Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs and stats in DeleteScheduler #6927

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,12 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
auto sfm = static_cast<SstFileManagerImpl*>(
impl->immutable_db_options_.sst_file_manager.get());
if (s.ok() && sfm) {
// Set Statistics ptr for SstFileManager to dump the stats of
// DeleteScheduler.
sfm->SetStatisticsPtr(impl->immutable_db_options_.statistics);
ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
"SstFileManager instance %p", sfm);

// Notify SstFileManager about all sst files that already exist in
// db_paths[0] and cf_paths[0] when the DB is opened.

Expand Down
10 changes: 10 additions & 0 deletions db/db_sst_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
env_->SetTimeElapseOnlySleep(&options);
options.disable_auto_compactions = true;
options.env = env_;
options.statistics = CreateDBStatistics();

int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
Status s;
Expand Down Expand Up @@ -425,6 +426,9 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
}
ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
ASSERT_LT(time_spent_deleting, expected_penlty * 1.1);
ASSERT_EQ(4, options.statistics->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(
0, options.statistics->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
Expand Down Expand Up @@ -816,6 +820,12 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) {
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
// Make sure the stat is bumped
ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(COMPACTION_CANCELLED), 0);
ASSERT_EQ(0,
dbfull()->immutable_db_options().statistics.get()->getTickerCount(
FILES_MARKED_TRASH));
ASSERT_EQ(4,
dbfull()->immutable_db_options().statistics.get()->getTickerCount(
FILES_DELETED_IMMEDIATELY));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

Expand Down
41 changes: 34 additions & 7 deletions file/delete_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "file/delete_scheduler.h"

#include <cinttypes>
#include <thread>
#include <vector>

Expand Down Expand Up @@ -65,24 +66,36 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
if (s.ok()) {
sst_file_manager_->OnDeleteFile(file_path);
ROCKS_LOG_INFO(info_log_,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can print a message during db start, similar to the following:

SstFileManager instance: <addr in hex>

"Deleted file %s immediately, rate_bytes_per_sec %" PRIi64
", total_trash_size %" PRIu64 " max_trash_db_ratio %lf",
file_path.c_str(), rate_bytes_per_sec_.load(),
total_trash_size_.load(), max_trash_db_ratio_.load());
RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
}
return s;
}

// Move file to trash
std::string trash_file;
s = MarkAsTrash(file_path, &trash_file);
ROCKS_LOG_INFO(info_log_, "Mark file: %s as trash -- %s", trash_file.c_str(),
s.ToString().c_str());

if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash -- %s",
file_path.c_str(), s.ToString().c_str());
s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
if (s.ok()) {
sst_file_manager_->OnDeleteFile(file_path);
ROCKS_LOG_INFO(info_log_, "Deleted file %s immediately",
trash_file.c_str());
RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
}
return s;
}

RecordTick(stats_.get(), FILES_MARKED_TRASH);
// Update the total trash size
uint64_t trash_file_size = 0;
fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
Expand Down Expand Up @@ -210,6 +223,8 @@ void DeleteScheduler::BackgroundEmptyTrash() {
current_delete_rate = rate_bytes_per_sec_.load();
start_time = env_->NowMicros();
total_deleted_bytes = 0;
ROCKS_LOG_INFO(info_log_, "rate_bytes_per_sec is changed to %" PRIi64,
current_delete_rate);
}

// Get new file to delete
Expand All @@ -233,19 +248,27 @@ void DeleteScheduler::BackgroundEmptyTrash() {
bg_errors_[path_in_trash] = s;
}

// Apply penlty if necessary
uint64_t total_penlty;
// Apply penalty if necessary
uint64_t total_penalty;
if (current_delete_rate > 0) {
// rate limiting is enabled
total_penlty =
total_penalty =
((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
ROCKS_LOG_INFO(info_log_,
"Rate limiting is enabled with penalty %" PRIu64
"after deleting file %s",
total_penalty, path_in_trash.c_str());
while (!closing_ && !cv_.TimedWait(start_time + total_penalty)) {
}
} else {
// rate limiting is disabled
total_penlty = 0;
total_penalty = 0;
ROCKS_LOG_INFO(info_log_,
"Rate limiting is disabled after deleting file %s",
path_in_trash.c_str());
}
TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
&total_penlty);
&total_penalty);

