Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Fix callback for second barrier not being attached #435

Merged
merged 6 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ test_integration_uv_SOURCES = \
test/integration/test_uv_tcp_listen.c \
test/integration/test_uv_snapshot_put.c \
test/integration/test_uv_truncate.c \
test/integration/test_uv_truncate_snapshot.c \
test/integration/test_uv_work.c
test_integration_uv_CFLAGS = $(AM_CFLAGS) -Wno-type-limits -Wno-conversion
test_integration_uv_LDFLAGS = -no-install
Expand Down
33 changes: 25 additions & 8 deletions src/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,24 @@ int UvAppend(struct raft_io *io,
raft_io_append_cb cb);

/* Pause request object and callback. */
struct UvBarrier;
typedef void (*UvBarrierCb)(struct UvBarrier *req);
struct UvBarrier
struct UvBarrierReq;

/* A barrier cb that plans to perform work on the threadpool MUST exit early
* and cleanup resources when it detects uv->closing, this is to allow forced
* closing on shutdown. */
typedef void (*UvBarrierCb)(struct UvBarrierReq *req);
struct UvBarrierReq
{
void *data; /* User data */
bool blocking; /* Whether this barrier should block future writes */
void *data; /* User data */
UvBarrierCb cb; /* Completion callback */
queue queue; /* Queue of reqs triggered by a UvBarrier */
};

struct UvBarrier
{
bool blocking; /* Whether this barrier should block future writes */
queue reqs; /* Queue of UvBarrierReq */
};

/* Submit a barrier request to interrupt the normal flow of append
Expand All @@ -355,10 +366,16 @@ struct UvBarrier
* This API is used to implement truncate and snapshot install operations, which
* need to wait until all pending writes have settled and modify the log state,
* changing the next index. */
int UvBarrier(struct uv *uv,
raft_index next_index,
struct UvBarrier *barrier,
UvBarrierCb cb);
int UvBarrier(struct uv *uv, raft_index next_index, struct UvBarrierReq *req);

/* Trigger a callback for a barrier request in this @barrier. Returns true if a
* callback was triggered, false if there are no more requests to trigger.
* A barrier callback will call UvUnblock, which in turn will try to run the
* next callback, if any, from a barrier request in this barrier. */
bool UvBarrierMaybeTrigger(struct UvBarrier *barrier);

/* Add a Barrier @req to an existing @barrier. */
void UvBarrierAddReq(struct UvBarrier *barrier, struct UvBarrierReq *req);

/* Returns @true if there are no more segments referencing uv->barrier */
bool UvBarrierReady(struct uv *uv);
Expand Down
148 changes: 125 additions & 23 deletions src/uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -754,11 +754,49 @@
return true;
}

int UvBarrier(struct uv *uv,
raft_index next_index,
struct UvBarrier *barrier,
UvBarrierCb cb)
bool UvBarrierMaybeTrigger(struct UvBarrier *barrier)
{
if (!barrier) {
return false;

Check warning on line 760 in src/uv_append.c

View check run for this annotation

Codecov / codecov/patch

src/uv_append.c#L760

Added line #L760 was not covered by tests
}

if (!QUEUE_IS_EMPTY(&barrier->reqs)) {
queue *head;
struct UvBarrierReq *r;
head = QUEUE_HEAD(&barrier->reqs);
QUEUE_REMOVE(head);
r = QUEUE_DATA(head, struct UvBarrierReq, queue);
r->cb(r);
return true;
}

return false;
}

/* Used during cleanup. */
static void uvBarrierTriggerAll(struct UvBarrier *barrier)
{
while (UvBarrierMaybeTrigger(barrier)) {
;
}
}

struct UvBarrier *uvBarrierAlloc(void)
{
struct UvBarrier *barrier;
barrier = RaftHeapMalloc(sizeof(*barrier));
if (!barrier) {
return NULL;

Check warning on line 789 in src/uv_append.c

View check run for this annotation

Codecov / codecov/patch

src/uv_append.c#L789

Added line #L789 was not covered by tests
}
QUEUE_INIT(&barrier->reqs);
return barrier;
}

