Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slave thread pool #5

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# vim temp files
*.swp
# Compiled Object files
*.slo
*.lo
*.o
*.obj
*pb.cc
*pb.h
build
cmake-build-debug
cmake-build-release

# Precompiled Headers
*.gch
*.pch

# Compiled Dynamic libraries
*.so
*.dylib
*.dll


# Compiled Static libraries
*.lai *.la
*.a
*.lib

# Executables
*.exe
*.out
*.app

# Log path
make_config.mk
log/
lib/
output/

# DB
db/
dump/
src/dbsync/

# third party
gdb.txt
tags

# IDE
.vscode

# generate
make_config.mk
src/*.d
src/build_version.cc

#cache
.cache

.idea/


#build
build/
buildtrees
deps
bin/

#develop container
.devcontainer

# include codis fe javascript lib files
!codis/cmd/fe/assets/**

tests/tmp
Empty file modified clear.sh
100644 → 100755
Empty file.
1 change: 1 addition & 0 deletions pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ maxmemory-samples 5
# CONFIG SET.
#
io-threads 2
slave-threads 2

################################ LUA SCRIPTING ###############################

Expand Down
14 changes: 14 additions & 0 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,20 @@ bool PClient::WaitFor(const PString& key, const PString* target) {

void PClient::SetSlaveInfo() { slave_info_.reset(new PSlaveInfo()); }

void PClient::TransferToSlaveThreads(){
// transfer to slave
auto loop = GetEventLoop();
if (!loop->IsSlaveEventLoop()){
auto slave_loop = tcp_connection_->SelectSlaveEventLoop();
auto id = tcp_connection_->GetUniqueId();
auto event_object = loop->GetEventObject(id);
loop->Unregister(event_object);
event_object->SetUniqueId(-1);
slave_loop->Register(event_object,0);
tcp_connection_->ResetEventLoop(slave_loop);
}
}

void PClient::AddCurrentToMonitor() {
std::unique_lock<std::mutex> guard(monitors_mutex);
monitors.insert(std::static_pointer_cast<PClient>(s_current->shared_from_this()));
Expand Down
1 change: 1 addition & 0 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class PClient : public std::enable_shared_from_this<PClient> {

void SetSlaveInfo();
PSlaveInfo* GetSlaveInfo() const { return slave_info_.get(); }
void TransferToSlaveThreads();

static void AddCurrentToMonitor();
static void FeedMonitors(const std::vector<PString>& params);
Expand Down
3 changes: 3 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ bool LoadPikiwiDBConfig(const char* cfgFile, PConfig& cfg) {
// io threads
cfg.io_threads_num = parser.GetData<int>("io-threads", 1);

// slave threads
cfg.slave_threads_num = parser.GetData<int>("slave-threads", 1);

// backend
cfg.backend = parser.GetData<int>("backend", BackEndNone);
cfg.backendPath = parser.GetData<PString>("backendpath", cfg.backendPath);
Expand Down
3 changes: 3 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ struct PConfig {
// THREADED I/O
int io_threads_num;

// THREADED SLAVE
int slave_threads_num;

int backend; // enum BackEndType
PString backendPath;
int backendHz; // the frequency of dump to backend
Expand Down
120 changes: 97 additions & 23 deletions src/io_thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,52 @@ bool IOThreadPool::SetWorkerNum(size_t num) {
return false;
}

if (!loops_.empty()) {
ERROR("can only called once, not empty loops size: {}", loops_.size());
if (!worker_loops_.empty()) {
ERROR("can only called once, not empty loops size: {}", worker_loops_.size());
return false;
}

if (num > kMaxWorkers) {
if ((num + slave_num_) > kMaxWorkers) {
ERROR("number of threads can't exceeds {}, now is {}", kMaxWorkers, num);
return false;
}

worker_num_.store(num);
workers_.reserve(num);
loops_.reserve(num);
worker_threads_.reserve(num);
worker_loops_.reserve(num);

return true;
}

bool IOThreadPool::SetSlaveNum(size_t num){
if (num <= 1){
return true;
}

if (state_ != State::kNone){
ERROR("can only called before application run");
return false;
}

if (!slave_loops_.empty()){
ERROR("can only called once, not empty slave loops size: {}", slave_loops_.size());
return false;
}

if ((num + worker_num_) > kMaxWorkers){
ERROR("number of threads can't exceeds {}, now is {}",kMaxWorkers, num);
return false;
}

slave_num_.store(num);
slave_threads_.reserve(num);
slave_loops_.reserve(num);

return true;
}

bool IOThreadPool::Init(const char* ip, int port, NewTcpConnectionCallback cb) {
auto f = std::bind(&IOThreadPool::Next, this);
auto f = std::bind(&IOThreadPool::ChooseNextWorkerEventLoop, this);

base_.Init();
printf("base loop %s %p, g_baseLoop %p\n", base_.GetName().c_str(), &base_, base_.Self());
Expand All @@ -87,12 +114,21 @@ void IOThreadPool::Run(int ac, char* av[]) {

// start loops in thread pool
StartWorkers();
StartSlaves();
state_ = State::kStarted;

base_.Run();

for (auto& w : workers_) {
for (auto& w : worker_threads_) {
w.join();
}
workers_.clear();

for (auto& slave_worker : slave_threads_ ){
slave_worker.join();
}

worker_threads_.clear();
slave_threads_.clear();

INFO("Process stopped, goodbye...");
}
Expand All @@ -101,65 +137,103 @@ void IOThreadPool::Exit() {
state_ = State::kStopped;

BaseLoop()->Stop();
for (size_t index = 0; index < loops_.size(); ++index) {
EventLoop* loop = loops_[index].get();

for (size_t index = 0; index < worker_loops_.size(); ++index) {
EventLoop* loop = worker_loops_[index].get();
loop->Stop();
}

for (size_t index = 0; index < slave_loops_.size(); ++index){
EventLoop* slave_loop = slave_loops_[index].get();
slave_loop->Stop();
}
}

bool IOThreadPool::IsExit() const { return state_ == State::kStopped; }

EventLoop* IOThreadPool::BaseLoop() { return &base_; }

EventLoop* IOThreadPool::Next() {
if (loops_.empty()) {
EventLoop* IOThreadPool::ChooseNextWorkerEventLoop() {
if (worker_loops_.empty()) {
return BaseLoop();
}

auto& loop = loops_[current_loop_++ % loops_.size()];
auto& loop = worker_loops_[current_worker_loop_++ % worker_loops_.size()];
return loop.get();
}

EventLoop* IOThreadPool::ChooseNextSlaveEventLoop(){
if (slave_loops_.empty()){
return BaseLoop();
}

auto& slave_loop = slave_loops_[current_slave_loop_++ % slave_loops_.size()];
return slave_loop.get();
}

void IOThreadPool::StartWorkers() {
// only called by main thread
assert(state_ == State::kNone);

size_t index = 1;
while (loops_.size() < worker_num_) {
while (worker_loops_.size() < worker_num_) {
std::unique_ptr<EventLoop> loop(new EventLoop);
if (!name_.empty()) {
loop->SetName(name_ + "_" + std::to_string(index++));
printf("loop %p, name %s\n", loop.get(), loop->GetName().c_str());
INFO("loop {}, name {}", static_cast<void*>(loop.get()), loop->GetName().c_str());
}
loops_.push_back(std::move(loop));
worker_loops_.push_back(std::move(loop));
}

for (index = 0; index < loops_.size(); ++index) {
EventLoop* loop = loops_[index].get();
for (index = 0; index < worker_loops_.size(); ++index) {
EventLoop* loop = worker_loops_[index].get();
std::thread t([loop]() {
loop->Init();
loop->Run();
});
printf("thread %lu, thread loop %p, loop name %s \n", index, loop, loop->GetName().c_str());
workers_.push_back(std::move(t));
worker_threads_.push_back(std::move(t));
}
}

state_ = State::kStarted;
void IOThreadPool::StartSlaves(){
// only called by main thread
assert(state_ == State::kNone);

size_t index = 1;
while(slave_loops_.size() < slave_num_){
std::unique_ptr<EventLoop> slave_loop(new EventLoop);
if (!name_.empty()) {
slave_loop->SetName(name_ + "-slave_" + std::to_string(index++));
INFO("loop {}, name {}", static_cast<void*>(slave_loop.get()), slave_loop->GetName().c_str());
}
slave_loops_.push_back(std::move(slave_loop));
}

for (index = 0; index < slave_loops_.size(); ++index){
EventLoop* slave_loop = slave_loops_[index].get();
std::thread t([slave_loop]() {
slave_loop->Init();
slave_loop->Run();
});
printf("thread %lu, thread loop %p, loop name %s \n", index, slave_loop, slave_loop->GetName().c_str());
slave_threads_.push_back(std::move(t));
}
}

void IOThreadPool::SetName(const std::string& name) { name_ = name; }

IOThreadPool::IOThreadPool() : state_(State::kNone) { InitSignal(); }

bool IOThreadPool::Listen(const char* ip, int port, NewTcpConnectionCallback ccb) {
auto f = std::bind(&IOThreadPool::Next, this);
auto f = std::bind(&IOThreadPool::ChooseNextWorkerEventLoop, this);
auto loop = BaseLoop();
return loop->Execute([loop, ip, port, ccb, f]() { return loop->Listen(ip, port, std::move(ccb), f); }).get();
}

void IOThreadPool::Connect(const char* ip, int port, NewTcpConnectionCallback ccb, TcpConnectionFailCallback fcb, EventLoop* loop) {
if (!loop) {
loop = Next();
loop = ChooseNextWorkerEventLoop();
}

std::string ipstr(ip);
Expand All @@ -186,7 +260,7 @@ std::shared_ptr<HttpClient> IOThreadPool::ConnectHTTP(const char* ip, int port,
auto fcb = [client](EventLoop*, const char* ip, int port) { client->OnConnectFail(ip, port); };

if (!loop) {
loop = Next();
loop = ChooseNextWorkerEventLoop();
}
client->SetLoop(loop);
Connect(ip, port, std::move(ncb), std::move(fcb), loop);
Expand Down
24 changes: 19 additions & 5 deletions src/io_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@ class IOThreadPool {
EventLoop* BaseLoop();

// choose a loop
EventLoop* Next();
EventLoop* ChooseNextWorkerEventLoop();

// choose a slave loop
EventLoop* ChooseNextSlaveEventLoop();

// set worker threads, each thread has a EventLoop object
bool SetWorkerNum(size_t n);

// set slave threads, each thread has a EventLoop object
bool SetSlaveNum(size_t n);

// app name, for top command
void SetName(const std::string& name);

Expand All @@ -57,7 +63,8 @@ class IOThreadPool {

private:
IOThreadPool();
void StartWorkers();
void StartWorkers(); // io-threads
void StartSlaves(); // slave-threads

static const size_t kMaxWorkers;

Expand All @@ -68,10 +75,17 @@ class IOThreadPool {

EventLoop base_;

// io-threads
std::atomic<size_t> worker_num_{0};
std::vector<std::thread> workers_;
std::vector<std::unique_ptr<EventLoop>> loops_;
mutable std::atomic<size_t> current_loop_{0};
std::vector<std::thread> worker_threads_;
std::vector<std::unique_ptr<EventLoop>> worker_loops_;
mutable std::atomic<size_t> current_worker_loop_{0};

// slave-threads
std::atomic<size_t> slave_num_{0};
std::vector<std::thread> slave_threads_;
std::vector<std::unique_ptr<EventLoop>> slave_loops_;
mutable std::atomic<size_t> current_slave_loop_{0};

enum class State {
kNone,
Expand Down
Loading