From c38d951a7ce03a5570a7056177f41ce22848833f Mon Sep 17 00:00:00 2001 From: Ben Caimano Date: Thu, 14 Sep 2023 13:13:56 -0400 Subject: [PATCH] Refactor: Make CountedDelete simpler This commit does a few notable things: - Counted deletes now use the lastFlush promise instead of their own fulfillers. This does mean that counted delete promises will resolve later in some rare cases like a counted delete and a delete all being requested within the same scheduled flush or a counted delete being issued as part of a transaction. The goal here is to get to a point where *all* operations just have to wait for the flush promise after they are scheduled. - Counted deletes now maintain their own list of participating entries (somewhat like delete all). This means that we no longer have to "inherit" counted deletes when entries are overridden or do iterator tracking when building their flush batches. --- src/workerd/io/actor-cache-test.c++ | 35 ++-- src/workerd/io/actor-cache.c++ | 305 ++++++++++++++-------------- src/workerd/io/actor-cache.h | 83 ++++++-- 3 files changed, 241 insertions(+), 182 deletions(-) diff --git a/src/workerd/io/actor-cache-test.c++ b/src/workerd/io/actor-cache-test.c++ index 366edf50fc4..500182baeaa 100644 --- a/src/workerd/io/actor-cache-test.c++ +++ b/src/workerd/io/actor-cache-test.c++ @@ -750,7 +750,7 @@ KJ_TEST("ActorCache deleteAll()") { { auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction"); mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["bar", "grault", "baz"])) + .withParams(CAPNP(keys = ["bar", "baz", "grault"])) .thenReturn(CAPNP(numDeleted = 2)); mockTxn->expectCall("delete", ws) .withParams(CAPNP(keys = ["garply"])) @@ -766,8 +766,6 @@ KJ_TEST("ActorCache deleteAll()") { mockTxn->expectDropped(ws); } - KJ_ASSERT(deletePromise.wait(ws) == 2); - mockStorage->expectCall("deleteAll", ws) .thenReturn(CAPNP(numDeleted = 2)); @@ -782,6 +780,8 @@ KJ_TEST("ActorCache deleteAll()") { .thenReturn(CAPNP()); } + KJ_ASSERT(deletePromise.wait(ws) == 2); + KJ_ASSERT(expectCached(test.get("foo")) == nullptr); KJ_ASSERT(expectCached(test.get("baz")) == nullptr); KJ_ASSERT(expectCached(test.get("corge")) == nullptr); @@ -862,7 +862,7 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") { { auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction"); mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["bar", "grault", "baz"])) + .withParams(CAPNP(keys = ["bar", "baz", "grault"])) .thenReturn(CAPNP(numDeleted = 2)); mockTxn->expectCall("delete", ws) .withParams(CAPNP(keys = ["garply"])) @@ -875,8 +875,6 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") { mockTxn->expectDropped(ws); } - KJ_ASSERT(deletePromise.wait(ws) == 2); - // Do another deleteAll() before the first one is done. auto deleteAllB = test.cache.deleteAll({}); @@ -886,7 +884,6 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") { // Now finish it. mockStorage->expectCall("deleteAll", ws) .thenReturn(CAPNP(numDeleted = 2)); - KJ_ASSERT(deleteAllA.count.wait(ws) == 2); KJ_ASSERT(deleteAllB.count.wait(ws) == 0); @@ -896,6 +893,7 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") { .withParams(CAPNP(entries = [(key = "fred", value = "2323")])) .thenReturn(CAPNP()); } + KJ_ASSERT(deletePromise.wait(ws) == 2); } KJ_TEST("ActorCache coalescing") { @@ -950,7 +948,7 @@ KJ_TEST("ActorCache coalescing") { .withParams(CAPNP(keys = ["grault"])) .thenReturn(CAPNP(numDeleted = 0)); mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["garply", "fred", "waldo"])) + .withParams(CAPNP(keys = ["garply", "waldo", "fred"])) .thenReturn(CAPNP(numDeleted = 2)); mockTxn->expectCall("delete", ws) .withParams(CAPNP(keys = ["bar", "foo"])) @@ -1172,10 +1170,9 @@ KJ_TEST("ActorCache flush retry") { KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("corge"))) == "555"); KJ_ASSERT(expectCached(test.get("grault")) == nullptr); - // Although the transaction didn't complete, the delete did, and so it resolves. - KJ_ASSERT(promise1.wait(ws) == 1); - - // The second delete had failed, though, so is still outstanding. + // Although the counted delete succeeded, the promise will not resolve until our flush succeeds! + KJ_ASSERT(!promise1.poll(ws)); + // The second delete had failed, though; it is still outstanding. KJ_ASSERT(!promise2.poll(ws)); // The transaction will be retried, with the updated puts and deletes. @@ -1188,11 +1185,14 @@ KJ_TEST("ActorCache flush retry") { // last time, because it hasn't been further overwritten, and that delete from last time // wasn't actually committed. mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["grault", "corge"])) + .withParams(CAPNP(keys = ["quux"])) + .thenReturn(CAPNP()); // count ignored because we got it on the first try! + mockTxn->expectCall("delete", ws) + .withParams(CAPNP(keys = ["corge", "grault"])) .thenReturn(CAPNP(numDeleted = 2)); mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["quux", "baz"])) - .thenReturn(CAPNP(numDeleted = 1234)); // count ignored + .withParams(CAPNP(keys = ["baz"])) + .thenReturn(CAPNP(numDeleted = 1234)); mockTxn->expectCall("put", ws) .withParams(CAPNP(entries = [(key = "foo", value = "123"), (key = "bar", value = "654"), @@ -1214,6 +1214,9 @@ KJ_TEST("ActorCache flush retry") { // Second delete finished this time. KJ_ASSERT(promise2.wait(ws) == 2); + + // Our flush has succeeded and we've obtained our count! + KJ_ASSERT(promise1.wait(ws) == 1); } KJ_TEST("ActorCache output gate blocked during flush") { @@ -3496,7 +3499,7 @@ KJ_TEST("ActorCache listReverse() retry on failure") { // ======================================================================================= // LRU purge -constexpr size_t ENTRY_SIZE = 128; +constexpr size_t ENTRY_SIZE = 120; KJ_TEST("ActorCache LRU purge") { ActorCacheTest test({.softLimit = 1 * ENTRY_SIZE}); auto& ws = test.ws; diff --git a/src/workerd/io/actor-cache.c++ b/src/workerd/io/actor-cache.c++ index 0609b61f871..99a9057ddc1 100644 --- a/src/workerd/io/actor-cache.c++ +++ b/src/workerd/io/actor-cache.c++ @@ -1881,6 +1881,18 @@ kj::Maybe> ActorCache::setAlarm(kj::Maybe newAlarmTi return getBackpressure(); } +namespace { +template +kj::OneOf, kj::PromiseForResult> mapPromise( + kj::Maybe> maybePromise, F&& f) { + KJ_IF_SOME(promise, maybePromise) { + return promise.then(kj::fwd(f)); + } else { + return kj::fwd(f)(); + } +} +} // namespace + kj::OneOf> ActorCache::delete_(Key key, WriteOptions options) { options.noCache = options.noCache || lru.options.noCache; requireNotTerminal(); @@ -1893,30 +1905,17 @@ kj::OneOf> ActorCache::delete_(Key key, WriteOptions opt evictOrOomIfNeeded(lock); } - if (countedDelete->isShared()) { - // The entry kept a reference to `countedDelete`, so it must be waiting on an RPC. Set up a - // fulfiller. - auto paf = kj::newPromiseAndFulfiller(); - countedDelete->resultFulfiller = kj::mv(paf.fulfiller); - KJ_IF_SOME(p, getBackpressure()) { - return p.then([promise = kj::mv(paf.promise)]() mutable { - return promise.then([](uint i) { return i > 0; }); - }); - } else { - return paf.promise.then([](uint i) { return i > 0; }); - } - } else { - // It looks like there was a pre-existing cache entry for this key, so we already know whether - // there was a value to delete. - bool result = countedDelete->countDeleted > 0; - KJ_IF_SOME(p, getBackpressure()) { - return p.then([result]() mutable { - return result; - }); - } else { - return result; - } + auto waiter = kj::heap(*this, kj::addRef(*countedDelete)); + kj::Maybe> maybePromise; + KJ_IF_SOME(p, getBackpressure()) { + // This might be more than one flush but that's okay as long as our state gets taken care of. + maybePromise = countedDelete->forgiveIfFinished(kj::mv(p)); + } else if (countedDelete->entries.size()) { + maybePromise = countedDelete->forgiveIfFinished(lastFlush.addBranch()); } + return mapPromise(kj::mv(maybePromise), [waiter = kj::mv(waiter)]() { + return waiter->getCountedDelete().countDeleted > 0; + }); } kj::OneOf> ActorCache::delete_(kj::Array keys, WriteOptions options) { @@ -1933,30 +1932,17 @@ kj::OneOf> ActorCache::delete_(kj::Array keys, Writ evictOrOomIfNeeded(lock); } - if (countedDelete->isShared()) { - // At least one entry kept a reference to `countedDelete`, so it must be waiting on an RPC. - // Set up a fulfiller. - auto paf = kj::newPromiseAndFulfiller(); - countedDelete->resultFulfiller = kj::mv(paf.fulfiller); - KJ_IF_SOME(p, getBackpressure()) { - return p.then([promise = kj::mv(paf.promise)]() mutable { - return kj::mv(promise); - }); - } else { - return kj::mv(paf.promise); - } - } else { - // It looks like the count of deletes is fully known based on cache content, so we don't need - // to wait. - uint result = countedDelete->countDeleted; - KJ_IF_SOME(p, getBackpressure()) { - return p.then([result]() mutable { - return result; - }); - } else { - return result; - } + auto waiter = kj::heap(*this, kj::addRef(*countedDelete)); + kj::Maybe> maybePromise; + KJ_IF_SOME(p, getBackpressure()) { + // This might be more than one flush but that's okay as long as our state gets taken care of. + maybePromise = countedDelete->forgiveIfFinished(kj::mv(p)); + } else if (countedDelete->entries.size()) { + maybePromise = countedDelete->forgiveIfFinished(lastFlush.addBranch()); } + return mapPromise(kj::mv(maybePromise), [waiter = kj::mv(waiter)]() { + return waiter->getCountedDelete().countDeleted; + }); } kj::Own ActorCache::startTransaction() { @@ -2031,8 +2017,9 @@ ActorCache::DeleteAllResults ActorCache::deleteAll(WriteOptions options) { }; } -void ActorCache::putImpl(Lock& lock, kj::Own newEntry, - const WriteOptions& options, kj::Maybe countedDelete) { +void ActorCache::putImpl( + Lock& lock, kj::Own newEntry, const WriteOptions& options, + kj::Maybe maybeCountedDelete) { auto& map = currentValues.get(lock); auto ordered = map.ordered(); @@ -2053,8 +2040,10 @@ void ActorCache::putImpl(Lock& lock, kj::Own newEntry, return; } - KJ_IF_SOME(c, countedDelete) { - // Overwrote an entry that was in cache, so we can count it now. + KJ_IF_SOME(c, maybeCountedDelete) { + // Overwrote an entry that was in cache, so we can count it now. Note that because we + // are PRESENT, we will not be added to the CountedDelete's `entries`, since we only + // do this for UNKNOWN entries! Instead, we'll be part of a regular delete. ++c.countDeleted; } break; @@ -2066,16 +2055,32 @@ void ActorCache::putImpl(Lock& lock, kj::Own newEntry, return; } + if (slot->isCountedDelete) { + // We are overwriting an entry that is slated for a counted delete operation. + // There may be a situation where all the entries associated with a counted delete are + // actually successfully deleted (and we get the count), but the transaction the deletes + // execute within fails. + // + // Since we are currently overwriting the Entry, we might as well inform the + // `CountedDelete` that this Entry has since been overwritten. Then, if we hit the case + // described above, we won't need to include this Entry in a subsequent counted delete + // retry, since we already have the count AND the Entry has been overwritten. + // + // For more details, see how we filter the entries to be deleted for a CountedDeleteFlush + // as part of a flush. + slot->overwritingCountedDelete = true; + } // We don't have to worry about the counted delete since we were already deleted. break; } case EntryValueStatus::UNKNOWN: { // This was a list end marker, we should just overwrite it. - KJ_IF_SOME(c, countedDelete) { + KJ_IF_SOME(c, maybeCountedDelete) { // Despite an entry being present, we don't know if the key exists, because it's just an // UNKNOWN entry. So we will still have to arrange to count the delete later. - newEntry->countedDelete = kj::addRef(c); + newEntry->isCountedDelete = true; + c.entries.add(kj::atomicAddRef(*newEntry)); } break; } @@ -2086,16 +2091,6 @@ void ActorCache::putImpl(Lock& lock, kj::Own newEntry, // Inherit gap state. newEntry->gapIsKnownEmpty = slot->gapIsKnownEmpty; - if (slot->isDirty()) { - // Entry may have `countedDelete` indicating we're still waiting to get a count from a - // previous delete operation. If so, we'll need to inherit it in case that delete operation - // fails and we end up retrying. Note that the new entry could be a positive entry rather - // than a negative one (a `put()` rather than a `delete()`). That is OK -- in `flushImpl()` - // we will still see the presence of `countedDelete` and realize we have to issue a delete - // on the key before we issue a put, just for the sake of counting it. - newEntry->countedDelete = kj::mv(slot->countedDelete); - } - // Swap in the new entry. removeEntry(lock, *slot); @@ -2121,8 +2116,9 @@ void ActorCache::putImpl(Lock& lock, kj::Own newEntry, // inserting a new entry, to avoid repeating the lookup. auto& slot = map.insert(kj::mv(newEntry)); slot->gapIsKnownEmpty = previousGapKnownEmpty; - KJ_IF_SOME(c, countedDelete) { - slot->countedDelete = kj::addRef(c); + KJ_IF_SOME(c, maybeCountedDelete) { + slot->isCountedDelete = true; + c.entries.add(kj::atomicAddRef(*slot)); } addToDirtyList(*slot); } @@ -2325,56 +2321,78 @@ kj::Promise ActorCache::startFlushTransaction() { batch.wordCount += words; }; - kj::Vector countedDeleteFlushes; + kj::Vector countedDeleteFlushes(countedDeletes.size()); + for (auto countedDelete : countedDeletes) { + if (countedDelete->isFinished) { + // This countedDelete has already be executed, but we haven't delivered the final count to + // the waiter yet. We'll skip it here since the destructor of CountedDeleteWaiter should + // eventually remove this entry from `countedDeletes`. + continue; + } + + auto& countedDeleteFlush = countedDeleteFlushes.add(CountedDeleteFlush{ + .countedDelete = kj::addRef(*countedDelete), + }); + // We might have successfully deleted these entries, but had the broader transaction fail. + // In that case, we might have entries that have since been overwritten, and which no longer + // need to be scheduled for deletion. + kj::Vector> entriesToDelete(countedDelete->entries.size()); + for (auto& entry : countedDelete->entries) { + if (entry->overwritingCountedDelete && countedDelete->completedInTransaction) { + // Not only is this a retry, but we have since modified the entry with a put(). + // Since we already have the delete count, we don't need to delete this entry again. + continue; + } + entriesToDelete.add(kj::mv(entry)); + } + + // Now that we've filtered our entries down to only those that need to be deleted, + // we need to overwrite the CountedDelete's `entries`. + countedDelete->entries = kj::mv(entriesToDelete); + for (auto& entry : countedDelete->entries) { + // A delete() call on this key is waiting to find out if the key existed in storage. Since + // each delete() call needs to return the count of keys deleted, we must issue + // corresponding delete calls to storage with the same batching, so that storage returns + // the right counts to us. We can't batch all the deletes into a single delete operation + // since then we'd only get a single count back and we wouldn't know how to split that up + // to satisfy all the callers. + // + // Note that a subsequent put() call could have set entry.value to non-null, but we still + // have to perform the delete first in order to determine the count that the delete() call + // should return. + // + // There is a minor quirk here because the counted delete set does not distinguish between + // before and after a delete all. That's actually okay because we should be able to + // immediately resolve counted deletes requested after a delete all (either the values are + // absent or they have a dirty put). This might also be an issue if we respected noCache for + // delete all's dummy value, but we do not. + entry->flushStarted = true; + + auto keySizeInWords = bytesToWordsRoundUp(entry->key.size()); + auto words = keySizeInWords + 1; + includeInCurrentBatch(countedDeleteFlush.batches, words); + } + } auto countEntry = [&](Entry& entry) { // Counts up the number of operations and RPC message sizes we'll need to cover this entry. + if (entry.isCountedDelete) { + // We should have already put this entry into a batch, so just skip it. + KJ_ASSERT(entry.flushStarted); + return; + } + entry.setFlushing(); auto keySizeInWords = bytesToWordsRoundUp(entry.key.size()); - KJ_IF_SOME(c, entry.countedDelete) { - if (c->resultFulfiller->isWaiting()) { - // A delete() call on this key is waiting to find out if the key existed in storage. Since - // each delete() call needs to return the count of keys deleted, we must issue - // corresponding delete calls to storage with the same batching, so that storage returns - // the right counts to us. We can't batch all the deletes into a single delete operation - // since then we'd only get a single count back and we wouldn't know how to split that up - // to satisfy all the callers. - // - // Note that a subsequent put() call could have set entry.value to non-null, but we still - // have to perform the delete first in order to determine the count that the delete() call - // should return. - CountedDeleteFlush* countedDeleteFlush; - KJ_IF_SOME(i, c->flushIndex) { - countedDeleteFlush = &countedDeleteFlushes[i]; - } else { - c->flushIndex = countedDeleteFlushes.size(); - countedDeleteFlush = &countedDeleteFlushes.add(CountedDeleteFlush{ - .countedDelete = kj::addRef(*c).attach(kj::defer([&cd = *c]() mutable { - // Note that this is attached to the `Own`, not the value. We actually want this, - // because it allows us to reset the `flushIndex` when *this flush* finishes, - // regardless of if we need to retry. - cd.flushIndex = kj::none; - })), - }); - } - auto words = keySizeInWords + 1; - includeInCurrentBatch(countedDeleteFlush->batches, words); - countedDeleteFlush->entries.add(kj::atomicAddRef(entry)); - } else { - // No one is waiting on this `CountedDelete` anymore so we can just drop it. - entry.countedDelete = kj::none; - } - } - KJ_IF_SOME(v, entry.getValuePtr()) { auto words = keySizeInWords + bytesToWordsRoundUp(v.size()) + capnp::sizeInWords(); includeInCurrentBatch(putFlush.batches, words); putFlush.entries.add(kj::atomicAddRef(entry)); - } else if (entry.countedDelete == kj::none) { + } else { auto words = keySizeInWords + 1; includeInCurrentBatch(mutedDeleteFlush.batches, words); mutedDeleteFlush.entries.add(kj::atomicAddRef(entry)); @@ -2547,7 +2565,7 @@ kj::Promise ActorCache::flushImpl(uint retryCount) { // will have inherited the `countedDelete`, and will still be DIRTY at this point. That is // OK, because the `countedDelete`'s fulfiller will have already been fulfilled, and // therefore the next flushImpl() will see that it is obsolete and discard it. - entry.countedDelete = kj::none; + entry.isCountedDelete = false; dirtyList.remove(entry); if (entry.noCache) { @@ -2666,35 +2684,26 @@ kj::Promise ActorCache::flushImplUsingSingleCountedDelete(CountedDeleteFlu KJ_ASSERT(countedFlush.batches.size() == 1); auto& batch = countedFlush.batches[0]; + auto countedDelete = kj::mv(countedFlush.countedDelete); KJ_ASSERT(batch.wordCount < MAX_ACTOR_STORAGE_RPC_WORDS); - KJ_ASSERT(batch.pairCount == countedFlush.entries.size()); + KJ_ASSERT(batch.pairCount == countedDelete->entries.size()); auto request = storage.deleteRequest(capnp::MessageSize { 4 + batch.wordCount, 0 }); auto listBuilder = request.initKeys(batch.pairCount); - auto entryIt = countedFlush.entries.begin(); + auto entryIt = countedDelete->entries.begin(); for (size_t i = 0; i < batch.pairCount; ++i) { auto& entry = **(entryIt++); listBuilder.set(i, entry.key.asBytes()); } // We're done with the batching instructions, free them before we go async. - countedFlush.entries.clear(); countedFlush.batches.clear(); - auto countedDelete = kj::mv(countedFlush.countedDelete); - try { - auto writeObserver = recordStorageWrite(hooks, clock); - util::DurationExceededLogger logger(clock, 1*kj::SECONDS, "storage operation took longer than expected: counted delete"); - auto response = co_await request.send(); - countedDelete->resultFulfiller->fulfill(response.getNumDeleted()); - } catch (kj::Exception& e) { - if (e.getType() == kj::Exception::Type::DISCONNECTED) { - // This deletion will be retried, so don't touch the fulfiller. - } else { - countedDelete->resultFulfiller->reject(kj::cp(e)); - } - throw kj::mv(e); - } + auto writeObserver = recordStorageWrite(hooks, clock); + util::DurationExceededLogger logger(clock, 1*kj::SECONDS, "storage operation took longer than expected: counted delete"); + auto response = co_await request.send(); + countedDelete->countDeleted += response.getNumDeleted(); + countedDelete->isFinished = true; } kj::Promise ActorCache::flushImplAlarmOnly(DirtyAlarm dirty) { @@ -2753,7 +2762,8 @@ kj::Promise ActorCache::flushImplUsingTxn( auto rpcPuts = kj::heapArrayBuilder(putFlush.batches.size()); for (auto& flush: countedDeleteFlushes) { - auto entryIt = flush.entries.begin(); + auto countedDelete = kj::mv(flush.countedDelete); + auto entryIt = countedDelete->entries.begin(); kj::Vector rpcDeletes; for (auto& batch: flush.batches) { KJ_ASSERT(batch.wordCount < MAX_ACTOR_STORAGE_RPC_WORDS); @@ -2761,19 +2771,20 @@ kj::Promise ActorCache::flushImplUsingTxn( auto request = txn.deleteRequest(capnp::MessageSize { 4 + batch.wordCount, 0 }); auto listBuilder = request.initKeys(batch.pairCount); for (size_t i = 0; i < batch.pairCount; ++i) { - KJ_ASSERT(entryIt != flush.entries.end()); + KJ_ASSERT(entryIt != countedDelete->entries.end()); auto& entry = **(entryIt++); listBuilder.set(i, entry.key.asBytes()); } rpcDeletes.add(kj::mv(request)); } + KJ_ASSERT(entryIt == countedDelete->entries.end()); rpcCountedDeletes.add(RpcCountedDelete{ - .countedDelete = kj::mv(flush.countedDelete), + .countedDelete = kj::mv(countedDelete), .rpcDeletes = rpcDeletes.releaseAsArray(), }); - KJ_ASSERT(entryIt == flush.entries.end()); } + countedDeleteFlushes = nullptr; { auto entryIt = mutedDeleteFlush.entries.begin(); @@ -2791,6 +2802,8 @@ kj::Promise ActorCache::flushImplUsingTxn( } KJ_ASSERT(entryIt == mutedDeleteFlush.entries.end()); } + mutedDeleteFlush.entries.clear(); + mutedDeleteFlush.batches.clear(); { auto entryIt = putFlush.entries.begin(); @@ -2810,13 +2823,8 @@ kj::Promise ActorCache::flushImplUsingTxn( } KJ_ASSERT(entryIt == putFlush.entries.end()); } - - // We're done with the batching instructions, free them before we go async. putFlush.entries.clear(); putFlush.batches.clear(); - mutedDeleteFlush.entries.clear(); - mutedDeleteFlush.batches.clear(); - countedDeleteFlushes = nullptr; // Send all the RPCs. It's important that counted deletes are sent first since they can overlap // with puts. Specifically this can happen if someone does a delete() immediately followed by a @@ -2839,30 +2847,26 @@ kj::Promise ActorCache::flushImplUsingTxn( }); }; + size_t recordsDeleted = 0; for (auto& promise : promises) { - // Reuse `countDeleted` since it's already in a state object anyway. - rpcCountedDelete.countedDelete->countDeleted += co_await promise; + recordsDeleted += co_await promise; } + + // This may be a retry following a successful counted delete within a failed transaction. + // In that case, we don't want to update the count again, since we've already considered it. + if (!rpcCountedDelete.countedDelete->completedInTransaction) { + // We only increment our `countDeleted` if *ALL* the delete batches succeeded. + rpcCountedDelete.countedDelete->countDeleted += recordsDeleted; + } + + // This delete succeeded, but we may need to retry it in some cases, ex. if the transaction fails. + // If we *do* retry after a successful counted delete, we won't want to update our + // `countDeleted` since we already got it. + rpcCountedDelete.countedDelete->completedInTransaction = true; }; + for (auto& rpcCountedDelete: rpcCountedDeletes) { - promises.add(joinCountedDelete(rpcCountedDelete).then( - [&countedDelete = *rpcCountedDelete.countedDelete]() mutable { - // Note that it's OK to trust the delete count even if the transaction ultimately gets rolled - // back, because: - // - We know that nothing else could be concurrently modifying our storage in a way that - // makes the count different on a retry. - // - If retries fail and the flush never completes at all, the output gate will kick in and - // make it impossible for anyone to observe the bogus result. - // HACK: This uses a `kj::mv()` because promise fulfillers require rvalues even for trivially - // copyable types. - countedDelete.resultFulfiller->fulfill(kj::mv(countedDelete.countDeleted)); - }, [&countedDelete = *rpcCountedDelete.countedDelete](kj::Exception&& e) { - if (e.getType() == kj::Exception::Type::DISCONNECTED) { - // This deletion will be retried, so don't touch the fulfiller. - } else { - countedDelete.resultFulfiller->reject(kj::mv(e)); - } - })); + promises.add(joinCountedDelete(rpcCountedDelete)); } for (auto& request: rpcMutedDeletes) { @@ -2923,6 +2927,11 @@ kj::Promise ActorCache::flushImplUsingTxn( promises.add(txn.commitRequest(capnp::MessageSize { 4, 0 }).send().ignoreResult()); co_await kj::joinPromises(promises.finish()); + for (auto& rpcCountedDelete: rpcCountedDeletes) { + // Now that the transaction has successfully completed, we can mark all our CountedDeletes + // as having completed as well. + rpcCountedDelete.countedDelete->isFinished = true; + } } } diff --git a/src/workerd/io/actor-cache.h b/src/workerd/io/actor-cache.h index 54f4d710bd9..264d775464b 100644 --- a/src/workerd/io/actor-cache.h +++ b/src/workerd/io/actor-cache.h @@ -469,17 +469,15 @@ class ActorCache final: public ActorCacheInterface { // In the DIRTY state, if this entry was originally created as the result of a // `delete()` call, and as such the caller needs to receive a count of deletions, then this // tracks that need. Note that only one caller could ever be waiting on this, because - // subsequent delete() calls can be counted based on the cache content. This can be null + // subsequent delete() calls can be counted based on the cache content. This can be false // if no delete operations need a count from this entry. - // - // If an entry is overwritten, `countedDelete` needs to be inherited by the replacement entry, - // so that the delete is still counted upon `flushImpl()`. (If the entry being replaced is - // already flushing, and that flush succeeds, then countedDelete->fulfiller will be fulfilled. - // In that case, it's no longer relevant to have `countedDelete` on the replacement entry, - // because it's already fulfilled and so will be ignored anyway. However, in the unlikely case - // that the flush failed, then it is actually important that the `countedDelete` has been moved - // to the replacement entry, so that it can be retried.) - kj::Maybe> countedDelete; + bool isCountedDelete = false; + + // This Entry is part of a CountedDelete, but has since been overwritten via a put(). + // This is really only useful in determining if we need to retry the deletion of this entry from + // storage, since we're interested in the number of deleted records. If we already got the count, + // we won't include this entry as part of our retried delete. + bool overwritingCountedDelete = false; // If CLEAN, the entry will be in the SharedLru's `cleanList`. // @@ -512,19 +510,69 @@ class ActorCache final: public ActorCacheInterface { // This object can only be manipulated in the thread that owns the specific actor that made // the request. That works out fine since CountedDelete only ever exists for dirty entries, // which won't be touched cross-thread by the LRU. - struct CountedDelete: public kj::Refcounted { + struct CountedDelete final: public kj::Refcounted { + CountedDelete() = default; + KJ_DISALLOW_COPY_AND_MOVE(CountedDelete); + + kj::Promise forgiveIfFinished(kj::Promise promise) { + try { + co_await promise; + } catch (...) { + if (isFinished) { + // We already flushed, so it's okay that the promise threw. + co_return; + } else { + throw; + } + } + } + // Running count of entries that existed before the delete. uint countDeleted = 0; - // When `countOutstanding` reaches zero, fulfill this with `countDeleted`. - kj::Own> resultFulfiller; + // Did this particular counted delete succeed within a transaction? In other words, did we + // already get the count? Even if we got the count, we may need to retry if the transaction + // itself failed, though we won't need to get the count again. + bool completedInTransaction = false; - // During `flushImpl()`, when this CountedDelete is first encountered, `flushIndex` will be set - // to track this delete batch. It will be set back to `kj::none` before `flushImpl()` returns. - // This field exists here to avoid the need for a HashMap in `flushImpl()`. - kj::Maybe flushIndex; + // Did this particular counted delete succeed? Note that this can be true even if the flush + // failed on a different batch of operations. + bool isFinished = false; + + // The entries are associated with this counted delete. + kj::Vector> entries; }; + class CountedDeleteWaiter { + public: + explicit CountedDeleteWaiter(ActorCache& cache, kj::Own state) + : cache(cache), state(kj::mv(state)) { + // Register this operation so that we can batch it properly during flush. + cache.countedDeletes.insert(this->state.get()); + } + KJ_DISALLOW_COPY_AND_MOVE(CountedDeleteWaiter); + ~CountedDeleteWaiter() noexcept(false) { + for (auto& entry : state->entries) { + // Let each entry associated with this counted delete know that we aren't waiting anymore. + entry->isCountedDelete = false; + } + + // Since the count of deleted pairs is no longer required, we don't need to batch the ops. + // Note that we're doing eraseMatch since the pointer is a temporary literal. + cache.countedDeletes.eraseMatch(state.get()); + } + + const CountedDelete& getCountedDelete() const { + return *state; + } + + private: + ActorCache& cache; + kj::Own state; + }; + + kj::HashSet countedDeletes; + rpc::ActorStorage::Stage::Client storage; const SharedLru& lru; OutputGate& gate; @@ -729,7 +777,6 @@ class ActorCache final: public ActorCacheInterface { }; struct CountedDeleteFlush { kj::Own countedDelete; - kj::Vector> entries; kj::Vector batches; }; using CountedDeleteFlushes = kj::Array;