Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 17 additions & 38 deletions src/QLPlan.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ ACTOR static Future<Void> toDocInfo(PlanCheckpoint* checkpoint,
inputLock->release();
Void _ = wait(outputLock->take());
lastKey = Key(kv.key, kv.arena());
// fprintf(stderr, "lastkey: %s\n", printable(lastKey).c_str());
Standalone<StringRef> last(DataKey::decode_item_rev(kv.key, 0), kv.arena());
Reference<ScanReturnedContext> output(new ScanReturnedContext(base->getSubContext(last), scanID, lastKey));
dis.send(output);
Expand Down Expand Up @@ -373,9 +372,6 @@ ACTOR static Future<bool> simpleWouldBeLast(Reference<ScanReturnedContext> doc,
break;
}
}
// fprintf(stderr, "SWBL last: %s\n", printable(last).c_str());
// fprintf(stderr, "SWBL scankey: %s\n", printable(doc->scanKey()).c_str());
// fprintf(stderr, "SWBL upperBound: %s\n", printable(indexUpperBound).c_str());
if (doc->scanKey().startsWith(last))
return true;
}
Expand Down Expand Up @@ -416,9 +412,6 @@ ACTOR static Future<bool> compoundWouldBeLast(Reference<ScanReturnedContext> doc
break;
}
}
// fprintf(stderr, "CWBL last: %s\n", printable(last).c_str());
// fprintf(stderr, "CWBL scankey: %s\n", printable(doc->scanKey()).c_str());
// fprintf(stderr, "CWBL upperBound: %s\n", printable(indexUpperBound).c_str());
if (doc->scanKey().startsWith(last))
return true;
}
Expand Down Expand Up @@ -558,7 +551,6 @@ ACTOR static Future<Void> doPKScan(PlanCheckpoint* checkpoint,
lastKey = Key(kv.key, kv.arena());
}
} catch (Error& e) {
// fprintf(stderr, "doPKScan: %s %d\n", e.what(), checkpoint->splitBoundWanted());
if (e.code() == error_code_actor_cancelled) {
if (checkpoint->splitBoundWanted()) {
DataKey splitKey = DataKey::decode_bytes(lastKey);
Expand All @@ -577,8 +569,8 @@ FutureStream<Reference<ScanReturnedContext>> PrimaryKeyLookupPlan::execute(PlanC
Reference<CollectionContext> bcx = cx->bindCollectionContext(tr);
if (begin.present() && end.present() && begin.get() == end.get()) {
PromiseStream<Reference<ScanReturnedContext>> p;
checkpoint->addOperation(doSinglePKLookup(checkpoint, p, bcx, begin.get(), scanID),
p); // ??? Can we skip this overhead?
// ??? Can we skip this overhead?
checkpoint->addOperation(doSinglePKLookup(checkpoint, p, bcx, begin.get(), scanID), p);
return p.getFuture();
} else {
PromiseStream<Reference<ScanReturnedContext>> p;
Expand All @@ -590,7 +582,6 @@ FutureStream<Reference<ScanReturnedContext>> PrimaryKeyLookupPlan::execute(PlanC
Standalone<StringRef> endKey = std::max<Standalone<StringRef>>(
beginKey, std::min(end.present() ? strinc(end.get().encode_key_part()) : LiteralStringRef("\xff"),
checkpoint->getBounds(scanID).end));
// fprintf(stderr, "PK scan executing from %s to %s\n", printable(beginKey).c_str(), printable(endKey).c_str());

GenFutureStream<KeyValue> kvs = bcx->cx->getDescendants(beginKey, endKey, descendantFlowControlLock);
checkpoint->addOperation(doPKScan(checkpoint, bcx, scanID, kvs, p, descendantFlowControlLock),
Expand Down Expand Up @@ -704,17 +695,14 @@ ACTOR static Future<Void> doNonIsolatedRO(PlanCheckpoint* outerCheckpoint,
try {
state uint64_t metadataVersion = wait(cx->bindCollectionContext(dtr)->getMetadataVersion());
loop {
// printf("Trying nonIsolatedRO with %d outputs and checkpoint '%s'-'%s'\n", nResults,
// printable(innerCheckpoint->getBounds(0).begin).c_str(),
// printable(innerCheckpoint->getBounds(0).end).c_str());
state FutureStream<Reference<ScanReturnedContext>> docs = subPlan->execute(innerCheckpoint.getPtr(), dtr);
state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock();
state bool first = true;
state Future<Void> timeout = delay(3.0);

loop choose {
when(state Reference<ScanReturnedContext> doc =
waitNext(docs)) { // throws end_of_stream when totally finished
when(state Reference<ScanReturnedContext> doc = waitNext(docs)) {
// throws end_of_stream when totally finished
Void _ = wait(outerLock->take());
innerLock->release();
output.send(doc);
Expand All @@ -723,7 +711,6 @@ ACTOR static Future<Void> doNonIsolatedRO(PlanCheckpoint* outerCheckpoint,
timeout = delay(DOCLAYER_KNOBS->NONISOLATED_INTERNAL_TIMEOUT);
first = false;
}
// if (oCount == 3) timeout = delay(0);
}
when(Void _ = wait(timeout)) { break; }
}
Expand Down Expand Up @@ -785,9 +772,10 @@ ACTOR static Future<Void> doNonIsolatedRW(PlanCheckpoint* outerCheckpoint,
loop {
if (bufferedDocs.size() + committingDocs.size() >=
DOCLAYER_KNOBS->NONISOLATED_RW_INTERNAL_BUFFER_MAX)
timeout = delay(0); // We do this instead of breaking so that when stopAndCheckpoint() gets
// called below, the actor for the plan immediately inside us is never
// on the call stack, so gets its actor_cancelled delivered immediately.
// We do this instead of breaking so that when stopAndCheckpoint() gets
// called below, the actor for the plan immediately inside us is never
// on the call stack, so gets its actor_cancelled delivered immediately.
timeout = delay(0);
choose {
when(state Reference<ScanReturnedContext> doc =
waitNext(docs)) { // throws end_of_stream when totally finished
Expand Down Expand Up @@ -1045,10 +1033,10 @@ ACTOR static Future<Void> doFlushChanges(PlanCheckpoint* checkpoint,
try {
choose {
when(Reference<ScanReturnedContext> nextInput = waitNext(input)) {
// FIXME: this will be unsafe with unique indexes. Something has to happen here that doesn't
// kill performance.
futures.push_back(std::pair<Reference<ScanReturnedContext>, Future<Void>>(
nextInput,
nextInput->commitChanges())); // FIXME: this will be unsafe with unique indexes. Something
// has to happen here that doesn't kill performance.
nextInput, nextInput->commitChanges()));
}
when(Void _ = wait(futures.empty() ? Never() : futures.front().second)) {
output.send(futures.front().first);
Expand Down Expand Up @@ -1127,7 +1115,6 @@ ACTOR static Future<Void> doUpdate(PlanCheckpoint* checkpoint,

throw end_of_stream();
} catch (Error& e) {
// printf("doUpdate: %s\n", e.what());
if (e.code() == error_code_actor_cancelled) {
if (checkpoint->splitBoundWanted()) {
for (int i = futures.size() - 1; i >= 0; i--)
Expand Down Expand Up @@ -1172,18 +1159,15 @@ ACTOR static Future<Void> findAndModify(PlanCheckpoint* outerCheckpoint,
try {
state uint64_t metadataVersion = wait(cx->bindCollectionContext(dtr)->getMetadataVersion());
loop {
// printf("Trying nonIsolatedRO with %d outputs and checkpoint '%s'-'%s'\n", nResults,
// printable(innerCheckpoint->getBounds(0).begin).c_str(),
// printable(innerCheckpoint->getBounds(0).end).c_str());
state FutureStream<Reference<ScanReturnedContext>> docs = subPlan->execute(innerCheckpoint.getPtr(), dtr);
state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock();
state Future<Void> timeout = delay(1.0);
state bool done = false;

try {
loop choose {
when(state Reference<ScanReturnedContext> doc =
waitNext(docs)) { // throws end_of_stream when totally finished
when(state Reference<ScanReturnedContext> doc = waitNext(docs)) {
// throws end_of_stream when totally finished
firstDoc = doc;
innerLock->release();
done = true;
Expand Down Expand Up @@ -1441,10 +1425,9 @@ ACTOR static Future<Void> doInsert(PlanCheckpoint* checkpoint,
Reference<MetadataManager> mm,
Namespace ns,
PromiseStream<Reference<ScanReturnedContext>> output) {
// state int64_t& inserted = checkpoint->getIntState(0); <- This is broken for now.
state Deque<Future<Reference<IReadWriteContext>>> f;
state FlowLock* flowControlLock = checkpoint->getDocumentFinishedLock();
state int i = 0; // = inserted;
state int i = 0;

try {
state Reference<UnboundCollectionContext> ucx = wait(mm->getUnboundCollectionContext(tr, ns));
Expand All @@ -1459,15 +1442,13 @@ ACTOR static Future<Void> doInsert(PlanCheckpoint* checkpoint,
when(Reference<IReadWriteContext> doc = wait(f.empty() ? Never() : f.front())) {
output.send(ref(new ScanReturnedContext(doc, -1, Key()))); // Are these the right scanId etc?
f.pop_front();
// inserted++;
}
}
}
state int j = 0;
for (; j < f.size(); j++) {
Reference<IReadWriteContext> doc = wait(f[j]);
output.send(ref(new ScanReturnedContext(doc, -1, Key()))); // Are these the right scanId etc?
// inserted++;
}
throw end_of_stream();
} catch (Error& e) {
Expand Down Expand Up @@ -1503,11 +1484,9 @@ ACTOR static Future<Void> doSort(PlanCheckpoint* outerCheckpoint,
loop {
try {
Reference<ScanReturnedContext> doc = waitNext(docs);
returnProjections.push_back(
doc->toDataValue().get().getPackedObject().getOwned()); // Note that this call to get() is safe here but
// not in general, because we know that
// doc is wrapping a BsonContext, which means
// toDataValue() is synchronous.
// Note that this call to get() is safe here but not in general, because we know that doc is wrapping a
// BsonContext, which means toDataValue() is synchronous.
returnProjections.push_back(doc->toDataValue().get().getPackedObject().getOwned());
innerLock->release();
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
Expand Down