Skip to content

Commit

Permalink
[ConcBlockingQueue] Unmoving body
Browse files Browse the repository at this point in the history
The locks structure was already malloc'd, and I extended this to include
the entire body. This prevents the GC from moving the body while we are
blocked on acquiring a lock, simplifying the code, and has negligible
cost.
  • Loading branch information
bdw committed Sep 4, 2018
1 parent 73156ba commit ac44769
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 83 deletions.
126 changes: 55 additions & 71 deletions src/6model/reprs/ConcBlockingQueue.c
Expand Up @@ -19,23 +19,24 @@ static MVMObject * type_object_for(MVMThreadContext *tc, MVMObject *HOW) {

/* Initializes a new instance. */
static void initialize(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) {
MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;

MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue*)root;
MVMConcBlockingQueueBody *body = MVM_calloc(1, sizeof(MVMConcBlockingQueueBody));
/* Initialize locks. */
int init_stat;
cbq->locks = MVM_calloc(1, sizeof(MVMConcBlockingQueueLocks));
if ((init_stat = uv_mutex_init(&cbq->locks->head_lock)) < 0)

if ((init_stat = uv_mutex_init(&body->head_lock)) < 0)
MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s",
uv_strerror(init_stat));
if ((init_stat = uv_mutex_init(&cbq->locks->tail_lock)) < 0)
if ((init_stat = uv_mutex_init(&body->tail_lock)) < 0)
MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s",
uv_strerror(init_stat));
if ((init_stat = uv_cond_init(&cbq->locks->head_cond)) < 0)
if ((init_stat = uv_cond_init(&body->head_cond)) < 0)
MVM_exception_throw_adhoc(tc, "Failed to initialize condition variable: %s",
uv_strerror(init_stat));

/* Head and tail point to a null node. */
cbq->tail = cbq->head = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode));
body->tail = body->head = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode));
cbq->body = body;
}

