Skip to content

Commit

Permalink
storage_service: merge_topology_snapshot: freeze_in_thread
Browse files Browse the repository at this point in the history
Freezing large mutations synchronously may cause
reactor stalls, as seen in the test_add_many_nodes_under_load
dtest:
```
  ++[1#1/37 5%] addr=0x15b0bf total=99 count=2 avg=50: ?? ??:0
  | ++[2#1/2 67%] addr=0x15a331f total=66 count=1 avg=66:
  | |             bytes_ostream::write at ././bytes_ostream.hh:248
  | |             (inlined by) bytes_ostream::write at ././bytes_ostream.hh:263
  | |             (inlined by) ser::serialize_integral<unsigned int, bytes_ostream> at ././serializer.hh:203
  | |             (inlined by) ser::integral_serializer<unsigned int>::write<bytes_ostream> at ././serializer.hh:217
  | |             (inlined by) ser::serialize<unsigned int, bytes_ostream> at ././serializer.hh:254
  | |             (inlined by) ser::writer_of_column<bytes_ostream>::write_id at ./build/dev/gen/idl/mutation.dist.impl.hh:4680
  | | ++[3#1/1 100%] addr=0x159df71 total=132 count=2 avg=66:
  | | |              (anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}::operator() at ./mutation/mutation_partition_serializer.cc:99
  | | |              (inlined by) row::maybe_invoke_with_hash<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1} const, cell_and_hash const> at ./mutation/mutation_partition.hh:133
  | | |              (inlined by) row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)scylladb#1}::operator() at ./mutation/mutation_partition.hh:152
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)scylladb#1}, true>::operator() at ././utils/compact-radix-tree.hh:1888
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::visit_slot<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)scylladb#1}, true>&> at ././utils/compact-radix-tree.hh:1560
  | | ++           - addr=0x159d84d:
  | | |              compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)scylladb#1}, true>&> at ././utils/compact-radix-tree.hh:1364
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::node_base<cell_and_hash, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::direct_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u> >::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)scylladb#1}, true>&, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::direct_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u> > at ././utils/compact-radix-tree.hh:799
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::node_base<cell_and_hash, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::direct_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u> >::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)scylladb#1}, true>&> at ././utils/compact-radix-tree.hh:807
  | |   ++[4#1/1 100%] addr=0x1596f4a total=329 count=5 avg=66:
  | |   |              compact_radix_tree::tree<cell_and_hash, unsigned int>::node_head::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)scylladb#1}, true> > at ././utils/compact-radix-tree.hh:473
  | |   |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)scylladb#1}, true> > at ././utils/compact-radix-tree.hh:1626
  | |   |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::walk<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)scylladb#1}> at ././utils/compact-radix-tree.hh:1909
  | |   |              (inlined by) row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)scylladb#1}> at ./mutation/mutation_partition.hh:151
  | |   |              (inlined by) (anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> > at ./mutation/mutation_partition_serializer.cc:97
  | |   |              (inlined by) write_row<ser::writer_of_deletable_row<bytes_ostream> > at ./mutation/mutation_partition_serializer.cc:168
  | |     ++[5#1/2 80%] addr=0x15a310c total=263 count=4 avg=66:
  | |     |             mutation_partition_serializer::write_serialized<ser::writer_of_mutation_partition<bytes_ostream> > at ./mutation/mutation_partition_serializer.cc:180
  | |     | ++[6#1/2 62%] addr=0x14eb60a total=428 count=7 avg=61:
  | |     | |             frozen_mutation::frozen_mutation(mutation const&)::$_0::operator()<ser::writer_of_mutation_partition<bytes_ostream> > at ./mutation/frozen_mutation.cc:85
  | |     | |             (inlined by) ser::after_mutation__key<bytes_ostream>::partition<frozen_mutation::frozen_mutation(mutation const&)::$_0> at ./build/dev/gen/idl/mutation.dist.impl.hh:7058
  | |     | |             (inlined by) frozen_mutation::frozen_mutation at ./mutation/frozen_mutation.cc:84
  | |     | | ++[7#1/1 100%] addr=0x14ed388 total=532 count=9 avg=59:
  | |     | | |              freeze at ./mutation/frozen_mutation.cc:143
  | |     | |   ++[8#1/2 74%] addr=0x252cf55 total=394 count=6 avg=66:
  | |     | |   |             service::storage_service::merge_topology_snapshot at ./service/storage_service.cc:763
```

This change adds a preemptible mutation_partition_serializer
called from `mutation_partition_serializer::write_in_thread`
and latter called from `freeze_in_thread`.

A thread is started in `storage_service::merge_topology_snapshot`
in which we expect large enough mutations to warrant
a seastar thread, and taking into account it is not
on the hot data path, but rather it's on the management plane.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Apr 22, 2024
1 parent 7ea53b9 commit 07afe11
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ future<> storage_service::topology_transition() {
}

future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
return async([this, snp = std::move(snp)] () mutable {
auto it = std::partition(snp.mutations.begin(), snp.mutations.end(), [] (const canonical_mutation& m) {
return m.column_family_id() != db::system_keyspace::cdc_generations_v3()->id();
});
Expand All @@ -773,30 +774,34 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {

// Split big mutations into smaller ones, prepare frozen_muts_to_apply
std::vector<frozen_mutation> frozen_muts_to_apply;
frozen_muts_to_apply.reserve(std::distance(it, snp.mutations.end()));
{
std::vector<mutation> muts_to_apply;
muts_to_apply.reserve(std::distance(it, snp.mutations.end()));
const auto max_size = _db.local().schema_commitlog()->max_record_size() / 2;
for (auto i = it; i != snp.mutations.end(); i++) {
const auto& m = *i;
auto mut = co_await m.to_mutation_gently(s);
auto mut = m.to_mutation_gently(s).get();
if (m.representation().size() <= max_size) {
muts_to_apply.push_back(std::move(mut));
frozen_muts_to_apply.emplace_back(freeze_in_thread(mut));
} else {
co_await split_mutation(std::move(mut), muts_to_apply, max_size);
std::vector<mutation> muts_to_apply;
// FIXME: freeze mutation one at a time to reduce
// memory footprint
split_mutation(std::move(mut), muts_to_apply, max_size).get();
for (auto& m : muts_to_apply) {
frozen_muts_to_apply.emplace_back(freeze_in_thread(m));
}
}
}
frozen_muts_to_apply = freeze(muts_to_apply);
}

// Apply non-atomically so as not to hit the commitlog size limit.
// The cdc_generations_v3 data is not used in any way until
// it's referenced from the topology table.
// By applying the cdc_generations_v3 mutations before topology mutations
// we ensure that the lack of atomicity isn't a problem here.
co_await max_concurrent_for_each(frozen_muts_to_apply, 128, [&] (const frozen_mutation& m) -> future<> {
max_concurrent_for_each(frozen_muts_to_apply, 128, [&] (const frozen_mutation& m) -> future<> {
return _db.local().apply(s, m, {}, db::commitlog::force_sync::yes, db::no_timeout);
});
}).get();
}

// Apply system.topology and system.topology_requests mutations atomically
Expand All @@ -807,7 +812,8 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
auto s = _db.local().find_schema(m.column_family_id());
return m.to_mutation(s);
});
co_await _db.local().apply(freeze(muts), db::no_timeout);
_db.local().apply(freeze(muts), db::no_timeout).get();
});
}

// Moves the coroutine lambda onto the heap and extends its
Expand Down

0 comments on commit 07afe11

Please sign in to comment.