if (is_complete) {
pending_files_--;
Expand Down Expand Up @@ -353,9 +376,13 @@ void DeleteScheduler::WaitForEmptyTrash() {
}

void DeleteScheduler::MaybeCreateBackgroundThread() {
if(bg_thread_ == nullptr && rate_bytes_per_sec_.load() > 0) {
if (bg_thread_ == nullptr && rate_bytes_per_sec_.load() > 0) {
bg_thread_.reset(
new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
ROCKS_LOG_INFO(info_log_,
"Created background thread for deletion scheduler with "
"rate_bytes_per_sec: %" PRIi64,
rate_bytes_per_sec_.load());
}
}

Expand Down
5 changes: 5 additions & 0 deletions file/delete_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ class DeleteScheduler {
static Status CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
const std::string& path);

void SetStatisticsPtr(const std::shared_ptr<Statistics>& stats) {
stats_ = stats;
}

private:
Status MarkAsTrash(const std::string& file_path, std::string* path_in_trash);

Expand Down Expand Up @@ -137,6 +141,7 @@ class DeleteScheduler {
// immediately
std::atomic<double> max_trash_db_ratio_;
static const uint64_t kMicrosInSecond = 1000 * 1000LL;
std::shared_ptr<Statistics> stats_;
};

} // namespace ROCKSDB_NAMESPACE
Expand Down
25 changes: 24 additions & 1 deletion file/delete_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class DeleteSchedulerTest : public testing::Test {
ToString(i));
DestroyAndCreateDir(dummy_files_dirs_.back());
}
stats_ = ROCKSDB_NAMESPACE::CreateDBStatistics();
}

~DeleteSchedulerTest() override {
Expand Down Expand Up @@ -99,13 +100,15 @@ class DeleteSchedulerTest : public testing::Test {
new SstFileManagerImpl(env_, fs, nullptr, rate_bytes_per_sec_,
/* max_trash_db_ratio= */ 1.1, 128 * 1024));
delete_scheduler_ = sst_file_mgr_->delete_scheduler();
sst_file_mgr_->SetStatisticsPtr(stats_);
}

Env* env_;
std::vector<std::string> dummy_files_dirs_;
int64_t rate_bytes_per_sec_;
DeleteScheduler* delete_scheduler_;
std::unique_ptr<SstFileManagerImpl> sst_file_mgr_;
std::shared_ptr<Statistics> stats_;
};

// Test the basic functionality of DeleteScheduler (Rate Limiting).
Expand Down Expand Up @@ -182,6 +185,8 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
ASSERT_EQ(num_files, dir_synced);

ASSERT_EQ(CountTrashFiles(), 0);
ASSERT_EQ(num_files, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
}
Expand Down Expand Up @@ -219,6 +224,9 @@ TEST_F(DeleteSchedulerTest, MultiDirectoryDeletionsScheduled) {
ASSERT_EQ(0, CountTrashFiles(i));
}

