Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions rocksdb_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
*
*/

#include "rocksdb_handler.h"

#include <brpc/controller.h>
#include <brpc/server.h>
#include <brpc/stream.h>
Expand Down Expand Up @@ -66,7 +68,6 @@
#include "redis_zset_object.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb_handler.h"
#include "rocksdb_scanner.h"
#include "store_util.h"
#include "tx_record.h"
Expand Down Expand Up @@ -575,7 +576,7 @@ void RocksDBHandler::UpsertTableInternal(
alter_table_info,
cc_req,
ccs,
err_code]()
err_code](size_t)
{
int64_t term;
if (!txservice::IsStandbyTx(tx_term))
Expand Down Expand Up @@ -975,7 +976,7 @@ void RocksDBHandler::FetchTableCatalog(
txservice::FetchCatalogCc *fetch_cc)
{
query_worker_pool_->SubmitWork(
[this, ccm_table_name, fetch_cc]()
[this, ccm_table_name, fetch_cc](size_t)
{
std::shared_lock<std::shared_mutex> db_lk(db_mux_);
auto db = GetDBPtr();
Expand Down Expand Up @@ -1127,7 +1128,8 @@ RocksDBHandler::FetchRecord(txservice::FetchRecordCc *fetch_cc,
fetch_cc,
redis_key = std::move(redis_key_copy),
kv_cf_name =
fetch_cc->table_schema_->GetKVCatalogInfo()->kv_table_name_]()
fetch_cc->table_schema_->GetKVCatalogInfo()->kv_table_name_](
size_t)
{
std::shared_lock<std::shared_mutex> db_lk(db_mux_);
auto db = GetDBPtr();
Expand Down Expand Up @@ -1694,7 +1696,7 @@ void RocksDBHandler::RestoreTxCache(txservice::NodeGroupId cc_ng_id,
uint16_t thread_num = core_cnt < 2 ? 1 : (core_cnt / 2);

query_worker_pool_->SubmitWork(
[this, cc_ng_id, cc_ng_term, core_cnt, thread_num]
[this, cc_ng_id, cc_ng_term, core_cnt, thread_num](size_t)
{
LOG(INFO) << "Start restore Tx service cache from KV store when KV "
"is enabled and cache replacement is disabled.";
Expand Down Expand Up @@ -1799,7 +1801,7 @@ void RocksDBHandler::RestoreTxCache(txservice::NodeGroupId cc_ng_id,
task_mux,
core_cnt,
cancel_data_loading_on_error,
on_flying_count]
on_flying_count](size_t)
{
ParallelIterateTable(cc_ng_id,
cc_ng_term,
Expand Down Expand Up @@ -1863,7 +1865,7 @@ bool RocksDBHandler::OnLeaderStart(uint32_t *next_leader_node)
bool finished = false;

db_manage_worker_->SubmitWork(
[this, &succ, &mux, &cv, &finished, next_leader_node]
[this, &succ, &mux, &cv, &finished, next_leader_node](size_t)
{
bool res = StartDB(true, next_leader_node);
std::unique_lock<bthread::Mutex> lk(mux);
Expand All @@ -1889,7 +1891,7 @@ void RocksDBHandler::OnStartFollowing()
bool finished = false;

db_manage_worker_->SubmitWork(
[this, &mux, &cv, &finished]
[this, &mux, &cv, &finished](size_t)
{
Shutdown();
// remove outdated db snapshot
Expand Down Expand Up @@ -3081,7 +3083,7 @@ bool RocksDBHandlerImpl::OnSnapshotReceived(
bool finished = false;

db_manage_worker_->SubmitWork(
[this, &succ, &mux, &cv, &finished, req]
[this, &succ, &mux, &cv, &finished, req](size_t)
{
bool res = true;
if (txservice::Sharder::Instance().CandidateStandbyNodeTerm() !=
Expand Down Expand Up @@ -3320,7 +3322,7 @@ bool RocksDBHandlerImpl::SendSnapshotToRemote(
&rsync_task_mux_,
&rsync_task_cond_,
&unfinished_rsync_tasks_,
this]()
this](size_t)
{
bool prev_succ = succ.load(std::memory_order_acquire);
if (!Sharder::Instance().CheckLeaderTerm(ng_id, ng_term) ||
Expand Down