Skip to content

Commit

Permalink
fmt and nits
Browse files Browse the repository at this point in the history
  • Loading branch information
kakaiu committed May 6, 2024
1 parent 54f9e73 commit 413590f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
2 changes: 1 addition & 1 deletion fdbclient/include/fdbclient/BulkLoading.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct BulkLoadState {

template <class Ar>
void serialize(Ar& ar) {
serializer(ar, range, loadType, filePaths, taskId);
serializer(ar, range, loadType, phase, filePaths, taskId);
}

KeyRange range;
Expand Down
44 changes: 26 additions & 18 deletions fdbserver/DataDistribution.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1053,9 +1053,9 @@ ACTOR Future<UID> startBulkLoadTask(Reference<DataDistributor> self, KeyRange ra
}
}

ACTOR Future<Void> waitOnBulkLoadInQueue(Reference<DataDistributor> self, UID taskId) {
ACTOR Future<Void> waitOnBulkLoadDequeue(Reference<DataDistributor> self, UID taskId) {
loop choose {
when (int phaseInQueue = waitNext(self->bulkLoadTasks[taskId].getFuture())) {
when(int phaseInQueue = waitNext(self->bulkLoadTasks[taskId].getFuture())) {
// ...
break; // under a certain condition
}
Expand All @@ -1065,7 +1065,7 @@ ACTOR Future<Void> waitOnBulkLoadInQueue(Reference<DataDistributor> self, UID ta

// Check progress of bulk loading via reading from bulk loading metadata
// Exit when the metadata indicates the task completes
ACTOR Future<Void> waitOnBulkLoad(Reference<DataDistributor> self, KeyRange range, UID taskId) {
ACTOR Future<Void> waitOnBulkLoadEnd(Reference<DataDistributor> self, KeyRange range, UID taskId) {
state double startTime = now();
state int retryTimes = 0;
state std::string exitReason;
Expand Down Expand Up @@ -1100,9 +1100,9 @@ ACTOR Future<Void> waitOnBulkLoad(Reference<DataDistributor> self, KeyRange rang
}
}
TraceEvent(SevInfo, "BulkLoadingTaskEnd")
.detail("Reason", exitReason)
.detail("RunningTime", now() - startTime)
.detail("RetryTimes", retryTimes);
.detail("Reason", exitReason)
.detail("RunningTime", now() - startTime)
.detail("RetryTimes", retryTimes);
return Void();
}

Expand All @@ -1112,12 +1112,15 @@ ACTOR Future<Void> doBulkLoadTask(Reference<DataDistributor> self, KeyRange rang
TraceEvent("DoBulkLoadTask").detail("Range", range);
try {
state UID taskId = wait(startBulkLoadTask(self, range));
wait(waitOnBulkLoadInQueue(self, taskId));
wait(waitOnBulkLoad(self, range, taskId));
TraceEvent("DoBulkLoadTaskStarted").detail("Range", range).detail("TaskId", taskId);
wait(waitOnBulkLoadDequeue(self, taskId));
TraceEvent("DoBulkLoadTaskDequeued").detail("Range", range).detail("TaskId", taskId);
wait(waitOnBulkLoadEnd(self, range, taskId));
TraceEvent("DoBulkLoadTaskFinished").detail("Range", range).detail("TaskId", taskId);
// At this point, bulk load task phase in metadata becomes complete
auto it = self->bulkLoadTasks.find(taskId);
/*auto it = self->bulkLoadTasks.find(taskId);
ASSERT(it != self->bulkLoadTasks.end());
self->bulkLoadTasks.erase(it); // Only place to erase the promise
self->bulkLoadTasks.erase(it); // Only place to erase the promise*/
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
Expand All @@ -1135,8 +1138,8 @@ ACTOR Future<Void> doBulkLoadTask(Reference<DataDistributor> self, KeyRange rang
// fileChecksum is unique to a bulk load task
void runBulkLoadTaskOnRange(Reference<DataDistributor> self, KeyRange range, bool retry) {
TraceEvent(SevInfo, "RunBulkLoadTask", self->ddId)
.detail("Range", range)
.detail("Context", retry ? "Retry" : "New");
.detail("Range", range)
.detail("Context", retry ? "Retry" : "New");
self->bulkLoadActors.add(doBulkLoadTask(self, range));
return;
}
Expand All @@ -1158,13 +1161,16 @@ ACTOR Future<Void> scheduleBulkLoadTasks(Reference<DataDistributor> self) {
BulkLoadState bulkLoadState = decodeBulkLoadState(result[i].value);
if (range != bulkLoadState.range) {
TraceEvent(SevWarn, "BulkLoadingScheduleFailed")
.detail("Reason", "Task boundary changed")
.detail("BulkLoadTask", bulkLoadState.toString())
.detail("RangeInSpace", range);
} else if (bulkLoadState.phase == BulkLoadPhase::Invalid || bulkLoadState.phase == BulkLoadPhase::Error) {
runBulkLoadTaskOnRange(self, bulkLoadState.range, /*retry=*/bulkLoadState.phase == BulkLoadPhase::Error);
.detail("Reason", "Task boundary changed")
.detail("BulkLoadTask", bulkLoadState.toString())
.detail("RangeInSpace", range);
} else if (bulkLoadState.phase == BulkLoadPhase::Invalid ||
bulkLoadState.phase == BulkLoadPhase::Error) {
runBulkLoadTaskOnRange(
self, bulkLoadState.range, /*retry=*/bulkLoadState.phase == BulkLoadPhase::Error);
} else {
ASSERT(bulkLoadState.phase == BulkLoadPhase::Running || bulkLoadState.phase == BulkLoadPhase::Complete);
ASSERT(bulkLoadState.phase == BulkLoadPhase::Running ||
bulkLoadState.phase == BulkLoadPhase::Complete);
}
}
}
Expand All @@ -1176,6 +1182,7 @@ ACTOR Future<Void> scheduleBulkLoadTasks(Reference<DataDistributor> self) {
wait(delay(10.0));
}
}
TraceEvent(SevInfo, "ScheduleBulkLoadTasksEnd", self->ddId);
return Void();
}

Expand All @@ -1187,6 +1194,7 @@ ACTOR Future<Void> bulkLoadingCore(Reference<DataDistributor> self) {
TraceEvent("BulkLoadingCore").detail("Status", "Mode On");
self->bulkLoadActors.add(scheduleBulkLoadTasks(self));
wait(self->bulkLoadActors.getResult());
TraceEvent("BulkLoadingCore").detail("Status", "Round complete");
} catch (Error& e) {
TraceEvent("BulkLoadingCore").detail("Status", "Error").errorUnsuppressed(e);
wait(delay(5.0));
Expand Down

0 comments on commit 413590f

Please sign in to comment.