Skip to content

Commit

Permalink
Merge branch 'main' into wal-corruption-in-checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
andlr committed Jun 1, 2024
2 parents 1299772 + fc59d8f commit c4c99d1
Show file tree
Hide file tree
Showing 47 changed files with 1,242 additions and 561 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ set(SOURCES
db/db_impl/db_impl_write.cc
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_files.cc
db/db_impl/db_impl_follower.cc
db/db_impl/db_impl_open.cc
db/db_impl/db_impl_debug.cc
db/db_impl/db_impl_experimental.cc
Expand Down Expand Up @@ -748,6 +749,7 @@ set(SOURCES
env/env_encryption.cc
env/file_system.cc
env/file_system_tracer.cc
env/fs_on_demand.cc
env/fs_remap.cc
env/mock_env.cc
env/unique_id_gen.cc
Expand Down Expand Up @@ -1037,10 +1039,8 @@ endif()

else()
list(APPEND SOURCES
db/db_impl/db_impl_follower.cc
port/port_posix.cc
env/env_posix.cc
env/fs_on_demand.cc
env/fs_posix.cc
env/io_posix.cc)
endif()
Expand Down
105 changes: 105 additions & 0 deletions cache/tiered_secondary_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,111 @@ TEST_P(DBTieredAdmPolicyTest, CompressedCacheAdmission) {
Destroy(options);
}

TEST_F(DBTieredSecondaryCacheTest, FSBufferTest) {
class WrapFS : public FileSystemWrapper {
public:
explicit WrapFS(const std::shared_ptr<FileSystem>& _target)
: FileSystemWrapper(_target) {}
~WrapFS() override {}
const char* Name() const override { return "WrapFS"; }

IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
class WrappedRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
explicit WrappedRandomAccessFile(
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileOwnerWrapper(std::move(file)) {}

IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) override {
for (size_t i = 0; i < num_reqs; ++i) {
FSReadRequest& req = reqs[i];
FSAllocationPtr buffer(new char[req.len], [](void* ptr) {
delete[] static_cast<char*>(ptr);
});
req.fs_scratch = std::move(buffer);
req.status = Read(req.offset, req.len, options, &req.result,
static_cast<char*>(req.fs_scratch.get()), dbg);
}
return IOStatus::OK();
}
};

std::unique_ptr<FSRandomAccessFile> file;
IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
EXPECT_OK(s);
result->reset(new WrappedRandomAccessFile(file));

return s;
}

void SupportedOps(int64_t& supported_ops) override {
supported_ops = 1 << FSSupportedOps::kAsyncIO;
supported_ops |= 1 << FSSupportedOps::kFSBuffer;
}
};

if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
return;
}

std::shared_ptr<WrapFS> wrap_fs =
std::make_shared<WrapFS>(env_->GetFileSystem());
std::unique_ptr<Env> wrap_env(new CompositeEnvWrapper(env_, wrap_fs));
BlockBasedTableOptions table_options;
table_options.block_cache = NewCache(250 * 1024, 20 * 1024, 256 * 1024,
TieredAdmissionPolicy::kAdmPolicyAuto,
/*ready_before_wait=*/true);
table_options.block_size = 4 * 1024;
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.statistics = CreateDBStatistics();
options.env = wrap_env.get();

options.paranoid_file_checks = false;
DestroyAndReopen(options);
Random rnd(301);
const int N = 256;
for (int i = 0; i < N; i++) {
std::string p_v;
test::CompressibleString(&rnd, 0.5, 1007, &p_v);
ASSERT_OK(Put(Key(i), p_v));
}

ASSERT_OK(Flush());

std::vector<std::string> keys;
std::vector<std::string> values;

keys.push_back(Key(0));
keys.push_back(Key(4));
keys.push_back(Key(8));
values = MultiGet(keys, /*snapshot=*/nullptr, /*async=*/true);
ASSERT_EQ(values.size(), keys.size());
for (const auto& value : values) {
ASSERT_EQ(1007, value.size());
}
ASSERT_EQ(nvm_sec_cache()->num_insert_saved(), 3u);
ASSERT_EQ(nvm_sec_cache()->num_misses(), 3u);
ASSERT_EQ(nvm_sec_cache()->num_hits(), 0u);

std::string v = Get(Key(12));
ASSERT_EQ(1007, v.size());
ASSERT_EQ(nvm_sec_cache()->num_insert_saved(), 4u);
ASSERT_EQ(nvm_sec_cache()->num_misses(), 4u);
ASSERT_EQ(options.statistics->getTickerCount(BLOCK_CACHE_MISS), 4u);

