From 39fe62dd80df153e33d620826e9432eacd55ede9 Mon Sep 17 00:00:00 2001 From: Bhaskar Muppana Date: Thu, 25 Apr 2019 15:37:34 -0700 Subject: [PATCH] Cosmetic cleanup. --- src/QLPlan.actor.cpp | 55 ++++++++++++++------------------------------ 1 file changed, 17 insertions(+), 38 deletions(-) diff --git a/src/QLPlan.actor.cpp b/src/QLPlan.actor.cpp index 94260c9..6bc0db3 100644 --- a/src/QLPlan.actor.cpp +++ b/src/QLPlan.actor.cpp @@ -335,7 +335,6 @@ ACTOR static Future toDocInfo(PlanCheckpoint* checkpoint, inputLock->release(); Void _ = wait(outputLock->take()); lastKey = Key(kv.key, kv.arena()); - // fprintf(stderr, "lastkey: %s\n", printable(lastKey).c_str()); Standalone last(DataKey::decode_item_rev(kv.key, 0), kv.arena()); Reference output(new ScanReturnedContext(base->getSubContext(last), scanID, lastKey)); dis.send(output); @@ -373,9 +372,6 @@ ACTOR static Future simpleWouldBeLast(Reference doc, break; } } - // fprintf(stderr, "SWBL last: %s\n", printable(last).c_str()); - // fprintf(stderr, "SWBL scankey: %s\n", printable(doc->scanKey()).c_str()); - // fprintf(stderr, "SWBL upperBound: %s\n", printable(indexUpperBound).c_str()); if (doc->scanKey().startsWith(last)) return true; } @@ -416,9 +412,6 @@ ACTOR static Future compoundWouldBeLast(Reference doc break; } } - // fprintf(stderr, "CWBL last: %s\n", printable(last).c_str()); - // fprintf(stderr, "CWBL scankey: %s\n", printable(doc->scanKey()).c_str()); - // fprintf(stderr, "CWBL upperBound: %s\n", printable(indexUpperBound).c_str()); if (doc->scanKey().startsWith(last)) return true; } @@ -558,7 +551,6 @@ ACTOR static Future doPKScan(PlanCheckpoint* checkpoint, lastKey = Key(kv.key, kv.arena()); } } catch (Error& e) { - // fprintf(stderr, "doPKScan: %s %d\n", e.what(), checkpoint->splitBoundWanted()); if (e.code() == error_code_actor_cancelled) { if (checkpoint->splitBoundWanted()) { DataKey splitKey = DataKey::decode_bytes(lastKey); @@ -577,8 +569,8 @@ FutureStream> PrimaryKeyLookupPlan::execute(PlanC Reference bcx = cx->bindCollectionContext(tr); if (begin.present() && end.present() && begin.get() == end.get()) { PromiseStream> p; - checkpoint->addOperation(doSinglePKLookup(checkpoint, p, bcx, begin.get(), scanID), - p); // ??? Can we skip this overhead? + // ??? Can we skip this overhead? + checkpoint->addOperation(doSinglePKLookup(checkpoint, p, bcx, begin.get(), scanID), p); return p.getFuture(); } else { PromiseStream> p; @@ -590,7 +582,6 @@ FutureStream> PrimaryKeyLookupPlan::execute(PlanC Standalone endKey = std::max>( beginKey, std::min(end.present() ? strinc(end.get().encode_key_part()) : LiteralStringRef("\xff"), checkpoint->getBounds(scanID).end)); - // fprintf(stderr, "PK scan executing from %s to %s\n", printable(beginKey).c_str(), printable(endKey).c_str()); GenFutureStream kvs = bcx->cx->getDescendants(beginKey, endKey, descendantFlowControlLock); checkpoint->addOperation(doPKScan(checkpoint, bcx, scanID, kvs, p, descendantFlowControlLock), @@ -704,17 +695,14 @@ ACTOR static Future doNonIsolatedRO(PlanCheckpoint* outerCheckpoint, try { state uint64_t metadataVersion = wait(cx->bindCollectionContext(dtr)->getMetadataVersion()); loop { - // printf("Trying nonIsolatedRO with %d outputs and checkpoint '%s'-'%s'\n", nResults, - // printable(innerCheckpoint->getBounds(0).begin).c_str(), - // printable(innerCheckpoint->getBounds(0).end).c_str()); state FutureStream> docs = subPlan->execute(innerCheckpoint.getPtr(), dtr); state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock(); state bool first = true; state Future timeout = delay(3.0); loop choose { - when(state Reference doc = - waitNext(docs)) { // throws end_of_stream when totally finished + when(state Reference doc = waitNext(docs)) { + // throws end_of_stream when totally finished Void _ = wait(outerLock->take()); innerLock->release(); output.send(doc); @@ -723,7 +711,6 @@ ACTOR static Future doNonIsolatedRO(PlanCheckpoint* outerCheckpoint, timeout = delay(DOCLAYER_KNOBS->NONISOLATED_INTERNAL_TIMEOUT); first = false; } - // if (oCount == 3) timeout = delay(0); } when(Void _ = wait(timeout)) { break; } } @@ -785,9 +772,10 @@ ACTOR static Future doNonIsolatedRW(PlanCheckpoint* outerCheckpoint, loop { if (bufferedDocs.size() + committingDocs.size() >= DOCLAYER_KNOBS->NONISOLATED_RW_INTERNAL_BUFFER_MAX) - timeout = delay(0); // We do this instead of breaking so that when stopAndCheckpoint() gets - // called below, the actor for the plan immediately inside us is never - // on the call stack, so gets its actor_cancelled delivered immediately. + // We do this instead of breaking so that when stopAndCheckpoint() gets + // called below, the actor for the plan immediately inside us is never + // on the call stack, so gets its actor_cancelled delivered immediately. + timeout = delay(0); choose { when(state Reference doc = waitNext(docs)) { // throws end_of_stream when totally finished @@ -1045,10 +1033,10 @@ ACTOR static Future doFlushChanges(PlanCheckpoint* checkpoint, try { choose { when(Reference nextInput = waitNext(input)) { + // FIXME: this will be unsafe with unique indexes. Something has to happen here that doesn't + // kill performance. futures.push_back(std::pair, Future>( - nextInput, - nextInput->commitChanges())); // FIXME: this will be unsafe with unique indexes. Something - // has to happen here that doesn't kill performance. + nextInput, nextInput->commitChanges())); } when(Void _ = wait(futures.empty() ? Never() : futures.front().second)) { output.send(futures.front().first); @@ -1127,7 +1115,6 @@ ACTOR static Future doUpdate(PlanCheckpoint* checkpoint, throw end_of_stream(); } catch (Error& e) { - // printf("doUpdate: %s\n", e.what()); if (e.code() == error_code_actor_cancelled) { if (checkpoint->splitBoundWanted()) { for (int i = futures.size() - 1; i >= 0; i--) @@ -1172,9 +1159,6 @@ ACTOR static Future findAndModify(PlanCheckpoint* outerCheckpoint, try { state uint64_t metadataVersion = wait(cx->bindCollectionContext(dtr)->getMetadataVersion()); loop { - // printf("Trying nonIsolatedRO with %d outputs and checkpoint '%s'-'%s'\n", nResults, - // printable(innerCheckpoint->getBounds(0).begin).c_str(), - // printable(innerCheckpoint->getBounds(0).end).c_str()); state FutureStream> docs = subPlan->execute(innerCheckpoint.getPtr(), dtr); state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock(); state Future timeout = delay(1.0); @@ -1182,8 +1166,8 @@ ACTOR static Future findAndModify(PlanCheckpoint* outerCheckpoint, try { loop choose { - when(state Reference doc = - waitNext(docs)) { // throws end_of_stream when totally finished + when(state Reference doc = waitNext(docs)) { + // throws end_of_stream when totally finished firstDoc = doc; innerLock->release(); done = true; @@ -1441,10 +1425,9 @@ ACTOR static Future doInsert(PlanCheckpoint* checkpoint, Reference mm, Namespace ns, PromiseStream> output) { - // state int64_t& inserted = checkpoint->getIntState(0); <- This is broken for now. state Deque>> f; state FlowLock* flowControlLock = checkpoint->getDocumentFinishedLock(); - state int i = 0; // = inserted; + state int i = 0; try { state Reference ucx = wait(mm->getUnboundCollectionContext(tr, ns)); @@ -1459,7 +1442,6 @@ ACTOR static Future doInsert(PlanCheckpoint* checkpoint, when(Reference doc = wait(f.empty() ? Never() : f.front())) { output.send(ref(new ScanReturnedContext(doc, -1, Key()))); // Are these the right scanId etc? f.pop_front(); - // inserted++; } } } @@ -1467,7 +1449,6 @@ ACTOR static Future doInsert(PlanCheckpoint* checkpoint, for (; j < f.size(); j++) { Reference doc = wait(f[j]); output.send(ref(new ScanReturnedContext(doc, -1, Key()))); // Are these the right scanId etc? - // inserted++; } throw end_of_stream(); } catch (Error& e) { @@ -1503,11 +1484,9 @@ ACTOR static Future doSort(PlanCheckpoint* outerCheckpoint, loop { try { Reference doc = waitNext(docs); - returnProjections.push_back( - doc->toDataValue().get().getPackedObject().getOwned()); // Note that this call to get() is safe here but - // not in general, because we know that - // doc is wrapping a BsonContext, which means - // toDataValue() is synchronous. + // Note that this call to get() is safe here but not in general, because we know that doc is wrapping a + // BsonContext, which means toDataValue() is synchronous. + returnProjections.push_back(doc->toDataValue().get().getPackedObject().getOwned()); innerLock->release(); } catch (Error& e) { if (e.code() == error_code_end_of_stream) {