Skip to content

Commit

Permalink
[BugFix] Release pk state cache when txn is aborted
Browse files Browse the repository at this point in the history
Signed-off-by: Zijie Lu <wslzj40@gmail.com>
  • Loading branch information
TszKitLo40 committed Jan 5, 2024
1 parent e523177 commit 58fb403
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 1 deletion.
4 changes: 4 additions & 0 deletions be/src/storage/lake/transactions.cpp
Expand Up @@ -23,6 +23,7 @@
#include "storage/lake/tablet_manager.h"
#include "storage/lake/txn_log.h"
#include "storage/lake/txn_log_applier.h"
#include "storage/lake/update_manager.h"
#include "storage/lake/vacuum.h" // delete_files_async
#include "util/lru_cache.h"

Expand Down Expand Up @@ -285,6 +286,7 @@ Status publish_log_version(TabletManager* tablet_mgr, int64_t tablet_id, const i

void abort_txn(TabletManager* tablet_mgr, int64_t tablet_id, std::span<const int64_t> txn_ids,
std::span<const int32_t> txn_types) {
TEST_SYNC_POINT("transactions::abort_txn:enter");
std::vector<std::string> files_to_delete;
for (size_t i = 0; i < txn_ids.size(); ++i) {
auto txn_id = txn_ids[i];
Expand Down Expand Up @@ -337,6 +339,8 @@ void abort_txn(TabletManager* tablet_mgr, int64_t tablet_id, std::span<const int
files_to_delete.emplace_back(log_path);

tablet_mgr->metacache()->erase(log_path);

tablet_mgr->update_mgr()->try_remove_cache(tablet_id, txn_id);
}

delete_files_async(std::move(files_to_delete));
Expand Down
9 changes: 9 additions & 0 deletions be/src/storage/lake/update_manager.cpp
Expand Up @@ -492,6 +492,7 @@ void UpdateManager::expire_cache() {
if (MonotonicMillis() - _last_clear_expired_cache_millis > _cache_expire_ms) {
_update_state_cache.clear_expired();
_index_cache.clear_expired();
_compaction_cache.clear_expired();
_last_clear_expired_cache_millis = MonotonicMillis();
}
}
Expand Down Expand Up @@ -717,6 +718,12 @@ void UpdateManager::TEST_remove_compaction_cache(uint32_t tablet_id, int64_t txn
}
}

void UpdateManager::try_remove_cache(uint32_t tablet_id, int64_t txn_id) {
auto key = cache_key(tablet_id, txn_id);
_update_state_cache.try_remove_by_key(key);
_compaction_cache.try_remove_by_key(key);
}

void UpdateManager::preload_update_state(const TxnLog& txnlog, Tablet* tablet) {
// use tabletid-txnid as update state cache's key, so it can retry safe.
auto state_entry = _update_state_cache.get_or_create(cache_key(tablet->id(), txnlog.txn_id()));
Expand Down Expand Up @@ -745,6 +752,7 @@ void UpdateManager::preload_update_state(const TxnLog& txnlog, Tablet* tablet) {
} else {
_update_state_cache.remove(state_entry);
}
TEST_SYNC_POINT("UpdateManager::preload_update_state:return");
}

void UpdateManager::preload_compaction_state(const TxnLog& txnlog, const Tablet& tablet,
Expand Down Expand Up @@ -776,6 +784,7 @@ void UpdateManager::preload_compaction_state(const TxnLog& txnlog, const Tablet&
// just release it, will use it again in publish
_compaction_cache.release(compaction_entry);
}
TEST_SYNC_POINT("UpdateManager::preload_compaction_state:return");
}

} // namespace starrocks::lake
2 changes: 2 additions & 0 deletions be/src/storage/lake/update_manager.h
Expand Up @@ -150,6 +150,8 @@ class UpdateManager {

void unlock_pk_index_shard(int64_t tablet_id) { _get_pk_index_shard_lock(tablet_id).unlock(); }

void try_remove_cache(uint32_t tablet_id, int64_t txn_id);

private:
// print memory tracker state
void _print_memory_stats();
Expand Down
70 changes: 70 additions & 0 deletions be/test/storage/lake/primary_key_compaction_task_test.cpp
Expand Up @@ -786,6 +786,76 @@ TEST_P(LakePrimaryKeyCompactionTest, test_remove_compaction_state) {
ASSERT_EQ(kChunkSize, read(version));
}

TEST_P(LakePrimaryKeyCompactionTest, test_abort_txn) {
SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"UpdateManager::preload_compaction_state:return", "transactions::abort_txn:enter"}});
// Prepare data for writing
auto chunk0 = generate_data(kChunkSize, 0);
auto indexes = std::vector<uint32_t>(kChunkSize);
for (int i = 0; i < kChunkSize; i++) {
indexes[i] = i;
}

