Skip to content

Commit

Permalink
bulk loading interface
Browse files Browse the repository at this point in the history
  • Loading branch information
kakaiu committed Apr 26, 2024
1 parent 9789c7f commit 192b0b1
Show file tree
Hide file tree
Showing 10 changed files with 554 additions and 4 deletions.
20 changes: 16 additions & 4 deletions fdbclient/NativeAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1603,11 +1603,23 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection

if (apiVersion.version() >= 740) {
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::METRICS,
SpecialKeySpace::MODULE::BULKLOADING,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<FaultToleranceMetricsImpl>(
singleKeyRange("fault_tolerance_metrics_json"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METRICS).begin)));
std::make_unique<BulkLoadStatusImpl>(
KeyRangeRef("status/"_sr, "status0"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin)));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::BULKLOADING,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<BulkLoadTaskImpl>(
KeyRangeRef("task/"_sr, "task0"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin)));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::BULKLOADING,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<BulkLoadCancelImpl>(
KeyRangeRef("cancel/"_sr, "cancel0"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin)));
}

if (apiVersion.version() >= 700) {
Expand Down
255 changes: 255 additions & 0 deletions fdbclient/SpecialKeySpace.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
#include <unordered_set>

#include "fdbclient/ActorLineageProfiler.h"
#include "fdbclient/AuditUtils.actor.h"
#include "fdbclient/BulkLoading.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ProcessInterface.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/SystemData.h"
#include "flow/Arena.h"
#include "flow/UnitTest.h"
#include "fdbclient/ManagementAPI.actor.h"
Expand Down Expand Up @@ -74,6 +78,7 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
{ SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
KeyRangeRef("\xff\xff/actor_profiler_conf/"_sr, "\xff\xff/actor_profiler_conf0"_sr) },
{ SpecialKeySpace::MODULE::CLUSTERID, singleKeyRange("\xff\xff/cluster_id"_sr) },
{ SpecialKeySpace::MODULE::BULKLOADING, KeyRangeRef("\xff\xff/bulk_loading/"_sr, "\xff\xff/bulk_loading0"_sr) },
};

std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandToRange = {
Expand Down Expand Up @@ -2717,6 +2722,256 @@ Future<Optional<std::string>> DataDistributionImpl::commit(ReadYourWritesTransac
return msg;
}

bool existingBulkLoadUpdate(ReadYourWritesTransaction* ryw, KeyRange range) {
KeyRange rangeToCheck =
range.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin);
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(rangeToCheck);
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
if (iter->value().first && iter->value().second.present()) {
return true;
}
}
return false;
}

BulkLoadStatusImpl::BulkLoadStatusImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}

ACTOR static Future<RangeResult> BulkLoadStatusGetRangeActor(ReadYourWritesTransaction* ryw,
KeyRef prefix,
KeyRangeRef kr) {
state Key taskPrefixKey =
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("status/"_sr);
state KeyRange range = kr.removePrefix(taskPrefixKey);
if (!normalKeys.contains(range)) {
TraceEvent(SevWarn, "BulkLoadCheckTaskStatusError")
.detail("Reason", "Input range to check is out of normal range")
.detail("InputRange", range);
throw bulkload_check_status_input_error();
}
// Check if there are existing updates in the current transaction
if (existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("task/"_sr, "task0"_sr))) ||
existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("cancel/"_sr, "cancel0"_sr)))) {
TraceEvent(SevWarn, "BulkLoadCheckTaskStatusError")
.detail("Reason", "Exist bulk loading update in the same transaction")
.detail("InputRange", range);
throw bulkload_check_status_input_error();
}
RangeResult result = wait(krmGetRanges(&(ryw->getTransaction()), bulkLoadPrefix, range));
RangeResult res;
res.more = result.more;
res.readThrough = result.readThrough;
res.readToBegin = result.readToBegin;
res.readThroughEnd = result.readThroughEnd;
for (int i = 0; i < result.size(); ++i) {
Key keyToCopy = result[i].key.withPrefix(taskPrefixKey);
res.push_back_deep(res.arena(), KeyValueRef(keyToCopy, result[i].value));
}
return rywGetRange(ryw, kr, res);
}

Future<RangeResult> BulkLoadStatusImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
return BulkLoadStatusGetRangeActor(ryw, getKeyRange().begin, kr);
}

BulkLoadTaskImpl::BulkLoadTaskImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}

Future<RangeResult> BulkLoadTaskImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
throw not_implemented();
}

void BulkLoadTaskImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
throw not_implemented();
}

void BulkLoadTaskImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
throw not_implemented();
}

void BulkLoadTaskImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
Key taskPrefixKey =
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("task/"_sr);
BulkLoadState bulkLoadState = decodeBulkLoadState(value);
if (key != taskPrefixKey) {
TraceEvent(SevWarn, "BulkLoadSetTaskError")
.detail("Reason", "Input key error")
.detail("CorrectKey", taskPrefixKey)
.detail("InputKey", key)
.detail("InputState", bulkLoadState.toString());
throw bulkload_add_task_input_error();
}
ASSERT(bulkLoadState.isValid());
KeyRangeRef bulkLoadRange = bulkLoadState.range;
ASSERT(!bulkLoadRange.empty());
if (bulkLoadRange.begin >= normalKeys.end || bulkLoadRange.end >= normalKeys.end) {
TraceEvent(SevWarn, "BulkLoadSetTaskError")
.detail("Reason", "Input range is out of normal key space")
.detail("InputState", bulkLoadState.toString());
throw bulkload_add_task_input_error();
}
auto ranges = ryw->getSpecialKeySpaceWriteMap().intersectingRanges(bulkLoadRange.withPrefix(taskPrefixKey));
for (auto range : ranges) {
if (!range.value().first || !range.value().second.present()) {
continue;
}
BulkLoadState oldBulkLoadState = decodeBulkLoadState(range.value().second.get());
ASSERT(oldBulkLoadState.isValid());
TraceEvent(SevWarnAlways, "BulkLoadSetTaskError")
.detail("Reason", "Input task is trying to overwrite the existing enforced task")
.detail("InputState", bulkLoadState.toString())
.detail("ExistState", oldBulkLoadState.toString());
throw bulkload_add_task_input_error();
}
ryw->getSpecialKeySpaceWriteMap().insert(bulkLoadRange.withPrefix(taskPrefixKey),
std::make_pair(true, bulkLoadStateValue(bulkLoadState)));
}

ACTOR static Future<Optional<std::string>> BulkLoadingTaskCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
// Check if there are existing bulk loading cancellation in the current transaction
if (existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("cancel/"_sr, "cancel0"_sr)))) {
TraceEvent(SevWarn, "BulkLoadTaskCommitError")
.detail("Reason", "Exist bulk loading cancel in the same transaction");
throw bulkload_check_status_input_error();
}

state KeyRange taskRange =
Standalone(KeyRangeRef("task/"_sr, "task0"_sr))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin);

// Validate current transaction bulk loading tasks
state std::vector<BulkLoadState> updateTasks;
state std::vector<KeyRange> updateRanges;
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(taskRange);
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
if (!iter->value().first || !iter->value().second.present()) {
continue;
}
BulkLoadState bulkLoadTask = decodeBulkLoadState(iter->value().second.get());
ASSERT(iter->range() == bulkLoadTask.range.withPrefix(taskRange.begin));
for (const auto& updateTask : updateTasks) {
if (updateTask.filePaths == bulkLoadTask.filePaths) {
TraceEvent(SevWarnAlways, "BulkLoadTaskCommitError")
.detail("Reason", "Different ranges are mapped to the same file path set");
throw bulkload_add_task_input_error();
}
}
updateTasks.push_back(bulkLoadTask);
updateRanges.push_back(bulkLoadTask.range);
}
updateRanges = coalesceRangeList(updateRanges);

// Conflict check between local change and global
state int i = 0;
state Key readBeginKey;
state Key readEndKey;
for (; i < updateRanges.size(); i++) {
readBeginKey = updateRanges[i].begin;
readEndKey = updateRanges[i].end;
while (readBeginKey < readEndKey) {
KeyRange rangeToRead = Standalone(KeyRangeRef(readBeginKey, readEndKey));
RangeResult result =
wait(krmGetRanges(&(ryw->getTransaction()), bulkLoadPrefix, rangeToRead.withPrefix(bulkLoadPrefix)));
for (int j = 0; j < result.size() - 1; j++) {
if (!result[j].value.empty()) {
KeyRange existRange = Standalone(KeyRangeRef(result[i].key, result[i + 1].key));
BulkLoadState existBulkLoadTask = decodeBulkLoadState(result[i].value);
ASSERT(existBulkLoadTask.isValid());
ASSERT(existBulkLoadTask.range ==
existRange.removePrefix(bulkLoadPrefix)); // check existing ones, unsafe, may remove
TraceEvent(SevWarnAlways, "BulkLoadTaskCommitError")
.detail("Reason", "New range conflicts to existing ones");
throw bulkload_add_task_input_error();
}
}
readBeginKey = result.back().key;
}
}

