Skip to content

Commit

Permalink
Cancel syncGets() when ActorCache destroyed
Browse files Browse the repository at this point in the history
This commit attempts to provide a mechanism to cancel the promise
that waits for the read batches to complete in syncGets(). This is
needed for when the `ActorCache` is destroyed, since we need to destroy
all `Entry`s before we destroy the `SharedLru`.
  • Loading branch information
MellowYarker committed Mar 28, 2024
1 parent a32ce31 commit b0e4e9a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
25 changes: 22 additions & 3 deletions src/workerd/io/actor-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ auto recordStorageWrite(ActorCache::Hooks& hooks, const kj::MonotonicClock& cloc


ActorCache::ActorCache(rpc::ActorStorage::Stage::Client storage, const SharedLru& lru,
OutputGate& gate, Hooks& hooks)
OutputGate& gate, Hooks& hooks, kj::PromiseFulfillerPair<void> paf)
: storage(kj::mv(storage)), lru(lru), gate(gate), hooks(hooks), clock(kj::systemPreciseMonotonicClock()),
currentValues(lru.cleanList.lockExclusive()) {}
currentValues(lru.cleanList.lockExclusive()),
readCancellation(paf.promise.fork()),
cancelReadFlush(kj::mv(paf.fulfiller)) {}

ActorCache::~ActorCache() noexcept(false) {
// Need to remove all entries from any lists they might be in.
auto lock = lru.cleanList.lockExclusive();
clear(lock);

// Also cancel any pending syncGets(), since they create capabilities that have strong references
// to `Entry`s, and if they don't get dropped our LRU size accounting will break.
cancelReadFlush->fulfill();
}

void ActorCache::clear(Lock& lock) {
Expand Down Expand Up @@ -130,6 +136,8 @@ size_t ActorCache::Entry::setPresentValue(Value newValue) {
void ActorCache::Entry::setAbsentValue() {
KJ_ASSERT(valueStatus == EntryValueStatus::UNKNOWN);
valueStatus = EntryValueStatus::ABSENT;
KJ_ASSERT_NONNULL(maybeCache).lru.size.fetch_sub(value.size());
value = {};
}

ActorCache::SharedLru::SharedLru(const Options options): options(options) {}
Expand Down Expand Up @@ -669,10 +677,19 @@ kj::Promise<void> ActorCache::syncGets(
}
KJ_DASSERT(entryIt == getFlush.entries.end());

// After this point, the only strong references to `Entry`s should be in the individual
// GetMultiStreamImpl objects.
getFlush.batches.clear();
getFlush.entries.clear();

co_await kj::joinPromises(promises.releaseAsArray());
// TODO(now): This doesn't seem to be firing...
auto cancellationPromise = kj::mv(getFlush.canceler).then([](){
KJ_DBG("[PENDING GET MULTIPLE READ CANCELED]");
});
auto joinedGetPromise = kj::joinPromises(promises.releaseAsArray());

// Exclusive join will cancel the promise that doesn't finish first.
co_await cancellationPromise.exclusiveJoin(kj::mv(joinedGetPromise));
}

kj::OneOf<kj::Maybe<kj::Date>, kj::Promise<kj::Maybe<kj::Date>>> ActorCache::getAlarm(
Expand Down Expand Up @@ -2548,6 +2565,8 @@ kj::Promise<void> ActorCache::startFlushTransaction() {
getFlush.entries.clear();
co_await syncGet(storage, kj::mv(entry));
} else if (getFlush.entries.size() > 1) {
// Set our canceler to ensure we destroy any strong refs to `Entry`s upon ~ActorCache().
getFlush.canceler = readCancellation.addBranch();
co_await syncGets(storage, kj::mv(getFlush));
} else {
// We had no gets!
Expand Down
19 changes: 18 additions & 1 deletion src/workerd/io/actor-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ class ActorCache final: public ActorCacheInterface {
"Durable Object storage is no longer accessible."_kj;

ActorCache(rpc::ActorStorage::Stage::Client storage, const SharedLru& lru, OutputGate& gate,
Hooks& hooks = Hooks::DEFAULT);
Hooks& hooks = Hooks::DEFAULT,
kj::PromiseFulfillerPair<void> paf = kj::newPromiseAndFulfiller<void>());
~ActorCache() noexcept(false);

kj::Maybe<SqliteDatabase&> getSqliteDatabase() override { return kj::none; }
Expand Down Expand Up @@ -727,6 +728,20 @@ class ActorCache final: public ActorCacheInterface {
// writes and not have to worry about this. However, at present, ActorStorage has automatic
// reconnect behavior at the supervisor layer which violates e-order.

// syncGets() creates a GetMultiStreamImpl for each batch that we read, and each
// GetMultiStreamImpl has an array of strong references to `Entry`s. Although we may be holding
// strong references, these `Entry`s are really sitting in the dirtyList, and GetMultiStreamImpl
// is filling the Entry with its value from storage. However, upon the destruction of the
// ActorCache, we need to make sure that all `Entry`s are dropped before we try to destroy our
// SharedLru, otherwise we may see use-after-frees.
//
// To ensure this destruction ordering is met, we give syncGets() a promise, which is just
// a branch of this `readCancellation` task. When we use the cancelReadFlush fulfiller, it signals
// to all syncGets() promises to drop their GetMultiStreamImpls, thereby dropping the strong
// Entry references.
kj::ForkedPromise<void> readCancellation;
kj::Own<kj::PromiseFulfiller<void>> cancelReadFlush;

// Did we hit a problem that makes the ActorCache unusable? If so this is the exception that
// describes the problem.
kj::Maybe<kj::Exception> maybeTerminalException;
Expand Down Expand Up @@ -834,6 +849,8 @@ class ActorCache final: public ActorCacheInterface {
kj::Vector<kj::Own<Entry>> entries;
// Metadata on each of the batches we will flush.
kj::Vector<FlushBatch> batches;
// Resolves if this get should be cancelled.
kj::Promise<void> canceler = kj::READY_NOW;
};

// All the `Entry`s we want to put next flush.
Expand Down

0 comments on commit b0e4e9a

Please sign in to comment.