From ac44769b9f01fbfcc6be141b41a0e829433c9105 Mon Sep 17 00:00:00 2001 From: Bart Wiegmans Date: Tue, 7 Aug 2018 16:44:18 +0200 Subject: [PATCH] [ConcBlockingQueue] Unmoving body The locks structure was already malloc'd, and I extended this to include the entire body. This prevents the GC from moving the body while we are blocked on acquiring a lock, simplifying the code, and has negligible cost. --- src/6model/reprs/ConcBlockingQueue.c | 126 ++++++++++++--------------- src/6model/reprs/ConcBlockingQueue.h | 22 ++--- src/types.h | 1 - 3 files changed, 66 insertions(+), 83 deletions(-) diff --git a/src/6model/reprs/ConcBlockingQueue.c b/src/6model/reprs/ConcBlockingQueue.c index 59a66a29f4..086d614928 100644 --- a/src/6model/reprs/ConcBlockingQueue.c +++ b/src/6model/reprs/ConcBlockingQueue.c @@ -19,23 +19,24 @@ static MVMObject * type_object_for(MVMThreadContext *tc, MVMObject *HOW) { /* Initializes a new instance. */ static void initialize(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) { - MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; - + MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue*)root; + MVMConcBlockingQueueBody *body = MVM_calloc(1, sizeof(MVMConcBlockingQueueBody)); /* Initialize locks. */ int init_stat; - cbq->locks = MVM_calloc(1, sizeof(MVMConcBlockingQueueLocks)); - if ((init_stat = uv_mutex_init(&cbq->locks->head_lock)) < 0) + + if ((init_stat = uv_mutex_init(&body->head_lock)) < 0) MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s", uv_strerror(init_stat)); - if ((init_stat = uv_mutex_init(&cbq->locks->tail_lock)) < 0) + if ((init_stat = uv_mutex_init(&body->tail_lock)) < 0) MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s", uv_strerror(init_stat)); - if ((init_stat = uv_cond_init(&cbq->locks->head_cond)) < 0) + if ((init_stat = uv_cond_init(&body->head_cond)) < 0) MVM_exception_throw_adhoc(tc, "Failed to initialize condition variable: %s", uv_strerror(init_stat)); /* Head and tail point to a null node. */ - cbq->tail = cbq->head = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode)); + body->tail = body->head = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode)); + cbq->body = body; } /* Copies the body of one object to another. */ @@ -47,7 +48,7 @@ static void copy_to(MVMThreadContext *tc, MVMSTable *st, void *src, MVMObject *d static void gc_mark(MVMThreadContext *tc, MVMSTable *st, void *data, MVMGCWorklist *worklist) { /* At this point we know the world is stopped, and thus we can safely do a * traversal of the data structure without needing locks. */ - MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; + MVMConcBlockingQueueBody *cbq = *(MVMConcBlockingQueueBody **)data; MVMConcBlockingQueueNode *cur = cbq->head; while (cur) { MVM_gc_worklist_add(tc, worklist, &cur->value); @@ -60,20 +61,22 @@ static void gc_free(MVMThreadContext *tc, MVMObject *obj) { MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)obj; /* First, free all the nodes. */ - MVMConcBlockingQueueNode *cur = cbq->body.head; + MVMConcBlockingQueueBody *body = cbq->body; + MVMConcBlockingQueueNode *cur = body->head; while (cur) { MVMConcBlockingQueueNode *next = cur->next; MVM_free(cur); cur = next; } - cbq->body.head = cbq->body.tail = NULL; - - /* Clean up locks. */ - uv_mutex_destroy(&cbq->body.locks->head_lock); - uv_mutex_destroy(&cbq->body.locks->tail_lock); - uv_cond_destroy(&cbq->body.locks->head_cond); - MVM_free(cbq->body.locks); - cbq->body.locks = NULL; + body->head = body->tail = NULL; + + /* Clean up */ + uv_mutex_destroy(&body->head_lock); + uv_mutex_destroy(&body->tail_lock); + uv_cond_destroy(&body->head_cond); + + /* Clean up body */ + MVM_free(body); } static const MVMStorageSpec storage_spec = { @@ -96,7 +99,7 @@ static void compose(MVMThreadContext *tc, MVMSTable *st, MVMObject *info) { } static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMint64 index, MVMRegister *value, MVMuint16 kind) { - MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; + MVMConcBlockingQueueBody *body = *(MVMConcBlockingQueueBody **)data; if (index != 0) MVM_exception_throw_adhoc(tc, @@ -105,22 +108,18 @@ static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *d MVM_exception_throw_adhoc(tc, "Can only get objects from a concurrent blocking queue"); - if (MVM_load(&cbq->elems) > 0) { + if (MVM_load(&body->elems) > 0) { MVMConcBlockingQueueNode *peeked; unsigned int interval_id; interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.at_pos"); MVMROOT(tc, root, { MVM_gc_mark_thread_blocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; - uv_mutex_lock(&cbq->locks->head_lock); + uv_mutex_lock(&body->head_lock); MVM_gc_mark_thread_unblocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; }); - peeked = cbq->head->next; + peeked = body->head->next; value->o = peeked ? peeked->value : tc->instance->VMNull; - uv_mutex_unlock(&cbq->locks->head_lock); + uv_mutex_unlock(&body->head_lock); MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.at_pos"); } else { @@ -129,12 +128,12 @@ static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *d } static MVMuint64 elems(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) { - MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; + MVMConcBlockingQueueBody *cbq = *(MVMConcBlockingQueueBody **)data; return MVM_load(&(cbq->elems)); } static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister value, MVMuint16 kind) { - MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; + MVMConcBlockingQueueBody *body = *(MVMConcBlockingQueueBody**)data; MVMConcBlockingQueueNode *add; AO_t orig_elems; MVMObject *to_add = value.o; @@ -152,37 +151,29 @@ static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *dat interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.push"); MVMROOT2(tc, root, to_add, { MVM_gc_mark_thread_blocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; - uv_mutex_lock(&cbq->locks->tail_lock); + uv_mutex_lock(&body->tail_lock); MVM_gc_mark_thread_unblocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; }); MVM_ASSIGN_REF(tc, &(root->header), add->value, to_add); - cbq->tail->next = add; - cbq->tail = add; - orig_elems = MVM_incr(&cbq->elems); - uv_mutex_unlock(&cbq->locks->tail_lock); + body->tail->next = add; + body->tail = add; + orig_elems = MVM_incr(&body->elems); + uv_mutex_unlock(&body->tail_lock); if (orig_elems == 0) { MVMROOT(tc, root, { MVM_gc_mark_thread_blocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; - uv_mutex_lock(&cbq->locks->head_lock); + uv_mutex_lock(&body->head_lock); MVM_gc_mark_thread_unblocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; }); - uv_cond_signal(&cbq->locks->head_cond); - uv_mutex_unlock(&cbq->locks->head_lock); + uv_cond_signal(&body->head_cond); + uv_mutex_unlock(&body->head_lock); } MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.push"); } static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister *value, MVMuint16 kind) { - MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; + MVMConcBlockingQueueBody *body = *(MVMConcBlockingQueueBody**)data; MVMConcBlockingQueueNode *taken; unsigned int interval_id; @@ -192,36 +183,28 @@ static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *da interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.shift"); MVMROOT(tc, root, { MVM_gc_mark_thread_blocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; - uv_mutex_lock(&cbq->locks->head_lock); + uv_mutex_lock(&body->head_lock); MVM_gc_mark_thread_unblocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; - while (MVM_load(&cbq->elems) == 0) { + while (MVM_load(&body->elems) == 0) { MVM_gc_mark_thread_blocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; - uv_cond_wait(&cbq->locks->head_cond, &cbq->locks->head_lock); + uv_cond_wait(&body->head_cond, &body->head_lock); MVM_gc_mark_thread_unblocked(tc); - data = OBJECT_BODY(root); - cbq = (MVMConcBlockingQueueBody *)data; } }); - taken = cbq->head->next; - MVM_free(cbq->head); - cbq->head = taken; + taken = body->head->next; + MVM_free(body->head); + body->head = taken; MVM_barrier(); value->o = taken->value; taken->value = NULL; MVM_barrier(); - if (MVM_decr(&cbq->elems) > 1) - uv_cond_signal(&cbq->locks->head_cond); + if (MVM_decr(&body->elems) > 1) + uv_cond_signal(&body->head_cond); - uv_mutex_unlock(&cbq->locks->head_lock); + uv_mutex_unlock(&body->head_lock); MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.shift"); } @@ -293,30 +276,31 @@ MVMObject * MVM_concblockingqueue_jit_poll(MVMThreadContext *tc, MVMObject *queu /* Polls a queue for a value, returning NULL if none is available. */ MVMObject * MVM_concblockingqueue_poll(MVMThreadContext *tc, MVMConcBlockingQueue *queue) { MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)queue; - MVMConcBlockingQueueNode *taken; + MVMConcBlockingQueueBody *body = cbq->body; + MVMConcBlockingQueueNode *taken; MVMObject *result = tc->instance->VMNull; unsigned int interval_id; interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.poll"); MVMROOT(tc, cbq, { MVM_gc_mark_thread_blocked(tc); - uv_mutex_lock(&cbq->body.locks->head_lock); + uv_mutex_lock(&body->head_lock); MVM_gc_mark_thread_unblocked(tc); }); - if (MVM_load(&cbq->body.elems) > 0) { - taken = cbq->body.head->next; - MVM_free(cbq->body.head); - cbq->body.head = taken; + if (MVM_load(&body->elems) > 0) { + taken = body->head->next; + MVM_free(body->head); + body->head = taken; MVM_barrier(); result = taken->value; taken->value = NULL; MVM_barrier(); - if (MVM_decr(&cbq->body.elems) > 1) - uv_cond_signal(&cbq->body.locks->head_cond); + if (MVM_decr(&body->elems) > 1) + uv_cond_signal(&body->head_cond); } - uv_mutex_unlock(&cbq->body.locks->head_lock); + uv_mutex_unlock(&body->head_lock); MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.poll"); return result; diff --git a/src/6model/reprs/ConcBlockingQueue.h b/src/6model/reprs/ConcBlockingQueue.h index 4d89778d40..c3d75ed9bf 100644 --- a/src/6model/reprs/ConcBlockingQueue.h +++ b/src/6model/reprs/ConcBlockingQueue.h @@ -4,16 +4,12 @@ struct MVMConcBlockingQueueNode { MVMConcBlockingQueueNode *next; }; -/* Memory used for mutexes and cond vars; these can't live in the object body - * directly as they are sensitive to being moved, but putting them together in - * a single struct means we can malloc a single bit of memory to hold them. */ -struct MVMConcBlockingQueueLocks { - uv_mutex_t head_lock; - uv_mutex_t tail_lock; - uv_cond_t head_cond; -}; -/* Representation used for concurrent blocking queue. */ +/* Representation used for concurrent blocking queue. Rather than hold the body + * inline, the object holds a pointer to the body. The body itself is allocated + * by malloc() rather than GC. This prevents it from being moved which would be + * a problem for mutexes and condition variables. Also, it prevents the GC from + * moving the body while we are blocked on acquiring a lock (for example) */ struct MVMConcBlockingQueueBody { /* Head and tail of the queue. */ MVMConcBlockingQueueNode *head; @@ -23,11 +19,15 @@ struct MVMConcBlockingQueueBody { AO_t elems; /* Locks and condition variables storage. */ - MVMConcBlockingQueueLocks *locks; + uv_mutex_t head_lock; + uv_mutex_t tail_lock; + uv_cond_t head_cond; }; + struct MVMConcBlockingQueue { MVMObject common; - MVMConcBlockingQueueBody body; + /* As noted, a pointer, not an inline struct */ + MVMConcBlockingQueueBody *body; }; /* Function for REPR setup. */ diff --git a/src/types.h b/src/types.h index 8499185fa3..917779c330 100644 --- a/src/types.h +++ b/src/types.h @@ -125,7 +125,6 @@ typedef struct MVMSemaphoreBody MVMSemaphoreBody; typedef struct MVMConcBlockingQueue MVMConcBlockingQueue; typedef struct MVMConcBlockingQueueBody MVMConcBlockingQueueBody; typedef struct MVMConcBlockingQueueNode MVMConcBlockingQueueNode; -typedef struct MVMConcBlockingQueueLocks MVMConcBlockingQueueLocks; typedef struct MVMObject MVMObject; typedef struct MVMObjectId MVMObjectId; typedef struct MVMObjectStooge MVMObjectStooge;