auto version = 1;
auto tablet_id = _tablet_metadata->id();
for (int i = 0; i < 3; i++) {
auto txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();
// Publish version
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
}
ASSERT_EQ(kChunkSize, read(version));
ASSIGN_OR_ABORT(auto new_tablet_metadata1, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata1->rowsets_size(), 3);

ExecEnv::GetInstance()->delete_file_thread_pool()->wait();
// make sure delvecs have been generated
for (int i = 0; i < 2; i++) {
auto itr = new_tablet_metadata1->delvec_meta().version_to_file().find(version - i);
EXPECT_TRUE(itr != new_tablet_metadata1->delvec_meta().version_to_file().end());
auto delvec_file = itr->second;
EXPECT_TRUE(fs::path_exist(_lp->delvec_location(tablet_id, delvec_file.name())));
}

auto txn_id = next_id();
std::thread t1([&]() {
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(_tablet_metadata->id(), version, txn_id));
check_task(task);
CompactionTask::Progress progress;
ASSERT_OK(task->execute(&progress, CompactionTask::kNoCancelFn));
EXPECT_EQ(100, progress.value());
});

std::thread t2([&]() {
lake::AbortTxnRequest request;
request.add_tablet_ids(tablet_id);
request.add_txn_ids(txn_id);
request.set_skip_cleanup(false);
lake::AbortTxnResponse response;
auto lake_service = LakeServiceImpl(ExecEnv::GetInstance(), _tablet_mgr.get());
lake_service.abort_txn(nullptr, &request, &response, nullptr);
});

t1.join();
t2.join();

ASSERT_TRUE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id));
SyncPoint::GetInstance()->DisableProcessing();
}

INSTANTIATE_TEST_SUITE_P(LakePrimaryKeyCompactionTest, LakePrimaryKeyCompactionTest,
::testing::Values(CompactionParam{HORIZONTAL_COMPACTION, 5, false},
CompactionParam{VERTICAL_COMPACTION, 1, false},
Expand Down
41 changes: 40 additions & 1 deletion be/test/storage/lake/primary_key_publish_test.cpp
Expand Up @@ -647,7 +647,6 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_rebuild_persistent_index) {
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();
// Publish version
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
Expand All @@ -670,6 +669,46 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_rebuild_persistent_index) {
}
}

TEST_P(LakePrimaryKeyPublishTest, test_abort_txn) {
SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"UpdateManager::preload_update_state:return", "transactions::abort_txn:enter"}});

auto tablet_id = _tablet_metadata->id();
auto txn_id = next_id();
std::thread t1([&]() {
auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true);
auto tablet_id = _tablet_metadata->id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();
});

std::thread t2([&]() {
lake::AbortTxnRequest request;
request.add_tablet_ids(tablet_id);
request.add_txn_ids(txn_id);
request.set_skip_cleanup(false);
lake::AbortTxnResponse response;
auto lake_service = LakeServiceImpl(ExecEnv::GetInstance(), _tablet_mgr.get());
lake_service.abort_txn(nullptr, &request, &response, nullptr);
});

t1.join();
t2.join();
ASSERT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
SyncPoint::GetInstance()->DisableProcessing();
}

INSTANTIATE_TEST_SUITE_P(LakePrimaryKeyPublishTest, LakePrimaryKeyPublishTest,
::testing::Values(PrimaryKeyParam{true}, PrimaryKeyParam{false}));

Expand Down

0 comments on commit 58fb403

Please sign in to comment.