Close();
Destroy(options);
}

INSTANTIATE_TEST_CASE_P(
DBTieredAdmPolicyTest, DBTieredAdmPolicyTest,
::testing::Values(TieredAdmissionPolicy::kAdmPolicyAuto,
Expand Down
42 changes: 42 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1947,6 +1947,10 @@ void rocksdb_iter_get_error(const rocksdb_iterator_t* iter, char** errptr) {
SaveError(errptr, iter->rep->status());
}

void rocksdb_iter_refresh(const rocksdb_iterator_t* iter, char** errptr) {
SaveError(errptr, iter->rep->Refresh());
}

rocksdb_writebatch_t* rocksdb_writebatch_create() {
return new rocksdb_writebatch_t;
}
Expand All @@ -1958,6 +1962,15 @@ rocksdb_writebatch_t* rocksdb_writebatch_create_from(const char* rep,
return b;
}

rocksdb_writebatch_t* rocksdb_writebatch_create_with_params(
size_t reserved_bytes, size_t max_bytes, size_t protection_bytes_per_key,
size_t default_cf_ts_sz) {
rocksdb_writebatch_t* b = new rocksdb_writebatch_t;
b->rep = WriteBatch(reserved_bytes, max_bytes, protection_bytes_per_key,
default_cf_ts_sz);
return b;
}

void rocksdb_writebatch_destroy(rocksdb_writebatch_t* b) { delete b; }

void rocksdb_writebatch_clear(rocksdb_writebatch_t* b) { b->rep.Clear(); }
Expand Down Expand Up @@ -2223,6 +2236,35 @@ rocksdb_writebatch_wi_t* rocksdb_writebatch_wi_create(
return b;
}

rocksdb_writebatch_wi_t* rocksdb_writebatch_wi_create_with_params(
rocksdb_comparator_t* backup_index_comparator, size_t reserved_bytes,
unsigned char overwrite_key, size_t max_bytes,
size_t protection_bytes_per_key) {
rocksdb_writebatch_wi_t* b = new rocksdb_writebatch_wi_t;
b->rep = new WriteBatchWithIndex(backup_index_comparator, reserved_bytes,
overwrite_key, max_bytes,
protection_bytes_per_key);
return b;
}

void rocksdb_writebatch_update_timestamps(
rocksdb_writebatch_t* wb, const char* ts, size_t tslen, void* state,
size_t (*get_ts_size)(void*, uint32_t), char** errptr) {
SaveError(errptr, wb->rep.UpdateTimestamps(
Slice(ts, tslen), [&get_ts_size, &state](uint32_t cf) {
return (*get_ts_size)(state, cf);
}));
}

void rocksdb_writebatch_wi_update_timestamps(
rocksdb_writebatch_wi_t* wb, const char* ts, size_t tslen, void* state,
size_t (*get_ts_size)(void*, uint32_t), char** errptr) {
SaveError(errptr, wb->rep->GetWriteBatch()->UpdateTimestamps(
Slice(ts, tslen), [&get_ts_size, &state](uint32_t cf) {
return (*get_ts_size)(state, cf);
}));
}

void rocksdb_writebatch_wi_destroy(rocksdb_writebatch_wi_t* b) {
if (b->rep) {
delete b->rep;
Expand Down
10 changes: 5 additions & 5 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ TEST_F(DBFlushTest, SyncFail) {
options.env = fault_injection_env.get();

SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
{"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedWals:Start"},
{"DBImpl::SyncClosedWals:Failed", "DBFlushTest::SyncFail:2"}});
SyncPoint::GetInstance()->EnableProcessing();

CreateAndReopenWithCF({"pikachu"}, options);
Expand All @@ -111,8 +111,8 @@ TEST_F(DBFlushTest, SyncSkip) {
Options options = CurrentOptions();

SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
{"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
{{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedWals:Skip"},
{"DBImpl::SyncClosedWals:Skip", "DBFlushTest::SyncSkip:2"}});
SyncPoint::GetInstance()->EnableProcessing();

Reopen(options);
Expand Down Expand Up @@ -2381,7 +2381,7 @@ TEST_F(DBFlushTest, PickRightMemtables) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncClosedLogs:BeforeReLock", [&](void* /*arg*/) {
"DBImpl::SyncClosedWals:BeforeReLock", [&](void* /*arg*/) {
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v"));
auto* cfhi =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[1]);
Expand Down
Loading

0 comments on commit c4c99d1

Please sign in to comment.