Skip to content

Commit

Permalink
schema.hh: introduce schema_static_props, use it for null_sharder
Browse files Browse the repository at this point in the history
Our goal (scylladb#12642) is to mark raft tables to use
schema commitlog. There are two similar
cases in code right now - with_null_sharder
and set_wait_for_sync_to_commitlog schema_builder
methods. The problem is that if we need to
mark some new schema with one of these methods
we need to do this twice - first in
a method describing the schema
(e.g. system_keyspace::raft()) and second in the
function create_table_from_mutations, which is not
obvious and easy to forget.

create_table_from_mutations is called when schema object
is reconstructed from mutations, with_null_sharder
and set_wait_for_sync_to_commitlog must be called from it
since the schema properties they describe are
not included in the mutation representation of the schema.

This patch proposes to distinguish between the schema
properties that get into mutations and those that do not.
The former are described with schema_builder, while for
the latter we introduce schema_static_props struct and
the schema_builder::register_static_configurator method.
This way we can formulate a rule once in the code about
which schemas should have a null sharder, and it will
be enforced in all cases.
  • Loading branch information
Petr Gusev committed Mar 14, 2023
1 parent 00fc73d commit 349bc1a
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 49 deletions.
42 changes: 7 additions & 35 deletions db/schema_tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ static bool is_extra_durable(const sstring& ks_name, const sstring& cf_name) {

/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
namespace db {
namespace {
const auto set_null_sharder = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
if (ks_name == schema_tables::NAME) {
props.use_null_sharder = true;
}
});
}

schema_ctxt::schema_ctxt(const db::config& cfg, std::shared_ptr<data_dictionary::user_types_storage> uts)
: _extensions(cfg.extensions())
Expand Down Expand Up @@ -247,7 +254,6 @@ schema_ptr keyspaces() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand All @@ -274,7 +280,6 @@ schema_ptr scylla_keyspaces() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand Down Expand Up @@ -316,7 +321,6 @@ schema_ptr tables() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand Down Expand Up @@ -345,7 +349,6 @@ schema_ptr scylla_tables(schema_features features) {
offset += 2;
}
sb.with_version(system_keyspace::generate_schema_version(id, offset));
sb.with_null_sharder();
return sb.build();
};
static thread_local schema_ptr schemas[2][2] = { {make(false, false), make(false, true)}, {make(true, false), make(true, true)} };
Expand Down Expand Up @@ -387,7 +390,6 @@ static schema_ptr columns_schema(const char* columns_table_name) {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}
schema_ptr columns() {
Expand Down Expand Up @@ -454,7 +456,6 @@ static schema_ptr computed_columns_schema(const char* columns_table_name) {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}

Expand Down Expand Up @@ -484,7 +485,6 @@ schema_ptr dropped_columns() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand All @@ -510,7 +510,6 @@ schema_ptr triggers() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand Down Expand Up @@ -555,7 +554,6 @@ schema_ptr views() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand All @@ -582,7 +580,6 @@ schema_ptr indexes() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand All @@ -609,7 +606,6 @@ schema_ptr types() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand Down Expand Up @@ -639,7 +635,6 @@ schema_ptr functions() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand Down Expand Up @@ -669,7 +664,6 @@ schema_ptr aggregates() {
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand Down Expand Up @@ -700,7 +694,6 @@ schema_ptr scylla_aggregates() {

builder.set_gc_grace_seconds(schema_gc_grace);
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build();
}();
return schema;
Expand All @@ -720,7 +713,6 @@ schema_ptr scylla_table_schema_history() {
builder.set_comment("Scylla specific table to store a history of column mappings "
"for each table schema version upon an CREATE TABLE/ALTER TABLE operations");
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
builder.with_null_sharder();
return builder.build(schema_builder::compact_storage::no);
}();
return s;
Expand Down Expand Up @@ -3017,19 +3009,6 @@ static void prepare_builder_from_table_row(const schema_ctxt& ctxt, schema_build
}
}

// tables in the "system" keyspace which need to use null sharder
static const std::unordered_set<sstring>& system_ks_null_shard_tables() {
static const std::unordered_set<sstring> tables = {
SCYLLA_TABLE_SCHEMA_HISTORY,
db::system_keyspace::RAFT,
db::system_keyspace::RAFT_SNAPSHOTS,
db::system_keyspace::RAFT_SNAPSHOT_CONFIG,
db::system_keyspace::GROUP0_HISTORY,
db::system_keyspace::DISCOVERY,
db::system_keyspace::BROADCAST_KV_STORE,
};
return tables;
}

schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, std::optional<table_schema_version> version)
{
Expand Down Expand Up @@ -3114,13 +3093,6 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
builder.with_sharder(smp::count, ctxt.murmur3_partitioner_ignore_msb_bits());
}