ASSERT_EQ(kNumFiles, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

Expand Down Expand Up @@ -301,6 +309,10 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {

ASSERT_EQ(CountNormalFiles(), 0);
ASSERT_EQ(CountTrashFiles(), 0);
ASSERT_EQ(num_files * thread_cnt,
stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
}
Expand All @@ -318,8 +330,9 @@ TEST_F(DeleteSchedulerTest, DisableRateLimiting) {

rate_bytes_per_sec_ = 0;
NewDeleteScheduler();
constexpr int num_files = 10;

for (int i = 0; i < 10; i++) {
for (int i = 0; i < num_files; i++) {
// Every file we delete will be deleted immediately
std::string dummy_file = NewDummyFile("dummy.data");
ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file, ""));
Expand All @@ -329,6 +342,9 @@ TEST_F(DeleteSchedulerTest, DisableRateLimiting) {
}

ASSERT_EQ(bg_delete_file, 0);
ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(num_files,
stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
Expand Down Expand Up @@ -365,6 +381,8 @@ TEST_F(DeleteSchedulerTest, ConflictNames) {

auto bg_errors = delete_scheduler_->GetBackgroundErrors();
ASSERT_EQ(bg_errors.size(), 0);
ASSERT_EQ(10, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
Expand Down Expand Up @@ -439,9 +457,12 @@ TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) {

auto bg_errors = delete_scheduler_->GetBackgroundErrors();
ASSERT_EQ(bg_errors.size(), 0);
ASSERT_EQ(10, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
}

ASSERT_EQ(bg_delete_file, 50);

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
}

Expand Down Expand Up @@ -653,6 +674,8 @@ TEST_F(DeleteSchedulerTest, ImmediateDeleteOn25PercDBSize) {
// When we end up with 26 files in trash we will start
// deleting new files immediately
ASSERT_EQ(fg_delete_file, 74);
ASSERT_EQ(26, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(74, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
Expand Down
6 changes: 6 additions & 0 deletions file/sst_file_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ class SstFileManagerImpl : public SstFileManager {
// once in the object's lifetime, and before the destructor
void Close();

void SetStatisticsPtr(const std::shared_ptr<Statistics>& stats) override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call this function in when we open the db.

stats_ = stats;
delete_scheduler_.SetStatisticsPtr(stats);
}

private:
// REQUIRES: mutex locked
void OnAddFileImpl(const std::string& file_path, uint64_t file_size,
Expand Down Expand Up @@ -190,6 +195,7 @@ class SstFileManagerImpl : public SstFileManager {
std::list<ErrorHandler*> error_handler_list_;
// Pointer to ErrorHandler instance that is currently processing recovery
ErrorHandler* cur_instance_;
std::shared_ptr<Statistics> stats_;
};

} // namespace ROCKSDB_NAMESPACE
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/sst_file_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <vector>

#include "rocksdb/file_system.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -80,6 +81,9 @@ class SstFileManager {
// Return the total size of trash files
// thread-safe
virtual uint64_t GetTotalTrashSize() = 0;

// Set the statistics ptr to dump the stat information
virtual void SetStatisticsPtr(const std::shared_ptr<Statistics>& stats) = 0;
};

// Create a new SstFileManager that can be shared among multiple RocksDB
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@ enum Tickers : uint32_t {
// <= BLOCK_CACHE_COMPRESSION_DICT_ADD
BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,

// # of files marked as trash by sst file manager and will be deleted
// later by background thread.
FILES_MARKED_TRASH,
// # of files deleted immediately by sst file manger through delete scheduler.
FILES_DELETED_IMMEDIATELY,

TICKER_ENUM_MAX
};

Expand Down
15 changes: 12 additions & 3 deletions java/rocksjni/portal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1292,9 +1292,9 @@ class ByteBufferJni : public JavaClass {
return constructWith(env, direct, nullptr, capacity, jbytebuffer_clazz);
}

static jobject constructWith(
JNIEnv* env, const bool direct, const char* buf, const size_t capacity,
jclass jbytebuffer_clazz = nullptr) {
static jobject constructWith(JNIEnv* env, const bool direct, const char* buf,
const size_t capacity,
jclass jbytebuffer_clazz = nullptr) {
if (direct) {
bool allocated = false;
if (buf == nullptr) {
Expand Down Expand Up @@ -4945,6 +4945,11 @@ class TickerTypeJni {
return -0x0C;
case ROCKSDB_NAMESPACE::Tickers::TXN_GET_TRY_AGAIN:
return -0x0D;
case ROCKSDB_NAMESPACE::Tickers::FILES_MARKED_TRASH:
return -0x0E;
case ROCKSDB_NAMESPACE::Tickers::FILES_DELETED_IMMEDIATELY:
return -0X0F;

case ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX:
// 0x5F for backwards compatibility on current minor version.
return 0x5F;
Expand Down Expand Up @@ -5240,6 +5245,10 @@ class TickerTypeJni {
return ROCKSDB_NAMESPACE::Tickers::TXN_SNAPSHOT_MUTEX_OVERHEAD;
case -0x0D:
return ROCKSDB_NAMESPACE::Tickers::TXN_GET_TRY_AGAIN;
case -0x0E:
return ROCKSDB_NAMESPACE::Tickers::FILES_MARKED_TRASH;
case -0x0F:
return ROCKSDB_NAMESPACE::Tickers::FILES_DELETED_IMMEDIATELY;
case 0x5F:
// 0x5F for backwards compatibility on current minor version.
return ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX;
Expand Down
10 changes: 10 additions & 0 deletions java/src/main/java/org/rocksdb/TickerType.java
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,16 @@ public enum TickerType {
*/
TXN_GET_TRY_AGAIN((byte) -0x0D),

/**
* # of files marked as trash by delete scheduler
*/
FILES_MARKED_TRASH((byte) -0x0E),

/**
* # of files deleted immediately by delete scheduler
*/
FILES_DELETED_IMMEDIATELY((byte) -0x0f),

TICKER_ENUM_MAX((byte) 0x5F);

private final byte value;
Expand Down
2 changes: 2 additions & 0 deletions monitoring/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{BLOCK_CACHE_DATA_ADD_REDUNDANT, "rocksdb.block.cache.data.add.redundant"},
{BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
"rocksdb.block.cache.compression.dict.add.redundant"},
{FILES_MARKED_TRASH, "rocksdb.files.marked.trash"},
{FILES_DELETED_IMMEDIATELY, "rocksdb.files.deleted.immediately"},
};

const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
Expand Down