int UvBarrier(struct uv *uv, raft_index next_index, struct UvBarrierReq *req)
{
/* The barrier to attach to. */
struct UvBarrier *barrier = NULL;
struct uvAliveSegment *segment = NULL;
queue *head;

assert(!uv->closing);
Expand All @@ -768,23 +806,61 @@

/* Arrange for all open segments not already involved in other barriers to
* be finalized as soon as their append requests get completed and mark them
* as involved in this specific barrier request. */
* as involved in this specific barrier request. */
QUEUE_FOREACH (head, &uv->append_segments) {
struct uvAliveSegment *segment;
segment = QUEUE_DATA(head, struct uvAliveSegment, queue);
if (segment->barrier != NULL) {
/* If a non-blocking barrier precedes this blocking request, we want
* to also block all future writes. */
if (req->blocking) {
segment->barrier->blocking = true;
}
continue;
}

if (!barrier) {
barrier = uvBarrierAlloc();
if (!barrier) {
return RAFT_NOMEM;

Check warning on line 824 in src/uv_append.c

View check run for this annotation

Codecov / codecov/patch

src/uv_append.c#L824

Added line #L824 was not covered by tests
}
/* And add the request to the barrier. */
UvBarrierAddReq(barrier, req);
}
segment->barrier = barrier;

if (segment == uvGetCurrentAliveSegment(uv)) {
uvFinalizeCurrentAliveSegmentOnceIdle(uv);
continue;
}
segment->finalize = true;
}

barrier->cb = cb;
/* Unable to attach to a segment, because all segments are involved in a
* barrier, or there are no segments. */
if (barrier == NULL) {
/* Attach req to last segment barrier. */
if (segment != NULL) {
barrier = segment->barrier;
/* There is no segment, attach to uv->barrier. */
} else if (uv->barrier != NULL) {
barrier = uv->barrier;

Check warning on line 846 in src/uv_append.c

View check run for this annotation

Codecov / codecov/patch

src/uv_append.c#L846

Added line #L846 was not covered by tests
/* There is no uv->barrier, make new one. */
} else {
barrier = uvBarrierAlloc();
if (!barrier) {
return RAFT_NOMEM;

Check warning on line 851 in src/uv_append.c

View check run for this annotation

Codecov / codecov/patch

src/uv_append.c#L851

Added line #L851 was not covered by tests
}
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
}
UvBarrierAddReq(barrier, req);
}

/* Let's not continue writing new entries if something down the line
* asked us to stop writing. */
if (uv->barrier != NULL && req->blocking) {
uv->barrier->blocking = true;
}
cole-miller marked this conversation as resolved.
Show resolved Hide resolved

assert(barrier != NULL);
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
if (uv->barrier == NULL) {
uv->barrier = barrier;
/* If there's no pending append-related activity, we can fire the
Expand All @@ -794,7 +870,8 @@
if (QUEUE_IS_EMPTY(&uv->append_segments) &&
QUEUE_IS_EMPTY(&uv->finalize_reqs) &&
uv->finalize_work.data == NULL) {
barrier->cb(barrier);
/* Not interested in return value. */
UvBarrierMaybeTrigger(barrier);
}
}

Expand All @@ -803,8 +880,16 @@

void UvUnblock(struct uv *uv)
{
tracef("uv unblock");
tracef("clear uv barrier");
/* First fire all pending barrier requests. Unblock will be called again
* when that request's callback is fired. */
if (UvBarrierMaybeTrigger(uv->barrier)) {
tracef("UvUnblock triggered barrier request callback.");
return;
}

/* All requests in barrier are finished. */
tracef("UvUnblock queue empty");
RaftHeapFree(uv->barrier);
uv->barrier = NULL;
if (uv->closing) {
uvMaybeFireCloseCb(uv);
Expand All @@ -819,6 +904,15 @@
}
}

void UvBarrierAddReq(struct UvBarrier *barrier, struct UvBarrierReq *req)
{
assert(barrier != NULL);
assert(req != NULL);
/* Once there's a blocking req, this barrier becomes blocking. */
barrier->blocking |= req->blocking;
QUEUE_PUSH(&barrier->reqs, &req->queue);
}

