Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable compressed logs by default in Keeper #56763

Merged
merged 6 commits into from Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/Coordination/Changelog.cpp
Expand Up @@ -476,6 +476,9 @@ struct ChangelogReadResult

/// last offset we were able to read from log
off_t last_position;

/// Whether the changelog file was written using compression
bool compressed_log;
bool error;
};

Expand All @@ -484,7 +487,7 @@ class ChangelogReader
public:
explicit ChangelogReader(DiskPtr disk_, const std::string & filepath_) : disk(disk_), filepath(filepath_)
{
auto compression_method = chooseCompressionMethod(filepath, "");
compression_method = chooseCompressionMethod(filepath, "");
auto read_buffer_from_file = disk->readFile(filepath);
read_buf = wrapReadBufferWithCompressionMethod(std::move(read_buffer_from_file), compression_method);
}
Expand All @@ -493,6 +496,7 @@ class ChangelogReader
ChangelogReadResult readChangelog(IndexToLogEntry & logs, uint64_t start_log_index, Poco::Logger * log)
{
ChangelogReadResult result{};
result.compressed_log = compression_method != CompressionMethod::None;
try
{
while (!read_buf->eof())
Expand Down Expand Up @@ -583,13 +587,15 @@ class ChangelogReader
private:
DiskPtr disk;
std::string filepath;
CompressionMethod compression_method;
std::unique_ptr<ReadBuffer> read_buf;
};

Changelog::Changelog(
Poco::Logger * log_, LogFileSettings log_file_settings, FlushSettings flush_settings_, KeeperContextPtr keeper_context_)
: changelogs_detached_dir("detached")
, rotate_interval(log_file_settings.rotate_interval)
, compress_logs(log_file_settings.compress_logs)
, log(log_)
, write_operations(std::numeric_limits<size_t>::max())
, append_completion_queue(std::numeric_limits<size_t>::max())
Expand Down Expand Up @@ -851,7 +857,8 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first > last_log_read_result->last_read_index; });
move_from_latest_logs_disks(existing_changelogs.at(last_log_read_result->log_start_index));
}
else
/// don't mix compressed and uncompressed writes
else if (compress_logs == last_log_read_result->compressed_log)
{
initWriter(description);
}
Expand Down
1 change: 1 addition & 0 deletions src/Coordination/Changelog.h
Expand Up @@ -182,6 +182,7 @@ class Changelog

const String changelogs_detached_dir;
const uint64_t rotate_interval;
const bool compress_logs;
Poco::Logger * log;

std::mutex writer_mutex;
Expand Down
2 changes: 1 addition & 1 deletion src/Coordination/CoordinationSettings.h
Expand Up @@ -45,7 +45,7 @@ struct Settings;
M(UInt64, max_requests_quick_batch_size, 100, "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \
M(Bool, compress_logs, false, "Write compressed coordination logs in ZSTD format", 0) \
M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0) \
M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) \
M(UInt64, max_log_file_size, 50 * 1024 * 1024, "Max size of the Raft log file. If possible, each created log file will preallocate this amount of bytes on disk. Set to 0 to disable the limit", 0) \
Expand Down
123 changes: 107 additions & 16 deletions src/Coordination/tests/gtest_coordination.cpp
Expand Up @@ -1104,20 +1104,15 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
}

/// Truncating only some entries from the end
TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
/// For compressed logs we have no reliable way of knowing how many log entries were lost
/// after we truncate some bytes from the end
TEST_F(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
{
auto params = GetParam();

/// For compressed logs we have no reliable way of knowing how many log entries were lost
/// after we truncate some bytes from the end
if (!params.extension.empty())
return;

ChangelogDirTest test("./logs");
setLogDirectory("./logs");

DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
Expand All @@ -1131,23 +1126,23 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
changelog.end_of_append_batch(0, 0);

waitDurableLogs(changelog);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin"));

DB::WriteBufferFromFile plain_buf(
"./logs/changelog_1_20.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
"./logs/changelog_1_20.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(plain_buf.size() - 30);

DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(1, 0);

EXPECT_EQ(changelog_reader.size(), 19);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension);
EXPECT_TRUE(fs::exists("./logs/changelog_20_39.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
assertBrokenLogRemoved("./logs", "changelog_21_40.bin");
EXPECT_TRUE(fs::exists("./logs/changelog_20_39.bin"));
auto entry = getLogEntry("hello_world", 7777);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
Expand All @@ -1158,6 +1153,102 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
}

TEST_F(CoordinationTest, ChangelogTestMixedLogTypes)
{
ChangelogDirTest test("./logs");
setLogDirectory("./logs");

std::vector<std::string> changelog_files;

const auto verify_changelog_files = [&]
{
for (const auto & log_file : changelog_files)
EXPECT_TRUE(fs::exists(log_file)) << "File " << log_file << " not found";
};

size_t last_term = 0;
size_t log_size = 0;

const auto append_log = [&](auto & changelog, const std::string & data, uint64_t term)
{
last_term = term;
++log_size;
auto entry = getLogEntry(data, last_term);
changelog.append(entry);
};

const auto verify_log_content = [&](const auto & changelog)
{
EXPECT_EQ(changelog.size(), log_size);
EXPECT_EQ(changelog.last_entry()->get_term(), last_term);
};

{
SCOPED_TRACE("Initial uncompressed log");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);

for (size_t i = 0; i < 35; ++i)
append_log(changelog, std::to_string(i) + "_hello_world", (i+ 44) * 10);

changelog.end_of_append_batch(0, 0);

waitDurableLogs(changelog);
changelog_files.push_back("./logs/changelog_1_20.bin");
changelog_files.push_back("./logs/changelog_21_40.bin");
verify_changelog_files();

verify_log_content(changelog);
}

{
SCOPED_TRACE("Compressed log");
DB::KeeperLogStore changelog_compressed(
DB::LogFileSettings{.force_sync = true, .compress_logs = true, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog_compressed.init(1, 0);

verify_changelog_files();
verify_log_content(changelog_compressed);

append_log(changelog_compressed, "hello_world", 7777);
changelog_compressed.end_of_append_batch(0, 0);

waitDurableLogs(changelog_compressed);

verify_log_content(changelog_compressed);

changelog_files.push_back("./logs/changelog_36_55.bin.zstd");
verify_changelog_files();
}

{
SCOPED_TRACE("Final uncompressed log");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);

verify_changelog_files();
verify_log_content(changelog);

append_log(changelog, "hello_world", 7778);
changelog.end_of_append_batch(0, 0);

waitDurableLogs(changelog);

verify_log_content(changelog);

changelog_files.push_back("./logs/changelog_37_56.bin");
verify_changelog_files();
}
}

TEST_P(CoordinationTest, ChangelogTestLostFiles)
{
auto params = GetParam();
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_keeper_four_word_command/test.py
Expand Up @@ -287,7 +287,7 @@ def test_cmd_conf(started_cluster):
assert result["quorum_reads"] == "false"
assert result["force_sync"] == "true"

assert result["compress_logs"] == "true"
assert result["compress_logs"] == "false"
assert result["compress_snapshots_with_zstd_format"] == "true"
assert result["configuration_change_tries_count"] == "20"

Expand Down