Skip to content

Commit

Permalink
OCDBT: add assume_config option
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 635977827
Change-Id: I5d1506d2eefdc79965b9ce397d2d477640b3962c
  • Loading branch information
jbms authored and copybara-github committed May 22, 2024
1 parent 7570693 commit 68957e9
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 39 deletions.
3 changes: 3 additions & 0 deletions tensorstore/kvstore/ocdbt/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ tensorstore_cc_library(
"//tensorstore/internal/json_binding:raw_bytes_hex",
"//tensorstore/kvstore",
"//tensorstore/kvstore/ocdbt/format",
"//tensorstore/util:result",
"//tensorstore/util:status",
"//tensorstore/util:str_cat",
"@com_github_nlohmann_json//:json",
"@com_google_absl//absl/status",
"@com_google_absl//absl/synchronization",
"@com_google_riegeli//riegeli/zstd:zstd_writer",
Expand Down Expand Up @@ -182,6 +184,7 @@ tensorstore_cc_test(
"//tensorstore/util:result",
"//tensorstore/util:status_testutil",
"@com_github_nlohmann_json//:json",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:cord",
"@com_google_absl//absl/strings:str_format",
"@com_google_googletest//:gtest_main",
Expand Down
44 changes: 35 additions & 9 deletions tensorstore/kvstore/ocdbt/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

#include "tensorstore/kvstore/ocdbt/config.h"

#include <stdint.h>

#include <atomic>
#include <string_view>
#include <type_traits>
#include <variant>

#include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
#include <nlohmann/json.hpp>
#include "riegeli/zstd/zstd_writer.h"
#include "tensorstore/internal/intrusive_ptr.h"
#include "tensorstore/internal/json_binding/bindable.h"
#include "tensorstore/internal/json_binding/enum.h"
#include "tensorstore/internal/json_binding/json_binding.h"
Expand All @@ -30,6 +34,7 @@
#include "tensorstore/kvstore/ocdbt/format/config.h"
#include "tensorstore/kvstore/ocdbt/format/version_tree.h"
#include "tensorstore/kvstore/supported_features.h"
#include "tensorstore/util/result.h"
#include "tensorstore/util/status.h"
#include "tensorstore/util/str_cat.h"

Expand Down Expand Up @@ -177,21 +182,35 @@ ConfigConstraints::ConfigConstraints(const Config& config)
version_tree_arity_log2(config.version_tree_arity_log2),
compression(config.compression) {}

ConfigState::ConfigState()
: supported_features_for_manifest_{kvstore::SupportedFeatures::kNone} {}

ConfigState::ConfigState(
Result<ConfigStatePtr> ConfigState::Make(
const ConfigConstraints& constraints,
kvstore::SupportedFeatures supported_features_for_manifest)
: constraints_(constraints),
supported_features_for_manifest_(supported_features_for_manifest) {}
kvstore::SupportedFeatures supported_features_for_manifest,
bool assume_config) {
auto self = internal::IntrusivePtr<ConfigState>(new ConfigState);
self->constraints_ = constraints;
self->supported_features_for_manifest_ = supported_features_for_manifest;
self->assume_config_ = assume_config;
if (assume_config) {
TENSORSTORE_ASSIGN_OR_RETURN(self->assumed_config_,
self->CreateNewConfig());
}
return self;
}

absl::Status ConfigState::ValidateNewConfig(const Config& config) {
if (!config_set_.load(std::memory_order_acquire)) {
absl::MutexLock lock(&mutex_);
TENSORSTORE_RETURN_IF_ERROR(ValidateConfig(config, constraints_));
config_ = config;
if (assume_config_) {
ConfigConstraints assumed_constraints(assumed_config_);
assumed_constraints.uuid = config.uuid;
TENSORSTORE_RETURN_IF_ERROR(
ValidateConfig(config, assumed_constraints),
tensorstore::MaybeAnnotateStatus(
_, "Observed config does not match assumed config"));
}
constraints_ = ConfigConstraints(config);
config_ = config;
config_set_.store(true, std::memory_order_release);
return absl::OkStatus();
}
Expand All @@ -205,6 +224,13 @@ const Config* ConfigState::GetExistingConfig() const {
return &config_;
}

const Config* ConfigState::GetAssumedOrExistingConfig() const {
if (assume_config_) {
return &assumed_config_;
}
return GetExistingConfig();
}

Result<Config> ConfigState::CreateNewConfig() {
if (!config_set_.load(std::memory_order_acquire)) {
absl::MutexLock lock(&mutex_);
Expand Down
31 changes: 25 additions & 6 deletions tensorstore/kvstore/ocdbt/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ struct ConfigConstraints {
};
};

class ConfigState;
using ConfigStatePtr = internal::IntrusivePtr<ConfigState>;

/// Tracks the configuration for an open database.
///
/// Initially, when the database is opened, the manifest has not yet been read
Expand All @@ -70,6 +73,15 @@ struct ConfigConstraints {
///
/// Once the configuration is known, it is an error for it to change.
///
/// If `assume_config` is specified, the configuration that would be inferred
/// from the constraints will be used to write data files even before the
/// manifest has been written. This reduces the initial write latency, but will
/// lead to a write error and possibly unreferenced garbage data files (but not
/// data corruption) if another concurrent writer ultimately writes the manifest
/// with an incompatible configuration (excluding UUID), as can occur if
/// different configuration constraints are specified or a different library
/// version is used by the concurrent writer.
///
/// FIXME(jbms): Because of the open kvstore cache, there is a potential for
/// this caching of the configuration to cause problems in the case that the
/// ocdbt kvstore is opened, then deleted from its underlying store, then
Expand All @@ -78,25 +90,32 @@ struct ConfigConstraints {
/// option like `recheck_cached_metadata`.
class ConfigState : public internal::AtomicReferenceCount<ConfigState> {
public:
ConfigState();
explicit ConfigState(
const ConfigConstraints& constraints,
kvstore::SupportedFeatures supported_features_for_manifest);
static Result<ConfigStatePtr> Make(
const ConfigConstraints& constraints = {},
kvstore::SupportedFeatures supported_features_for_manifest =
kvstore::SupportedFeatures::kNone,
bool assume_config = false);

absl::Status ValidateNewConfig(const Config& config);
const Config* GetExistingConfig() const;
const Config* GetAssumedOrExistingConfig() const;
Result<Config> CreateNewConfig();
ConfigConstraints GetConstraints() const;

bool assume_config() const { return assume_config_; }

private:
ConfigState() = default;

mutable absl::Mutex mutex_;
ConfigConstraints constraints_;
Config assumed_config_;
Config config_;
kvstore::SupportedFeatures supported_features_for_manifest_;
std::atomic<bool> config_set_{false};
bool assume_config_{false};
};

using ConfigStatePtr = internal::IntrusivePtr<ConfigState>;

absl::Status ValidateConfig(const Config& config,
const ConfigConstraints& constraints);

Expand Down
3 changes: 2 additions & 1 deletion tensorstore/kvstore/ocdbt/distributed/btree_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,8 @@ Future<TimestampedStorageGeneration> DistributedBtreeWriter::Write(
if (value) {
auto& new_entry = request.mutation->new_entry;
auto& value_ref = new_entry.value_reference;
if (auto* config = writer.io_handle_->config_state->GetExistingConfig();
if (auto* config =
writer.io_handle_->config_state->GetAssumedOrExistingConfig();
!config || value->size() <= config->max_inline_value_bytes) {
if (!config && !value->empty()) {
needs_inline_value_pass = true;
Expand Down
12 changes: 6 additions & 6 deletions tensorstore/kvstore/ocdbt/distributed/cooperator_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ class CooperatorServerTest : public ::testing::Test {
.GetResource<tensorstore::internal::DataCopyConcurrencyResource>()
.value();
base_kvstore_ = tensorstore::GetMemoryKeyValueStore();
io_handle_ =
MakeIoHandle(data_copy_concurrency, cache_pool.get(), base_kvstore_,
MakeIntrusivePtr<ConfigState>(
ConfigConstraints{},
base_kvstore_.driver->GetSupportedFeatures({})),
/*data_file_prefixes=*/{});
io_handle_ = MakeIoHandle(
data_copy_concurrency, cache_pool.get(), base_kvstore_,
ConfigState::Make(ConfigConstraints{},
base_kvstore_.driver->GetSupportedFeatures({}))
.value(),
/*data_file_prefixes=*/{});

{
CoordinatorServer::Options options;
Expand Down
14 changes: 10 additions & 4 deletions tensorstore/kvstore/ocdbt/driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ TENSORSTORE_DEFINE_JSON_DEFAULT_BINDER(
jb::Member("version_tree_node_data_prefix",
jb::Projection<&DataFilePrefixes::version_tree_node>(
jb::DefaultValue([](auto* v) { *v = "d/"; }))))),
jb::Member("assume_config",
jb::Projection<&OcdbtDriverSpecData::assume_config>(
jb::DefaultInitializedValue())),
jb::Member(
"experimental_read_coalescing_threshold_bytes",
jb::Projection<&OcdbtDriverSpecData::
Expand Down Expand Up @@ -214,12 +217,14 @@ Future<kvstore::DriverPtr> OcdbtDriverSpec::DoOpen() const {
absl::ZeroDuration());
}

TENSORSTORE_ASSIGN_OR_RETURN(
auto config_state,
ConfigState::Make(spec->data_.config, supported_manifest_features,
spec->data_.assume_config));

driver->io_handle_ = internal_ocdbt::MakeIoHandle(
driver->data_copy_concurrency_, driver->cache_pool_->get(),
driver->base_,
internal::MakeIntrusivePtr<ConfigState>(
spec->data_.config, supported_manifest_features),
driver->data_file_prefixes_,
driver->base_, std::move(config_state), driver->data_file_prefixes_,
driver->target_data_file_size_.value_or(kDefaultTargetBufferSize),
std::move(read_coalesce_options));
driver->btree_writer_ =
Expand Down Expand Up @@ -268,6 +273,7 @@ absl::Status OcdbtDriver::GetBoundSpecData(OcdbtDriverSpecData& spec) const {
spec.data_copy_concurrency = data_copy_concurrency_;
spec.cache_pool = cache_pool_;
spec.config = io_handle_->config_state->GetConstraints();
spec.assume_config = io_handle_->config_state->assume_config();
spec.data_file_prefixes = data_file_prefixes_;
spec.experimental_read_coalescing_threshold_bytes =
experimental_read_coalescing_threshold_bytes_;
Expand Down
1 change: 1 addition & 0 deletions tensorstore/kvstore/ocdbt/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ struct OcdbtDriverSpecData {
std::optional<size_t> experimental_read_coalescing_merged_bytes;
std::optional<absl::Duration> experimental_read_coalescing_interval;
std::optional<size_t> target_data_file_size;
bool assume_config = false;
Context::Resource<OcdbtCoordinatorResource> coordinator;

TENSORSTORE_DECLARE_JSON_DEFAULT_BINDER(OcdbtDriverSpecData,
Expand Down
Loading

0 comments on commit 68957e9

Please sign in to comment.