Skip to content

Commit

Permalink
OCDBT: multi-key atomic transaction support (non-distributed)
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 635977382
Change-Id: I993b6f012dba143485abff55e40a88fcd7759d8f
  • Loading branch information
jbms authored and Copybara-Service committed May 22, 2024
1 parent e62858b commit d77c943
Show file tree
Hide file tree
Showing 20 changed files with 2,331 additions and 1,411 deletions.
23 changes: 11 additions & 12 deletions tensorstore/kvstore/memory/memory_key_value_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,12 @@ class MemoryDriver::TransactionNode
absl::Time commit_time = absl::Now();
if (!ValidateEntryConditions(data, single_phase_mutation, commit_time)) {
lock.unlock();
internal_kvstore::RetryAtomicWriteback(single_phase_mutation,
commit_time);
this->RetryAtomicWriteback(commit_time);
return;
}
ApplyMutation(data, single_phase_mutation, commit_time);
lock.unlock();
internal_kvstore::AtomicCommitWritebackSuccess(single_phase_mutation);
this->AtomicCommitWritebackSuccess();
} else {
internal_kvstore::WritebackError(single_phase_mutation);
}
Expand Down Expand Up @@ -321,7 +320,7 @@ class MemoryDriver::TransactionNode
BufferedReadModifyWriteEntry& entry,
const absl::Time& commit_time)
ABSL_SHARED_LOCKS_REQUIRED(data.mutex) {
auto& stamp = entry.read_result_.stamp;
auto& stamp = entry.stamp();
auto if_equal = StorageGeneration::Clean(stamp.generation);
if (StorageGeneration::IsUnknown(if_equal)) {
assert(stamp.time == absl::InfiniteFuture());
Expand All @@ -330,11 +329,11 @@ class MemoryDriver::TransactionNode
auto it = data.values.find(entry.key_);
if (it == data.values.end()) {
if (StorageGeneration::IsNoValue(if_equal)) {
entry.read_result_.stamp.time = commit_time;
stamp.time = commit_time;
return true;
}
} else if (if_equal == it->second.generation()) {
entry.read_result_.stamp.time = commit_time;
stamp.time = commit_time;
return true;
}
return false;
Expand All @@ -351,19 +350,19 @@ class MemoryDriver::TransactionNode
for (auto& entry : single_phase_mutation.entries_) {
if (entry.entry_type() == kReadModifyWrite) {
auto& rmw_entry = static_cast<BufferedReadModifyWriteEntry&>(entry);
auto& stamp = rmw_entry.read_result_.stamp;
auto& stamp = rmw_entry.stamp();
stamp.time = commit_time;
if (!StorageGeneration::IsDirty(
rmw_entry.read_result_.stamp.generation)) {
auto value_state = rmw_entry.value_state_;
if (!StorageGeneration::IsDirty(stamp.generation)) {
// Do nothing
} else if (rmw_entry.read_result_.state == ReadResult::kMissing) {
} else if (value_state == ReadResult::kMissing) {
data.values.erase(rmw_entry.key_);
stamp.generation = StorageGeneration::NoValue();
} else {
assert(rmw_entry.read_result_.state == ReadResult::kValue);
assert(value_state == ReadResult::kValue);
auto& v = data.values[rmw_entry.key_];
v.generation_number = data.next_generation_number++;
v.value = std::move(rmw_entry.read_result_.value);
v.value = std::move(rmw_entry.value_);
stamp.generation = v.generation();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,14 +649,15 @@ class ShardedKeyValueStoreWriteCache
}

void Writeback(internal_kvstore::ReadModifyWriteEntry& entry,
internal_kvstore::ReadModifyWriteEntry& source_entry,
kvstore::ReadResult&& read_result) override {
auto& value = read_result.value;
if (read_result.state == kvstore::ReadResult::kValue) {
value = EncodeData(value,
GetOwningCache(*this).sharding_spec().data_encoding);
}
internal_kvstore::AtomicMultiPhaseMutation::Writeback(
entry, std::move(read_result));
entry, entry, std::move(read_result));
}

ApplyReceiver apply_receiver_;
Expand Down Expand Up @@ -720,7 +721,7 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::WritebackError() {

namespace {
void StartApply(ShardedKeyValueStoreWriteCache::TransactionNode& node) {
RetryAtomicWriteback(node.phases_, node.apply_options_.staleness_bound);
node.RetryAtomicWriteback(node.apply_options_.staleness_bound);
}

/// Attempts to compute the new encoded shard state that merges any mutations
Expand Down Expand Up @@ -766,20 +767,17 @@ void MergeForWriteback(ShardedKeyValueStoreWriteCache::TransactionNode& node,
auto& buffered_entry =
static_cast<internal_kvstore::AtomicMultiPhaseMutation::
BufferedReadModifyWriteEntry&>(entry);
if (StorageGeneration::IsConditional(
buffered_entry.read_result_.stamp.generation) &&
StorageGeneration::Clean(
buffered_entry.read_result_.stamp.generation) !=
auto& entry_stamp = buffered_entry.stamp();
if (StorageGeneration::IsConditional(entry_stamp.generation) &&
StorageGeneration::Clean(entry_stamp.generation) !=
StorageGeneration::Clean(stamp.generation)) {
// This mutation is conditional, and is inconsistent with a prior
// conditional mutation or with `existing_chunks`.
mismatch = true;
break;
}
if (buffered_entry.read_result_.state ==
kvstore::ReadResult::kUnspecified ||
!StorageGeneration::IsInnerLayerDirty(
buffered_entry.read_result_.stamp.generation)) {
if (buffered_entry.value_state_ == kvstore::ReadResult::kUnspecified ||
!StorageGeneration::IsInnerLayerDirty(entry_stamp.generation)) {
// This is a no-op mutation; ignore it, which has the effect of retaining
// the existing chunk with this id, if present in `existing_chunks`.
continue;
Expand All @@ -804,10 +802,10 @@ void MergeForWriteback(ShardedKeyValueStoreWriteCache::TransactionNode& node,
break;
}
}
if (buffered_entry.read_result_.state == kvstore::ReadResult::kValue) {
if (buffered_entry.value_state_ == kvstore::ReadResult::kValue) {
// The mutation specifies a new value (rather than a deletion).
chunks.push_back(EncodedChunk{minishard_and_chunk_id,
buffered_entry.read_result_.value});
chunks.push_back(
EncodedChunk{minishard_and_chunk_id, buffered_entry.value_});
changed = true;
}
}
Expand Down Expand Up @@ -871,12 +869,11 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::AllEntriesDone(
auto& buffered_entry =
static_cast<AtomicMultiPhaseMutation::BufferedReadModifyWriteEntry&>(
entry);
if (buffered_entry.read_result_.state !=
kvstore::ReadResult::kUnspecified) {
if (buffered_entry.value_state_ != kvstore::ReadResult::kUnspecified) {
modified = true;
++num_chunks;
}
auto& entry_stamp = buffered_entry.read_result_.stamp;
auto& entry_stamp = buffered_entry.stamp();
if (StorageGeneration::IsConditional(entry_stamp.generation)) {
if (!StorageGeneration::IsUnknown(stamp.generation) &&
StorageGeneration::Clean(stamp.generation) !=
Expand Down
2 changes: 2 additions & 0 deletions tensorstore/kvstore/ocdbt/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ tensorstore_cc_library(
"//tensorstore/kvstore/ocdbt/non_distributed:btree_writer",
"//tensorstore/kvstore/ocdbt/non_distributed:list",
"//tensorstore/kvstore/ocdbt/non_distributed:read",
"//tensorstore/kvstore/ocdbt/non_distributed:transactional_btree_writer",
"//tensorstore/serialization",
"//tensorstore/serialization:absl_time",
"//tensorstore/util:executor",
Expand Down Expand Up @@ -166,6 +167,7 @@ tensorstore_cc_test(
"//tensorstore:transaction",
"//tensorstore/internal:global_initializer",
"//tensorstore/internal:json_gtest",
"//tensorstore/internal/cache:kvs_backed_cache_testutil",
"//tensorstore/internal/testing:dynamic",
"//tensorstore/internal/testing:scoped_directory",
"//tensorstore/kvstore",
Expand Down
37 changes: 32 additions & 5 deletions tensorstore/kvstore/ocdbt/driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "tensorstore/kvstore/ocdbt/non_distributed/btree_writer.h"
#include "tensorstore/kvstore/ocdbt/non_distributed/list.h"
#include "tensorstore/kvstore/ocdbt/non_distributed/read.h"
#include "tensorstore/kvstore/ocdbt/non_distributed/transactional_btree_writer.h"
#include "tensorstore/kvstore/operations.h"
#include "tensorstore/kvstore/read_result.h"
#include "tensorstore/kvstore/registry.h"
Expand Down Expand Up @@ -316,8 +317,8 @@ Future<const void> OcdbtDriver::ExperimentalCopyRangeFrom(
std::string target_prefix, kvstore::CopyRangeOptions options) {
if (typeid(*source.driver) == typeid(OcdbtDriver)) {
auto& source_driver = static_cast<OcdbtDriver&>(*source.driver);
if (source.transaction != no_transaction || transaction) {
return absl::UnimplementedError("Transactions not supported");
if (source.transaction != no_transaction) {
return absl::UnimplementedError("Source transactions not supported");
}
if (source_driver.base_.driver == base_.driver &&
absl::StartsWith(source_driver.base_.path, base_.path)) {
Expand All @@ -331,7 +332,8 @@ Future<const void> OcdbtDriver::ExperimentalCopyRangeFrom(
source_driver.base_.path.substr(base_.path.size()),
source_range =
KeyRange::AddPrefix(source.path, options.source_range),
source_prefix_length = source.path.size()](
source_prefix_length = source.path.size(),
transaction = std::move(transaction)](
Promise<void> promise,
ReadyFuture<const ManifestWithTime> future) mutable {
auto& manifest_with_time = future.value();
Expand Down Expand Up @@ -364,8 +366,12 @@ Future<const void> OcdbtDriver::ExperimentalCopyRangeFrom(
copy_node_options.range = std::move(source_range);
copy_node_options.strip_prefix_length = source_prefix_length;
copy_node_options.add_prefix = std::move(target_prefix);
LinkResult(std::move(promise), self->btree_writer_->CopySubtree(
std::move(copy_node_options)));
LinkResult(std::move(promise),
transaction ? internal_ocdbt::AddCopySubtree(
&*self, *self->io_handle_, transaction,
std::move(copy_node_options))
: self->btree_writer_->CopySubtree(
std::move(copy_node_options)));
},
std::move(promise), std::move(manifest_future));
return std::move(future);
Expand All @@ -386,6 +392,27 @@ Result<KvStore> OcdbtDriver::GetBase(std::string_view path,
return base_;
}

absl::Status OcdbtDriver::ReadModifyWrite(
internal::OpenTransactionPtr& transaction, size_t& phase, Key key,
ReadModifyWriteSource& source) {
if (!transaction || !transaction->atomic() || coordinator_->address) {
return kvstore::Driver::ReadModifyWrite(transaction, phase, std::move(key),
source);
}
return internal_ocdbt::AddReadModifyWrite(this, *io_handle_, transaction,
phase, std::move(key), source);
}

absl::Status OcdbtDriver::TransactionalDeleteRange(
const internal::OpenTransactionPtr& transaction, KeyRange range) {
if (!transaction->atomic() || coordinator_->address) {
return kvstore::Driver::TransactionalDeleteRange(transaction,
std::move(range));
}
return internal_ocdbt::AddDeleteRange(this, *io_handle_, transaction,
std::move(range));
}

} // namespace internal_ocdbt
} // namespace tensorstore

Expand Down
8 changes: 7 additions & 1 deletion tensorstore/kvstore/ocdbt/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "tensorstore/context.h"
#include "tensorstore/context_resource_provider.h"
#include "tensorstore/internal/cache/cache_pool_resource.h"
#include "tensorstore/internal/cache_key/cache_key.h"
#include "tensorstore/internal/concurrency_resource.h"
#include "tensorstore/internal/data_copy_concurrency_resource.h"
#include "tensorstore/internal/json_binding/bindable.h"
Expand Down Expand Up @@ -144,6 +143,13 @@ class OcdbtDriver
Result<KvStore> GetBase(std::string_view path,
const Transaction& transaction) const override;

absl::Status ReadModifyWrite(internal::OpenTransactionPtr& transaction,
size_t& phase, Key key,
ReadModifyWriteSource& source) override;

absl::Status TransactionalDeleteRange(
const internal::OpenTransactionPtr& transaction, KeyRange range) override;

const Executor& executor() { return data_copy_concurrency_->executor; }

IoHandle::Ptr io_handle_;
Expand Down
52 changes: 52 additions & 0 deletions tensorstore/kvstore/ocdbt/driver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <nlohmann/json_fwd.hpp>
#include <nlohmann/json.hpp>
#include "tensorstore/context.h"
#include "tensorstore/internal/cache/kvs_backed_cache_testutil.h"
#include "tensorstore/internal/global_initializer.h"
#include "tensorstore/internal/json_gtest.h"
#include "tensorstore/internal/testing/dynamic.h"
Expand Down Expand Up @@ -57,6 +58,7 @@ namespace kvstore = ::tensorstore::kvstore;
using ::tensorstore::Context;
using ::tensorstore::KeyRange;
using ::tensorstore::internal::GetMap;
using ::tensorstore::internal::MatchesKvsReadResult;
using ::tensorstore::internal::MatchesKvsReadResultNotFound;
using ::tensorstore::internal::MatchesListEntry;
using ::tensorstore::internal::MockKeyValueStore;
Expand Down Expand Up @@ -518,4 +520,54 @@ TEST(OcdbtTest, CopyRange) {
})));
}

TEST(OcdbtTest, TransactionalCopyRange) {
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store, kvstore::Open({
{"driver", "ocdbt"},
{"base", "memory://"},
{"config", {{"max_inline_value_bytes", 0}}},
})
.result());
TENSORSTORE_ASSERT_OK(kvstore::Write(store, "x/a", absl::Cord("value_a")));
TENSORSTORE_ASSERT_OK(kvstore::Write(store, "x/b", absl::Cord("value_b")));
auto transaction = tensorstore::Transaction(tensorstore::atomic_isolated);
{
TENSORSTORE_ASSERT_OK_AND_ASSIGN(auto transactional_store,
store | transaction);

TENSORSTORE_ASSERT_OK(kvstore::ExperimentalCopyRange(
store.WithPathSuffix("x/"), transactional_store.WithPathSuffix("y/")));
TENSORSTORE_ASSERT_OK(kvstore::ExperimentalCopyRange(
store.WithPathSuffix("x/"), transactional_store.WithPathSuffix("z/")));
EXPECT_THAT(kvstore::Read(transactional_store, "y/a").result(),
MatchesKvsReadResult(absl::Cord("value_a")));
TENSORSTORE_ASSERT_OK(transaction.CommitAsync());
}
EXPECT_THAT(GetMap(store), ::testing::Optional(::testing::ElementsAreArray({
::testing::Pair("x/a", absl::Cord("value_a")),
::testing::Pair("x/b", absl::Cord("value_b")),
::testing::Pair("y/a", absl::Cord("value_a")),
::testing::Pair("y/b", absl::Cord("value_b")),
::testing::Pair("z/a", absl::Cord("value_a")),
::testing::Pair("z/b", absl::Cord("value_b")),
})));
}

TENSORSTORE_GLOBAL_INITIALIZER {
using ::tensorstore::internal::KvsBackedCacheBasicTransactionalTestOptions;
using ::tensorstore::internal::RegisterKvsBackedCacheBasicTransactionalTest;

KvsBackedCacheBasicTransactionalTestOptions options;
options.test_name = "OcdbtDriverTransactional";
options.get_store = [] {
TENSORSTORE_CHECK_OK_AND_ASSIGN(
auto store,
kvstore::Open({{"driver", "ocdbt"}, {"base", "memory://"}}).result());
return store.driver;
};
options.delete_range_supported = true;
options.multi_key_atomic_supported = true;
RegisterKvsBackedCacheBasicTransactionalTest(options);
}

} // namespace
5 changes: 5 additions & 0 deletions tensorstore/kvstore/ocdbt/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ The ``ocdbt`` driver implements an Optionally-Cooperative Distributed B+Tree
.. json:schema:: Context.ocdbt_coordinator
.. note::

Atomic multi-key transactions are supported when not using
:json:schema:`Context.ocdbt_coordinator`.

Concepts
--------

Expand Down
Loading

0 comments on commit d77c943

Please sign in to comment.