diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 8eecf714cd..47f9a117cd 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -66,7 +66,7 @@ class PikaClientConn : public net::RedisConn { net::ServerThread* const server_thread_; std::string current_table_; WriteCompleteCallback write_completed_cb_; - bool is_pubsub_; + bool is_pubsub_ = false; std::shared_ptr DoCmd(const PikaCmdArgsType& argv, const std::string& opt, std::shared_ptr resp_ptr); diff --git a/include/pika_command.h b/include/pika_command.h index e4e4c0ff8a..5dedc21ac7 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -471,8 +471,8 @@ class Cmd : public std::enable_shared_from_this { void LogCommand() const; std::string name_; - int arity_; - uint16_t flag_; + int arity_ = -2; + uint16_t flag_ = 0; CmdRes res_; PikaCmdArgsType argv_; @@ -480,8 +480,8 @@ class Cmd : public std::enable_shared_from_this { std::weak_ptr conn_; std::weak_ptr resp_; - CmdStage stage_; - uint64_t do_duration_; + CmdStage stage_ = kNone; + uint64_t do_duration_ = 0; private: virtual void DoInitial() = 0; diff --git a/include/pika_dispatch_thread.h b/include/pika_dispatch_thread.h index 1e427fb221..e13e16a22f 100644 --- a/include/pika_dispatch_thread.h +++ b/include/pika_dispatch_thread.h @@ -28,8 +28,7 @@ class PikaDispatchThread { explicit ClientConnFactory(int max_conn_rbuf_size) : max_conn_rbuf_size_(max_conn_rbuf_size) {} virtual std::shared_ptr NewNetConn(int connfd, const std::string& ip_port, net::Thread* server_thread, void* worker_specific_data, net::NetMultiplexer* net) const { - return std::static_pointer_cast(std::make_shared( - connfd, ip_port, server_thread, net, net::HandleType::kAsynchronous, max_conn_rbuf_size_)); + return std::make_shared(connfd, ip_port, server_thread, net, net::HandleType::kAsynchronous, max_conn_rbuf_size_); } private: diff --git a/include/pika_version.h b/include/pika_version.h index 3788ad0142..73b422bbfc 100644 --- a/include/pika_version.h +++ b/include/pika_version.h @@ -8,6 +8,6 @@ #define PIKA_MAJOR 3 #define PIKA_MINOR 4 -#define PIKA_PATCH 1 +#define PIKA_PATCH 2 #endif // INCLUDE_PIKA_VERSION_H_ diff --git a/src/net/include/net_conn.h b/src/net/include/net_conn.h index 9bb2dc1c7f..323ac4cdcf 100644 --- a/src/net/include/net_conn.h +++ b/src/net/include/net_conn.h @@ -7,6 +7,7 @@ #define NET_INCLUDE_NET_CONN_H_ #include +#include #include #ifdef __ENABLE_SSL @@ -17,6 +18,7 @@ #include "net/include/net_define.h" #include "net/include/server_thread.h" #include "net/src/net_multiplexer.h" +#include "pstd/include/testutil.h" namespace net { @@ -28,7 +30,7 @@ class NetConn : public std::enable_shared_from_this { virtual ~NetConn(); /* - * Set the fd to nonblock && set the flag_ the the fd flag + * Set the fd to nonblock && set the flag_ the fd flag */ bool SetNonblock(); @@ -64,7 +66,7 @@ class NetConn : public std::enable_shared_from_this { void set_name(std::string name) { name_ = std::move(name); } bool IsClose() { return close_; } - void SetClose(bool close) { close_ = close; } + void SetClose(bool close); void set_last_interaction(const struct timeval& now) { last_interaction_ = now; } @@ -76,6 +78,12 @@ class NetConn : public std::enable_shared_from_this { NetMultiplexer* net_multiplexer() const { return net_multiplexer_; } + std::string String() const { + std::stringstream ss; + ss << "fd: " << fd_ << ", ip_port: " << ip_port_ << ", name: " << name_ << ", is_reply: " << is_reply_ << ", close: " << close_; + return ss.str(); + } + #ifdef __ENABLE_SSL SSL* ssl() { return ssl_; } @@ -83,13 +91,13 @@ class NetConn : public std::enable_shared_from_this { #endif private: - int fd_; + int fd_ = -1; std::string ip_port_; - bool is_reply_; - bool is_writable_; - bool close_; + bool is_reply_ = false; + bool is_writable_ = false; + bool close_ = false; struct timeval last_interaction_; - int flags_; + int flags_ = 0; std::string name_; #ifdef __ENABLE_SSL @@ -97,9 +105,9 @@ class NetConn : public std::enable_shared_from_this { #endif // thread this conn belong to - Thread* thread_; + Thread* thread_ = nullptr; // the net epoll this conn belong to - NetMultiplexer* net_multiplexer_; + NetMultiplexer* net_multiplexer_ = nullptr; /* * No allowed copy and copy assign operator diff --git a/src/net/include/redis_conn.h b/src/net/include/redis_conn.h index 1d2b370e95..e384f5c76e 100644 --- a/src/net/include/redis_conn.h +++ b/src/net/include/redis_conn.h @@ -45,21 +45,21 @@ class RedisConn : public NetConn { static int ParserCompleteCb(RedisParser* parser, const std::vector& argvs); ReadStatus ParseRedisParserStatus(RedisParserStatus status); - HandleType handle_type_; + HandleType handle_type_ = kSynchronous; - char* rbuf_; - int rbuf_len_; - int rbuf_max_len_; - int msg_peak_; - int command_len_; + char* rbuf_ = nullptr; + int rbuf_len_ = 0; + int rbuf_max_len_ = 0; + int msg_peak_ = 0; + int command_len_ = 0; - uint32_t wbuf_pos_; + uint32_t wbuf_pos_ = 0; std::string response_; // For Redis Protocol parser - int last_read_pos_; + int last_read_pos_ = -1; RedisParser redis_parser_; - long bulk_len_; + long bulk_len_ = -1; }; } // namespace net diff --git a/src/net/include/server_thread.h b/src/net/include/server_thread.h index 3c44880211..387db032fb 100644 --- a/src/net/include/server_thread.h +++ b/src/net/include/server_thread.h @@ -169,14 +169,14 @@ class ServerThread : public Thread { friend class DispatchThread; friend class WorkerThread; - int cron_interval_; + int cron_interval_ = 0; virtual void DoCronTask(); // process events in notify_queue virtual void ProcessNotifyEvents(const NetFiredEvent* pfe); const ServerHandle* handle_; - bool own_handle_; + bool own_handle_ = false; #ifdef __ENABLE_SSL bool security_; @@ -186,7 +186,7 @@ class ServerThread : public Thread { /* * The tcp server port and address */ - int port_; + int port_ = -1; std::set ips_; std::vector server_sockets_; std::set server_fds_; diff --git a/src/net/src/dispatch_thread.cc b/src/net/src/dispatch_thread.cc index d5ab3e26da..44052bef7f 100644 --- a/src/net/src/dispatch_thread.cc +++ b/src/net/src/dispatch_thread.cc @@ -5,8 +5,9 @@ #include -#include "net/src/dispatch_thread.h" +#include +#include "net/src/dispatch_thread.h" #include "net/src/net_item.h" #include "net/src/net_multiplexer.h" #include "net/src/worker_thread.h" @@ -150,6 +151,7 @@ void DispatchThread::HandleNewConn(const int connfd, const std::string& ip_port) // Slow workers may consume many fds. // We simply loop to find next legal worker. NetItem ti(connfd, ip_port); + LOG(INFO) << "accept new conn " << ti.String(); int next_thread = last_thread_; bool find = false; for (int cnt = 0; cnt < work_num_; cnt++) { diff --git a/src/net/src/holy_thread.cc b/src/net/src/holy_thread.cc index 97d82fd36c..85a2069ea0 100644 --- a/src/net/src/holy_thread.cc +++ b/src/net/src/holy_thread.cc @@ -194,6 +194,9 @@ void HolyThread::DoCronTask() { } conns_.clear(); deleting_conn_ipport_.clear(); + for (const auto conn : to_close) { + CloseFd(conn); + } return; } diff --git a/src/net/src/holy_thread.h b/src/net/src/holy_thread.h index 25ed65118a..2cdbc36ab2 100644 --- a/src/net/src/holy_thread.h +++ b/src/net/src/holy_thread.h @@ -22,7 +22,7 @@ class NetConn; class HolyThread : public ServerThread { public: - // This type thread thread will listen and work self list redis thread + // This type thread will listen and work self list redis thread HolyThread(int port, ConnFactory* conn_factory, int cron_interval = 0, const ServerHandle* handle = nullptr, bool async = true); HolyThread(const std::string& bind_ip, int port, ConnFactory* conn_factory, int cron_interval = 0, @@ -57,8 +57,8 @@ class HolyThread : public ServerThread { mutable pstd::RWMutex rwlock_; /* For external statistics */ std::map> conns_; - ConnFactory* conn_factory_; - void* private_data_; + ConnFactory* conn_factory_ = nullptr; + void* private_data_ = nullptr; std::atomic keepalive_timeout_; // keepalive second bool async_; diff --git a/src/net/src/net_conn.cc b/src/net/src/net_conn.cc index 160965bd04..f02d4fb7d5 100644 --- a/src/net/src/net_conn.cc +++ b/src/net/src/net_conn.cc @@ -32,6 +32,10 @@ NetConn::~NetConn() { #endif } +void NetConn::SetClose(bool close) { + close_ = close; +} + bool NetConn::SetNonblock() { flags_ = Setnonblocking(fd()); if (flags_ == -1) { diff --git a/src/net/src/net_item.h b/src/net/src/net_item.h index 9452d9d441..0cfc1f5a6a 100644 --- a/src/net/src/net_item.h +++ b/src/net/src/net_item.h @@ -20,13 +20,16 @@ class NetItem { int fd() const { return fd_; } std::string ip_port() const { return ip_port_; } + std::string String() const { + return std::to_string(fd_) + ":" + ip_port_ + ":" + std::to_string(notify_type_); + } NotifyType notify_type() const { return notify_type_; } private: - int fd_; + int fd_ = -1; std::string ip_port_; - NotifyType notify_type_; + NotifyType notify_type_ = kNotiConnect; }; } // namespace net diff --git a/src/net/src/redis_cli.cc b/src/net/src/redis_cli.cc index 604c54c3a4..672c70e739 100644 --- a/src/net/src/redis_cli.cc +++ b/src/net/src/redis_cli.cc @@ -521,7 +521,7 @@ int redisvFormatCommand(std::string* cmd, const char* format, va_list ap) { memcpy(_format, c, _l); _format[_l] = '\0'; - int n = vsnprintf(buf, REDIS_MAX_MESSAGE, _format, _cpy); + int n = vsnprintf(buf, sizeof(buf), _format, _cpy); curarg.append(buf, n); /* Update current position (note: outer blocks diff --git a/src/net/src/redis_conn.cc b/src/net/src/redis_conn.cc index db270f60f7..6504d35c4a 100644 --- a/src/net/src/redis_conn.cc +++ b/src/net/src/redis_conn.cc @@ -91,7 +91,7 @@ ReadStatus RedisConn::GetRequest() { nread = read(fd(), rbuf_ + next_read_pos, remain); if (nread == -1) { - if (errno == EAGAIN) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { nread = 0; return kReadHalf; // HALF } else { @@ -150,7 +150,7 @@ WriteStatus RedisConn::SendReply() { } } if (nwritten == -1) { - if (errno == EAGAIN) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { return kWriteHalf; } else { // Here we should close the connection diff --git a/src/net/src/server_thread.cc b/src/net/src/server_thread.cc index 0445d4723d..7c34b38fd3 100644 --- a/src/net/src/server_thread.cc +++ b/src/net/src/server_thread.cc @@ -14,6 +14,7 @@ #include "net/src/server_socket.h" #include "pstd/include/xdebug.h" +#include "pstd/include/testutil.h" namespace net { diff --git a/src/net/src/worker_thread.cc b/src/net/src/worker_thread.cc index 0234520320..f089f367d5 100644 --- a/src/net/src/worker_thread.cc +++ b/src/net/src/worker_thread.cc @@ -5,6 +5,9 @@ #include +#include + +#include "pstd/include/testutil.h" #include "net/src/worker_thread.h" #include "net/include/net_conn.h" @@ -49,6 +52,7 @@ std::shared_ptr WorkerThread::MoveConnOut(int fd) { int fd = iter->first; conn = iter->second; net_multiplexer_->NetDelEvent(fd, 0); + DLOG(INFO) << "move out connection " << conn->String(); conns_.erase(iter); } return conn; @@ -171,6 +175,7 @@ void* WorkerThread::ThreadMain() { in_conn->set_is_reply(false); if (in_conn->IsClose()) { should_close = 1; + LOG(INFO) << "will close client connection " << in_conn->String(); } } else if (write_status == kWriteHalf) { continue; @@ -238,6 +243,7 @@ void WorkerThread::DoCronTask() { to_close.push_back(conn); deleting_conn_ipport_.erase(conn->ip_port()); iter = conns_.erase(iter); + LOG(INFO) << "will close client connection " << conn->String(); continue; } @@ -245,6 +251,7 @@ void WorkerThread::DoCronTask() { if (keepalive_timeout_ > 0 && (now.tv_sec - conn->last_interaction().tv_sec > keepalive_timeout_)) { to_timeout.push_back(conn); iter = conns_.erase(iter); + LOG(INFO) << "connection " << conn->String() << " keepalive timeout, the keepalive_timeout_ is " << keepalive_timeout_.load(); continue; } diff --git a/src/net/src/worker_thread.h b/src/net/src/worker_thread.h index 3583834dad..c2d8f245a0 100644 --- a/src/net/src/worker_thread.h +++ b/src/net/src/worker_thread.h @@ -52,12 +52,12 @@ class WorkerThread : public Thread { mutable pstd::RWMutex rwlock_; /* For external statistics */ std::map> conns_; - void* private_data_; + void* private_data_ = nullptr; private: - ServerThread* server_thread_; - ConnFactory* conn_factory_; - int cron_interval_; + ServerThread* server_thread_ = nullptr; + ConnFactory* conn_factory_ = nullptr; + int cron_interval_ = 0; /* * The epoll handler diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 9b7a210505..beea5c92f6 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -10,6 +10,8 @@ #include +#include + #include "include/build_version.h" #include "include/pika_conf.h" #include "include/pika_rm.h" @@ -2289,6 +2291,7 @@ void QuitCmd::DoInitial() { void QuitCmd::Do(std::shared_ptr partition) { res_.SetRes(CmdRes::kOk); + LOG(INFO) << "QutCmd will close connection " << GetConn()->String(); GetConn()->SetClose(true); } diff --git a/src/pika_dispatch_thread.cc b/src/pika_dispatch_thread.cc index 93007e9be5..947c5a63c5 100644 --- a/src/pika_dispatch_thread.cc +++ b/src/pika_dispatch_thread.cc @@ -9,6 +9,7 @@ #include "include/pika_conf.h" #include "include/pika_server.h" +#include "pstd/include/testutil.h" extern PikaConf* g_pika_conf; extern PikaServer* g_pika_server; @@ -56,7 +57,7 @@ bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const { return false; } - DLOG(INFO) << "new clinet comming, ip: " << ip; + DLOG(INFO) << "new client comming, ip: " << ip; g_pika_server->incr_accumulative_connections(); return true; } diff --git a/src/pika_monitor_thread.cc b/src/pika_monitor_thread.cc index e2d46106d9..11291cedcb 100644 --- a/src/pika_monitor_thread.cc +++ b/src/pika_monitor_thread.cc @@ -110,7 +110,7 @@ net::WriteStatus PikaMonitorThread::SendMessage(int32_t fd, std::string& message ssize_t nwritten = 0, message_len_sended = 0, message_len_left = message.size(); while (message_len_left > 0) { nwritten = write(fd, message.data() + message_len_sended, message_len_left); - if (nwritten == -1 && errno == EAGAIN) { + if (nwritten == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // If the write buffer is full, but the client no longer consumes, it will // get stuck in the loop and cause the entire Pika to block becase of monitor_mutex_protector_. // So we put a limit on the number of retries diff --git a/src/pstd/CMakeLists.txt b/src/pstd/CMakeLists.txt index 67fcf2fbd2..f6e915a1e2 100644 --- a/src/pstd/CMakeLists.txt +++ b/src/pstd/CMakeLists.txt @@ -28,7 +28,7 @@ add_subdirectory(examples) aux_source_directory(./src DIR_SRCS) -add_library(pstd STATIC ${DIR_SRCS} ) +add_library(pstd STATIC ${DIR_SRCS}) target_include_directories(pstd PUBLIC ${PROJECT_SOURCE_DIR}/.. diff --git a/src/pstd/include/pstd_hash.h b/src/pstd/include/pstd_hash.h index 564ffac3ab..7a23a4de19 100644 --- a/src/pstd/include/pstd_hash.h +++ b/src/pstd/include/pstd_hash.h @@ -83,6 +83,6 @@ namespace pstd { std::string md5(const std::string& str, bool raw = false); std::string sha256(const std::string& input, bool raw = false); -} // namespace pstd +} // namespace pstd #endif // __PSTD_HASH_H__ diff --git a/src/pstd/include/testutil.h b/src/pstd/include/testutil.h index 4ebbf8e048..f5a5d84950 100644 --- a/src/pstd/include/testutil.h +++ b/src/pstd/include/testutil.h @@ -2,12 +2,19 @@ #define __PSTD_INCLUDE_TESTUTIL_H__ #include +#include namespace pstd { -extern std::string RandomString(const int len); -extern int RandomSeed(); +extern char* get_date_time(); extern int GetTestDirectory(std::string* result); +extern void current_time_str(char * str, size_t max_len); + +#define output(fmt, args...) do { \ + char __time_str__[1024];\ + current_time_str(__time_str__, sizeof(__time_str__)); \ + printf("[%s %s %d]" fmt "\n", __time_str__, __FILE_NAME__, __LINE__, ##args); \ + } while (0) }; // namespace pstd diff --git a/src/pstd/include/xdebug.h b/src/pstd/include/xdebug.h index 39dead6c25..12782c230c 100644 --- a/src/pstd/include/xdebug.h +++ b/src/pstd/include/xdebug.h @@ -44,7 +44,7 @@ # define clean_errno() (errno == 0 ? "None" : strerror(errno)) # define log_err(M, ...) \ { \ - fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, clean_errno(), ##__VA_ARGS__); \ + fprintf(stderr, "[ERROR] (%s:%d %s errno: %s) " M "\n", __FILE__, __LINE__, get_date_time().c_str(), clean_errno(), ##__VA_ARGS__); \ exit(-1); \ } # define log_warn(M, ...) \ diff --git a/src/pstd/src/testutil.cc b/src/pstd/src/testutil.cc index 34064d2152..df2f6b90cb 100644 --- a/src/pstd/src/testutil.cc +++ b/src/pstd/src/testutil.cc @@ -1,18 +1,30 @@ #include "pstd/include/testutil.h" +#include +#include + #include -#include "pstd/include/env.h" #include "pstd/include/random.h" namespace pstd { -std::string RandomString(const int len) { - char buf[len]; - for (int i = 0; i < len; i++) { - buf[i] = Random::Uniform('z' - 'a') + 'a'; - } - return std::string(buf, len); +void current_time_str(char * str, size_t max_len) +{ + struct timeval tv; + struct tm tmm; + + gettimeofday(&tv, 0); + + localtime_r(&(tv.tv_sec), &tmm); + snprintf(str, max_len, "%04d-%02d-%02dT%02d:%02d:%02d.%06d", + tmm.tm_year + 1900, + tmm.tm_mon+1, + tmm.tm_mday, + tmm.tm_hour, + tmm.tm_min, + tmm.tm_sec, + tv.tv_usec); } int GetTestDirectory(std::string* result) { @@ -27,13 +39,4 @@ int GetTestDirectory(std::string* result) { return 0; } -int RandomSeed() { - const char* env = getenv("TEST_RANDOM_SEED"); - int result = (env != NULL ? atoi(env) : 301); - if (result <= 0) { - result = 301; - } - return result; -} - } // namespace pstd