diff --git a/src/workerd/io/actor-cache-test.c++ b/src/workerd/io/actor-cache-test.c++ index 366edf50fc4..500182baeaa 100644 --- a/src/workerd/io/actor-cache-test.c++ +++ b/src/workerd/io/actor-cache-test.c++ @@ -750,7 +750,7 @@ KJ_TEST("ActorCache deleteAll()") { { auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction"); mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["bar", "grault", "baz"])) + .withParams(CAPNP(keys = ["bar", "baz", "grault"])) .thenReturn(CAPNP(numDeleted = 2)); mockTxn->expectCall("delete", ws) .withParams(CAPNP(keys = ["garply"])) @@ -766,8 +766,6 @@ KJ_TEST("ActorCache deleteAll()") { mockTxn->expectDropped(ws); } - KJ_ASSERT(deletePromise.wait(ws) == 2); - mockStorage->expectCall("deleteAll", ws) .thenReturn(CAPNP(numDeleted = 2)); @@ -782,6 +780,8 @@ KJ_TEST("ActorCache deleteAll()") { .thenReturn(CAPNP()); } + KJ_ASSERT(deletePromise.wait(ws) == 2); + KJ_ASSERT(expectCached(test.get("foo")) == nullptr); KJ_ASSERT(expectCached(test.get("baz")) == nullptr); KJ_ASSERT(expectCached(test.get("corge")) == nullptr); @@ -862,7 +862,7 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") { { auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction"); mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["bar", "grault", "baz"])) + .withParams(CAPNP(keys = ["bar", "baz", "grault"])) .thenReturn(CAPNP(numDeleted = 2)); mockTxn->expectCall("delete", ws) .withParams(CAPNP(keys = ["garply"])) @@ -875,8 +875,6 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") { mockTxn->expectDropped(ws); } - KJ_ASSERT(deletePromise.wait(ws) == 2); - // Do another deleteAll() before the first one is done. auto deleteAllB = test.cache.deleteAll({}); @@ -886,7 +884,6 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") { // Now finish it. mockStorage->expectCall("deleteAll", ws) .thenReturn(CAPNP(numDeleted = 2)); - KJ_ASSERT(deleteAllA.count.wait(ws) == 2); KJ_ASSERT(deleteAllB.count.wait(ws) == 0); @@ -896,6 +893,7 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") { .withParams(CAPNP(entries = [(key = "fred", value = "2323")])) .thenReturn(CAPNP()); } + KJ_ASSERT(deletePromise.wait(ws) == 2); } KJ_TEST("ActorCache coalescing") { @@ -950,7 +948,7 @@ KJ_TEST("ActorCache coalescing") { .withParams(CAPNP(keys = ["grault"])) .thenReturn(CAPNP(numDeleted = 0)); mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["garply", "fred", "waldo"])) + .withParams(CAPNP(keys = ["garply", "waldo", "fred"])) .thenReturn(CAPNP(numDeleted = 2)); mockTxn->expectCall("delete", ws) .withParams(CAPNP(keys = ["bar", "foo"])) @@ -1172,10 +1170,9 @@ KJ_TEST("ActorCache flush retry") { KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("corge"))) == "555"); KJ_ASSERT(expectCached(test.get("grault")) == nullptr); - // Although the transaction didn't complete, the delete did, and so it resolves. - KJ_ASSERT(promise1.wait(ws) == 1); - - // The second delete had failed, though, so is still outstanding. + // Although the counted delete succeeded, the promise will not resolve until our flush succeeds! + KJ_ASSERT(!promise1.poll(ws)); + // The second delete had failed, though; it is still outstanding. KJ_ASSERT(!promise2.poll(ws)); // The transaction will be retried, with the updated puts and deletes. @@ -1188,11 +1185,14 @@ KJ_TEST("ActorCache flush retry") { // last time, because it hasn't been further overwritten, and that delete from last time // wasn't actually committed. mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["grault", "corge"])) + .withParams(CAPNP(keys = ["quux"])) + .thenReturn(CAPNP()); // count ignored because we got it on the first try! + mockTxn->expectCall("delete", ws) + .withParams(CAPNP(keys = ["corge", "grault"])) .thenReturn(CAPNP(numDeleted = 2)); mockTxn->expectCall("delete", ws) - .withParams(CAPNP(keys = ["quux", "baz"])) - .thenReturn(CAPNP(numDeleted = 1234)); // count ignored + .withParams(CAPNP(keys = ["baz"])) + .thenReturn(CAPNP(numDeleted = 1234)); mockTxn->expectCall("put", ws) .withParams(CAPNP(entries = [(key = "foo", value = "123"), (key = "bar", value = "654"), @@ -1214,6 +1214,9 @@ KJ_TEST("ActorCache flush retry") { // Second delete finished this time. KJ_ASSERT(promise2.wait(ws) == 2); + + // Our flush has succeeded and we've obtained our count! + KJ_ASSERT(promise1.wait(ws) == 1); } KJ_TEST("ActorCache output gate blocked during flush") { @@ -3496,7 +3499,7 @@ KJ_TEST("ActorCache listReverse() retry on failure") { // ======================================================================================= // LRU purge -constexpr size_t ENTRY_SIZE = 128; +constexpr size_t ENTRY_SIZE = 120; KJ_TEST("ActorCache LRU purge") { ActorCacheTest test({.softLimit = 1 * ENTRY_SIZE}); auto& ws = test.ws; diff --git a/src/workerd/io/actor-cache.c++ b/src/workerd/io/actor-cache.c++ index 0609b61f871..99a9057ddc1 100644 --- a/src/workerd/io/actor-cache.c++ +++ b/src/workerd/io/actor-cache.c++ @@ -1881,6 +1881,18 @@ kj::Maybe> ActorCache::setAlarm(kj::Maybe newAlarmTi return getBackpressure(); } +namespace { +template +kj::OneOf, kj::PromiseForResult> mapPromise( + kj::Maybe> maybePromise, F&& f) { + KJ_IF_SOME(promise, maybePromise) { + return promise.then(kj::fwd(f)); + } else { + return kj::fwd(f)(); + } +} +} // namespace + kj::OneOf> ActorCache::delete_(Key key, WriteOptions options) { options.noCache = options.noCache || lru.options.noCache; requireNotTerminal(); @@ -1893,30 +1905,17 @@ kj::OneOf> ActorCache::delete_(Key key, WriteOptions opt evictOrOomIfNeeded(lock); } - if (countedDelete->isShared()) { - // The entry kept a reference to `countedDelete`, so it must be waiting on an RPC. Set up a - // fulfiller. - auto paf = kj::newPromiseAndFulfiller(); - countedDelete->resultFulfiller = kj::mv(paf.fulfiller); - KJ_IF_SOME(p, getBackpressure()) { - return p.then([promise = kj::mv(paf.promise)]() mutable { - return promise.then([](uint i) { return i > 0; }); - }); - } else { - return paf.promise.then([](uint i) { return i > 0; }); - } - } else { - // It looks like there was a pre-existing cache entry for this key, so we already know whether - // there was a value to delete. - bool result = countedDelete->countDeleted > 0; - KJ_IF_SOME(p, getBackpressure()) { - return p.then([result]() mutable { - return result; - }); - } else { - return result; - } + auto waiter = kj::heap(*this, kj::addRef(*countedDelete)); + kj::Maybe> maybePromise; + KJ_IF_SOME(p, getBackpressure()) { + // This might be more than one flush but that's okay as long as our state gets taken care of. + maybePromise = countedDelete->forgiveIfFinished(kj::mv(p)); + } else if (countedDelete->entries.size()) { + maybePromise = countedDelete->forgiveIfFinished(lastFlush.addBranch()); } + return mapPromise(kj::mv(maybePromise), [waiter = kj::mv(waiter)]() { + return waiter->getCountedDelete().countDeleted > 0; + }); } kj::OneOf> ActorCache::delete_(kj::Array keys, WriteOptions options) { @@ -1933,30 +1932,17 @@ kj::OneOf> ActorCache::delete_(kj::Array keys, Writ evictOrOomIfNeeded(lock); } - if (countedDelete->isShared()) { - // At least one entry kept a reference to `countedDelete`, so it must be waiting on an RPC. - // Set up a fulfiller. - auto paf = kj::newPromiseAndFulfiller(); - countedDelete->resultFulfiller = kj::mv(paf.fulfiller); - KJ_IF_SOME(p, getBackpressure()) { - return p.then([promise = kj::mv(paf.promise)]() mutable { - return kj::mv(promise); - }); - } else { - return kj::mv(paf.promise); - } - } else { - // It looks like the count of deletes is fully known based on cache content, so we don't need - // to wait. - uint result = countedDelete->countDeleted; - KJ_IF_SOME(p, getBackpressure()) { - return p.then([result]() mutable { - return result; - }); - } else { - return result; - } + auto waiter = kj::heap(*this, kj::addRef(*countedDelete)); + kj::Maybe> maybePromise; + KJ_IF_SOME(p, getBackpressure()) { + // This might be more than one flush but that's okay as long as our state gets taken care of. + maybePromise = countedDelete->forgiveIfFinished(kj::mv(p)); + } else if (countedDelete->entries.size()) { + maybePromise = countedDelete->forgiveIfFinished(lastFlush.addBranch()); } + return mapPromise(kj::mv(maybePromise), [waiter = kj::mv(waiter)]() { + return waiter->getCountedDelete().countDeleted; + }); } kj::Own ActorCache::startTransaction() { @@ -2031,8 +2017,9 @@ ActorCache::DeleteAllResults ActorCache::deleteAll(WriteOptions options) { }; } -void ActorCache::putImpl(Lock& lock, kj::Own newEntry, - const WriteOptions& options, kj::Maybe countedDelete) { +void ActorCache::putImpl( + Lock& lock, kj::Own newEntry, const WriteOptions& options, + kj::Maybe maybeCountedDelete) { auto& map = currentValues.get(lock); auto ordered = map.ordered(); @@ -2053,8 +2040,10 @@ void ActorCache::putImpl(Lock& lock, kj::Own newEntry, return; } - KJ_IF_SOME(c, countedDelete) { - // Overwrote an entry that was in cache, so we can count it now. + KJ_IF_SOME(c, maybeCountedDelete) { + // Overwrote an entry that was in cache, so we can count it now. Note that because we + // are PRESENT, we will not be added to the CountedDelete's `entries`, since we only + // do this for UNKNOWN entries! Instead, we'll be part of a regular delete. ++c.countDeleted; } break; @@ -2066,16 +2055,32 @@ void ActorCache::putImpl(Lock& lock, kj::Own newEntry, return; } + if (slot->isCountedDelete) { + // We are overwriting an entry that is slated for a counted delete operation. + // There may be a situation where all the entries associated with a counted delete are + // actually successfully deleted (and we get the count), but the transaction the deletes + // execute within fails. + // + // Since we are currently overwriting the Entry, we might as well inform the + // `CountedDelete` that this Entry has since been overwritten. Then, if we hit the case + // described above, we won't need to include this Entry in a subsequent counted delete + // retry, since we already have the count AND the Entry has been overwritten. + // + // For more details, see how we filter the entries to be deleted for a CountedDeleteFlush + // as part of a flush. + slot->overwritingCountedDelete = true; + } // We don't have to worry about the counted delete since we were already deleted. break; } case EntryValueStatus::UNKNOWN: { // This was a list end marker, we should just overwrite it. - KJ_IF_SOME(c, countedDelete) { + KJ_IF_SOME(c, maybeCountedDelete) { // Despite an entry being present, we don't know if the key exists, because it's just an // UNKNOWN entry. So we will still have to arrange to count the delete later. - newEntry->countedDelete = kj::addRef(c); + newEntry->isCountedDelete = true; + c.entries.add(kj::atomicAddRef(*newEntry)); } break; } @@ -2086,16 +2091,6 @@ void ActorCache::putImpl(Lock& lock, kj::Own newEntry, // Inherit gap state. newEntry->gapIsKnownEmpty = slot->gapIsKnownEmpty; - if (slot->isDirty()) { - // Entry may have `countedDelete` indicating we're still waiting to get a count from a - // previous delete operation. If so, we'll need to inherit it in case that delete operation - // fails and we end up retrying. Note that the new entry could be a positive entry rather - // than a negative one (a `put()` rather than a `delete()`). That is OK -- in `flushImpl()` - // we will still see the presence of `countedDelete` and realize we have to issue a delete - // on the key before we issue a put, just for the sake of counting it. - newEntry->countedDelete = kj::mv(slot->countedDelete); - } - // Swap in the new entry. removeEntry(lock, *slot); @@ -2121,8 +2116,9 @@ void ActorCache::putImpl(Lock& lock, kj::Own newEntry, // inserting a new entry, to avoid repeating the lookup. auto& slot = map.insert(kj::mv(newEntry)); slot->gapIsKnownEmpty = previousGapKnownEmpty; - KJ_IF_SOME(c, countedDelete) { - slot->countedDelete = kj::addRef(c); + KJ_IF_SOME(c, maybeCountedDelete) { + slot->isCountedDelete = true; + c.entries.add(kj::atomicAddRef(*slot)); } addToDirtyList(*slot); } @@ -2325,56 +2321,78 @@ kj::Promise ActorCache::startFlushTransaction() { batch.wordCount += words; }; - kj::Vector countedDeleteFlushes; + kj::Vector countedDeleteFlushes(countedDeletes.size()); + for (auto countedDelete : countedDeletes) { + if (countedDelete->isFinished) { + // This countedDelete has already be executed, but we haven't delivered the final count to + // the waiter yet. We'll skip it here since the destructor of CountedDeleteWaiter should + // eventually remove this entry from `countedDeletes`. + continue; + } + + auto& countedDeleteFlush = countedDeleteFlushes.add(CountedDeleteFlush{ + .countedDelete = kj::addRef(*countedDelete), + }); + // We might have successfully deleted these entries, but had the broader transaction fail. + // In that case, we might have entries that have since been overwritten, and which no longer + // need to be scheduled for deletion. + kj::Vector> entriesToDelete(countedDelete->entries.size()); + for (auto& entry : countedDelete->entries) { + if (entry->overwritingCountedDelete && countedDelete->completedInTransaction) { + // Not only is this a retry, but we have since modified the entry with a put(). + // Since we already have the delete count, we don't need to delete this entry again. + continue; + } + entriesToDelete.add(kj::mv(entry)); + } + + // Now that we've filtered our entries down to only those that need to be deleted, + // we need to overwrite the CountedDelete's `entries`. + countedDelete->entries = kj::mv(entriesToDelete); + for (auto& entry : countedDelete->entries) { + // A delete() call on this key is waiting to find out if the key existed in storage. Since + // each delete() call needs to return the count of keys deleted, we must issue + // corresponding delete calls to storage with the same batching, so that storage returns + // the right counts to us. We can't batch all the deletes into a single delete operation + // since then we'd only get a single count back and we wouldn't know how to split that up + // to satisfy all the callers. + // + // Note that a subsequent put() call could have set entry.value to non-null, but we still + // have to perform the delete first in order to determine the count that the delete() call + // should return. + // + // There is a minor quirk here because the counted delete set does not distinguish between + // before and after a delete all. That's actually okay because we should be able to + // immediately resolve counted deletes requested after a delete all (either the values are + // absent or they have a dirty put). This might also be an issue if we respected noCache for + // delete all's dummy value, but we do not. + entry->flushStarted = true; + + auto keySizeInWords = bytesToWordsRoundUp(entry->key.size()); + auto words = keySizeInWords + 1; + includeInCurrentBatch(countedDeleteFlush.batches, words); + } + } auto countEntry = [&](Entry& entry) { // Counts up the number of operations and RPC message sizes we'll need to cover this entry. + if (entry.isCountedDelete) { + // We should have already put this entry into a batch, so just skip it. + KJ_ASSERT(entry.flushStarted); + return; + } + entry.setFlushing(); auto keySizeInWords = bytesToWordsRoundUp(entry.key.size()); - KJ_IF_SOME(c, entry.countedDelete) { - if (c->resultFulfiller->isWaiting()) { - // A delete() call on this key is waiting to find out if the key existed in storage. Since - // each delete() call needs to return the count of keys deleted, we must issue - // corresponding delete calls to storage with the same batching, so that storage returns - // the right counts to us. We can't batch all the deletes into a single delete operation - // since then we'd only get a single count back and we wouldn't know how to split that up - // to satisfy all the callers. - // - // Note that a subsequent put() call could have set entry.value to non-null, but we still - // have to perform the delete first in order to determine the count that the delete() call - // should return. - CountedDeleteFlush* countedDeleteFlush; - KJ_IF_SOME(i, c->flushIndex) { - countedDeleteFlush = &countedDeleteFlushes[i]; - } else { - c->flushIndex = countedDeleteFlushes.size(); - countedDeleteFlush = &countedDeleteFlushes.add(CountedDeleteFlush{ - .countedDelete = kj::addRef(*c).attach(kj::defer([&cd = *c]() mutable { - // Note that this is attached to the `Own`, not the value. We actually want this, - // because it allows us to reset the `flushIndex` when *this flush* finishes, - // regardless of if we need to retry. - cd.flushIndex = kj::none; - })), - }); - } - auto words = keySizeInWords + 1; - includeInCurrentBatch(countedDeleteFlush->batches, words); - countedDeleteFlush->entries.add(kj::atomicAddRef(entry)); - } else { - // No one is waiting on this `CountedDelete` anymore so we can just drop it. - entry.countedDelete = kj::none; - } - } - KJ_IF_SOME(v, entry.getValuePtr()) { auto words = keySizeInWords + bytesToWordsRoundUp(v.size()) + capnp::sizeInWords(); includeInCurrentBatch(putFlush.batches, words); putFlush.entries.add(kj::atomicAddRef(entry)); - } else if (entry.countedDelete == kj::none) { + } else { auto words = keySizeInWords + 1; includeInCurrentBatch(mutedDeleteFlush.batches, words); mutedDeleteFlush.entries.add(kj::atomicAddRef(entry)); @@ -2547,7 +2565,7 @@ kj::Promise ActorCache::flushImpl(uint retryCount) { // will have inherited the `countedDelete`, and will still be DIRTY at this point. That is // OK, because the `countedDelete`'s fulfiller will have already been fulfilled, and // therefore the next flushImpl() will see that it is obsolete and discard it. - entry.countedDelete = kj::none; + entry.isCountedDelete = false; dirtyList.remove(entry); if (entry.noCache) { @@ -2666,35 +2684,26 @@ kj::Promise ActorCache::flushImplUsingSingleCountedDelete(CountedDeleteFlu KJ_ASSERT(countedFlush.batches.size() == 1); auto& batch = countedFlush.batches[0]; + auto countedDelete = kj::mv(countedFlush.countedDelete); KJ_ASSERT(batch.wordCount < MAX_ACTOR_STORAGE_RPC_WORDS); - KJ_ASSERT(batch.pairCount == countedFlush.entries.size()); + KJ_ASSERT(batch.pairCount == countedDelete->entries.size()); auto request = storage.deleteRequest(capnp::MessageSize { 4 + batch.wordCount, 0 }); auto listBuilder = request.initKeys(batch.pairCount); - auto entryIt = countedFlush.entries.begin(); + auto entryIt = countedDelete->entries.begin(); for (size_t i = 0; i < batch.pairCount; ++i) { auto& entry = **(entryIt++); listBuilder.set(i, entry.key.asBytes()); } // We're done with the batching instructions, free them before we go async. - countedFlush.entries.clear(); countedFlush.batches.clear(); - auto countedDelete = kj::mv(countedFlush.countedDelete); - try { - auto writeObserver = recordStorageWrite(hooks, clock); - util::DurationExceededLogger logger(clock, 1*kj::SECONDS, "storage operation took longer than expected: counted delete"); - auto response = co_await request.send(); - countedDelete->resultFulfiller->fulfill(response.getNumDeleted()); - } catch (kj::Exception& e) { - if (e.getType() == kj::Exception::Type::DISCONNECTED) { - // This deletion will be retried, so don't touch the fulfiller. - } else { - countedDelete->resultFulfiller->reject(kj::cp(e)); - } - throw kj::mv(e); - } + auto writeObserver = recordStorageWrite(hooks, clock); + util::DurationExceededLogger logger(clock, 1*kj::SECONDS, "storage operation took longer than expected: counted delete"); + auto response = co_await request.send(); + countedDelete->countDeleted += response.getNumDeleted(); + countedDelete->isFinished = true; } kj::Promise ActorCache::flushImplAlarmOnly(DirtyAlarm dirty) { @@ -2753,7 +2762,8 @@ kj::Promise ActorCache::flushImplUsingTxn( auto rpcPuts = kj::heapArrayBuilder(putFlush.batches.size()); for (auto& flush: countedDeleteFlushes) { - auto entryIt = flush.entries.begin(); + auto countedDelete = kj::mv(flush.countedDelete); + auto entryIt = countedDelete->entries.begin(); kj::Vector rpcDeletes; for (auto& batch: flush.batches) { KJ_ASSERT(batch.wordCount < MAX_ACTOR_STORAGE_RPC_WORDS); @@ -2761,19 +2771,20 @@ kj::Promise ActorCache::flushImplUsingTxn( auto request = txn.deleteRequest(capnp::MessageSize { 4 + batch.wordCount, 0 }); auto listBuilder = request.initKeys(batch.pairCount); for (size_t i = 0; i < batch.pairCount; ++i) { - KJ_ASSERT(entryIt != flush.entries.end()); + KJ_ASSERT(entryIt != countedDelete->entries.end()); auto& entry = **(entryIt++); listBuilder.set(i, entry.key.asBytes()); } rpcDeletes.add(kj::mv(request)); } + KJ_ASSERT(entryIt == countedDelete->entries.end()); rpcCountedDeletes.add(RpcCountedDelete{ - .countedDelete = kj::mv(flush.countedDelete), + .countedDelete = kj::mv(countedDelete), .rpcDeletes = rpcDeletes.releaseAsArray(), }); - KJ_ASSERT(entryIt == flush.entries.end()); } + countedDeleteFlushes = nullptr; { auto entryIt = mutedDeleteFlush.entries.begin(); @@ -2791,6 +2802,8 @@ kj::Promise ActorCache::flushImplUsingTxn( } KJ_ASSERT(entryIt == mutedDeleteFlush.entries.end()); } + mutedDeleteFlush.entries.clear(); + mutedDeleteFlush.batches.clear(); { auto entryIt = putFlush.entries.begin(); @@ -2810,13 +2823,8 @@ kj::Promise ActorCache::flushImplUsingTxn( } KJ_ASSERT(entryIt == putFlush.entries.end()); } - - // We're done with the batching instructions, free them before we go async. putFlush.entries.clear(); putFlush.batches.clear(); - mutedDeleteFlush.entries.clear(); - mutedDeleteFlush.batches.clear(); - countedDeleteFlushes = nullptr; // Send all the RPCs. It's important that counted deletes are sent first since they can overlap // with puts. Specifically this can happen if someone does a delete() immediately followed by a @@ -2839,30 +2847,26 @@ kj::Promise ActorCache::flushImplUsingTxn( }); }; + size_t recordsDeleted = 0; for (auto& promise : promises) { - // Reuse `countDeleted` since it's already in a state object anyway. - rpcCountedDelete.countedDelete->countDeleted += co_await promise; + recordsDeleted += co_await promise; } + + // This may be a retry following a successful counted delete within a failed transaction. + // In that case, we don't want to update the count again, since we've already considered it. + if (!rpcCountedDelete.countedDelete->completedInTransaction) { + // We only increment our `countDeleted` if *ALL* the delete batches succeeded. + rpcCountedDelete.countedDelete->countDeleted += recordsDeleted; + } + + // This delete succeeded, but we may need to retry it in some cases, ex. if the transaction fails. + // If we *do* retry after a successful counted delete, we won't want to update our + // `countDeleted` since we already got it. + rpcCountedDelete.countedDelete->completedInTransaction = true; }; + for (auto& rpcCountedDelete: rpcCountedDeletes) { - promises.add(joinCountedDelete(rpcCountedDelete).then( - [&countedDelete = *rpcCountedDelete.countedDelete]() mutable { - // Note that it's OK to trust the delete count even if the transaction ultimately gets rolled - // back, because: - // - We know that nothing else could be concurrently modifying our storage in a way that - // makes the count different on a retry. - // - If retries fail and the flush never completes at all, the output gate will kick in and - // make it impossible for anyone to observe the bogus result. - // HACK: This uses a `kj::mv()` because promise fulfillers require rvalues even for trivially - // copyable types. - countedDelete.resultFulfiller->fulfill(kj::mv(countedDelete.countDeleted)); - }, [&countedDelete = *rpcCountedDelete.countedDelete](kj::Exception&& e) { - if (e.getType() == kj::Exception::Type::DISCONNECTED) { - // This deletion will be retried, so don't touch the fulfiller. - } else { - countedDelete.resultFulfiller->reject(kj::mv(e)); - } - })); + promises.add(joinCountedDelete(rpcCountedDelete)); } for (auto& request: rpcMutedDeletes) { @@ -2923,6 +2927,11 @@ kj::Promise ActorCache::flushImplUsingTxn( promises.add(txn.commitRequest(capnp::MessageSize { 4, 0 }).send().ignoreResult()); co_await kj::joinPromises(promises.finish()); + for (auto& rpcCountedDelete: rpcCountedDeletes) { + // Now that the transaction has successfully completed, we can mark all our CountedDeletes + // as having completed as well. + rpcCountedDelete.countedDelete->isFinished = true; + } } } diff --git a/src/workerd/io/actor-cache.h b/src/workerd/io/actor-cache.h index 54f4d710bd9..264d775464b 100644 --- a/src/workerd/io/actor-cache.h +++ b/src/workerd/io/actor-cache.h @@ -469,17 +469,15 @@ class ActorCache final: public ActorCacheInterface { // In the DIRTY state, if this entry was originally created as the result of a // `delete()` call, and as such the caller needs to receive a count of deletions, then this // tracks that need. Note that only one caller could ever be waiting on this, because - // subsequent delete() calls can be counted based on the cache content. This can be null + // subsequent delete() calls can be counted based on the cache content. This can be false // if no delete operations need a count from this entry. - // - // If an entry is overwritten, `countedDelete` needs to be inherited by the replacement entry, - // so that the delete is still counted upon `flushImpl()`. (If the entry being replaced is - // already flushing, and that flush succeeds, then countedDelete->fulfiller will be fulfilled. - // In that case, it's no longer relevant to have `countedDelete` on the replacement entry, - // because it's already fulfilled and so will be ignored anyway. However, in the unlikely case - // that the flush failed, then it is actually important that the `countedDelete` has been moved - // to the replacement entry, so that it can be retried.) - kj::Maybe> countedDelete; + bool isCountedDelete = false; + + // This Entry is part of a CountedDelete, but has since been overwritten via a put(). + // This is really only useful in determining if we need to retry the deletion of this entry from + // storage, since we're interested in the number of deleted records. If we already got the count, + // we won't include this entry as part of our retried delete. + bool overwritingCountedDelete = false; // If CLEAN, the entry will be in the SharedLru's `cleanList`. // @@ -512,19 +510,69 @@ class ActorCache final: public ActorCacheInterface { // This object can only be manipulated in the thread that owns the specific actor that made // the request. That works out fine since CountedDelete only ever exists for dirty entries, // which won't be touched cross-thread by the LRU. - struct CountedDelete: public kj::Refcounted { + struct CountedDelete final: public kj::Refcounted { + CountedDelete() = default; + KJ_DISALLOW_COPY_AND_MOVE(CountedDelete); + + kj::Promise forgiveIfFinished(kj::Promise promise) { + try { + co_await promise; + } catch (...) { + if (isFinished) { + // We already flushed, so it's okay that the promise threw. + co_return; + } else { + throw; + } + } + } + // Running count of entries that existed before the delete. uint countDeleted = 0; - // When `countOutstanding` reaches zero, fulfill this with `countDeleted`. - kj::Own> resultFulfiller; + // Did this particular counted delete succeed within a transaction? In other words, did we + // already get the count? Even if we got the count, we may need to retry if the transaction + // itself failed, though we won't need to get the count again. + bool completedInTransaction = false; - // During `flushImpl()`, when this CountedDelete is first encountered, `flushIndex` will be set - // to track this delete batch. It will be set back to `kj::none` before `flushImpl()` returns. - // This field exists here to avoid the need for a HashMap in `flushImpl()`. - kj::Maybe flushIndex; + // Did this particular counted delete succeed? Note that this can be true even if the flush + // failed on a different batch of operations. + bool isFinished = false; + + // The entries are associated with this counted delete. + kj::Vector> entries; }; + class CountedDeleteWaiter { + public: + explicit CountedDeleteWaiter(ActorCache& cache, kj::Own state) + : cache(cache), state(kj::mv(state)) { + // Register this operation so that we can batch it properly during flush. + cache.countedDeletes.insert(this->state.get()); + } + KJ_DISALLOW_COPY_AND_MOVE(CountedDeleteWaiter); + ~CountedDeleteWaiter() noexcept(false) { + for (auto& entry : state->entries) { + // Let each entry associated with this counted delete know that we aren't waiting anymore. + entry->isCountedDelete = false; + } + + // Since the count of deleted pairs is no longer required, we don't need to batch the ops. + // Note that we're doing eraseMatch since the pointer is a temporary literal. + cache.countedDeletes.eraseMatch(state.get()); + } + + const CountedDelete& getCountedDelete() const { + return *state; + } + + private: + ActorCache& cache; + kj::Own state; + }; + + kj::HashSet countedDeletes; + rpc::ActorStorage::Stage::Client storage; const SharedLru& lru; OutputGate& gate; @@ -729,7 +777,6 @@ class ActorCache final: public ActorCacheInterface { }; struct CountedDeleteFlush { kj::Own countedDelete; - kj::Vector> entries; kj::Vector batches; }; using CountedDeleteFlushes = kj::Array;