Skip to content

Commit

Permalink
bulk loading interface
Browse files Browse the repository at this point in the history
  • Loading branch information
kakaiu committed Apr 26, 2024
1 parent 9789c7f commit 42f624b
Show file tree
Hide file tree
Showing 10 changed files with 556 additions and 0 deletions.
18 changes: 18 additions & 0 deletions fdbclient/NativeAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,24 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
std::make_unique<FaultToleranceMetricsImpl>(
singleKeyRange("fault_tolerance_metrics_json"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METRICS).begin)));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::BULKLOADING,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<BulkLoadStatusImpl>(
KeyRangeRef("status/"_sr, "status0"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin)));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::BULKLOADING,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<BulkLoadTaskImpl>(
KeyRangeRef("task/"_sr, "task0"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin)));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::BULKLOADING,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<BulkLoadCancelImpl>(
KeyRangeRef("cancel/"_sr, "cancel0"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin)));
}

if (apiVersion.version() >= 700) {
Expand Down
255 changes: 255 additions & 0 deletions fdbclient/SpecialKeySpace.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
#include <unordered_set>

#include "fdbclient/ActorLineageProfiler.h"
#include "fdbclient/AuditUtils.actor.h"
#include "fdbclient/BulkLoading.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ProcessInterface.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/SystemData.h"
#include "flow/Arena.h"
#include "flow/UnitTest.h"
#include "fdbclient/ManagementAPI.actor.h"
Expand Down Expand Up @@ -74,6 +78,7 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
{ SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
KeyRangeRef("\xff\xff/actor_profiler_conf/"_sr, "\xff\xff/actor_profiler_conf0"_sr) },
{ SpecialKeySpace::MODULE::CLUSTERID, singleKeyRange("\xff\xff/cluster_id"_sr) },
{ SpecialKeySpace::MODULE::BULKLOADING, KeyRangeRef("\xff\xff/bulk_loading/"_sr, "\xff\xff/bulk_loading0"_sr) },
};

std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandToRange = {
Expand Down Expand Up @@ -2717,6 +2722,256 @@ Future<Optional<std::string>> DataDistributionImpl::commit(ReadYourWritesTransac
return msg;
}

bool existingBulkLoadUpdate(ReadYourWritesTransaction* ryw, KeyRange range) {
KeyRange rangeToCheck =
range.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin);
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(rangeToCheck);
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
if (iter->value().first && iter->value().second.present()) {
return true;
}
}
return false;
}

BulkLoadStatusImpl::BulkLoadStatusImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}

ACTOR static Future<RangeResult> BulkLoadStatusGetRangeActor(ReadYourWritesTransaction* ryw,
KeyRef prefix,
KeyRangeRef kr) {
state Key taskPrefixKey =
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("status/"_sr);
state KeyRange range = kr.removePrefix(taskPrefixKey);
if (!normalKeys.contains(range)) {
TraceEvent(SevWarn, "BulkLoadCheckTaskStatusError")
.detail("Reason", "Input range to check is out of normal range")
.detail("InputRange", range);
throw bulkload_check_status_input_error();
}
// Check if there are existing updates in the current transaction
if (existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("task/"_sr, "task0"_sr))) ||
existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("cancel/"_sr, "cancel0"_sr)))) {
TraceEvent(SevWarn, "BulkLoadCheckTaskStatusError")
.detail("Reason", "Exist bulk loading update in the same transaction")
.detail("InputRange", range);
throw bulkload_check_status_input_error();
}
RangeResult result = wait(krmGetRanges(&(ryw->getTransaction()), bulkLoadPrefix, range));
RangeResult res;
res.more = result.more;
res.readThrough = result.readThrough;
res.readToBegin = result.readToBegin;
res.readThroughEnd = result.readThroughEnd;
for (int i = 0; i < result.size(); ++i) {
Key keyToCopy = result[i].key.withPrefix(taskPrefixKey);
res.push_back_deep(res.arena(), KeyValueRef(keyToCopy, result[i].value));
}
return rywGetRange(ryw, kr, res);
}

Future<RangeResult> BulkLoadStatusImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
return BulkLoadStatusGetRangeActor(ryw, getKeyRange().begin, kr);
}

BulkLoadTaskImpl::BulkLoadTaskImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}

Future<RangeResult> BulkLoadTaskImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
throw not_implemented();
}

void BulkLoadTaskImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
throw not_implemented();
}

void BulkLoadTaskImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
throw not_implemented();
}

void BulkLoadTaskImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
Key taskPrefixKey =
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("task/"_sr);
BulkLoadState bulkLoadState = decodeBulkLoadState(value);
if (key != taskPrefixKey) {
TraceEvent(SevWarn, "BulkLoadSetTaskError")
.detail("Reason", "Input key error")
.detail("CorrectKey", taskPrefixKey)
.detail("InputKey", key)
.detail("InputState", bulkLoadState.toString());
throw bulkload_add_task_input_error();
}
ASSERT(bulkLoadState.isValid());
KeyRangeRef bulkLoadRange = bulkLoadState.range;
ASSERT(!bulkLoadRange.empty());
if (bulkLoadRange.begin >= normalKeys.end || bulkLoadRange.end >= normalKeys.end) {
TraceEvent(SevWarn, "BulkLoadSetTaskError")
.detail("Reason", "Input range is out of normal key space")
.detail("InputState", bulkLoadState.toString());
throw bulkload_add_task_input_error();
}
auto ranges = ryw->getSpecialKeySpaceWriteMap().intersectingRanges(bulkLoadRange.withPrefix(taskPrefixKey));
for (auto range : ranges) {
if (!range.value().first || !range.value().second.present()) {
continue;
}
BulkLoadState oldBulkLoadState = decodeBulkLoadState(range.value().second.get());
ASSERT(oldBulkLoadState.isValid());
TraceEvent(SevWarnAlways, "BulkLoadSetTaskError")
.detail("Reason", "Input task is trying to overwrite the existing task in the same transaction")
.detail("InputState", bulkLoadState.toString())
.detail("ExistState", oldBulkLoadState.toString());
throw bulkload_add_task_input_error();
}
ryw->getSpecialKeySpaceWriteMap().insert(bulkLoadRange.withPrefix(taskPrefixKey),
std::make_pair(true, bulkLoadStateValue(bulkLoadState)));
}

ACTOR static Future<Optional<std::string>> BulkLoadingTaskCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
// Check if there are existing bulk loading cancellation in the current transaction
if (existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("cancel/"_sr, "cancel0"_sr)))) {
TraceEvent(SevWarn, "BulkLoadTaskCommitError")
.detail("Reason", "Exist bulk loading cancel in the same transaction");
throw bulkload_check_status_input_error();
}

state KeyRange taskRange =
Standalone(KeyRangeRef("task/"_sr, "task0"_sr))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin);

// Validate current transaction bulk loading tasks
state std::vector<BulkLoadState> updateTasks;
state std::vector<KeyRange> updateRanges;
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(taskRange);
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
if (!iter->value().first || !iter->value().second.present()) {
continue;
}
BulkLoadState bulkLoadTask = decodeBulkLoadState(iter->value().second.get());
ASSERT(iter->range() == bulkLoadTask.range.withPrefix(taskRange.begin));
for (const auto& updateTask : updateTasks) {
if (updateTask.filePaths == bulkLoadTask.filePaths) {
TraceEvent(SevWarnAlways, "BulkLoadTaskCommitError")
.detail("Reason", "Different ranges are mapped to the same file path set");
throw bulkload_add_task_input_error();
}
}
updateTasks.push_back(bulkLoadTask);
updateRanges.push_back(bulkLoadTask.range);
}
updateRanges = coalesceRangeList(updateRanges);

// Conflict check between local change and global
state int i = 0;
state Key readBeginKey;
state Key readEndKey;
for (; i < updateRanges.size(); i++) {
readBeginKey = updateRanges[i].begin;
readEndKey = updateRanges[i].end;
while (readBeginKey < readEndKey) {
KeyRange rangeToRead = Standalone(KeyRangeRef(readBeginKey, readEndKey));
RangeResult result =
wait(krmGetRanges(&(ryw->getTransaction()), bulkLoadPrefix, rangeToRead.withPrefix(bulkLoadPrefix)));
for (int j = 0; j < result.size() - 1; j++) {
if (!result[j].value.empty()) {
KeyRange existRange = Standalone(KeyRangeRef(result[i].key, result[i + 1].key));
BulkLoadState existBulkLoadTask = decodeBulkLoadState(result[i].value);
ASSERT(existBulkLoadTask.isValid());
ASSERT(existBulkLoadTask.range ==
existRange.removePrefix(bulkLoadPrefix)); // check existing ones, unsafe, may remove
TraceEvent(SevWarnAlways, "BulkLoadTaskCommitError")
.detail("Reason", "New range conflicts to existing ones");
throw bulkload_add_task_input_error();
}
}
readBeginKey = result.back().key;
}
}

// Update to global
i = 0;
for (; i < updateTasks.size(); i++) {
wait(krmSetRange(
&(ryw->getTransaction()), bulkLoadPrefix, updateTasks[i].range, bulkLoadStateValue(updateTasks[i])));
TraceEvent("BulkLoadCommitEach").detail("Task", updateTasks[i].toString()).detail("KR", kr.toString());
}
return Optional<std::string>();
}

Future<Optional<std::string>> BulkLoadTaskImpl::commit(ReadYourWritesTransaction* ryw) {
return BulkLoadingTaskCommitActor(ryw, getKeyRange());
}

BulkLoadCancelImpl::BulkLoadCancelImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}

Future<RangeResult> BulkLoadCancelImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
throw not_implemented();
}

void BulkLoadCancelImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
Key cancelPrefixKey =
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("cancel/"_sr);
KeyRange rangeToCancel = range.removePrefix(cancelPrefixKey);
if (!normalKeys.contains(rangeToCancel)) {
TraceEvent(SevWarn, "BulkLoadCancelTaskError")
.detail("Reason", "Input range to check is out of normal range")
.detail("InputRange", rangeToCancel);
throw bulkload_cancel_task_input_error();
}
ryw->getSpecialKeySpaceWriteMap().insert(rangeToCancel.withPrefix(cancelPrefixKey),
std::make_pair(true, Optional<Value>()));
}

void BulkLoadCancelImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
throw not_implemented();
}

void BulkLoadCancelImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
throw not_implemented();
}

ACTOR static Future<Optional<std::string>> BulkLoadingCancelCommitActor(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) {
// Check if there are existing bulk loading new task in the current transaction
if (existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("task/"_sr, "task0"_sr)))) {
TraceEvent(SevWarn, "BulkLoadTaskCommitError")
.detail("Reason", "Exist bulk loading cancel in the same transaction");
throw bulkload_check_status_input_error();
}

state KeyRange cancelRange =
Standalone(KeyRangeRef("cancel/"_sr, "cancel0"_sr))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin);

state std::vector<KeyRange> cancelRanges;
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(cancelRange);
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
if (!iter->value().first) {
continue;
}
ASSERT(!iter->value().second.present());
cancelRanges.push_back(iter->range().removePrefix(cancelRange.begin));
}
cancelRanges = coalesceRangeList(cancelRanges);

// Update to global
state int i = 0;
for (; i < cancelRanges.size(); i++) {
wait(krmSetRange(&(ryw->getTransaction()), bulkLoadPrefix, cancelRanges[i], Value()));
TraceEvent("BulkLoadCancelCommitEach").detail("Range", cancelRanges[i].toString()).detail("KR", kr.toString());
}

return Optional<std::string>();
}

Future<Optional<std::string>> BulkLoadCancelImpl::commit(ReadYourWritesTransaction* ryw) {
return BulkLoadingCancelCommitActor(ryw, getKeyRange());
}

// Clears the special management api keys excludeLocality and failedLocality.
void includeLocalities(ReadYourWritesTransaction* ryw) {
ryw->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Expand Down
15 changes: 15 additions & 0 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,21 @@ const KeyRef moveKeysLockWriteKey = "\xff/moveKeysLock/Write"_sr;
const KeyRef dataDistributionModeKey = "\xff/dataDistributionMode"_sr;
const UID dataDistributionModeLock = UID(6345, 3425);

// Bulk loading keys
const KeyRef bulkLoadTriggerKey = "\xff/bulkLoadTrigger"_sr;
const KeyRef bulkLoadPrefix = "\xff/bulkLoad/"_sr;

const Value bulkLoadStateValue(const BulkLoadState& bulkLoadState) {
return ObjectWriter::toValue(bulkLoadState, IncludeVersion());
}

BulkLoadState decodeBulkLoadState(const ValueRef& value) {
BulkLoadState bulkLoadState;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(bulkLoadState);
return bulkLoadState;
}

// Keys to view and control tag throttling
const KeyRangeRef tagThrottleKeys = KeyRangeRef("\xff\x02/throttledTags/tag/"_sr, "\xff\x02/throttledTags/tag0"_sr);
const KeyRef tagThrottleKeysPrefix = tagThrottleKeys.begin;
Expand Down
69 changes: 69 additions & 0 deletions fdbclient/include/fdbclient/BulkLoading.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* BulkLoading.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FDBCLIENT_BULKLOADING_H
#define FDBCLIENT_BULKLOADING_H
#pragma once

#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"

enum class BulkLoadPhase : uint8_t {
Invalid = 0,
Running = 1,
Complete = 2,
Error = 3,
};

enum class BulkLoadType : uint8_t {
Invalid = 0,
SQLite = 1,
RocksDB = 2,
ShardedRocksDB = 3,
};

struct BulkLoadState {
constexpr static FileIdentifier file_identifier = 1384499;

BulkLoadState() = default;

BulkLoadState(BulkLoadType loadType, std::set<std::string> filePaths) : loadType(loadType), filePaths(filePaths) {}

BulkLoadState(KeyRange range, BulkLoadType loadType, std::set<std::string> filePaths)
: range(range), loadType(loadType), filePaths(filePaths) {}

bool isValid() const { return loadType != BulkLoadType::Invalid; }

std::string toString() const {
return "BulkLoadState: [Range]: " + Traceable<KeyRangeRef>::toString(range) +
", [Type]: " + std::to_string(static_cast<uint8_t>(loadType)) + ", [FilePath]: " + describe(filePaths);
}

template <class Ar>
void serialize(Ar& ar) {
serializer(ar, range, loadType, filePaths);
}

KeyRange range;
BulkLoadType loadType;
std::set<std::string> filePaths;
};

#endif

0 comments on commit 42f624b

Please sign in to comment.