if (ks_name == NAME
|| (ks_name == db::system_keyspace::NAME
&& system_ks_null_shard_tables().contains(cf_name))) {
// Put every schema table on shard 0.
builder.with_null_sharder();
}

if (is_extra_durable(ks_name, cf_name)) {
builder.set_wait_for_sync_to_commitlog(true);
}
Expand Down
23 changes: 17 additions & 6 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;

namespace db {
namespace {
const auto set_null_sharder = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
// tables in the "system" keyspace which need to use null sharder
static const std::unordered_set<sstring> system_ks_null_shard_tables = {
schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY,
system_keyspace::RAFT,
system_keyspace::RAFT_SNAPSHOTS,
system_keyspace::RAFT_SNAPSHOT_CONFIG,
system_keyspace::GROUP0_HISTORY,
system_keyspace::DISCOVERY,
system_keyspace::BROADCAST_KV_STORE,
};
if (ks_name == system_keyspace::NAME && system_ks_null_shard_tables.contains(cf_name)) {
props.use_null_sharder = true;
}
});
}

std::unique_ptr<query_context> qctx = {};

Expand Down Expand Up @@ -206,7 +223,6 @@ schema_ptr system_keyspace::raft() {
.set_comment("Persisted RAFT log, votes and snapshot info")
.with_version(generate_schema_version(id))
.set_wait_for_sync_to_commitlog(true)
.with_null_sharder()
.build();
}();
return schema;
Expand All @@ -228,7 +244,6 @@ schema_ptr system_keyspace::raft_snapshots() {
.set_comment("Persisted RAFT snapshot descriptors info")
.with_version(generate_schema_version(id))
.set_wait_for_sync_to_commitlog(true)
.with_null_sharder()
.build();
}();
return schema;
Expand All @@ -246,7 +261,6 @@ schema_ptr system_keyspace::raft_snapshot_config() {
.set_comment("RAFT configuration for the latest snapshot descriptor")
.with_version(generate_schema_version(id))
.set_wait_for_sync_to_commitlog(true)
.with_null_sharder()
.build();
}();
return schema;
Expand Down Expand Up @@ -924,7 +938,6 @@ schema_ptr system_keyspace::group0_history() {

.set_comment("History of Raft group 0 state changes")
.with_version(generate_schema_version(id))
.with_null_sharder()
.build();
}();
return schema;
Expand All @@ -944,7 +957,6 @@ schema_ptr system_keyspace::discovery() {
.set_comment("State of cluster discovery algorithm: the set of discovered peers")
.with_version(generate_schema_version(id))
.set_wait_for_sync_to_commitlog(true)
.with_null_sharder()
.build();
}();
return schema;
Expand All @@ -958,7 +970,6 @@ schema_ptr system_keyspace::broadcast_kv_store() {
.with_column("value", utf8_type)
.with_version(generate_schema_version(id))
.set_wait_for_sync_to_commitlog(true)
.with_null_sharder()
.build();
}();
return schema;
Expand Down
29 changes: 23 additions & 6 deletions schema/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,9 @@ schema::raw_schema::raw_schema(table_id id)
, _sharder(::get_sharder(smp::count, default_partitioner_ignore_msb))
{ }

