Skip to content

Commit

Permalink
Merge pull request #56763 from ClickHouse/disable-compress-logs-default
Browse files Browse the repository at this point in the history
Disable compressed logs by default in Keeper
  • Loading branch information
antonio2368 committed Nov 15, 2023
2 parents 7505ea2 + 5309dc0 commit c3f7c87
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 20 deletions.
11 changes: 9 additions & 2 deletions src/Coordination/Changelog.cpp
Expand Up @@ -476,6 +476,9 @@ struct ChangelogReadResult

/// last offset we were able to read from log
off_t last_position;

/// Whether the changelog file was written using compression
bool compressed_log;
bool error;
};

Expand All @@ -484,7 +487,7 @@ class ChangelogReader
public:
explicit ChangelogReader(DiskPtr disk_, const std::string & filepath_) : disk(disk_), filepath(filepath_)
{
auto compression_method = chooseCompressionMethod(filepath, "");
compression_method = chooseCompressionMethod(filepath, "");
auto read_buffer_from_file = disk->readFile(filepath);
read_buf = wrapReadBufferWithCompressionMethod(std::move(read_buffer_from_file), compression_method);
}
Expand All @@ -493,6 +496,7 @@ class ChangelogReader
ChangelogReadResult readChangelog(IndexToLogEntry & logs, uint64_t start_log_index, Poco::Logger * log)
{
ChangelogReadResult result{};
result.compressed_log = compression_method != CompressionMethod::None;
try
{
while (!read_buf->eof())
Expand Down Expand Up @@ -583,13 +587,15 @@ class ChangelogReader
private:
DiskPtr disk;
std::string filepath;
CompressionMethod compression_method;
std::unique_ptr<ReadBuffer> read_buf;
};

Changelog::Changelog(
Poco::Logger * log_, LogFileSettings log_file_settings, FlushSettings flush_settings_, KeeperContextPtr keeper_context_)
: changelogs_detached_dir("detached")
, rotate_interval(log_file_settings.rotate_interval)
, compress_logs(log_file_settings.compress_logs)
, log(log_)
, write_operations(std::numeric_limits<size_t>::max())
, append_completion_queue(std::numeric_limits<size_t>::max())
Expand Down Expand Up @@ -851,7 +857,8 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first > last_log_read_result->last_read_index; });
move_from_latest_logs_disks(existing_changelogs.at(last_log_read_result->log_start_index));
}
else
/// don't mix compressed and uncompressed writes
else if (compress_logs == last_log_read_result->compressed_log)
{
initWriter(description);
}
Expand Down
1 change: 1 addition & 0 deletions src/Coordination/Changelog.h
Expand Up @@ -182,6 +182,7 @@ class Changelog

const String changelogs_detached_dir;
const uint64_t rotate_interval;
const bool compress_logs;
Poco::Logger * log;

std::mutex writer_mutex;
Expand Down
2 changes: 1 addition & 1 deletion src/Coordination/CoordinationSettings.h
Expand Up @@ -45,7 +45,7 @@ struct Settings;
M(UInt64, max_requests_quick_batch_size, 100, "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \
M(Bool, compress_logs, false, "Write compressed coordination logs in ZSTD format", 0) \
M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0) \
M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) \
M(UInt64, max_log_file_size, 50 * 1024 * 1024, "Max size of the Raft log file. If possible, each created log file will preallocate this amount of bytes on disk. Set to 0 to disable the limit", 0) \
Expand Down
123 changes: 107 additions & 16 deletions src/Coordination/tests/gtest_coordination.cpp
Expand Up @@ -1104,20 +1104,15 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
}

/// Truncating only some entries from the end
TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
/// For compressed logs we have no reliable way of knowing how many log entries were lost
/// after we truncate some bytes from the end
TEST_F(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
{
auto params = GetParam();

/// For compressed logs we have no reliable way of knowing how many log entries were lost
/// after we truncate some bytes from the end
if (!params.extension.empty())
return;

ChangelogDirTest test("./logs");
setLogDirectory("./logs");

DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
Expand All @@ -1131,23 +1126,23 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
changelog.end_of_append_batch(0, 0);

waitDurableLogs(changelog);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin"));

DB::WriteBufferFromFile plain_buf(
"./logs/changelog_1_20.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
"./logs/changelog_1_20.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(plain_buf.size() - 30);

DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(1, 0);

EXPECT_EQ(changelog_reader.size(), 19);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension);
EXPECT_TRUE(fs::exists("./logs/changelog_20_39.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
assertBrokenLogRemoved("./logs", "changelog_21_40.bin");
EXPECT_TRUE(fs::exists("./logs/changelog_20_39.bin"));
auto entry = getLogEntry("hello_world", 7777);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
Expand All @@ -1158,6 +1153,102 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
}

TEST_F(CoordinationTest, ChangelogTestMixedLogTypes)
{
ChangelogDirTest test("./logs");
setLogDirectory("./logs");

std::vector<std::string> changelog_files;

const auto verify_changelog_files = [&]
{
for (const auto & log_file : changelog_files)
EXPECT_TRUE(fs::exists(log_file)) << "File " << log_file << " not found";
};

size_t last_term = 0;
size_t log_size = 0;

const auto append_log = [&](auto & changelog, const std::string & data, uint64_t term)
{
last_term = term;
++log_size;
auto entry = getLogEntry(data, last_term);
changelog.append(entry);
};

const auto verify_log_content = [&](const auto & changelog)
{
EXPECT_EQ(changelog.size(), log_size);
EXPECT_EQ(changelog.last_entry()->get_term(), last_term);
};

{
SCOPED_TRACE("Initial uncompressed log");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);

for (size_t i = 0; i < 35; ++i)
append_log(changelog, std::to_string(i) + "_hello_world", (i+ 44) * 10);

changelog.end_of_append_batch(0, 0);

waitDurableLogs(changelog);
changelog_files.push_back("./logs/changelog_1_20.bin");
changelog_files.push_back("./logs/changelog_21_40.bin");
verify_changelog_files();

verify_log_content(changelog);
}

{
SCOPED_TRACE("Compressed log");
DB::KeeperLogStore changelog_compressed(
DB::LogFileSettings{.force_sync = true, .compress_logs = true, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog_compressed.init(1, 0);

verify_changelog_files();
verify_log_content(changelog_compressed);

append_log(changelog_compressed, "hello_world", 7777);
changelog_compressed.end_of_append_batch(0, 0);

waitDurableLogs(changelog_compressed);

verify_log_content(changelog_compressed);

changelog_files.push_back("./logs/changelog_36_55.bin.zstd");
verify_changelog_files();
}

{
SCOPED_TRACE("Final uncompressed log");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);

verify_changelog_files();
verify_log_content(changelog);

append_log(changelog, "hello_world", 7778);
changelog.end_of_append_batch(0, 0);

waitDurableLogs(changelog);

verify_log_content(changelog);

changelog_files.push_back("./logs/changelog_37_56.bin");
verify_changelog_files();
}
}

TEST_P(CoordinationTest, ChangelogTestLostFiles)
{
auto params = GetParam();
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_keeper_four_word_command/test.py
Expand Up @@ -287,7 +287,7 @@ def test_cmd_conf(started_cluster):
assert result["quorum_reads"] == "false"
assert result["force_sync"] == "true"

assert result["compress_logs"] == "true"
assert result["compress_logs"] == "false"
assert result["compress_snapshots_with_zstd_format"] == "true"
assert result["configuration_change_tries_count"] == "20"

Expand Down

0 comments on commit c3f7c87

Please sign in to comment.