Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Move the FIFO Queue implementation into its own file (fifo_q.h). Work

on the nif_unload path.  Free up resources owned by wterl.c when
unloading.  Continue to evolve the build script.  Add to khash the ability
to create a hash that maps from a pointer to a value. There is still a segv
due to a race wterl.c:do_unload() which needs to be addressed.
  • Loading branch information...
commit 60dd048b7e425d53a7383e067a806ff60d787914 1 parent db953f5
Gregory Burd authored
View
3  .gdbinit
@@ -1 +1,4 @@
handle SIGPIPE nostop noprint pass
+#b erl_nif.c:1203
+#b sys/unix/erl_unix_sys_ddll.c:234
+
View
83 c_src/async_nif.h
@@ -27,6 +27,7 @@ extern "C" {
#endif
#include <assert.h>
+#include "fifo_q.h"
#ifdef ASYNC_NIF_STATS
#include "stats.h" // TODO: measure, measure... measure again
#endif
@@ -34,73 +35,6 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
-#define FIFO_QUEUE_TYPE(name) \
- struct fifo_q__ ## name *
-#define DECL_FIFO_QUEUE(name, type) \
- struct fifo_q__ ## name { \
- unsigned int h, t, s; \
- type *items[]; \
- }; \
- static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \
- int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\
- struct fifo_q__ ## name *q = enif_alloc(sz); \
- if (!q) \
- return 0; \
- memset(q, 0, sz); \
- q->s = n + 1; \
- return q; \
- } \
- static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
- memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
- enif_free(q); \
- } \
- static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
- q->items[q->h] = n; \
- q->h = (q->h + 1) % q->s; \
- return n; \
- } \
- static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
- type *n = q->items[q->t]; \
- q->items[q->t] = 0; \
- q->t = (q->t + 1) % q->s; \
- return n; \
- } \
- static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
- return (q->h - q->t + q->s) % q->s; \
- } \
- static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
- return q->s - 1; \
- } \
- static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
- return (q->t == q->h); \
- } \
- static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \
- return ((q->h + 1) % q->s) == q->t; \
- }
-
-#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
-#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue)
-#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue)
-#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item)
-#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
-#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
-#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
-#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue)
-#define fifo_q_foreach(name, queue, item, task) do { \
- while((item = fifo_q_ ## name ## _get(queue)) != NULL) { \
- do task while(0); \
- } \
- } while(0);
-
-struct async_nif_req_entry {
- ERL_NIF_TERM ref;
- ErlNifEnv *env;
- ErlNifPid pid;
- void *args;
- void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
- void (*fn_post)(void *);
-};
-
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
struct async_nif_work_queue {
@@ -141,7 +75,8 @@ struct async_nif_state {
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc -= 1; \
- struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); \
+ /* Note: !!! this assumes that the first element of priv_data is ours */ \
+ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
@@ -198,11 +133,11 @@ struct async_nif_state {
priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
-#define ASYNC_NIF_UNLOAD(name, env) do { \
+#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
- async_nif_unload(env); \
+ async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \
enif_mutex_destroy(name##_async_nif_coord); \
name##_async_nif_coord = NULL; \
@@ -326,10 +261,9 @@ async_nif_worker_fn(void *arg)
}
static void
-async_nif_unload(ErlNifEnv *env)
+async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
{
unsigned int i;
- struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env);
unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
@@ -359,7 +293,6 @@ async_nif_unload(ErlNifEnv *env)
/* Cleanup requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
- enif_mutex_lock(q->reqs_mutex); // TODO: unnecessary?
/* Worker threads are stopped, now toss anything left in the queue. */
struct async_nif_req_entry *req = NULL;
@@ -372,8 +305,6 @@ async_nif_unload(ErlNifEnv *env)
enif_free(req);
});
fifo_q_free(reqs, q->reqs);
-
- enif_mutex_unlock(q->reqs_mutex); // TODO: unnecessary?
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
@@ -382,7 +313,7 @@ async_nif_unload(ErlNifEnv *env)
}
static void *
-async_nif_load(void)
+async_nif_load()
{
static int has_init = 0;
unsigned int i, j, num_queues;
View
10 c_src/build_deps.sh
@@ -158,17 +158,17 @@ case "$1" in
# Build Snappy
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
- test -f system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
+ test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build BZIP2
[ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory" && exit 1)
- test -f system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2;
+ test -f $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2;
# Build WiredTiger
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
- test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
- -a -f system/lib/libwiredtiger_snappy.so \
- -a -f system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt;
+ test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
+ -a -f $BASEDIR/system/lib/libwiredtiger_snappy.so \
+ -a -f $BASEDIR/system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp $BASEDIR/system/bin/wt ${BASEDIR}/../priv
View
102 c_src/fifo_q.h
@@ -0,0 +1,102 @@
+/*
+ * fifo_q: a macro-based implementation of a FIFO Queue
+ *
+ * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
+ * Author: Gregory Burd <greg@basho.com> <greg@burd.me>
+ *
+ * This file is provided to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef __FIFO_Q_H__
+#define __FIFO_Q_H__
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+#define FIFO_QUEUE_TYPE(name) \
+ struct fifo_q__ ## name *
+#define DECL_FIFO_QUEUE(name, type) \
+ struct fifo_q__ ## name { \
+ unsigned int h, t, s; \
+ type *items[]; \
+ }; \
+ static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \
+ int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\
+ struct fifo_q__ ## name *q = enif_alloc(sz); \
+ if (!q) \
+ return 0; \
+ memset(q, 0, sz); \
+ q->s = n + 1; \
+ return q; \
+ } \
+ static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
+ memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
+ enif_free(q); \
+ } \
+ static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
+ q->items[q->h] = n; \
+ q->h = (q->h + 1) % q->s; \
+ return n; \
+ } \
+ static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
+ type *n = q->items[q->t]; \
+ q->items[q->t] = 0; \
+ q->t = (q->t + 1) % q->s; \
+ return n; \
+ } \
+ static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
+ return (q->h - q->t + q->s) % q->s; \
+ } \
+ static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
+ return q->s - 1; \
+ } \
+ static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
+ return (q->t == q->h); \
+ } \
+ static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \
+ return ((q->h + 1) % q->s) == q->t; \
+ }
+
+#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
+#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue)
+#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue)
+#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item)
+#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
+#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
+#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
+#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue)
+#define fifo_q_foreach(name, queue, item, task) do { \
+ while(!fifo_q_ ## name ## _empty(queue)) { \
+ item = fifo_q_ ## name ## _get(queue); \
+ do task while(0); \
+ } \
+ } while(0);
+
+struct async_nif_req_entry {
+ ERL_NIF_TERM ref;
+ ErlNifEnv *env;
+ ErlNifPid pid;
+ void *args;
+ void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
+ void (*fn_post)(void *);
+};
+
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif // __FIFO_Q_H__
View
33 c_src/khash.h
@@ -372,6 +372,26 @@ static const double __ac_HASH_UPPER = 0.77;
*/
#define kh_int64_hash_equal(a, b) ((a) == (b))
/*! @function
+ @abstract Pointer hash function
+ @param key The integer void *
+ @return The hash value [khint_t]
+*/
+#define kh_ptr_hash_func(key) (khint32_t)(key)
+/*! @function
+ @abstract Pointer comparison function
+*/
+#define kh_ptr_hash_equal(a, b) ((a) == (b))
+/*! @function
+ @abstract 64-bit pointer hash function
+ @param key The integer void *
+ @return The hash value [khint_t]
+*/
+#define kh_ptr64_hash_func(key) (khint32_t)(((khint64_t)key)>>33^((khint64_t)key)^((khint64_t)key)<<11)
+/*! @function
+ @abstract 64-bit pointer comparison function
+*/
+#define kh_ptr64_hash_equal(a, b) ((a) == (b))
+/*! @function
@abstract const char* hash function
@param s Pointer to a null terminated string
@return The hash value
@@ -562,6 +582,19 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key)
/* More conenient interfaces */
/*! @function
+ @abstract Instantiate a hash map containing (void *) keys
+ @param name Name of the hash table [symbol]
+ @param khval_t Type of values [type]
+*/
+#ifdef __x86_64__
+#define KHASH_MAP_INIT_PTR(name, khval_t) \
+ KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal)
+#else
+#define KHASH_MAP_INIT_PTR(name, khval_t) \
+ KHASH_INIT(name, void*, khval_t, 1, kh_ptr_hash_func, kh_ptr_hash_equal)
+#endif
+
+/*! @function
@abstract Instantiate a hash set containing integer keys
@param name Name of the hash table [symbol]
*/
View
112 c_src/wterl.c
@@ -46,6 +46,7 @@
static ErlNifResourceType *wterl_conn_RESOURCE;
static ErlNifResourceType *wterl_cursor_RESOURCE;
+/* Generators for 'cursors' a named, type-specific hash table functions. */
KHASH_MAP_INIT_STR(cursors, WT_CURSOR*);
/**
@@ -90,8 +91,19 @@ static ERL_NIF_TERM ATOM_NOT_FOUND;
static ERL_NIF_TERM ATOM_FIRST;
static ERL_NIF_TERM ATOM_LAST;
+/* Generators for 'conns' a named, type-specific hash table functions. */
+KHASH_MAP_INIT_PTR(conns, WterlConnHandle*);
+
+struct wterl_priv_data {
+ void *async_nif_priv; // Note: must be first element in struct
+ ErlNifMutex *conns_mutex;
+ khash_t(conns) *conns;
+};
+
+/* Global init for async_nif. */
ASYNC_NIF_INIT(wterl);
+
/**
* Get the per-worker reusable WT_SESSION for a worker_id.
*/
@@ -135,10 +147,9 @@ __close_all_sessions(WterlConnHandle *conn_handle)
for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
if (kh_exist(h, itr)) {
WT_CURSOR *cursor = kh_val(h, itr);
- char *key = (char *)kh_key(h, itr);
cursor->close(cursor);
kh_del(cursors, h, itr);
- enif_free(key);
+ kh_value(h, itr) = NULL;
}
}
kh_destroy(cursors, h);
@@ -165,10 +176,9 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
khiter_t itr = kh_get(cursors, h, (char *)uri);
if (itr != kh_end(h)) {
WT_CURSOR *cursor = kh_value(h, itr);
- char *key = (char *)kh_key(h, itr);
cursor->close(cursor);
kh_del(cursors, h, itr);
- enif_free(key);
+ kh_value(h, itr) = NULL;
}
}
}
@@ -251,6 +261,7 @@ ASYNC_NIF_DECL(
ERL_NIF_TERM config;
ERL_NIF_TERM session_config;
char homedir[4096];
+ struct wterl_priv_data *priv;
},
{ // pre
@@ -262,6 +273,8 @@ ASYNC_NIF_DECL(
}
args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
args->session_config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
+
+ args->priv = (struct wterl_priv_data *)enif_priv_data(env);
},
{ // work
@@ -297,13 +310,25 @@ ASYNC_NIF_DECL(
} else {
conn_handle->session_config = NULL;
}
+ conn_handle->contexts_mutex = enif_mutex_create(NULL);
+ enif_mutex_lock(conn_handle->contexts_mutex);
conn_handle->conn = conn;
conn_handle->num_contexts = 0;
memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS);
- conn_handle->contexts_mutex = enif_mutex_create(NULL);
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
+
+ khash_t(conns) *h;
+ enif_mutex_lock(args->priv->conns_mutex);
+ h = args->priv->conns;
+ int itr_status = 0;
+ khiter_t itr = kh_put(conns, h, conn, &itr_status);
+ if (itr_status != 0) // 0 indicates the key exists already
+ kh_value(h, itr) = conn_handle;
+ enif_mutex_unlock(args->priv->conns_mutex);
+
enif_release_resource(conn_handle);
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result));
+ enif_mutex_unlock(conn_handle->contexts_mutex);
}
else
{
@@ -324,6 +349,7 @@ ASYNC_NIF_DECL(
{ // struct
WterlConnHandle* conn_handle;
+ struct wterl_priv_data *priv;
},
{ // pre
@@ -332,6 +358,8 @@ ASYNC_NIF_DECL(
ASYNC_NIF_RETURN_BADARG();
}
enif_keep_resource((void*)args->conn_handle);
+
+ args->priv = (struct wterl_priv_data *)enif_priv_data(env);
},
{ // work
@@ -344,6 +372,19 @@ ASYNC_NIF_DECL(
}
WT_CONNECTION* conn = args->conn_handle->conn;
int rc = conn->close(conn, NULL);
+
+ khash_t(conns) *h;
+ enif_mutex_lock(args->priv->conns_mutex);
+ h = args->priv->conns;
+ khiter_t itr;
+ itr = kh_get(conns, h, conn);
+ if (itr == 0) {
+ /* key exists in table (as expected) delete it */
+ kh_del(conns, h, itr);
+ kh_value(h, itr) = NULL;
+ }
+ enif_mutex_unlock(args->priv->conns_mutex);
+
enif_mutex_unlock(args->conn_handle->contexts_mutex);
enif_mutex_destroy(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
@@ -1836,27 +1877,78 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
ATOM_FIRST = enif_make_atom(env, "first");
ATOM_LAST = enif_make_atom(env, "last");
- ASYNC_NIF_LOAD(wterl, *priv_data);
+ struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data));
+ if (!priv)
+ return ENOMEM;
+ memset(priv, 0, sizeof(struct wterl_priv_data));
- return *priv_data ? 0 : -1;
+ /* Note: !!! the first element of our priv_data struct *must* be the
+ pointer to the async_nif's private data which we set here. */
+ ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
+
+ priv->conns_mutex = enif_mutex_create(NULL);
+ priv->conns = kh_init(conns);
+ *priv_data = priv;
+ return *priv_data ? 0 : ENOMEM;
}
static int
on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{
- return 0; // TODO: Determine what should be done here.
+ return 0; // TODO: implement
}
static void
on_unload(ErlNifEnv *env, void *priv_data)
{
- ASYNC_NIF_UNLOAD(wterl, env); // TODO: Review/test this.
+ unsigned int i;
+ struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data;
+ khash_t(conns) *h;
+ khiter_t itr;
+
+ enif_mutex_lock(priv->conns_mutex);
+ h = priv->conns;
+
+ for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
+ if (kh_exist(h, itr)) {
+ WterlConnHandle *c = kh_val(h, itr);
+ if (c) {
+ enif_mutex_lock(c->contexts_mutex);
+ enif_free((void*)c->session_config);
+ for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
+ kh_destroy(cursors, c->contexts[i].cursors);
+ }
+ }
+
+ /* This should close all cursors and sessions. */
+ c->conn->close(c->conn, NULL);
+ }
+ }
+
+ /* Continue to hold the context mutex while unloading the async_nif
+ to prevent new work from coming in while shutting down. */
+ ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv);
+
+ for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
+ if (kh_exist(h, itr)) {
+ WterlConnHandle *c = kh_val(h, itr);
+ if (c) {
+ enif_mutex_unlock(c->contexts_mutex);
+ enif_mutex_destroy(c->contexts_mutex);
+ }
+ }
+ }
+
+ kh_destroy(conns, h);
+ enif_mutex_unlock(priv->conns_mutex);
+ enif_mutex_destroy(priv->conns_mutex);
+ enif_free(priv);
}
static int
on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info)
{
- ASYNC_NIF_UPGRADE(wterl, env); // TODO: Review/test this.
+ ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement
return 0;
}
View
4 src/riak_kv_wterl_backend.erl
@@ -151,8 +151,8 @@ start(Partition, Config) ->
%% @doc Stop the wterl backend
-spec stop(state()) -> ok.
-stop(_State) ->
- ok.
+stop(#state{connection=Connection}) ->
+ wterl_conn:close(Connection).
%% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->

0 comments on commit 60dd048

Please sign in to comment.
Something went wrong with that request. Please try again.