Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 127 additions & 1 deletion cloud/src/meta-store/txn_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,94 @@ TxnErrorCode Transaction::get_conflicting_range(
return TxnErrorCode::TXN_OK;
}

TxnErrorCode Transaction::get_read_conflict_range(
std::vector<std::pair<std::string, std::string>>* values) {
constexpr std::string_view start = "\xff\xff/transaction/read_conflict_range/";
constexpr std::string_view end = "\xff\xff/transaction/read_conflict_range/\xff";

int limit = 0;
int target_bytes = 0;
FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL;
int iteration = 0;
fdb_bool_t snapshot = 0;
fdb_bool_t reverse = 0;
FDBFuture* future = fdb_transaction_get_range(
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()),
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit,
target_bytes, mode, iteration, snapshot, reverse);

DORIS_CLOUD_DEFER {
fdb_future_destroy(future);
};

RETURN_IF_ERROR(await_future(future));

FDBKeyValue const* out_kvs;
int out_kvs_count;
fdb_bool_t out_more;
do {
fdb_error_t err =
fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more);
if (err) {
LOG(WARNING) << "get_conflicting_range get keyvalue array error: "
<< fdb_get_error(err);
return cast_as_txn_code(err);
}
for (int i = 0; i < out_kvs_count; i++) {
std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length);
std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length);
key.remove_prefix(start.size());
values->emplace_back(key, value);
}
} while (out_more);

return TxnErrorCode::TXN_OK;
}

TxnErrorCode Transaction::get_write_conflict_range(
std::vector<std::pair<std::string, std::string>>* values) {
constexpr std::string_view start = "\xff\xff/transaction/write_conflict_range/";
constexpr std::string_view end = "\xff\xff/transaction/write_conflict_range/\xff";

int limit = 0;
int target_bytes = 0;
FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL;
int iteration = 0;
fdb_bool_t snapshot = 0;
fdb_bool_t reverse = 0;
FDBFuture* future = fdb_transaction_get_range(
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()),
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit,
target_bytes, mode, iteration, snapshot, reverse);

DORIS_CLOUD_DEFER {
fdb_future_destroy(future);
};

RETURN_IF_ERROR(await_future(future));

FDBKeyValue const* out_kvs;
int out_kvs_count;
fdb_bool_t out_more;
do {
fdb_error_t err =
fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more);
if (err) {
LOG(WARNING) << "get_conflicting_range get keyvalue array error: "
<< fdb_get_error(err);
return cast_as_txn_code(err);
}
for (int i = 0; i < out_kvs_count; i++) {
std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length);
std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length);
key.remove_prefix(start.size());
values->emplace_back(key, value);
}
} while (out_more);

return TxnErrorCode::TXN_OK;
}

TxnErrorCode Transaction::report_conflicting_range() {
if (!config::enable_logging_conflict_keys) {
return TxnErrorCode::TXN_OK;
Expand All @@ -969,7 +1057,45 @@ TxnErrorCode Transaction::report_conflicting_range() {
out += fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
}

LOG(WARNING) << "conflicting key ranges: " << out;
key_values.clear();
RETURN_IF_ERROR(get_read_conflict_range(&key_values));
if (key_values.size() % 2 != 0) {
LOG(WARNING) << "the read conflict range is not well-formed, size=" << key_values.size();
return TxnErrorCode::TXN_INVALID_DATA;
}
std::string read_conflict_range_out;
for (size_t i = 0; i < key_values.size(); i += 2) {
std::string_view start = key_values[i].first;
std::string_view end = key_values[i + 1].first;
std::string_view conflict_count = key_values[i].second;
if (!read_conflict_range_out.empty()) {
read_conflict_range_out += ", ";
}
read_conflict_range_out +=
fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
}

key_values.clear();
RETURN_IF_ERROR(get_write_conflict_range(&key_values));
if (key_values.size() % 2 != 0) {
LOG(WARNING) << "the write conflict range is not well-formed, size=" << key_values.size();
return TxnErrorCode::TXN_INVALID_DATA;
}
std::string write_conflict_range_out;
for (size_t i = 0; i < key_values.size(); i += 2) {
std::string_view start = key_values[i].first;
std::string_view end = key_values[i + 1].first;
std::string_view conflict_count = key_values[i].second;
if (!write_conflict_range_out.empty()) {
write_conflict_range_out += ", ";
}
write_conflict_range_out +=
fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
}

LOG(WARNING) << "conflicting key ranges: " << out
<< ", read conflict range: " << read_conflict_range_out
<< ", write conflict range: " << write_conflict_range_out;

return TxnErrorCode::TXN_OK;
}
Expand Down
4 changes: 4 additions & 0 deletions cloud/src/meta-store/txn_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,10 @@ class Transaction : public cloud::Transaction {
// It only works when the report_conflicting_ranges option is enabled.
TxnErrorCode get_conflicting_range(
std::vector<std::pair<std::string, std::string>>* key_values);
TxnErrorCode get_read_conflict_range(
std::vector<std::pair<std::string, std::string>>* key_values);
TxnErrorCode get_write_conflict_range(
std::vector<std::pair<std::string, std::string>>* key_values);
TxnErrorCode report_conflicting_range();

std::shared_ptr<Database> db_ {nullptr};
Expand Down
Loading