Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/RocksDBStore: extract common code to a new function #16532

Merged
merged 2 commits into from Aug 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 33 additions & 65 deletions src/kv/RocksDBStore.cc
Expand Up @@ -498,9 +498,8 @@ void RocksDBStore::get_statistics(Formatter *f)
}
}

int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t)
{
utime_t start = ceph_clock_now();
// enable rocksdb breakdown
// considering performance overhead, default is disabled
if (g_conf->rocksdb_perf) {
Expand All @@ -510,7 +509,6 @@ int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)

RocksDBTransactionImpl * _t =
static_cast<RocksDBTransactionImpl *>(t.get());
rocksdb::WriteOptions woptions;
woptions.disableWAL = disableWAL;
lgeneric_subdout(cct, rocksdb, 30) << __func__;
RocksWBHandler bat_txc;
Expand All @@ -524,7 +522,6 @@ int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
<< " Rocksdb transaction: " << rocks_txc.seen << dendl;
}
utime_t lat = ceph_clock_now() - start;

if (g_conf->rocksdb_perf) {
utime_t write_memtable_time;
Expand All @@ -545,78 +542,49 @@ int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
}

return s.ok() ? 0 : -1;
}

int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
{
utime_t start = ceph_clock_now();
rocksdb::WriteOptions woptions;
woptions.sync = false;

int result = submit_common(woptions, t);

utime_t lat = ceph_clock_now() - start;
logger->inc(l_rocksdb_txns);
logger->tinc(l_rocksdb_submit_latency, lat);

return s.ok() ? 0 : -1;
return result;
}

int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
{
utime_t start = ceph_clock_now();
// enable rocksdb breakdown
// considering performance overhead, default is disabled
if (g_conf->rocksdb_perf) {
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
rocksdb::perf_context.Reset();
}

RocksDBTransactionImpl * _t =
static_cast<RocksDBTransactionImpl *>(t.get());
rocksdb::WriteOptions woptions;
woptions.sync = true;
woptions.disableWAL = disableWAL;
lgeneric_subdout(cct, rocksdb, 30) << __func__;
RocksWBHandler bat_txc;
_t->bat.Iterate(&bat_txc);
*_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;

rocksdb::Status s = db->Write(woptions, &_t->bat);
if (!s.ok()) {
RocksWBHandler rocks_txc;
_t->bat.Iterate(&rocks_txc);
derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
<< " Rocksdb transaction: " << rocks_txc.seen << dendl;
}

int result = submit_common(woptions, t);

utime_t lat = ceph_clock_now() - start;

if (g_conf->rocksdb_perf) {
utime_t write_memtable_time;
utime_t write_delay_time;
utime_t write_wal_time;
utime_t write_pre_and_post_process_time;
write_wal_time.set_from_double(
static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
write_memtable_time.set_from_double(
static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
write_delay_time.set_from_double(
static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
write_pre_and_post_process_time.set_from_double(
static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
}

logger->inc(l_rocksdb_txns_sync);
logger->tinc(l_rocksdb_submit_sync_latency, lat);

return s.ok() ? 0 : -1;
return result;
}

RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
{
db = _db;
}

void RocksDBStore::RocksDBTransactionImpl::set(
const string &prefix,
const string &k,
static void put_bat(
rocksdb::WriteBatch& bat,
const string &key,
const bufferlist &to_set_bl)
{
string key = combine_strings(prefix, k);

// bufferlist::c_str() is non-constant, so we can't call c_str()
if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
bat.Put(rocksdb::Slice(key),
Expand All @@ -630,6 +598,16 @@ void RocksDBStore::RocksDBTransactionImpl::set(
}
}

void RocksDBStore::RocksDBTransactionImpl::set(
const string &prefix,
const string &k,
const bufferlist &to_set_bl)
{
string key = combine_strings(prefix, k);

put_bat(bat, key, to_set_bl);
}

void RocksDBStore::RocksDBTransactionImpl::set(
const string &prefix,
const char *k, size_t keylen,
Expand All @@ -638,17 +616,7 @@ void RocksDBStore::RocksDBTransactionImpl::set(
string key;
combine_strings(prefix, k, keylen, &key);

// bufferlist::c_str() is non-constant, so we can't call c_str()
if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
bat.Put(rocksdb::Slice(key),
rocksdb::Slice(to_set_bl.buffers().front().c_str(),
to_set_bl.length()));
} else {
rocksdb::Slice key_slice(key);
vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
prepare_sliceparts(to_set_bl, &value_slices));
}
put_bat(bat, key, to_set_bl);
}

void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
Expand Down
1 change: 1 addition & 0 deletions src/kv/RocksDBStore.h
Expand Up @@ -78,6 +78,7 @@ class RocksDBStore : public KeyValueDB {
uint64_t cache_size = 0;
bool set_cache_flag = false;

int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
int do_open(ostream &out, bool create_if_missing);

// manage async compactions
Expand Down