Skip to content

Commit

Permalink
Track whether an AndFuture has begun (#8937)
Browse files Browse the repository at this point in the history
* Track whether an AndFuture has begun

Previously, for deciding whether or not it was legal to set
READ_YOUR_WRITES_DISABLE, it was allowed as long as no reads were
in-flight. It's possible that a read can have been initiated, but the
RYW caches are empty and no reads are in-flight. See #8925 for an
example.

Closes #8925

* Change began to futureCount

* Remove unused and incorrect assignment operators

* Remove bogus test expectation

It shouldn't be legal to set READ_YOUR_WRITES_DISABLE after reading if
there are currently no pending reads, but this test was expecting that
to work.
  • Loading branch information
sfc-gh-anoyes authored and xumengpanda committed Dec 6, 2022
1 parent 23deb5b commit 97c3a20
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 18 deletions.
2 changes: 1 addition & 1 deletion fdbclient/ReadYourWrites.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2446,7 +2446,7 @@ void ReadYourWritesTransaction::setOptionImpl(FDBTransactionOptions::Option opti
case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE:
validateOptionValueNotPresent(value);

if (!reading.isReady() || !cache.empty() || !writes.empty())
if (reading.getFutureCount() > 0 || !cache.empty() || !writes.empty())
throw client_invalid_operation();

options.readYourWritesDisabled = true;
Expand Down
8 changes: 6 additions & 2 deletions fdbserver/workloads/WriteDuringRead.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,12 @@ struct WriteDuringReadWorkload : TestWorkload {
self->finished.trigger();
self->dataWritten += txnSize;

if (readYourWritesDisabled)
tr->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
// It's not legal to set readYourWritesDisabled after performing
// reads on a transaction, even if all those reads have completed
// and there are none in-flight.
// if (readYourWritesDisabled)
// tr->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);

if (snapshotRYWDisabled)
tr->setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE);
if (readAheadDisabled)
Expand Down
27 changes: 12 additions & 15 deletions flow/include/flow/genericactors.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -1774,26 +1774,18 @@ Future<T> delayActionJittered(Future<T> what, double time) {

class AndFuture {
public:
AndFuture() {}
AndFuture() = default;
AndFuture(AndFuture const& f) = default;
AndFuture(AndFuture&& f) noexcept = default;
AndFuture& operator=(AndFuture const& f) = default;
AndFuture& operator=(AndFuture&& f) noexcept = default;

AndFuture(AndFuture const& f) { futures = f.futures; }
AndFuture(Future<Void> const& f) : futureCount(1), futures{ f } {}

AndFuture(AndFuture&& f) noexcept { futures = std::move(f.futures); }

AndFuture(Future<Void> const& f) { futures.push_back(f); }

AndFuture(Error const& e) { futures.push_back(e); }
AndFuture(Error const& e) : futureCount(1), futures{ Future<Void>(e) } {}

operator Future<Void>() { return getFuture(); }

void operator=(AndFuture const& f) { futures = f.futures; }

void operator=(AndFuture&& f) noexcept { futures = std::move(f.futures); }

void operator=(Future<Void> const& f) { futures.push_back(f); }

void operator=(Error const& e) { futures.push_back(e); }

Future<Void> getFuture() {
if (futures.empty())
return Void();
Expand Down Expand Up @@ -1833,13 +1825,18 @@ class AndFuture {
}

void add(Future<Void> const& f) {
++futureCount;
if (!f.isReady() || f.isError())
futures.push_back(f);
}

void add(AndFuture f) { add(f.getFuture()); }

// The total number of futures which have ever been added to this AndFuture
int64_t getFutureCount() const { return futureCount; }

private:
int64_t futureCount = 0;
std::vector<Future<Void>> futures;
};

Expand Down

0 comments on commit 97c3a20

Please sign in to comment.