schema::schema(private_tag, const raw_schema& raw, std::optional<raw_view_info> raw_view_info)
schema::schema(private_tag, const raw_schema& raw, std::optional<raw_view_info> raw_view_info, const schema_static_props& props)
: _raw(raw)
, _static_props(props)
, _offsets([this] {
if (_raw._columns.size() > std::numeric_limits<column_count_type>::max()) {
throw std::runtime_error(format("Column count limit ({:d}) overflowed: {:d}",
Expand Down Expand Up @@ -410,6 +411,7 @@ schema::schema(private_tag, const raw_schema& raw, std::optional<raw_view_info>

schema::schema(const schema& o, const std::function<void(schema&)>& transform)
: _raw(o._raw)
, _static_props(o._static_props)
, _offsets(o._offsets)
{
// Do the transformation after all the raw fields are initialized, but
Expand Down Expand Up @@ -984,10 +986,6 @@ schema_builder& schema_builder::with_sharder(unsigned shard_count, unsigned shar
return *this;
}

schema_builder& schema_builder::with_null_sharder() {
_raw._sharder = get_sharder(1, 0);
return *this;
}

schema_builder::schema_builder(std::string_view ks_name, std::string_view cf_name,
std::optional<table_id> id, data_type rct)
Expand Down Expand Up @@ -1275,6 +1273,11 @@ schema_builder& schema_builder::without_indexes() {
schema_ptr schema_builder::build() {
schema::raw_schema new_raw = _raw; // Copy so that build() remains idempotent.

schema_static_props static_props{};
for (const auto& c: static_configurators()) {
c(new_raw._ks_name, new_raw._cf_name, static_props);
}

if (_version) {
new_raw._version = *_version;
} else {
Expand Down Expand Up @@ -1322,7 +1325,21 @@ schema_ptr schema_builder::build() {
dynamic_pointer_cast<db::per_partition_rate_limit_extension>(it->second)->get_options();
}

return make_lw_shared<schema>(schema::private_tag{}, new_raw, _view_info);
if (static_props.use_null_sharder) {
new_raw._sharder = get_sharder(1, 0);
}

return make_lw_shared<schema>(schema::private_tag{}, new_raw, _view_info, static_props);
}

auto schema_builder::static_configurators() -> std::vector<static_configurator>& {
static std::vector<static_configurator> result{};
return result;
}

int schema_builder::register_static_configurator(static_configurator&& configurator) {
static_configurators().push_back(std::move(configurator));
return 0;
}

const cdc::options& schema::cdc_options() const {
Expand Down
10 changes: 9 additions & 1 deletion schema/schema.hh
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,10 @@ public:
}
};

struct schema_static_props {
bool use_null_sharder = false; // use a sharder that puts everything on shard 0
};

/*
* Effectively immutable.
* Not safe to access across cores because of shared_ptr's.
Expand Down Expand Up @@ -630,6 +634,7 @@ private:
std::reference_wrapper<const dht::sharder> _sharder;
};
raw_schema _raw;
schema_static_props _static_props;
thrift_schema _thrift;
v3_columns _v3_columns;
mutable schema_registry_entry* _registry_entry = nullptr;
Expand Down Expand Up @@ -680,11 +685,14 @@ private:
schema(const schema&, const std::function<void(schema&)>&);
class private_tag{};
public:
schema(private_tag, const raw_schema&, std::optional<raw_view_info>);
schema(private_tag, const raw_schema&, std::optional<raw_view_info>, const schema_static_props& props);
schema(const schema&);
// See \ref make_reversed().
schema(reversed_tag, const schema&);
~schema();
const schema_static_props& static_props() const {
return _static_props;
}
table_schema_version version() const {
return _raw._version;
}
Expand Down
5 changes: 4 additions & 1 deletion schema/schema_builder.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ class per_partition_rate_limit_options;
struct schema_builder {
public:
enum class compact_storage { no, yes };
using static_configurator = noncopyable_function<void(const sstring& ks_name, const sstring& cf_name, schema_static_props&)>;
private:
schema::raw_schema _raw;
std::optional<compact_storage> _compact_storage;
std::optional<table_schema_version> _version;
std::optional<raw_view_info> _view_info;
schema_builder(const schema::raw_schema&);
static std::vector<static_configurator>& static_configurators();
public:
schema_builder(std::string_view ks_name, std::string_view cf_name,
std::optional<table_id> = { },
Expand All @@ -43,6 +45,8 @@ public:
sstring comment = "");
schema_builder(const schema_ptr);

static int register_static_configurator(static_configurator&& configurator);

schema_builder& set_uuid(const table_id& id) {
_raw._id = id;
return *this;
Expand Down Expand Up @@ -238,7 +242,6 @@ public:
}
schema_builder& with_partitioner(sstring name);
schema_builder& with_sharder(unsigned shard_count, unsigned sharding_ignore_msb_bits);
schema_builder& with_null_sharder(); // a sharder that puts everything on shard 0
class default_names {
public:
default_names(const schema_builder&);
Expand Down

0 comments on commit 349bc1a

Please sign in to comment.