Skip to content

Commit

Permalink
Merge pull request #50483 from ClickHouse/fix-keeper-snapshot-install
Browse files Browse the repository at this point in the history
Keeper fix: apply uncommitted state after snapshot install
  • Loading branch information
alexey-milovidov committed Jun 5, 2023
2 parents 112826a + 336c9d7 commit bed19aa
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 14 deletions.
5 changes: 5 additions & 0 deletions src/Coordination/KeeperStateMachine.cpp
Expand Up @@ -364,6 +364,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
else if (s.get_last_log_idx() < latest_snapshot_meta->get_last_log_idx())
{
LOG_INFO(log, "A snapshot with a larger last log index ({}) was created, skipping applying this snapshot", latest_snapshot_meta->get_last_log_idx());
return true;
}

latest_snapshot_ptr = latest_snapshot_buf;
Expand All @@ -373,6 +374,10 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
std::lock_guard lock(storage_and_responses_lock);
auto snapshot_deserialization_result
= snapshot_manager.deserializeSnapshotFromBuffer(snapshot_manager.deserializeSnapshotBufferFromDisk(s.get_last_log_idx()));

/// maybe some logs were preprocessed with log idx larger than the snapshot idx
/// we have to apply them to the new storage
storage->applyUncommittedState(*snapshot_deserialization_result.storage, s.get_last_log_idx());
storage = std::move(snapshot_deserialization_result.storage);
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
cluster_config = snapshot_deserialization_result.cluster_config;
Expand Down
51 changes: 37 additions & 14 deletions src/Coordination/KeeperStorage.cpp
Expand Up @@ -375,25 +375,28 @@ void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
delta.operation);
}

void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
void KeeperStorage::UncommittedState::addDelta(Delta new_delta)
{
for (auto & delta : new_deltas)
{
const auto & added_delta = deltas.emplace_back(std::move(delta));
const auto & added_delta = deltas.emplace_back(std::move(new_delta));

if (!added_delta.path.empty())
{
deltas_for_path[added_delta.path].push_back(&added_delta);
applyDelta(added_delta);
}
else if (const auto * auth_delta = std::get_if<AddAuthDelta>(&added_delta.operation))
{
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
uncommitted_auth.emplace_back(&auth_delta->auth_id);
}
if (!added_delta.path.empty())
{
deltas_for_path[added_delta.path].push_back(&added_delta);
applyDelta(added_delta);
}
else if (const auto * auth_delta = std::get_if<AddAuthDelta>(&added_delta.operation))
{
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
uncommitted_auth.emplace_back(&auth_delta->auth_id);
}
}

void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
{
for (auto & delta : new_deltas)
addDelta(std::move(delta));
}

void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
{
assert(deltas.empty() || deltas.front().zxid >= commit_zxid);
Expand Down Expand Up @@ -602,6 +605,26 @@ namespace

}

void KeeperStorage::applyUncommittedState(KeeperStorage & other, int64_t last_zxid)
{
for (const auto & transaction : uncommitted_transactions)
{
if (transaction.zxid <= last_zxid)
continue;
other.uncommitted_transactions.push_back(transaction);
}

auto it = uncommitted_state.deltas.begin();

for (; it != uncommitted_state.deltas.end(); ++it)
{
if (it->zxid <= last_zxid)
continue;

other.uncommitted_state.addDelta(*it);
}
}

Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
{
// Deltas are added with increasing ZXIDs
Expand Down
5 changes: 5 additions & 0 deletions src/Coordination/KeeperStorage.h
Expand Up @@ -222,6 +222,7 @@ class KeeperStorage
{
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }

void addDelta(Delta new_delta);
void addDeltas(std::vector<Delta> new_deltas);
void commit(int64_t commit_zxid);
void rollback(int64_t rollback_zxid);
Expand Down Expand Up @@ -310,6 +311,10 @@ class KeeperStorage

UncommittedState uncommitted_state{*this};

// Apply uncommitted state to another storage using only transactions
// with zxid > last_zxid
void applyUncommittedState(KeeperStorage & other, int64_t last_zxid);

Coordination::Error commit(int64_t zxid);

// Create node in the storage
Expand Down
77 changes: 77 additions & 0 deletions src/Coordination/tests/gtest_coordination.cpp
Expand Up @@ -2524,6 +2524,83 @@ TEST_P(CoordinationTest, TestCheckNotExistsRequest)
}
}

TEST_P(CoordinationTest, TestReapplyingDeltas)
{
using namespace DB;
using namespace Coordination;

static constexpr int64_t initial_zxid = 100;

const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
create_request->path = "/test/data";
create_request->is_sequential = true;

const auto process_create = [](KeeperStorage & storage, const auto & request, int64_t zxid)
{
storage.preprocessRequest(request, 1, 0, zxid);
auto responses = storage.processRequest(request, 1, zxid);
EXPECT_GE(responses.size(), 1);
EXPECT_EQ(responses[0].response->error, Error::ZOK);
};

const auto commit_initial_data = [&](auto & storage)
{
int64_t zxid = 1;

const auto root_create = std::make_shared<ZooKeeperCreateRequest>();
root_create->path = "/test";
process_create(storage, root_create, zxid);
++zxid;

for (; zxid <= initial_zxid; ++zxid)
process_create(storage, create_request, zxid);
};

KeeperStorage storage1{500, "", keeper_context};
commit_initial_data(storage1);

for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid)
storage1.preprocessRequest(create_request, 1, 0, zxid);

/// create identical new storage
KeeperStorage storage2{500, "", keeper_context};
commit_initial_data(storage2);

storage1.applyUncommittedState(storage2, initial_zxid);

const auto commit_unprocessed = [&](KeeperStorage & storage)
{
for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid)
{
auto responses = storage.processRequest(create_request, 1, zxid);
EXPECT_GE(responses.size(), 1);
EXPECT_EQ(responses[0].response->error, Error::ZOK);
}
};

commit_unprocessed(storage1);
commit_unprocessed(storage2);

const auto get_children = [&](KeeperStorage & storage)
{
const auto list_request = std::make_shared<ZooKeeperListRequest>();
list_request->path = "/test";
auto responses = storage.processRequest(list_request, 1, std::nullopt, /*check_acl=*/true, /*is_local=*/true);
EXPECT_EQ(responses.size(), 1);
const auto * list_response = dynamic_cast<const ListResponse *>(responses[0].response.get());
EXPECT_TRUE(list_response);
return list_response->names;
};

auto children1 = get_children(storage1);
std::unordered_set<std::string> children1_set(children1.begin(), children1.end());

auto children2 = get_children(storage2);
std::unordered_set<std::string> children2_set(children2.begin(), children2.end());

ASSERT_TRUE(children1_set == children2_set);
}

INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest,
::testing::ValuesIn(std::initializer_list<CompressionParam>{
Expand Down

0 comments on commit bed19aa

Please sign in to comment.