Skip to content

Commit

Permalink
Merge 23ca345 into 189f0c2
Browse files Browse the repository at this point in the history
  • Loading branch information
yiwu-arbug committed Jun 26, 2018
2 parents 189f0c2 + 23ca345 commit 357fe6f
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 36 deletions.
4 changes: 1 addition & 3 deletions utilities/blob_db/blob_db.h
Expand Up @@ -38,9 +38,7 @@ struct BlobDBOptions {

// When max_db_size is reached, evict blob files to free up space
// instead of returnning NoSpace error on write. Blob files will be
// evicted in this order until enough space is free up:
// * the TTL blob file cloeset to expire,
// * the oldest non-TTL blob file.
// evicted from oldest to newest, based on file creation time.
bool is_fifo = false;

// Maximum size of the database (including SST files and blob files).
Expand Down
37 changes: 15 additions & 22 deletions utilities/blob_db/blob_db_impl.cc
Expand Up @@ -52,8 +52,15 @@ WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound(
return WalFilter::WalProcessingOption::kContinueProcessing;
}

bool blobf_compare_ttl::operator()(const std::shared_ptr<BlobFile>& lhs,
const std::shared_ptr<BlobFile>& rhs) const {
bool BlobFileComparator::operator()(
const std::shared_ptr<BlobFile>& lhs,
const std::shared_ptr<BlobFile>& rhs) const {
return lhs->BlobFileNumber() > rhs->BlobFileNumber();
}

bool BlobFileComparatorTTL::operator()(
const std::shared_ptr<BlobFile>& lhs,
const std::shared_ptr<BlobFile>& rhs) const {
assert(lhs->HasTTL() && rhs->HasTTL());
if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
return true;
Expand Down Expand Up @@ -852,14 +859,9 @@ Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
}

std::vector<std::shared_ptr<BlobFile>> candidate_files;
CopyBlobFiles(&candidate_files,
[&](const std::shared_ptr<BlobFile>& blob_file) {
// Only evict TTL files
return blob_file->HasTTL();
});
CopyBlobFiles(&candidate_files);
std::sort(candidate_files.begin(), candidate_files.end(),
blobf_compare_ttl());
std::reverse(candidate_files.begin(), candidate_files.end());
BlobFileComparator());
fifo_eviction_seq_ = GetLatestSequenceNumber();

WriteLock l(&mutex_);
Expand Down Expand Up @@ -887,10 +889,9 @@ Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
"Evict oldest blob file since DB out of space. Current "
"live SST file size: %" PRIu64 ", total blob size: %" PRIu64
", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
" with expiration range (%" PRIu64 ", %" PRIu64 ").",
".",
live_sst_size, total_blob_size_.load(),
bdb_options_.max_db_size, blob_file->BlobFileNumber(),
expiration_range.first, expiration_range.second);
bdb_options_.max_db_size, blob_file->BlobFileNumber());
ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
evict_expiration_up_to_ = expiration_range.first;
RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
Expand Down Expand Up @@ -1741,18 +1742,10 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
}

void BlobDBImpl::CopyBlobFiles(
std::vector<std::shared_ptr<BlobFile>>* bfiles_copy,
std::function<bool(const std::shared_ptr<BlobFile>&)> predicate) {
std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
ReadLock rl(&mutex_);

for (auto const& p : blob_files_) {
bool pred_value = true;
if (predicate) {
pred_value = predicate(p.second);
}
if (pred_value) {
bfiles_copy->push_back(p.second);
}
bfiles_copy->push_back(p.second);
}
}

Expand Down
13 changes: 8 additions & 5 deletions utilities/blob_db/blob_db_impl.h
Expand Up @@ -64,7 +64,12 @@ class BlobReconcileWalFilter : public WalFilter {

// Comparator to sort "TTL" aware Blob files based on the lower value of
// TTL range.
struct blobf_compare_ttl {
struct BlobFileComparatorTTL {
bool operator()(const std::shared_ptr<BlobFile>& lhs,
const std::shared_ptr<BlobFile>& rhs) const;
};

struct BlobFileComparator {
bool operator()(const std::shared_ptr<BlobFile>& lhs,
const std::shared_ptr<BlobFile>& rhs) const;
};
Expand Down Expand Up @@ -315,9 +320,7 @@ class BlobDBImpl : public BlobDB {
bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);

void CopyBlobFiles(
std::vector<std::shared_ptr<BlobFile>>* bfiles_copy,
std::function<bool(const std::shared_ptr<BlobFile>&)> predicate = {});
void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);

uint64_t EpochNow() { return env_->NowMicros() / 1000000; }

Expand Down Expand Up @@ -373,7 +376,7 @@ class BlobDBImpl : public BlobDB {

// all the blob files which are currently being appended to based
// on variety of incoming TTL's
std::set<std::shared_ptr<BlobFile>, blobf_compare_ttl> open_ttl_files_;
std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_;

// Flag to check whether Close() has been called on this DB
bool closed_;
Expand Down
26 changes: 21 additions & 5 deletions utilities/blob_db/blob_db_test.cc
Expand Up @@ -1120,26 +1120,42 @@ TEST_F(BlobDBTest, FIFOEviction) {

ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());

// Adding another 100 byte blob would take the total size to 264 bytes
// Adding another 100 bytes blob would take the total size to 264 bytes
// (2*132). max_db_size will be exceeded
// than max_db_size and trigger FIFO eviction.
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
ASSERT_EQ(1, evict_count);
// key1 will exist until corresponding file be deleted.
VerifyDB({{"key1", value}, {"key2", value}});

// Adding another 100 bytes blob without TTL.
ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
ASSERT_EQ(2, evict_count);
// key1 and key2 will exist until corresponding file be deleted.
VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});

// The fourth blob file, without TTL.
ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
ASSERT_EQ(3, evict_count);
VerifyDB(
{{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});

auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(4, blob_files.size());
ASSERT_TRUE(blob_files[0]->Obsolete());
ASSERT_FALSE(blob_files[1]->Obsolete());
ASSERT_TRUE(blob_files[1]->Obsolete());
ASSERT_TRUE(blob_files[2]->Obsolete());
ASSERT_FALSE(blob_files[3]->Obsolete());
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(1, obsolete_files.size());
ASSERT_EQ(3, obsolete_files.size());
ASSERT_EQ(blob_files[0], obsolete_files[0]);
ASSERT_EQ(blob_files[1], obsolete_files[1]);
ASSERT_EQ(blob_files[2], obsolete_files[2]);

blob_db_impl()->TEST_DeleteObsoleteFiles();
obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_TRUE(obsolete_files.empty());
VerifyDB({{"key2", value}});
VerifyDB({{"key4", value}});
}

TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
Expand Down
3 changes: 2 additions & 1 deletion utilities/blob_db/blob_file.h
Expand Up @@ -23,7 +23,8 @@ class BlobDBImpl;

class BlobFile {
friend class BlobDBImpl;
friend struct blobf_compare_ttl;
friend struct BlobFileComparator;
friend struct BlobFileComparatorTTL;

private:
// access to parent
Expand Down

0 comments on commit 357fe6f

Please sign in to comment.