Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: init every class member's value to defeat the redis-cli connection closed by pika #1390

Merged
merged 16 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class PikaClientConn : public net::RedisConn {
net::ServerThread* const server_thread_;
std::string current_table_;
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_;
bool is_pubsub_ = false;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
std::shared_ptr<std::string> resp_ptr);
Expand Down
8 changes: 4 additions & 4 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,17 +471,17 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
void LogCommand() const;

std::string name_;
int arity_;
uint16_t flag_;
int arity_ = -2;
uint16_t flag_ = 0;

CmdRes res_;
PikaCmdArgsType argv_;
std::string table_name_;

std::weak_ptr<net::NetConn> conn_;
std::weak_ptr<std::string> resp_;
CmdStage stage_;
uint64_t do_duration_;
CmdStage stage_ = kNone;
uint64_t do_duration_ = 0;

private:
virtual void DoInitial() = 0;
Expand Down
3 changes: 1 addition & 2 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class PikaDispatchThread {
explicit ClientConnFactory(int max_conn_rbuf_size) : max_conn_rbuf_size_(max_conn_rbuf_size) {}
virtual std::shared_ptr<net::NetConn> NewNetConn(int connfd, const std::string& ip_port, net::Thread* server_thread,
void* worker_specific_data, net::NetMultiplexer* net) const {
return std::static_pointer_cast<net::NetConn>(std::make_shared<PikaClientConn>(
connfd, ip_port, server_thread, net, net::HandleType::kAsynchronous, max_conn_rbuf_size_));
return std::make_shared<PikaClientConn>(connfd, ip_port, server_thread, net, net::HandleType::kAsynchronous, max_conn_rbuf_size_);
}

private:
Expand Down
2 changes: 1 addition & 1 deletion include/pika_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@

#define PIKA_MAJOR 3
#define PIKA_MINOR 4
#define PIKA_PATCH 1
#define PIKA_PATCH 2

#endif // INCLUDE_PIKA_VERSION_H_
26 changes: 17 additions & 9 deletions src/net/include/net_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define NET_INCLUDE_NET_CONN_H_

#include <sys/time.h>
#include <sstream>
#include <string>

#ifdef __ENABLE_SSL
Expand All @@ -17,6 +18,7 @@
#include "net/include/net_define.h"
#include "net/include/server_thread.h"
#include "net/src/net_multiplexer.h"
#include "pstd/include/testutil.h"

namespace net {

Expand All @@ -28,7 +30,7 @@ class NetConn : public std::enable_shared_from_this<NetConn> {
virtual ~NetConn();

/*
* Set the fd to nonblock && set the flag_ the the fd flag
* Set the fd to nonblock && set the flag_ the fd flag
*/
bool SetNonblock();

Expand Down Expand Up @@ -64,7 +66,7 @@ class NetConn : public std::enable_shared_from_this<NetConn> {
void set_name(std::string name) { name_ = std::move(name); }

bool IsClose() { return close_; }
void SetClose(bool close) { close_ = close; }
void SetClose(bool close);

void set_last_interaction(const struct timeval& now) { last_interaction_ = now; }

Expand All @@ -76,30 +78,36 @@ class NetConn : public std::enable_shared_from_this<NetConn> {

NetMultiplexer* net_multiplexer() const { return net_multiplexer_; }

std::string String() const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToString

std::stringstream ss;
ss << "fd: " << fd_ << ", ip_port: " << ip_port_ << ", name: " << name_ << ", is_reply: " << is_reply_ << ", close: " << close_;
return ss.str();
}

#ifdef __ENABLE_SSL
SSL* ssl() { return ssl_; }

bool security() { return ssl_ != nullptr; }
#endif

private:
int fd_;
int fd_ = -1;
std::string ip_port_;
bool is_reply_;
bool is_writable_;
bool close_;
bool is_reply_ = false;
bool is_writable_ = false;
bool close_ = false;
struct timeval last_interaction_;
int flags_;
int flags_ = 0;
std::string name_;

#ifdef __ENABLE_SSL
SSL* ssl_;
#endif

// thread this conn belong to
Thread* thread_;
Thread* thread_ = nullptr;
// the net epoll this conn belong to
NetMultiplexer* net_multiplexer_;
NetMultiplexer* net_multiplexer_ = nullptr;

/*
* No allowed copy and copy assign operator
Expand Down
18 changes: 9 additions & 9 deletions src/net/include/redis_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ class RedisConn : public NetConn {
static int ParserCompleteCb(RedisParser* parser, const std::vector<RedisCmdArgsType>& argvs);
ReadStatus ParseRedisParserStatus(RedisParserStatus status);

HandleType handle_type_;
HandleType handle_type_ = kSynchronous;

char* rbuf_;
int rbuf_len_;
int rbuf_max_len_;
int msg_peak_;
int command_len_;
char* rbuf_ = nullptr;
int rbuf_len_ = 0;
int rbuf_max_len_ = 0;
int msg_peak_ = 0;
int command_len_ = 0;

uint32_t wbuf_pos_;
uint32_t wbuf_pos_ = 0;
std::string response_;

// For Redis Protocol parser
int last_read_pos_;
int last_read_pos_ = -1;
RedisParser redis_parser_;
long bulk_len_;
long bulk_len_ = -1;
};

} // namespace net
Expand Down
6 changes: 3 additions & 3 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ class ServerThread : public Thread {
friend class DispatchThread;
friend class WorkerThread;

int cron_interval_;
int cron_interval_ = 0;
virtual void DoCronTask();

// process events in notify_queue
virtual void ProcessNotifyEvents(const NetFiredEvent* pfe);

const ServerHandle* handle_;
bool own_handle_;
bool own_handle_ = false;

#ifdef __ENABLE_SSL
bool security_;
Expand All @@ -186,7 +186,7 @@ class ServerThread : public Thread {
/*
* The tcp server port and address
*/
int port_;
int port_ = -1;
std::set<std::string> ips_;
std::vector<ServerSocket*> server_sockets_;
std::set<int32_t> server_fds_;
Expand Down
4 changes: 3 additions & 1 deletion src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

#include <vector>

#include "net/src/dispatch_thread.h"
#include <glog/logging.h>

#include "net/src/dispatch_thread.h"
#include "net/src/net_item.h"
#include "net/src/net_multiplexer.h"
#include "net/src/worker_thread.h"
Expand Down Expand Up @@ -150,6 +151,7 @@ void DispatchThread::HandleNewConn(const int connfd, const std::string& ip_port)
// Slow workers may consume many fds.
// We simply loop to find next legal worker.
NetItem ti(connfd, ip_port);
LOG(INFO) << "accept new conn " << ti.String();
int next_thread = last_thread_;
bool find = false;
for (int cnt = 0; cnt < work_num_; cnt++) {
Expand Down
3 changes: 3 additions & 0 deletions src/net/src/holy_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ void HolyThread::DoCronTask() {
}
conns_.clear();
deleting_conn_ipport_.clear();
for (const auto conn : to_close) {
CloseFd(conn);
}
return;
}

Expand Down
6 changes: 3 additions & 3 deletions src/net/src/holy_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class NetConn;

class HolyThread : public ServerThread {
public:
// This type thread thread will listen and work self list redis thread
// This type thread will listen and work self list redis thread
HolyThread(int port, ConnFactory* conn_factory, int cron_interval = 0, const ServerHandle* handle = nullptr,
bool async = true);
HolyThread(const std::string& bind_ip, int port, ConnFactory* conn_factory, int cron_interval = 0,
Expand Down Expand Up @@ -57,8 +57,8 @@ class HolyThread : public ServerThread {
mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<NetConn>> conns_;

ConnFactory* conn_factory_;
void* private_data_;
ConnFactory* conn_factory_ = nullptr;
void* private_data_ = nullptr;

std::atomic<int> keepalive_timeout_; // keepalive second
bool async_;
Expand Down
4 changes: 4 additions & 0 deletions src/net/src/net_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ NetConn::~NetConn() {
#endif
}

void NetConn::SetClose(bool close) {
close_ = close;
}

bool NetConn::SetNonblock() {
flags_ = Setnonblocking(fd());
if (flags_ == -1) {
Expand Down
7 changes: 5 additions & 2 deletions src/net/src/net_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ class NetItem {

int fd() const { return fd_; }
std::string ip_port() const { return ip_port_; }
std::string String() const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToString

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToString

我这个是借鉴 Go 的实现,ToString() 是 Java 的吗?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

额接口名字而已,String也行,别的库基本都是ToString ToStringView to_string to_stringview 这几类

return std::to_string(fd_) + ":" + ip_port_ + ":" + std::to_string(notify_type_);
}

NotifyType notify_type() const { return notify_type_; }

private:
int fd_;
int fd_ = -1;
std::string ip_port_;
NotifyType notify_type_;
NotifyType notify_type_ = kNotiConnect;
};

} // namespace net
Expand Down
2 changes: 1 addition & 1 deletion src/net/src/redis_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ int redisvFormatCommand(std::string* cmd, const char* format, va_list ap) {
memcpy(_format, c, _l);
_format[_l] = '\0';

int n = vsnprintf(buf, REDIS_MAX_MESSAGE, _format, _cpy);
int n = vsnprintf(buf, sizeof(buf), _format, _cpy);
curarg.append(buf, n);

/* Update current position (note: outer blocks
Expand Down
4 changes: 2 additions & 2 deletions src/net/src/redis_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ ReadStatus RedisConn::GetRequest() {

nread = read(fd(), rbuf_ + next_read_pos, remain);
if (nread == -1) {
if (errno == EAGAIN) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
nread = 0;
return kReadHalf; // HALF
} else {
Expand Down Expand Up @@ -150,7 +150,7 @@ WriteStatus RedisConn::SendReply() {
}
}
if (nwritten == -1) {
if (errno == EAGAIN) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return kWriteHalf;
} else {
// Here we should close the connection
Expand Down
1 change: 1 addition & 0 deletions src/net/src/server_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "net/src/server_socket.h"
#include "pstd/include/xdebug.h"
#include "pstd/include/testutil.h"

namespace net {

Expand Down
7 changes: 7 additions & 0 deletions src/net/src/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#include <vector>

#include <glog/logging.h>

#include "pstd/include/testutil.h"
#include "net/src/worker_thread.h"

#include "net/include/net_conn.h"
Expand Down Expand Up @@ -49,6 +52,7 @@ std::shared_ptr<NetConn> WorkerThread::MoveConnOut(int fd) {
int fd = iter->first;
conn = iter->second;
net_multiplexer_->NetDelEvent(fd, 0);
DLOG(INFO) << "move out connection " << conn->String();
conns_.erase(iter);
}
return conn;
Expand Down Expand Up @@ -171,6 +175,7 @@ void* WorkerThread::ThreadMain() {
in_conn->set_is_reply(false);
if (in_conn->IsClose()) {
should_close = 1;
LOG(INFO) << "will close client connection " << in_conn->String();
}
} else if (write_status == kWriteHalf) {
continue;
Expand Down Expand Up @@ -238,13 +243,15 @@ void WorkerThread::DoCronTask() {
to_close.push_back(conn);
deleting_conn_ipport_.erase(conn->ip_port());
iter = conns_.erase(iter);
LOG(INFO) << "will close client connection " << conn->String();
continue;
}

// Check keepalive timeout connection
if (keepalive_timeout_ > 0 && (now.tv_sec - conn->last_interaction().tv_sec > keepalive_timeout_)) {
to_timeout.push_back(conn);
iter = conns_.erase(iter);
LOG(INFO) << "connection " << conn->String() << " keepalive timeout, the keepalive_timeout_ is " << keepalive_timeout_.load();
continue;
}

Expand Down
8 changes: 4 additions & 4 deletions src/net/src/worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ class WorkerThread : public Thread {
mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<NetConn>> conns_;

void* private_data_;
void* private_data_ = nullptr;

private:
ServerThread* server_thread_;
ConnFactory* conn_factory_;
int cron_interval_;
ServerThread* server_thread_ = nullptr;
ConnFactory* conn_factory_ = nullptr;
int cron_interval_ = 0;

/*
* The epoll handler
Expand Down
3 changes: 3 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#include <algorithm>

#include <glog/logging.h>

#include "include/build_version.h"
#include "include/pika_conf.h"
#include "include/pika_rm.h"
Expand Down Expand Up @@ -2289,6 +2291,7 @@ void QuitCmd::DoInitial() {

void QuitCmd::Do(std::shared_ptr<Partition> partition) {
res_.SetRes(CmdRes::kOk);
LOG(INFO) << "QutCmd will close connection " << GetConn()->String();
GetConn()->SetClose(true);
}

Expand Down
3 changes: 2 additions & 1 deletion src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "include/pika_conf.h"
#include "include/pika_server.h"
#include "pstd/include/testutil.h"

extern PikaConf* g_pika_conf;
extern PikaServer* g_pika_server;
Expand Down Expand Up @@ -56,7 +57,7 @@ bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const {
return false;
}

DLOG(INFO) << "new clinet comming, ip: " << ip;
DLOG(INFO) << "new client comming, ip: " << ip;
g_pika_server->incr_accumulative_connections();
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/pika_monitor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ net::WriteStatus PikaMonitorThread::SendMessage(int32_t fd, std::string& message
ssize_t nwritten = 0, message_len_sended = 0, message_len_left = message.size();
while (message_len_left > 0) {
nwritten = write(fd, message.data() + message_len_sended, message_len_left);
if (nwritten == -1 && errno == EAGAIN) {
if (nwritten == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// If the write buffer is full, but the client no longer consumes, it will
// get stuck in the loop and cause the entire Pika to block becase of monitor_mutex_protector_.
// So we put a limit on the number of retries
Expand Down
Loading