// Update to global
i = 0;
for (; i < updateTasks.size(); i++) {
wait(krmSetRange(
&(ryw->getTransaction()), bulkLoadPrefix, updateTasks[i].range, bulkLoadStateValue(updateTasks[i])));
TraceEvent("BulkLoadCommitEach").detail("Task", updateTasks[i].toString()).detail("KR", kr.toString());
}
return Optional<std::string>();
}

Future<Optional<std::string>> BulkLoadTaskImpl::commit(ReadYourWritesTransaction* ryw) {
return BulkLoadingTaskCommitActor(ryw, getKeyRange());
}

BulkLoadCancelImpl::BulkLoadCancelImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}

Future<RangeResult> BulkLoadCancelImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
throw not_implemented();
}

void BulkLoadCancelImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
Key cancelPrefixKey =
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("cancel/"_sr);
KeyRange rangeToCancel = range.removePrefix(cancelPrefixKey);
if (!normalKeys.contains(rangeToCancel)) {
TraceEvent(SevWarn, "BulkLoadCancelTaskError")
.detail("Reason", "Input range to check is out of normal range")
.detail("InputRange", rangeToCancel);
throw bulkload_cancel_task_input_error();
}
ryw->getSpecialKeySpaceWriteMap().insert(rangeToCancel.withPrefix(cancelPrefixKey),
std::make_pair(true, Optional<Value>()));
}

void BulkLoadCancelImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
throw not_implemented();
}

void BulkLoadCancelImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
throw not_implemented();
}

ACTOR static Future<Optional<std::string>> BulkLoadingCancelCommitActor(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) {
// Check if there are existing bulk loading new task in the current transaction
if (existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("task/"_sr, "task0"_sr)))) {
TraceEvent(SevWarn, "BulkLoadTaskCommitError")
.detail("Reason", "Exist bulk loading cancel in the same transaction");
throw bulkload_check_status_input_error();
}

state KeyRange cancelRange =
Standalone(KeyRangeRef("cancel/"_sr, "cancel0"_sr))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin);

state std::vector<KeyRange> cancelRanges;
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(cancelRange);
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
if (!iter->value().first) {
continue;
}
ASSERT(!iter->value().second.present());
cancelRanges.push_back(iter->range().removePrefix(cancelRange.begin));
}
cancelRanges = coalesceRangeList(cancelRanges);

// Update to global
state int i = 0;
for (; i < cancelRanges.size(); i++) {
wait(krmSetRange(&(ryw->getTransaction()), bulkLoadPrefix, cancelRanges[i], Value()));
TraceEvent("BulkLoadCancelCommitEach").detail("Range", cancelRanges[i].toString()).detail("KR", kr.toString());
}

return Optional<std::string>();
}

Future<Optional<std::string>> BulkLoadCancelImpl::commit(ReadYourWritesTransaction* ryw) {
return BulkLoadingCancelCommitActor(ryw, getKeyRange());
}

// Clears the special management api keys excludeLocality and failedLocality.
void includeLocalities(ReadYourWritesTransaction* ryw) {
ryw->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Expand Down
15 changes: 15 additions & 0 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,21 @@ const KeyRef moveKeysLockWriteKey = "\xff/moveKeysLock/Write"_sr;
const KeyRef dataDistributionModeKey = "\xff/dataDistributionMode"_sr;
const UID dataDistributionModeLock = UID(6345, 3425);

// Bulk loading keys
const KeyRef bulkLoadTriggerKey = "\xff/bulkLoadTrigger"_sr;
const KeyRef bulkLoadPrefix = "\xff/bulkLoad/"_sr;

const Value bulkLoadStateValue(const BulkLoadState& bulkLoadState) {
return ObjectWriter::toValue(bulkLoadState, IncludeVersion());
}

BulkLoadState decodeBulkLoadState(const ValueRef& value) {
BulkLoadState bulkLoadState;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(bulkLoadState);
return bulkLoadState;
}

// Keys to view and control tag throttling
const KeyRangeRef tagThrottleKeys = KeyRangeRef("\xff\x02/throttledTags/tag/"_sr, "\xff\x02/throttledTags/tag0"_sr);
const KeyRef tagThrottleKeysPrefix = tagThrottleKeys.begin;
Expand Down

0 comments on commit 192b0b1

Please sign in to comment.