/* Copies the body of one object to another. */
Expand All @@ -47,7 +48,7 @@ static void copy_to(MVMThreadContext *tc, MVMSTable *st, void *src, MVMObject *d
static void gc_mark(MVMThreadContext *tc, MVMSTable *st, void *data, MVMGCWorklist *worklist) {
/* At this point we know the world is stopped, and thus we can safely do a
* traversal of the data structure without needing locks. */
MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
MVMConcBlockingQueueBody *cbq = *(MVMConcBlockingQueueBody **)data;
MVMConcBlockingQueueNode *cur = cbq->head;
while (cur) {
MVM_gc_worklist_add(tc, worklist, &cur->value);
Expand All @@ -60,20 +61,22 @@ static void gc_free(MVMThreadContext *tc, MVMObject *obj) {
MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)obj;

/* First, free all the nodes. */
MVMConcBlockingQueueNode *cur = cbq->body.head;
MVMConcBlockingQueueBody *body = cbq->body;
MVMConcBlockingQueueNode *cur = body->head;
while (cur) {
MVMConcBlockingQueueNode *next = cur->next;
MVM_free(cur);
cur = next;
}
cbq->body.head = cbq->body.tail = NULL;

/* Clean up locks. */
uv_mutex_destroy(&cbq->body.locks->head_lock);
uv_mutex_destroy(&cbq->body.locks->tail_lock);
uv_cond_destroy(&cbq->body.locks->head_cond);
MVM_free(cbq->body.locks);
cbq->body.locks = NULL;
body->head = body->tail = NULL;

/* Clean up */
uv_mutex_destroy(&body->head_lock);
uv_mutex_destroy(&body->tail_lock);
uv_cond_destroy(&body->head_cond);

/* Clean up body */
MVM_free(body);
}

static const MVMStorageSpec storage_spec = {
Expand All @@ -96,7 +99,7 @@ static void compose(MVMThreadContext *tc, MVMSTable *st, MVMObject *info) {
}

static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMint64 index, MVMRegister *value, MVMuint16 kind) {
MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
MVMConcBlockingQueueBody *body = *(MVMConcBlockingQueueBody **)data;

if (index != 0)
MVM_exception_throw_adhoc(tc,
Expand All @@ -105,22 +108,18 @@ static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *d
MVM_exception_throw_adhoc(tc,
"Can only get objects from a concurrent blocking queue");

if (MVM_load(&cbq->elems) > 0) {
if (MVM_load(&body->elems) > 0) {
MVMConcBlockingQueueNode *peeked;
unsigned int interval_id;
interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.at_pos");
MVMROOT(tc, root, {
MVM_gc_mark_thread_blocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;
uv_mutex_lock(&cbq->locks->head_lock);
uv_mutex_lock(&body->head_lock);
MVM_gc_mark_thread_unblocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;
});
peeked = cbq->head->next;
peeked = body->head->next;
value->o = peeked ? peeked->value : tc->instance->VMNull;
uv_mutex_unlock(&cbq->locks->head_lock);
uv_mutex_unlock(&body->head_lock);
MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.at_pos");
}
else {
Expand All @@ -129,12 +128,12 @@ static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *d
}

static MVMuint64 elems(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) {
MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
MVMConcBlockingQueueBody *cbq = *(MVMConcBlockingQueueBody **)data;
return MVM_load(&(cbq->elems));
}

static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister value, MVMuint16 kind) {
MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
MVMConcBlockingQueueBody *body = *(MVMConcBlockingQueueBody**)data;
MVMConcBlockingQueueNode *add;
AO_t orig_elems;
MVMObject *to_add = value.o;
Expand All @@ -152,37 +151,29 @@ static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *dat
interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.push");
MVMROOT2(tc, root, to_add, {
MVM_gc_mark_thread_blocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;
uv_mutex_lock(&cbq->locks->tail_lock);
uv_mutex_lock(&body->tail_lock);
MVM_gc_mark_thread_unblocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;
});
MVM_ASSIGN_REF(tc, &(root->header), add->value, to_add);
cbq->tail->next = add;
cbq->tail = add;
orig_elems = MVM_incr(&cbq->elems);
uv_mutex_unlock(&cbq->locks->tail_lock);
body->tail->next = add;
body->tail = add;
orig_elems = MVM_incr(&body->elems);
uv_mutex_unlock(&body->tail_lock);

if (orig_elems == 0) {
MVMROOT(tc, root, {
MVM_gc_mark_thread_blocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;
uv_mutex_lock(&cbq->locks->head_lock);
uv_mutex_lock(&body->head_lock);
MVM_gc_mark_thread_unblocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;
});
uv_cond_signal(&cbq->locks->head_cond);
uv_mutex_unlock(&cbq->locks->head_lock);
uv_cond_signal(&body->head_cond);
uv_mutex_unlock(&body->head_lock);
}
MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.push");
}

static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister *value, MVMuint16 kind) {
MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
MVMConcBlockingQueueBody *body = *(MVMConcBlockingQueueBody**)data;
MVMConcBlockingQueueNode *taken;
unsigned int interval_id;

Expand All @@ -192,36 +183,28 @@ static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *da
interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.shift");
MVMROOT(tc, root, {
MVM_gc_mark_thread_blocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;
uv_mutex_lock(&cbq->locks->head_lock);
uv_mutex_lock(&body->head_lock);
MVM_gc_mark_thread_unblocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;

while (MVM_load(&cbq->elems) == 0) {
while (MVM_load(&body->elems) == 0) {
MVM_gc_mark_thread_blocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;
uv_cond_wait(&cbq->locks->head_cond, &cbq->locks->head_lock);
uv_cond_wait(&body->head_cond, &body->head_lock);
MVM_gc_mark_thread_unblocked(tc);
data = OBJECT_BODY(root);
cbq = (MVMConcBlockingQueueBody *)data;
}
});

taken = cbq->head->next;
MVM_free(cbq->head);
cbq->head = taken;
taken = body->head->next;
MVM_free(body->head);
body->head = taken;
MVM_barrier();
value->o = taken->value;
taken->value = NULL;
MVM_barrier();

if (MVM_decr(&cbq->elems) > 1)
uv_cond_signal(&cbq->locks->head_cond);
if (MVM_decr(&body->elems) > 1)
uv_cond_signal(&body->head_cond);

uv_mutex_unlock(&cbq->locks->head_lock);
uv_mutex_unlock(&body->head_lock);
MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.shift");
}

Expand Down Expand Up @@ -293,30 +276,31 @@ MVMObject * MVM_concblockingqueue_jit_poll(MVMThreadContext *tc, MVMObject *queu
/* Polls a queue for a value, returning NULL if none is available. */
MVMObject * MVM_concblockingqueue_poll(MVMThreadContext *tc, MVMConcBlockingQueue *queue) {
MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)queue;
MVMConcBlockingQueueNode *taken;
MVMConcBlockingQueueBody *body = cbq->body;
MVMConcBlockingQueueNode *taken;
MVMObject *result = tc->instance->VMNull;
unsigned int interval_id;

interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.poll");
MVMROOT(tc, cbq, {
MVM_gc_mark_thread_blocked(tc);
uv_mutex_lock(&cbq->body.locks->head_lock);
uv_mutex_lock(&body->head_lock);
MVM_gc_mark_thread_unblocked(tc);
});

if (MVM_load(&cbq->body.elems) > 0) {
taken = cbq->body.head->next;
MVM_free(cbq->body.head);
cbq->body.head = taken;
if (MVM_load(&body->elems) > 0) {
taken = body->head->next;
MVM_free(body->head);
body->head = taken;
MVM_barrier();
result = taken->value;
taken->value = NULL;
MVM_barrier();
if (MVM_decr(&cbq->body.elems) > 1)
uv_cond_signal(&cbq->body.locks->head_cond);
if (MVM_decr(&body->elems) > 1)
uv_cond_signal(&body->head_cond);
}

uv_mutex_unlock(&cbq->body.locks->head_lock);
uv_mutex_unlock(&body->head_lock);

MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.poll");
return result;
Expand Down
22 changes: 11 additions & 11 deletions src/6model/reprs/ConcBlockingQueue.h
Expand Up @@ -4,16 +4,12 @@ struct MVMConcBlockingQueueNode {
MVMConcBlockingQueueNode *next;
};

/* Memory used for mutexes and cond vars; these can't live in the object body
* directly as they are sensitive to being moved, but putting them together in
* a single struct means we can malloc a single bit of memory to hold them. */
struct MVMConcBlockingQueueLocks {
uv_mutex_t head_lock;
uv_mutex_t tail_lock;
uv_cond_t head_cond;
};

/* Representation used for concurrent blocking queue. */
/* Representation used for concurrent blocking queue. Rather than hold the body
* inline, the object holds a pointer to the body. The body itself is allocated
* by malloc() rather than GC. This prevents it from being moved which would be
* a problem for mutexes and condition variables. Also, it prevents the GC from
* moving the body while we are blocked on acquiring a lock (for example) */
struct MVMConcBlockingQueueBody {
/* Head and tail of the queue. */
MVMConcBlockingQueueNode *head;
Expand All @@ -23,11 +19,15 @@ struct MVMConcBlockingQueueBody {
AO_t elems;

/* Locks and condition variables storage. */
MVMConcBlockingQueueLocks *locks;
uv_mutex_t head_lock;
uv_mutex_t tail_lock;
uv_cond_t head_cond;
};

struct MVMConcBlockingQueue {
MVMObject common;
MVMConcBlockingQueueBody body;
/* As noted, a pointer, not an inline struct */
MVMConcBlockingQueueBody *body;
};

/* Function for REPR setup. */
Expand Down
1 change: 0 additions & 1 deletion src/types.h
Expand Up @@ -125,7 +125,6 @@ typedef struct MVMSemaphoreBody MVMSemaphoreBody;
typedef struct MVMConcBlockingQueue MVMConcBlockingQueue;
typedef struct MVMConcBlockingQueueBody MVMConcBlockingQueueBody;
typedef struct MVMConcBlockingQueueNode MVMConcBlockingQueueNode;
typedef struct MVMConcBlockingQueueLocks MVMConcBlockingQueueLocks;
typedef struct MVMObject MVMObject;
typedef struct MVMObjectId MVMObjectId;
typedef struct MVMObjectStooge MVMObjectStooge;
Expand Down

0 comments on commit ac44769

Please sign in to comment.