Skip to content

Commit

Permalink
Refactor: Make CountedDelete simpler
Browse files Browse the repository at this point in the history
This commit does a few notable things:
- Counted deletes now use the lastFlush promise instead of their own fulfillers. This does mean that counted delete promises will resolve later in some rare cases like a counted delete and a delete all being requested within the same scheduled flush or a counted delete being issued as part of a transaction. The goal here is to get to a point where *all* operations just have to wait for the flush promise after they are scheduled.
- Counted deletes now maintain their own list of participating entries (somewhat like delete all). This means that we no longer have to "inherit" counted deletes when entries are overridden or do iterator tracking when building their flush batches.
  • Loading branch information
bcaimano authored and MellowYarker committed Apr 18, 2024
1 parent e115c39 commit c38d951
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 182 deletions.
35 changes: 19 additions & 16 deletions src/workerd/io/actor-cache-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ KJ_TEST("ActorCache deleteAll()") {
{
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["bar", "grault", "baz"]))
.withParams(CAPNP(keys = ["bar", "baz", "grault"]))
.thenReturn(CAPNP(numDeleted = 2));
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["garply"]))
Expand All @@ -766,8 +766,6 @@ KJ_TEST("ActorCache deleteAll()") {
mockTxn->expectDropped(ws);
}

KJ_ASSERT(deletePromise.wait(ws) == 2);

mockStorage->expectCall("deleteAll", ws)
.thenReturn(CAPNP(numDeleted = 2));

Expand All @@ -782,6 +780,8 @@ KJ_TEST("ActorCache deleteAll()") {
.thenReturn(CAPNP());
}

KJ_ASSERT(deletePromise.wait(ws) == 2);

KJ_ASSERT(expectCached(test.get("foo")) == nullptr);
KJ_ASSERT(expectCached(test.get("baz")) == nullptr);
KJ_ASSERT(expectCached(test.get("corge")) == nullptr);
Expand Down Expand Up @@ -862,7 +862,7 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") {
{
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["bar", "grault", "baz"]))
.withParams(CAPNP(keys = ["bar", "baz", "grault"]))
.thenReturn(CAPNP(numDeleted = 2));
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["garply"]))
Expand All @@ -875,8 +875,6 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") {
mockTxn->expectDropped(ws);
}

KJ_ASSERT(deletePromise.wait(ws) == 2);

// Do another deleteAll() before the first one is done.
auto deleteAllB = test.cache.deleteAll({});

Expand All @@ -886,7 +884,6 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") {
// Now finish it.
mockStorage->expectCall("deleteAll", ws)
.thenReturn(CAPNP(numDeleted = 2));

KJ_ASSERT(deleteAllA.count.wait(ws) == 2);
KJ_ASSERT(deleteAllB.count.wait(ws) == 0);

Expand All @@ -896,6 +893,7 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") {
.withParams(CAPNP(entries = [(key = "fred", value = "2323")]))
.thenReturn(CAPNP());
}
KJ_ASSERT(deletePromise.wait(ws) == 2);
}

KJ_TEST("ActorCache coalescing") {
Expand Down Expand Up @@ -950,7 +948,7 @@ KJ_TEST("ActorCache coalescing") {
.withParams(CAPNP(keys = ["grault"]))
.thenReturn(CAPNP(numDeleted = 0));
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["garply", "fred", "waldo"]))
.withParams(CAPNP(keys = ["garply", "waldo", "fred"]))
.thenReturn(CAPNP(numDeleted = 2));
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["bar", "foo"]))
Expand Down Expand Up @@ -1172,10 +1170,9 @@ KJ_TEST("ActorCache flush retry") {
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("corge"))) == "555");
KJ_ASSERT(expectCached(test.get("grault")) == nullptr);

// Although the transaction didn't complete, the delete did, and so it resolves.
KJ_ASSERT(promise1.wait(ws) == 1);

// The second delete had failed, though, so is still outstanding.
// Although the counted delete succeeded, the promise will not resolve until our flush succeeds!
KJ_ASSERT(!promise1.poll(ws));
// The second delete had failed, though; it is still outstanding.
KJ_ASSERT(!promise2.poll(ws));

// The transaction will be retried, with the updated puts and deletes.
Expand All @@ -1188,11 +1185,14 @@ KJ_TEST("ActorCache flush retry") {
// last time, because it hasn't been further overwritten, and that delete from last time
// wasn't actually committed.
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["grault", "corge"]))
.withParams(CAPNP(keys = ["quux"]))
.thenReturn(CAPNP()); // count ignored because we got it on the first try!
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["corge", "grault"]))
.thenReturn(CAPNP(numDeleted = 2));
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["quux", "baz"]))
.thenReturn(CAPNP(numDeleted = 1234)); // count ignored
.withParams(CAPNP(keys = ["baz"]))
.thenReturn(CAPNP(numDeleted = 1234));
mockTxn->expectCall("put", ws)
.withParams(CAPNP(entries = [(key = "foo", value = "123"),
(key = "bar", value = "654"),
Expand All @@ -1214,6 +1214,9 @@ KJ_TEST("ActorCache flush retry") {

// Second delete finished this time.
KJ_ASSERT(promise2.wait(ws) == 2);

// Our flush has succeeded and we've obtained our count!
KJ_ASSERT(promise1.wait(ws) == 1);
}

KJ_TEST("ActorCache output gate blocked during flush") {
Expand Down Expand Up @@ -3496,7 +3499,7 @@ KJ_TEST("ActorCache listReverse() retry on failure") {
// =======================================================================================
// LRU purge

constexpr size_t ENTRY_SIZE = 128;
constexpr size_t ENTRY_SIZE = 120;
KJ_TEST("ActorCache LRU purge") {
ActorCacheTest test({.softLimit = 1 * ENTRY_SIZE});
auto& ws = test.ws;
Expand Down

0 comments on commit c38d951

Please sign in to comment.