Skip to content

Commit

Permalink
fix bug: failed to start redis server
Browse files Browse the repository at this point in the history
  • Loading branch information
QlQlqiqi committed May 22, 2024
1 parent 43bd1ab commit 14f59b3
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 34 deletions.
2 changes: 0 additions & 2 deletions src/net/include/net_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
#ifndef NET_INCLUDE_NET_DEFINE_H_
#define NET_INCLUDE_NET_DEFINE_H_

#include <functional>
#include <iostream>
#include <map>

namespace net {

Expand Down
5 changes: 1 addition & 4 deletions src/net/include/random.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@
#ifndef RANDOM_H
#define RANDOM_H

#include <stdint.h>
#include <string.h>
#include <random>
#include <thread>
#include <utility>

#include "net/include/likely.h"

Expand Down Expand Up @@ -88,4 +85,4 @@ class Random {

} // namespace net

#endif // RANDOM_H
#endif // RANDOM_H
14 changes: 2 additions & 12 deletions src/net/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ namespace net {

using TaskFunc = void (*)(void*);

// struct Task {
// Task() = default;
// TaskFunc func = nullptr;
// void* arg = nullptr;
// Task(TaskFunc _func, void* _arg) : func(_func), arg(_arg) {}
// };

struct TimeTask {
uint64_t exec_time;
TaskFunc func;
Expand Down Expand Up @@ -103,15 +96,15 @@ class ThreadPool : public pstd::noncopyable {
// it's okay for other platforms to be no-ops
}

Node* CreateMissingNewerLinks(Node* head, int* cnt);
Node* CreateMissingNewerLinks(Node* head);
bool LinkOne(Node* node, std::atomic<Node*>* newest_node);

std::atomic<Node*> newest_node_;
std::atomic<int> node_cnt_; // for task
std::atomic<Node*> time_newest_node_;
std::atomic<int> time_node_cnt_; // for time task

const int queue_slow_size_; // default value: max(worker_num_ * 100, max_queue_size_)
const int queue_slow_size_; // default value: min(worker_num_ * 10, max_queue_size_)
size_t max_queue_size_;

const uint64_t max_yield_usec_;
Expand All @@ -121,15 +114,12 @@ class ThreadPool : public pstd::noncopyable {

size_t worker_num_;
std::string thread_pool_name_;
// std::queue<TimeTask> queue_;
// std::priority_queue<TimeTask> time_queue_;
std::vector<Worker*> workers_;
std::atomic<bool> running_;
std::atomic<bool> should_stop_;

pstd::Mutex mu_;
pstd::CondVar rsignal_;
// pstd::CondVar wsignal_;
};

} // namespace net
Expand Down
31 changes: 15 additions & 16 deletions src/net/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ int ThreadPool::Worker::stop() {
ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name)
: newest_node_(nullptr),
node_cnt_(0),
queue_slow_size_(std::max(worker_num_ * 100, max_queue_size_)),
time_newest_node_(nullptr),
time_node_cnt_(0),
queue_slow_size_(std::min(worker_num * 10, max_queue_size)),
max_queue_size_(max_queue_size),
max_yield_usec_(100),
slow_yield_usec_(3),
adp_ctx(),
worker_num_(worker_num),
max_queue_size_(max_queue_size),
thread_pool_name_(std::move(thread_pool_name)),
running_(false),
should_stop_(false) {}
Expand All @@ -78,7 +80,6 @@ int ThreadPool::stop_thread_pool() {
if (running_.load()) {
should_stop_.store(true);
rsignal_.notify_all();
// wsignal_.notify_all();
for (const auto worker : workers_) {
res = worker->stop();
if (res != 0) {
Expand Down Expand Up @@ -106,6 +107,7 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) {
if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) {
std::this_thread::yield();
}
// std::unique_lock lock(mu_);
if (LIKELY(!should_stop())) {
auto node = new Node(func, arg);
LinkOne(node, &newest_node_);
Expand All @@ -122,6 +124,7 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) {
uint64_t unow = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
uint64_t exec_time = unow + timeout * 1000;

// std::unique_lock lock(mu_);
if (LIKELY(!should_stop())) {
auto node = new Node(exec_time, func, arg);
LinkOne(node, &time_newest_node_);
Expand All @@ -132,6 +135,8 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) {

size_t ThreadPool::max_queue_size() { return max_queue_size_; }

size_t ThreadPool::worker_size() { return worker_num_; }

void ThreadPool::cur_queue_size(size_t* qsize) { *qsize = node_cnt_.load(std::memory_order_relaxed); }

void ThreadPool::cur_time_queue_size(size_t* qsize) { *qsize = time_node_cnt_.load(std::memory_order_relaxed); }
Expand Down Expand Up @@ -174,7 +179,7 @@ void ThreadPool::runInThread() {
}
AsmVolatilePause();
}

// 2. loop for a little short time again
const size_t kMaxSlowYieldsWhileSpinning = 3;
auto& yield_credit = adp_ctx.value;
Expand Down Expand Up @@ -238,24 +243,20 @@ void ThreadPool::runInThread() {
exec:
// do all normal tasks older than this task pointed last
if (LIKELY(last != nullptr)) {
int cnt = 0;
auto first = CreateMissingNewerLinks(last, &cnt);
auto first = CreateMissingNewerLinks(last);
assert(!first->is_time_task);
node_cnt_ -= cnt;
do {
first->Exec();
// node_cnt_--;
tmp = first;
first = first->Next();
node_cnt_--;
delete tmp;
} while (first != nullptr);
}

// do all time tasks older than this task pointed time_last
if (UNLIKELY(time_last != nullptr)) {
int time_cnt = 0;
auto time_first = CreateMissingNewerLinks(time_last, &time_cnt);
// time_node_cnt_ -= time_cnt;
auto time_first = CreateMissingNewerLinks(time_last);
do {
// time task may block normal task
auto now = std::chrono::system_clock::now();
Expand All @@ -265,32 +266,30 @@ void ThreadPool::runInThread() {
assert(time_first->is_time_task);
if (unow >= exec_time) {
time_first->Exec();
time_node_cnt_--;
} else {
lock.lock();
rsignal_.wait_for(lock, std::chrono::microseconds(exec_time - unow));
lock.unlock();
time_first->Exec();
time_node_cnt_--;
}
tmp = time_first;
time_first = time_first->Next();
time_node_cnt_--;
delete tmp;
} while (time_first != nullptr);
}
goto retry;
}
}

ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head, int* cnt) {
ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head) {
assert(head != nullptr);
Node* next = nullptr;
*cnt = 1;
while (true) {
next = head->link_older;
if (next == nullptr) {
return head;
}
++(*cnt);
next->link_newer = head;
head = next;
}
Expand Down

0 comments on commit 14f59b3

Please sign in to comment.