Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
WIP FIXUP - Multiple requests in barrier
Browse files Browse the repository at this point in the history
Signed-off-by: Mathieu Borderé <mathieu.bordere@canonical.com>
  • Loading branch information
Mathieu Borderé committed Jul 4, 2023
1 parent b74369b commit de93e52
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 18 deletions.
8 changes: 4 additions & 4 deletions src/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,20 +328,20 @@ int UvAppend(struct raft_io *io,
raft_io_append_cb cb);

/* Pause request object and callback. */
struct UvBarrier;
typedef void (*UvBarrierCb)(struct UvBarrier *req);
struct UvBarrierReq;
typedef void (*UvBarrierCb)(struct UvBarrierReq *req);
struct UvBarrierReq
{
bool blocking; /* Whether this barrier should block future writes */
void *data; /* User data */
UvBarrierCb cb; /* Completion callback */
queue queue;
};

struct UvBarrier
{
bool blocking; /* Whether this barrier should block future writes */
void *data; /* User data */
UvBarrierCb cb; /* Completion callback */
queue reqs; /* Queue of UvBarrierReq */
};

/* Submit a barrier request to interrupt the normal flow of append
Expand Down
33 changes: 23 additions & 10 deletions src/uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -760,9 +760,24 @@ void UvBarrierTrigger(struct UvBarrier *barrier)
return;
}

if (barrier->cb) {
barrier->cb(barrier);
while (!QUEUE_IS_EMPTY(&barrier->reqs)) {
queue *head;
struct UvBarrierReq *r;
head = QUEUE_HEAD(&barrier->reqs);
QUEUE_REMOVE(head);
r = QUEUE_DATA(head, struct UvBarrierReq, queue);
r->cb(r);
}
}

struct UvBarrier *uvBarrierAlloc(void) {
struct UvBarrier *barrier;
barrier = RaftHeapMalloc(sizeof(*barrier));
if (!barrier) {
return NULL;
}
QUEUE_INIT(&barrier->reqs);
return barrier;
}

int UvBarrier(struct uv *uv,
Expand Down Expand Up @@ -794,7 +809,7 @@ int UvBarrier(struct uv *uv,
}

if (!barrier) {
barrier = RaftHeapMalloc(sizeof(*barrier));
barrier = uvBarrierAlloc();
if (!barrier) {
return RAFT_NOMEM;
}
Expand All @@ -821,7 +836,7 @@ int UvBarrier(struct uv *uv,
barrier = uv->barrier;
/* There is no uv->barrier, make new one. */
} else {
barrier = RaftHeapMalloc(sizeof(*barrier));
barrier = uvBarrierAlloc();
if (!barrier) {
return RAFT_NOMEM;
}
Expand Down Expand Up @@ -875,13 +890,11 @@ void UvUnblock(struct uv *uv)

void UvBarrierAddReq(struct UvBarrier *barrier, struct UvBarrierReq *req)
{
if (!barrier) {
return;
}

assert(barrier != NULL);
assert(req != NULL);
/* Once there's a blocking req, this barrier becomes blocking. */
barrier->blocking |= req->blocking;
barrier->cb = req->cb;
barrier->data = req->data;
QUEUE_PUSH(&barrier->reqs, &req->queue);
}

/* Fire all pending barrier requests, the barrier callback will notice that
Expand Down
2 changes: 1 addition & 1 deletion src/uv_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ static void uvSnapshotPutStart(struct uvSnapshotPut *put)
}
}

static void uvSnapshotPutBarrierCb(struct UvBarrier *barrier)
static void uvSnapshotPutBarrierCb(struct UvBarrierReq *barrier)
{
/* Ensure that we don't invoke this callback more than once. */
barrier->cb = NULL;
Expand Down
2 changes: 1 addition & 1 deletion src/uv_truncate.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ static void uvTruncateAfterWorkCb(uv_work_t *work, int status)
UvUnblock(uv);
}

static void uvTruncateBarrierCb(struct UvBarrier *barrier)
static void uvTruncateBarrierCb(struct UvBarrierReq *barrier)
{
struct uvTruncate *truncate = barrier->data;
struct uv *uv = truncate->uv;
Expand Down
4 changes: 2 additions & 2 deletions test/integration/test_uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ struct barrierData
struct uv *uv;
};

static void barrierCbCompareCounter(struct UvBarrier *barrier)
static void barrierCbCompareCounter(struct UvBarrierReq *barrier)
{
struct barrierData *bd = barrier->data;
munit_assert_false(bd->done);
Expand Down Expand Up @@ -831,7 +831,7 @@ static void longAfterWorkCb(uv_work_t *work, int status)
free(work);
}

static void barrierCbLongWork(struct UvBarrier *barrier)
static void barrierCbLongWork(struct UvBarrierReq *barrier)
{
struct barrierData *bd = barrier->data;
munit_assert_false(bd->done);
Expand Down

0 comments on commit de93e52

Please sign in to comment.