Skip to content

Commit

Permalink
Refactor : rename ConnPtr to NSQConnPtr
Browse files Browse the repository at this point in the history
  • Loading branch information
zieckey committed Apr 26, 2017
1 parent 53b74f7 commit b3bdab9
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 22 deletions.
24 changes: 15 additions & 9 deletions apps/evnsq/client.cc
Expand Up @@ -16,12 +16,16 @@ static const std::string kNSQMagic = " V2";
static const std::string kOK = "OK";

Client::Client(evpp::EventLoop* l, Type t, const Option& ops)
: loop_(l), type_(t), option_(ops), closing_(false) {}
: loop_(l), type_(t), option_(ops), closing_(false) {
DLOG_TRACE;
}

Client::~Client() {}
Client::~Client() {
DLOG_TRACE;
}

void Client::ConnectToNSQD(const std::string& addr) {
auto c = ConnPtr(new NSQConn(this, option_));
auto c = NSQConnPtr(new NSQConn(this, option_));
connecting_conns_[addr] = c;
c->SetMessageCallback(msg_fn_);
c->SetConnectionCallback(std::bind(&Client::OnConnection, this, std::placeholders::_1));
Expand Down Expand Up @@ -64,18 +68,18 @@ void Client::ConnectToLookupds(const std::string& lookupd_urls/*http://192.168.0
}

void Client::Close() {
LOG_INFO << "Client::Close this=" << this << " conns_.size=" << conns_.size() << " connecting_conns_.size=" << connecting_conns_.size();
DLOG_TRACE << "conns_.size=" << conns_.size() << " connecting_conns_.size=" << connecting_conns_.size();
closing_ = true;

auto f = [this]() {
ready_to_publish_fn_ = ReadyToPublishCallback();
for (auto it = this->conns_.begin(), ite = this->conns_.end(); it != ite; ++it) {
LOG_INFO << "Close connected NSQConn " << (*it).get() << (*it)->remote_addr();
DLOG_TRACE << "Close connected NSQConn " << (*it).get() << (*it)->remote_addr();
(*it)->Close();
}

for (auto it = this->connecting_conns_.begin(), ite = this->connecting_conns_.end(); it != ite; ++it) {
LOG_INFO << "Close connecting NSQConn " << it->second.get() << it->second->remote_addr();
DLOG_TRACE << "Close connecting NSQConn " << it->second.get() << it->second->remote_addr();
it->second->Close();
}

Expand Down Expand Up @@ -103,6 +107,7 @@ bool Client::IsReady() const {
void Client::HandleLoopkupdHTTPResponse(
const std::shared_ptr<evpp::httpc::Response>& response,
const std::shared_ptr<evpp::httpc::Request>& request) {
DLOG_TRACE;

std::string body = response->body().ToString();
if (response->http_code() != 200) {
Expand Down Expand Up @@ -142,7 +147,8 @@ void Client::HandleLoopkupdHTTPResponse(
}
}

void Client::OnConnection(const ConnPtr& conn) {
void Client::OnConnection(const NSQConnPtr& conn) {
DLOG_TRACE << " NSQConn remote_addr=" << conn->remote_addr() << " status=" << conn->StatusToString();
if (conn->IsConnected() || conn->IsReady()) {
conns_.push_back(conn);
connecting_conns_.erase(conn->remote_addr());
Expand Down Expand Up @@ -210,8 +216,8 @@ bool Client::IsKnownNSQDAddress(const std::string& addr) const {
return false;
}

void Client::MoveToConnectingList(const ConnPtr& conn) {
ConnPtr& connecting_conn = connecting_conns_[conn->remote_addr()];
void Client::MoveToConnectingList(const NSQConnPtr& conn) {
NSQConnPtr& connecting_conn = connecting_conns_[conn->remote_addr()];
if (connecting_conn.get()) {
// This connection is already in the connecting list
// so do not need to remove it from conns_
Expand Down
10 changes: 4 additions & 6 deletions apps/evnsq/client.h
Expand Up @@ -23,8 +23,6 @@ class Response;

namespace evnsq {

typedef std::shared_ptr<NSQConn> ConnPtr;

// A Client represents a producer or consumer who holds several NSQConns with a cluster of NSQDs
class EVNSQ_EXPORT Client {
public:
Expand Down Expand Up @@ -68,7 +66,7 @@ class EVNSQ_EXPORT Client {
void HandleLoopkupdHTTPResponse(
const std::shared_ptr<evpp::httpc::Response>& response,
const std::shared_ptr<evpp::httpc::Request>& request);
void OnConnection(const ConnPtr& conn);
void OnConnection(const NSQConnPtr& conn);
void set_topic(const std::string& t) {
topic_ = t;
}
Expand All @@ -80,15 +78,15 @@ class EVNSQ_EXPORT Client {
}
private:
bool IsKnownNSQDAddress(const std::string& addr) const;
void MoveToConnectingList(const ConnPtr& conn);
void MoveToConnectingList(const NSQConnPtr& conn);
protected:
evpp::EventLoop* loop_;
Type type_;
Option option_;
std::string topic_;
std::string channel_;
std::map<std::string/*NSQD address "host:port"*/, ConnPtr> connecting_conns_; // The TCP connections which are connecting to NSQDs
std::vector<ConnPtr> conns_; // The TCP connections which has established the connection with NSQDs
std::map<std::string/*NSQD address "host:port"*/, NSQConnPtr> connecting_conns_; // The TCP connections which are connecting to NSQDs
std::vector<NSQConnPtr> conns_; // The TCP connections which has established the connection with NSQDs
MessageCallback msg_fn_;
CloseCallback close_fn_;
std::vector<evpp::InvokeTimerPtr> lookupd_timers_;
Expand Down
2 changes: 1 addition & 1 deletion apps/evnsq/nsq_conn.cc
Expand Up @@ -66,7 +66,7 @@ const std::string& NSQConn::remote_addr() const {
}

void NSQConn::OnTCPConnectionEvent(const evpp::TCPConnPtr& conn) {
LOG_INFO << "NSQConn::OnTCPConnectionEvent status=" << StatusToString() << " TCPConn=" << conn.get() << " remote_addr=" << conn->remote_addr();
DLOG_TRACE << "status=" << StatusToString() << " TCPConn=" << conn.get() << " remote_addr=" << conn->remote_addr();
if (conn->IsConnected()) {
assert(tcp_client_->conn() == conn);
if (status_ == kConnecting) {
Expand Down
8 changes: 5 additions & 3 deletions apps/evnsq/nsq_conn.h
Expand Up @@ -36,7 +36,7 @@ class EVNSQ_EXPORT NSQConn : public std::enable_shared_from_this<NSQConn> {
kConnecting = 1,
kIdentifying = 2,
kAuthenticating = 3,
kConnected = 4, // Successfully connected to NSQD
kConnected = 4, // After identifying and authenticating, we successfully connected to NSQD
kSubscribing = 5,
kReady = 6, // Ready to produce messages to NSQD or consume messages from NSQD
kDisconnecting = 7,
Expand Down Expand Up @@ -85,6 +85,7 @@ class EVNSQ_EXPORT NSQConn : public std::enable_shared_from_this<NSQConn> {
return status_ == kAuthenticating;
}
const std::string& remote_addr() const;
const char* StatusToString() const;
private:
void WriteCommand(const Command& cmd);
void Reconnect();
Expand All @@ -101,7 +102,6 @@ class EVNSQ_EXPORT NSQConn : public std::enable_shared_from_this<NSQConn> {
void OnPublishResponse(const char* d, size_t len);
void PushWaitACKCommand(const CommandPtr& cmd);
CommandPtr PopWaitACKCommand();
const char* StatusToString() const;
private:
Client* nsq_client_;
evpp::EventLoop* loop_;
Expand All @@ -116,7 +116,9 @@ class EVNSQ_EXPORT NSQConn : public std::enable_shared_from_this<NSQConn> {
int64_t published_count_;
int64_t published_ok_count_;
int64_t published_failed_count_;

};

typedef std::shared_ptr<NSQConn> NSQConnPtr;

}

4 changes: 2 additions & 2 deletions apps/evnsq/producer.cc
Expand Up @@ -120,9 +120,9 @@ void Producer::OnPublishResponse(NSQConn* conn, const CommandPtr& cmd, bool succ
}
}

ConnPtr Producer::GetNextConn() {
NSQConnPtr Producer::GetNextConn() {
if (conns_.empty()) {
return ConnPtr();
return NSQConnPtr();
}

if (current_conn_index_ >= conns_.size()) {
Expand Down
2 changes: 1 addition & 1 deletion apps/evnsq/producer.h
Expand Up @@ -48,7 +48,7 @@ class EVNSQ_EXPORT Producer : public Client {
bool PublishInLoop(const CommandPtr& cmd);
void OnPublishResponse(NSQConn* conn, const CommandPtr& cmd, bool successfull);
void OnReady(NSQConn* conn);
ConnPtr GetNextConn();
NSQConnPtr GetNextConn();
void PrintStats();
private:
size_t current_conn_index_; // current Conn position at Client::conns_
Expand Down

0 comments on commit b3bdab9

Please sign in to comment.