Permalink
Browse files

Merge branch 'gh41-quviq-pulse-and-qc-slf-bugfixes'

  • Loading branch information...
slfritchie committed Nov 16, 2012
2 parents 859013d + 446f758 commit 616e53336897de35b86f064502760e7f954dd311
View
@@ -1,6 +1,6 @@
.eunit/*
deps/*
ebin
-priv/*
+priv/*.so
*.o
*.beam
View
@@ -33,6 +33,7 @@
#include "khash.h"
#include "murmurhash.h"
+#include <stdio.h>
#ifdef BITCASK_DEBUG
#include <stdio.h>
#include <stdarg.h>
@@ -47,7 +48,9 @@ void DEBUG(const char *fmt, ...)
# define DEBUG(X, ...) {}
#endif
-
+#ifdef PULSE
+#include "pulse_c_send.h"
+#endif
static ErlNifResourceType* bitcask_keydir_RESOURCE;
@@ -66,6 +69,7 @@ typedef struct
uint32_t total_sz;
uint64_t offset;
uint32_t tstamp;
+ uint32_t newest_put;
uint16_t key_sz;
char key[0];
} bitcask_keydir_entry;
@@ -84,6 +88,7 @@ typedef struct
uint64_t total_keys; // total number of keys written to file
uint64_t total_bytes; // total number of bytes written to file
uint32_t oldest_tstamp; // oldest observed tstamp in a file
+ uint32_t newest_tstamp; // newest observed tstamp in a file
} bitcask_fstats_entry;
KHASH_MAP_INIT_INT(fstats, bitcask_fstats_entry*);
@@ -98,8 +103,10 @@ typedef struct
fstats_hash_t* fstats;
size_t key_count;
size_t key_bytes;
+ uint32_t biggest_file_id;
unsigned int refcount;
unsigned int keyfolders;
+ uint64_t iter_generation;
uint64_t pending_updated;
uint64_t pending_start; // os:timestamp() as 64-bit integer
ErlNifPid* pending_awaken; // processes to wake once pending merged into entries
@@ -221,10 +228,13 @@ static void bitcask_nifs_file_resource_cleanup(ErlNifEnv* env, void* arg);
static ErlNifFunc nif_funcs[] =
{
+#ifdef PULSE
+ {"set_pulse_pid", 1, set_pulse_pid},
+#endif
{"keydir_new", 0, bitcask_nifs_keydir_new0},
{"keydir_new", 1, bitcask_nifs_keydir_new1},
{"keydir_mark_ready", 1, bitcask_nifs_keydir_mark_ready},
- {"keydir_put_int", 6, bitcask_nifs_keydir_put_int},
+ {"keydir_put_int", 9, bitcask_nifs_keydir_put_int},
{"keydir_get_int", 2, bitcask_nifs_keydir_get_int},
{"keydir_remove", 2, bitcask_nifs_keydir_remove},
{"keydir_remove_int", 5, bitcask_nifs_keydir_remove},
@@ -235,19 +245,19 @@ static ErlNifFunc nif_funcs[] =
{"keydir_info", 1, bitcask_nifs_keydir_info},
{"keydir_release", 1, bitcask_nifs_keydir_release},
- {"lock_acquire", 2, bitcask_nifs_lock_acquire},
- {"lock_release", 1, bitcask_nifs_lock_release},
- {"lock_readdata", 1, bitcask_nifs_lock_readdata},
- {"lock_writedata", 2, bitcask_nifs_lock_writedata},
-
- {"file_open", 2, bitcask_nifs_file_open},
- {"file_close", 1, bitcask_nifs_file_close},
- {"file_sync", 1, bitcask_nifs_file_sync},
- {"file_pread", 3, bitcask_nifs_file_pread},
- {"file_pwrite", 3, bitcask_nifs_file_pwrite},
- {"file_read", 2, bitcask_nifs_file_read},
- {"file_write", 2, bitcask_nifs_file_write},
- {"file_seekbof", 1, bitcask_nifs_file_seekbof}
+ {"lock_acquire_int", 2, bitcask_nifs_lock_acquire},
+ {"lock_release_int", 1, bitcask_nifs_lock_release},
+ {"lock_readdata_int", 1, bitcask_nifs_lock_readdata},
+ {"lock_writedata_int", 2, bitcask_nifs_lock_writedata},
+
+ {"file_open_int", 2, bitcask_nifs_file_open},
+ {"file_close_int", 1, bitcask_nifs_file_close},
+ {"file_sync_int", 1, bitcask_nifs_file_sync},
+ {"file_pread_int", 3, bitcask_nifs_file_pread},
+ {"file_pwrite_int", 3, bitcask_nifs_file_pwrite},
+ {"file_read_int", 2, bitcask_nifs_file_read},
+ {"file_write_int", 2, bitcask_nifs_file_write},
+ {"file_seekbof_int", 1, bitcask_nifs_file_seekbof}
};
ERL_NIF_TERM bitcask_nifs_keydir_new0(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
@@ -394,6 +404,11 @@ static void update_fstats(ErlNifEnv* env, bitcask_keydir* keydir,
{
entry->oldest_tstamp = tstamp;
}
+ if ((tstamp != 0 && tstamp > entry->newest_tstamp) ||
+ entry->newest_tstamp == 0)
+ {
+ entry->newest_tstamp = tstamp;
+ }
}
static khint_t keydir_entry_hash(bitcask_keydir_entry* entry)
@@ -537,13 +552,18 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF
bitcask_keydir_handle* handle;
bitcask_keydir_entry entry;
ErlNifBinary key;
+ uint32_t old_file_id;
+ uint64_t old_offset;
if (enif_get_resource(env, argv[0], bitcask_keydir_RESOURCE, (void**)&handle) &&
enif_inspect_binary(env, argv[1], &key) &&
enif_get_uint(env, argv[2], (unsigned int*)&(entry.file_id)) &&
enif_get_uint(env, argv[3], &(entry.total_sz)) &&
enif_get_uint64_bin(env, argv[4], &(entry.offset)) &&
- enif_get_uint(env, argv[5], &(entry.tstamp)))
+ enif_get_uint(env, argv[5], &(entry.tstamp)) &&
+ enif_get_uint(env, argv[6], &(entry.newest_put)) &&
+ enif_get_uint(env, argv[7], (unsigned int*)&(old_file_id)) &&
+ enif_get_uint64_bin(env, argv[8], &(old_offset)))
{
khiter_t itr;
entries_hash_t* hash;
@@ -555,7 +575,6 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF
(int) entry.file_id, (int) entry.offset,
(int)entry.total_sz);
-
// Check for put on a new key or updating a pending tombstone
int tombstone = 0;
int found = find_keydir_entry(env, keydir, &key, &hash, &itr, &old_entry, 0);
@@ -565,6 +584,12 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF
tombstone = 1;
}
+ if (!found && old_file_id != 0)
+ {
+ UNLOCK(keydir);
+ return ATOM_ALREADY_EXISTS;
+ }
+
if (!found)
{
keydir->key_count++;
@@ -590,21 +615,35 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF
{
keydir->pending_updated++;
}
+ if (entry.file_id > keydir->biggest_file_id)
+ {
+ keydir->biggest_file_id = entry.file_id;
+ }
UNLOCK(keydir);
return ATOM_OK;
}
+ // If old_file_id is > 0, then test-and-set fails,
+ // then return already_exists.
+ if (old_file_id != 0 &&
+ !(old_file_id == old_entry->file_id &&
+ old_offset == old_entry->offset))
+ {
+ UNLOCK(keydir);
+ return ATOM_ALREADY_EXISTS;
+ }
+
// Now that we've marshalled everything, see if the tstamp for this key is >=
// to what's already in the hash. Otherwise, we don't bother with the update.
- if ((old_entry->tstamp < entry.tstamp) ||
-
- ((old_entry->tstamp == entry.tstamp) &&
- (old_entry->file_id < entry.file_id)) ||
-
- ((old_entry->tstamp == entry.tstamp) &&
- ((old_entry->file_id == entry.file_id) &&
- (old_entry->offset < entry.offset))))
+ if ((entry.newest_put &&
+ (entry.file_id >= keydir->biggest_file_id)) ||
+ (! entry.newest_put &&
+ (old_entry->tstamp < entry.tstamp)) ||
+ (! entry.newest_put &&
+ ((old_entry->file_id < entry.file_id) ||
+ (((old_entry->file_id == entry.file_id) &&
+ (old_entry->offset < entry.offset))))))
{
// Remove the stats for the old entry and add the new
if (old_entry->file_id != entry.file_id) // different files
@@ -641,6 +680,10 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF
{
keydir->pending_updated++;
}
+ if (entry.file_id > keydir->biggest_file_id)
+ {
+ keydir->biggest_file_id = entry.file_id;
+ }
UNLOCK(keydir);
return ATOM_OK;
@@ -962,7 +1005,14 @@ ERL_NIF_TERM bitcask_nifs_keydir_itr(ErlNifEnv* env, int argc, const ERL_NIF_TER
{ // Grow 16-at-a-time, expect a single alloc
keydir->pending_awaken_size += 16;
size_t size = keydir->pending_awaken_size * sizeof(keydir->pending_awaken[0]);
- keydir->pending_awaken = enif_realloc_compat(env, keydir->pending_awaken, size);
+ if (keydir->pending_awaken == NULL)
+ {
+ keydir->pending_awaken = enif_alloc_compat(env, size);
+ }
+ else
+ {
+ keydir->pending_awaken = enif_realloc_compat(env, keydir->pending_awaken, size);
+ }
}
enif_self(env, &keydir->pending_awaken[keydir->pending_awaken_count]);
keydir->pending_awaken_count++;
@@ -1054,6 +1104,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_itr_release(ErlNifEnv* env, int argc, const ERL
if (handle->keydir->keyfolders == 0)
{
merge_pending_entries(env, handle->keydir);
+ handle->keydir->iter_generation++;
}
UNLOCK(handle->keydir);
return ATOM_OK;
@@ -1072,10 +1123,16 @@ ERL_NIF_TERM bitcask_nifs_keydir_info(ErlNifEnv* env, int argc, const ERL_NIF_TE
if (enif_get_resource(env, argv[0], bitcask_keydir_RESOURCE, (void**)&handle))
{
bitcask_keydir* keydir = handle->keydir;
+
+ if (keydir == NULL)
+ {
+ return enif_make_badarg(env);
+ }
LOCK(keydir);
// Dump fstats info into a list of [{file_id, live_keys, total_keys,
- // live_bytes, total_bytes, oldest_tstamp}]
+ // live_bytes, total_bytes,
+ // oldest_tstamp, newest_tstamp}]
ERL_NIF_TERM fstats_list = enif_make_list(env, 0);
khiter_t itr;
bitcask_fstats_entry* curr_f;
@@ -1085,21 +1142,28 @@ ERL_NIF_TERM bitcask_nifs_keydir_info(ErlNifEnv* env, int argc, const ERL_NIF_TE
{
curr_f = kh_val(keydir->fstats, itr);
ERL_NIF_TERM fstat =
- enif_make_tuple6(env,
+ enif_make_tuple7(env,
enif_make_uint(env, curr_f->file_id),
enif_make_ulong(env, curr_f->live_keys),
enif_make_ulong(env, curr_f->total_keys),
enif_make_ulong(env, curr_f->live_bytes),
enif_make_ulong(env, curr_f->total_bytes),
- enif_make_uint(env, curr_f->oldest_tstamp));
+ enif_make_uint(env, curr_f->oldest_tstamp),
+ enif_make_uint(env, curr_f->newest_tstamp));
fstats_list = enif_make_list_cell(env, fstat, fstats_list);
}
}
- ERL_NIF_TERM result = enif_make_tuple3(env,
+ ERL_NIF_TERM iter_info =
+ enif_make_tuple3(env,
+ enif_make_uint64_bin(env, keydir->iter_generation),
+ enif_make_ulong(env, keydir->keyfolders),
+ keydir->pending == NULL ? ATOM_FALSE : ATOM_TRUE);
+ ERL_NIF_TERM result = enif_make_tuple4(env,
enif_make_ulong(env, keydir->key_count),
enif_make_ulong(env, keydir->key_bytes),
- fstats_list);
+ fstats_list,
+ iter_info);
UNLOCK(keydir);
return result;
}
@@ -1582,7 +1646,11 @@ static void msg_pending_awaken(ErlNifEnv* env, bitcask_keydir* keydir,
for (idx = 0; idx < keydir->pending_awaken_count; idx++)
{
enif_clear_env(msg_env);
+#ifdef PULSE
+ PULSE_SEND(env, &keydir->pending_awaken[idx], msg_env, msg);
+#else
enif_send(env, &keydir->pending_awaken[idx], msg_env, msg);
+#endif
}
enif_free_env(msg_env);
}
@@ -1626,7 +1694,7 @@ static void merge_pending_entries(ErlNifEnv* env, bitcask_keydir* keydir)
else
{
bitcask_keydir_entry* entries_entry = kh_key(keydir->entries, ent_itr);
- DEBUG("Entries Entry: key=%s key_sz=%d file_id=%d statmp=%u offset=%u size=%d\r\n",
+ DEBUG("Entries Entry: key=%s key_sz=%d file_id=%d statmp=%u offset=%u size=%d\r\n",
entries_entry->key, entries_entry->key_sz,
entries_entry->file_id,
(unsigned int) entries_entry->tstamp,
@@ -1658,7 +1726,10 @@ static void merge_pending_entries(ErlNifEnv* env, bitcask_keydir* keydir)
keydir->pending_updated = 0;
keydir->pending_start = 0;
- enif_free_compat(env, keydir->pending_awaken);
+ if (keydir->pending_awaken != NULL)
+ {
+ enif_free_compat(env, keydir->pending_awaken);
+ }
keydir->pending_awaken = NULL;
keydir->pending_awaken_count = 0;
keydir->pending_awaken_size = 0;
@@ -1864,6 +1935,10 @@ static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOM_READONLY = enif_make_atom(env, "readonly");
ATOM_O_SYNC = enif_make_atom(env, "o_sync");
+#ifdef PULSE
+ pulse_c_send_on_load(env);
+#endif
+
return 0;
}
View
@@ -0,0 +1,62 @@
+// ---------------------------------------------------------
+// - Small utility lib for sending messages from C through
+// - the PULSE scheduler.
+// -
+// - The caveat is that you can't send messages to a named/
+// - registered process from a NIF. Therefore a little bit
+// - of machinery is used to keep track of the Pid of PULSE
+// - and to enable message sends to PULSE via the pulse_send
+// - function.
+//
+// ----------------------------------------------------------
+#ifdef PULSE
+
+#include "erl_nif.h"
+#include "erl_nif_compat.h"
+#include "pulse_c_send.h"
+
+// The global place to store the pid of PULSE
+ErlNifPid* THE_PULSE_PID;
+
+// Send atom, initialized on_load
+static ERL_NIF_TERM ATOM_SEND;
+static ERL_NIF_TERM ATOM_OK;
+
+ERL_NIF_TERM set_pulse_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]){
+ if(!THE_PULSE_PID){
+ THE_PULSE_PID = (ErlNifPid *)malloc(sizeof(ErlNifPid));
+ }
+
+ enif_get_local_pid(env, argv[0], THE_PULSE_PID);
+
+ return ATOM_OK;
+}
+
+int pulse_send(ErlNifEnv* env, ErlNifPid* dest_pid,
+ ErlNifEnv* msg_env, ERL_NIF_TERM msg,
+ char* file, int line){
+ ERL_NIF_TERM t_self =
+ enif_make_pid(msg_env, enif_self(msg_env, (ErlNifPid *)malloc(sizeof(ErlNifPid))));
+ ERL_NIF_TERM t_src_loc =
+ enif_make_tuple2(msg_env, enif_make_string(msg_env, file, ERL_NIF_LATIN1),
+ enif_make_int(msg_env, line));
+ ERL_NIF_TERM t_args = enif_make_list(msg_env, 2, enif_make_pid(msg_env, dest_pid), msg);
+ ERL_NIF_TERM pulse_msg = enif_make_tuple4(msg_env,
+ ATOM_SEND,
+ t_self,
+ t_src_loc,
+ t_args);
+
+ return enif_send(env, THE_PULSE_PID, msg_env, pulse_msg);
+}
+
+int pulse_c_send_on_load(ErlNifEnv* env){
+ THE_PULSE_PID = (ErlNifPid *)0L;
+
+ ATOM_SEND = enif_make_atom(env, "send");
+ ATOM_OK = enif_make_atom(env, "ok");
+
+ return 0;
+}
+
+#endif // ifdef PULSE
Oops, something went wrong.

0 comments on commit 616e533

Please sign in to comment.