Skip to content

Commit

Permalink
Fix issue 18 : The delay closing feature of an incoming TCPConn will …
Browse files Browse the repository at this point in the history
…make memory leak. See #18
  • Loading branch information
zieckey committed Mar 24, 2017
1 parent 91fe913 commit 961add9
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 10 deletions.
10 changes: 9 additions & 1 deletion evpp/tcp_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ void TCPClient::Disconnect() {
loop_->RunInLoop(std::bind(&TCPClient::DisconnectInLoop, this));
}

void TCPClient::SetConnectionCallback(const ConnectionCallback& cb) {
conn_fn_ = cb;
auto c = conn();
if (c) {
c->SetConnectionCallback(cb);
}
}

void TCPClient::DisconnectInLoop() {
LOG_WARN << "TCPClient::DisconnectInLoop this=" << this << " remote_addr=" << remote_addr_;
assert(loop_->IsInLoopThread());
Expand Down Expand Up @@ -101,7 +109,7 @@ void TCPClient::OnConnection(int sockfd, const std::string& laddr) {
void TCPClient::OnRemoveConnection(const TCPConnPtr& c) {
assert(c.get() == conn_.get());
assert(loop_->IsInLoopThread());

conn_.reset();
if (auto_reconnect_.load()) {
Reconnect();
}
Expand Down
4 changes: 1 addition & 3 deletions evpp/tcp_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ class EVPP_EXPORT TCPClient {
// 1. Successfully establish a connection : TCPConn::IsConnected() == true
// 2. An exist connection broken down : TCPConn::IsDisconnecting() == true
// 3. Failed to establish a connection : TCPConn::IsDisconnected() == true and TCPConn::fd() == -1
void SetConnectionCallback(const ConnectionCallback& cb) {
conn_fn_ = cb;
}
void SetConnectionCallback(const ConnectionCallback& cb);

void SetMessageCallback(const MessageCallback& cb) {
msg_fn_ = cb;
Expand Down
27 changes: 21 additions & 6 deletions evpp/tcp_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "evpp/fd_channel.h"
#include "evpp/event_loop.h"
#include "evpp/sockets.h"
#include "evpp/invoke_timer.h"

namespace evpp {
TCPConn::TCPConn(EventLoop* l,
Expand All @@ -28,11 +29,11 @@ TCPConn::TCPConn(EventLoop* l,
chan_->SetWriteCallback(std::bind(&TCPConn::HandleWrite, this));
}

LOG_DEBUG << "TCPConn::[" << name_ << "] this=" << this << " channel=" << chan_.get() << " fd=" << sockfd;
LOG_DEBUG << "TCPConn::[" << name_ << "] this=" << this << " channel=" << chan_.get() << " fd=" << sockfd << " addr=" << Addr();
}

TCPConn::~TCPConn() {
LOG_TRACE << "TCPConn::~TCPConn() name=" << name() << " this=" << this << " channel=" << chan_.get() << " fd=" << fd_ << " type=" << int(type()) << " status=" << StatusToString();
LOG_TRACE << "TCPConn::~TCPConn() name=" << name() << " this=" << this << " channel=" << chan_.get() << " fd=" << fd_ << " type=" << int(type()) << " status=" << StatusToString() << " addr=" << Addr();;
assert(status_ == kDisconnected);

if (fd_ >= 0) {
Expand All @@ -42,10 +43,12 @@ TCPConn::~TCPConn() {
EVUTIL_CLOSESOCKET(fd_);
fd_ = INVALID_SOCKET;
}

assert(!delay_close_timer_.get());
}

void TCPConn::Close() {
LOG_INFO << "TCPConn::Close this=" << this << " fd=" << fd_ << " status=" << StatusToString() << " remote_addr=" << remote_addr_;
LOG_INFO << "TCPConn::Close this=" << this << " fd=" << fd_ << " status=" << StatusToString() << " addr=" << Addr();
auto c = shared_from_this();
auto f = [c]() {
assert(c->loop_->IsInLoopThread());
Expand Down Expand Up @@ -182,8 +185,8 @@ void TCPConn::HandleRead() {
// This is an incoming connection, we need to preserve the connection for a while so that we can reply to it.
// And we set a timer to close the connection eventually.
chan_->DisableReadEvent();
LOG_DEBUG << "TCPConn::HandleRead this=" << this << " channel (fd=" << chan_->fd() << ") DisableReadEvent";
loop_->RunAfter(close_delay_, std::bind(&TCPConn::HandleClose, shared_from_this())); // TODO leave it to user layer close.
LOG_DEBUG << "TCPConn::HandleRead this=" << this << " channel (fd=" << chan_->fd() << ") DisableReadEvent. And set a timer to delay close this TCPConn";
delay_close_timer_ = loop_->RunAfter(close_delay_, std::bind(&TCPConn::DelayClose, shared_from_this())); // TODO leave it to user layer close.
}
} else {
if (EVUTIL_ERR_RW_RETRIABLE(serrno)) {
Expand Down Expand Up @@ -221,8 +224,13 @@ void TCPConn::HandleWrite() {
}
}

void TCPConn::DelayClose() {
delay_close_timer_.reset();
HandleClose();
}

void TCPConn::HandleClose() {
LOG_INFO << "TCPConn::HandleClose this=" << this << " remote_addr=" << remote_addr_ << " fd=" << fd_ << " status_=" << StatusToString();
LOG_INFO << "TCPConn::HandleClose this=" << this << " addr=" << Addr() << " fd=" << fd_ << " status_=" << StatusToString();

// Avoid multi calling
if (status_ == kDisconnected) {
Expand All @@ -237,6 +245,12 @@ void TCPConn::HandleClose() {

TCPConnPtr conn(shared_from_this());

if (delay_close_timer_) {
LOG_INFO << "Cancel the delay closing timer.";
delay_close_timer_->Cancel();
delay_close_timer_.reset();
}

if (conn_fn_) {
// This callback must be invoked at status kDisconnecting
// e.g. when the TCPClient disconnects with remote server,
Expand All @@ -246,6 +260,7 @@ void TCPConn::HandleClose() {
}

close_fn_(conn);
LOG_INFO << "TCPConn::HandleClose exit, this=" << this << " addr=" << Addr() << " fd=" << fd_ << " status_=" << StatusToString() << " use_count=" << conn.use_count();
status_ = kDisconnected;
}

Expand Down
16 changes: 16 additions & 0 deletions evpp/tcp_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace evpp {
class EventLoop;
class FdChannel;
class TCPClient;
class InvokeTimer;

class EVPP_EXPORT TCPConn : public std::enable_shared_from_this<TCPConn> {
public:
Expand Down Expand Up @@ -46,6 +47,9 @@ class EVPP_EXPORT TCPConn : public std::enable_shared_from_this<TCPConn> {
EventLoop* loop() const {
return loop_;
}
int fd() const {
return fd_;
}
void set_context(const Any& c) {
context_[0] = c;
}
Expand Down Expand Up @@ -82,6 +86,9 @@ class EVPP_EXPORT TCPConn : public std::enable_shared_from_this<TCPConn> {
Type type() const {
return type_;
}
bool IsIncommingConn() const {
return type_ == kIncoming;
}
Status status() const {
return status_;
}
Expand Down Expand Up @@ -122,10 +129,18 @@ class EVPP_EXPORT TCPConn : public std::enable_shared_from_this<TCPConn> {
void HandleRead();
void HandleWrite();
void HandleClose();
void DelayClose();
void SendInLoop(const Slice& message);
void SendInLoop(const void* data, size_t len);
void SendStringInLoop(const std::string& message);
std::string StatusToString() const;
std::string Addr() const {
if (IsIncommingConn()) {
return "(" + remote_addr_ + "->" + local_addr_ + "(local))";
} else {
return "(" + local_addr_ + "(local)->" + remote_addr_ + ")";
}
}
private:
EventLoop* loop_;
int fd_;
Expand All @@ -145,6 +160,7 @@ class EVPP_EXPORT TCPConn : public std::enable_shared_from_this<TCPConn> {
// The delay time to close a incoming connection which has been shutdown by peer normally.
// Default is 3 second.
Duration close_delay_;
std::shared_ptr<InvokeTimer> delay_close_timer_; // The timer to delay close this TCPConn

ConnectionCallback conn_fn_; // This will be called to the user application layer
MessageCallback msg_fn_; // This will be called to the user application layer
Expand Down
1 change: 1 addition & 0 deletions evpp/tcp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ EventLoop* TCPServer::GetNextLoop(const struct sockaddr_in* raddr) {
void TCPServer::RemoveConnection(const TCPConnPtr& conn) {
auto f = [ = ]() {
// Remove the connection in the listening EventLoop
LOG_INFO << "TCPServer::RemoveConnection conn=" << conn.get() << " fd="<< conn->fd();
assert(this->loop_->IsInLoopThread());
this->connections_.erase(conn->name());
};
Expand Down

0 comments on commit 961add9

Please sign in to comment.