Skip to content

Commit

Permalink
compaction_manager: flush_all_tables before major compaction
Browse files Browse the repository at this point in the history
Major compaction already flushes each table to make
sure it considers any mutations that are present in the
memtable for the purpose of tombstone purging.
See 64ec1c6

However, compaction is unaware of mutations in the commitlog
and due to sharing commitlog segments by different tables,
it is possible the data or tombstones will get resurrected
by commitlog replay in case the node restarts right after
the respective data and tombstones are purged by (major) compaction
after f42eb4d.

This patch calls `database::flush_all_tables`, based on the
`compaction_flush_all_tables_before_major_seconds` interval,
before tables are compacted to ensure that any data in membtables
is flushed to sstables, AND a new commitlog segment is forced so
that major compaction would be able to purge tombstones more
efficiently, without them being locked by live commitlog segments.

Note that this requires flushing all tables (and their memtables)
in each shard since they share the commitlog and an unflushed
and unrelated table may hold a commitlog segment that stores
mutations from another table that is about to get compacted.

in the case that not all tables are flushed prior
to major compaction, we revert to the old behavior of
flushing each table in the keyspace before major-compacting it.

Fixes scylladb#15777

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Nov 15, 2023
1 parent 23d2768 commit 9393c2c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 1 deletion.
27 changes: 26 additions & 1 deletion compaction/task_manager_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "sstables/sstables.hh"
#include "sstables/sstable_directory.hh"
#include "utils/pretty_printers.hh"
#include "db/config.hh"

namespace replica {

Expand Down Expand Up @@ -275,11 +276,35 @@ sstring major_compaction_task_impl::to_string(flush_mode fm) {
__builtin_unreachable();
}

static future<bool> maybe_flush_all_tables(sharded<replica::database>& db) {
auto interval = db.local().get_config().compaction_flush_all_tables_before_major_seconds();
if (interval) {
auto when = db_clock::now() - std::chrono::duration_cast<db_clock::duration>(std::chrono::seconds(interval));
if (co_await replica::database::get_all_tables_flushed_at(db) <= when) {
co_await db.invoke_on_all([&] (replica::database& db) -> future<> {
co_await db.flush_all_tables();
});
co_return true;
}
}
co_return false;
}

future<> major_keyspace_compaction_task_impl::run() {
// TODO: implement a `compact_all_keyspaces` api
// that will flush all tables once for all keyspaces
// rather than for each keyspace, using a mechanism similar to `run_table_tasks`.
// It can be called from `scylla-nodetool compact` with keyspace arg.
bool flushed_all_tables = false;
if (_flush_mode == flush_mode::all_tables) {
flushed_all_tables = co_await maybe_flush_all_tables(_db);
}

flush_mode fm = _flush_mode == flush_mode::skip || flushed_all_tables ? flush_mode::skip : flush_mode::compacted_tables;
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
tasks::task_info parent_info{_status.id, _status.shard};
auto& module = db.get_compaction_manager().get_task_manager_module();
auto task = co_await module.make_and_start_task<shard_major_keyspace_compaction_task_impl>(parent_info, _status.keyspace, _status.id, db, _table_infos, _flush_mode);
auto task = co_await module.make_and_start_task<shard_major_keyspace_compaction_task_impl>(parent_info, _status.keyspace, _status.id, db, _table_infos, fm);
co_await task->done();
});
}
Expand Down
4 changes: 4 additions & 0 deletions db/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity")
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold")
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
"Set the minimum interval in seconds between flushing all tables before each major compaction (default is 86400). "
"This option is useful for maximizing tombstone garbage collection by releasing all active commitlog segments. "
"Set to 0 to disable automatic flushing all tables before major compaction")
/**
* @Group Initialization properties
* @GroupDescription The minimal properties needed for configuring a cluster.
Expand Down
1 change: 1 addition & 0 deletions db/config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public:
named_value<float> memtable_flush_static_shares;
named_value<float> compaction_static_shares;
named_value<bool> compaction_enforce_min_threshold;
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
named_value<sstring> cluster_name;
named_value<sstring> listen_address;
named_value<sstring> listen_interface;
Expand Down
11 changes: 11 additions & 0 deletions replica/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2513,6 +2513,17 @@ future<> database::flush_all_tables() {
co_await get_tables_metadata().parallel_for_each_table([] (table_id, lw_shared_ptr<table> t) {
return t->flush();
});
_all_tables_flushed_at = db_clock::now();
}

future<db_clock::time_point> database::get_all_tables_flushed_at(sharded<database>& sharded_db) {
db_clock::time_point min_all_tables_flushed_at;
co_await sharded_db.map_reduce0([&] (const database& db) {
return db._all_tables_flushed_at;
}, db_clock::now(), [] (db_clock::time_point l, db_clock::time_point r) {
return std::min(l, r);
});
co_return min_all_tables_flushed_at;
}

future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name) {
Expand Down
4 changes: 4 additions & 0 deletions replica/database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,8 @@ private:
serialized_action _update_memtable_flush_static_shares_action;
utils::observer<float> _memtable_flush_static_shares_observer;

db_clock::time_point _all_tables_flushed_at;

public:
data_dictionary::database as_data_dictionary() const;
db::commitlog* commitlog_for(const schema_ptr& schema);
Expand Down Expand Up @@ -1744,6 +1746,8 @@ public:
// flushing all tables will allow reclaiming of all commitlog segments
future<> flush_all_tables();

static future<db_clock::time_point> get_all_tables_flushed_at(sharded<database>& sharded_db);

static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);

Expand Down

0 comments on commit 9393c2c

Please sign in to comment.