/* Fire all pending barrier requests, the barrier callback will notice that
* we're closing and abort there. */
static void uvBarrierClose(struct uv *uv)
Expand All @@ -830,14 +924,13 @@
QUEUE_FOREACH (head, &uv->append_segments) {
struct uvAliveSegment *segment;
segment = QUEUE_DATA(head, struct uvAliveSegment, queue);
if (segment->barrier != NULL && segment->barrier != barrier) {
if (segment->barrier != NULL && segment->barrier != barrier &&
segment->barrier != uv->barrier) {
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
barrier = segment->barrier;
if (barrier->cb != NULL) {
barrier->cb(barrier);
}
if (segment->barrier == uv->barrier) {
uv->barrier = NULL;
}
/* Fire all barrier cb's, this is safe because the barrier cb exits
* early when uv->closing is true. */
uvBarrierTriggerAll(barrier);
RaftHeapFree(barrier);
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
}
/* The segment->barrier field is used:
*
Copy link
Contributor

@cole-miller cole-miller Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to interpret this (pre-existing) comment.

Expand All @@ -857,13 +950,22 @@
segment->barrier = NULL;
}

/* There might still still be a current barrier set on uv->barrier, meaning
/* There might still be a current barrier set on uv->barrier, meaning
* that the open segment it was associated with has started to be finalized
* and is not anymore in the append_segments queue. Let's cancel that
* too. */
if (uv->barrier != NULL && uv->barrier->cb != NULL) {
uv->barrier->cb(uv->barrier);
uv->barrier = NULL;
* and is not anymore in the append_segments queue. Let's cancel all
* untriggered barrier request callbacks too. */
if (uv->barrier != NULL) {
uvBarrierTriggerAll(uv->barrier);
/* Clear uv->barrier if there's no active work on the thread pool. When
* the work on the threadpool finishes, UvUnblock will notice
* we're closing, clear and free uv->barrier and call
* uvMaybeFireCloseCb. UnUnblock will not try to fire anymore barrier
* request callbacks because they were triggered in the line above. */
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
if (uv->snapshot_put_work.data == NULL &&
uv->truncate_work.data == NULL) {
RaftHeapFree(uv->barrier);
uv->barrier = NULL;
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/uv_finalize.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ static void uvFinalizeAfterWorkCb(uv_work_t *work, int status)
{
struct uvDyingSegment *segment = work->data;
struct uv *uv = segment->uv;
tracef("uv finalize after work cb status:%d", status);
tracef("uv finalize after work segment %p cb status:%d", (void *)segment,
status);
queue *head;
int rv;

Expand All @@ -90,9 +91,8 @@ static void uvFinalizeAfterWorkCb(uv_work_t *work, int status)
* barrier to unblock or if we are done closing. */
if (QUEUE_IS_EMPTY(&uv->finalize_reqs)) {
tracef("unblock barrier or close");
if (uv->barrier != NULL && UvBarrierReady(uv) &&
uv->barrier->cb != NULL) {
uv->barrier->cb(uv->barrier);
if (uv->barrier != NULL && UvBarrierReady(uv)) {
UvBarrierMaybeTrigger(uv->barrier);
}
uvMaybeFireCloseCb(uv);
return;
Expand Down
8 changes: 4 additions & 4 deletions src/uv_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ struct uvSnapshotPut
} meta;
char errmsg[RAFT_ERRMSG_BUF_SIZE];
int status;
struct UvBarrier barrier;
struct UvBarrierReq barrier;
};

struct uvSnapshotGet
Expand Down Expand Up @@ -603,7 +603,7 @@ static void uvSnapshotPutStart(struct uvSnapshotPut *put)
}
}

static void uvSnapshotPutBarrierCb(struct UvBarrier *barrier)
static void uvSnapshotPutBarrierCb(struct UvBarrierReq *barrier)
{
/* Ensure that we don't invoke this callback more than once. */
barrier->cb = NULL;
Expand All @@ -613,7 +613,6 @@ static void uvSnapshotPutBarrierCb(struct UvBarrier *barrier)
}

struct uv *uv = put->uv;
assert((barrier->blocking && put->trailing == 0) || !barrier->blocking);
put->barrier.data = NULL;
/* If we're closing, abort the request. */
if (uv->closing) {
Expand Down Expand Up @@ -659,6 +658,7 @@ int UvSnapshotPut(struct raft_io *io,
put->trailing = trailing;
put->barrier.data = put;
put->barrier.blocking = trailing == 0;
put->barrier.cb = uvSnapshotPutBarrierCb;

req->cb = cb;

Expand Down Expand Up @@ -692,7 +692,7 @@ int UvSnapshotPut(struct raft_io *io,
* and we don't change append_next_index. */
next_index =
(trailing == 0) ? (snapshot->index + 1) : uv->append_next_index;
rv = UvBarrier(uv, next_index, &put->barrier, uvSnapshotPutBarrierCb);
rv = UvBarrier(uv, next_index, &put->barrier);
if (rv != 0) {
goto err_after_configuration_encode;
}
Expand Down
8 changes: 5 additions & 3 deletions src/uv_truncate.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
struct uvTruncate
{
struct uv *uv;
struct UvBarrier barrier;
struct UvBarrierReq barrier;
raft_index index;
int status;
};
Expand Down Expand Up @@ -118,7 +118,7 @@ static void uvTruncateAfterWorkCb(uv_work_t *work, int status)
UvUnblock(uv);
}

static void uvTruncateBarrierCb(struct UvBarrier *barrier)
static void uvTruncateBarrierCb(struct UvBarrierReq *barrier)
{
struct uvTruncate *truncate = barrier->data;
struct uv *uv = truncate->uv;
Expand All @@ -132,6 +132,7 @@ static void uvTruncateBarrierCb(struct UvBarrier *barrier)
if (uv->closing) {
tracef("closing => don't truncate");
RaftHeapFree(truncate);
uvMaybeFireCloseCb(uv);
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
return;
}

Expand Down Expand Up @@ -176,10 +177,11 @@ int UvTruncate(struct raft_io *io, raft_index index)
truncate->index = index;
truncate->barrier.data = truncate;
truncate->barrier.blocking = true;
truncate->barrier.cb = uvTruncateBarrierCb;

/* Make sure that we wait for any inflight writes to finish and then close
* the current segment. */
rv = UvBarrier(uv, index, &truncate->barrier, uvTruncateBarrierCb);
rv = UvBarrier(uv, index, &truncate->barrier);
if (rv != 0) {
goto err_after_req_alloc;
}
Expand Down
Loading