Skip to content
This repository has been archived by the owner on Jan 13, 2022. It is now read-only.

Commit

Permalink
close network store when underlying connection is closed
Browse files Browse the repository at this point in the history
Summary:
as in title. also look at the test plan

Test Plan:
Earlier:
	scribe-client running with a buffered store
	scribe-server running with a file store
	scribe-client's primary network store points to scribe-server

	step 1 with both scribe client and server running log a message. this
	populats the scribe-client's connpool

	step 2 bring down scribe server

	step 3 send another message in same category. scribe client goes into
	disconnected state. periodically it wakes up, goes to SENDING_BUFFER
	state and then back to DISCONNECTED state. This is because the primary
	store was never closed. this leads to unnecessary  reading of the
	backup file
	[Thu May 27 11:20:41 2010] "[xyz] Changing state from <DISCONNECTED> to
<SENDING_BUFFER>"
	[Thu May 27 11:20:41 2010] "[xyz] read <1> entries of <10> bytes from file
</tmp/corr/xyz/xyz_00000>"
	Thrift: Thu May 27 11:20:41 2010 TSocket::open() error on socket (after poll)
<Host: 127.0.0.1 Port: 1464>Connection refused
	[Thu May 27 11:20:41 2010] "failed to open connection to remote scribe server
<127.0.0.1:1464> thrift error <socket open() error: Connection refused>"
	[Thu May 27 11:20:41 2010] "[xyz] choosing new retry interval <1> seconds"
	[Thu May 27 11:20:41 2010] "[xyz] Changing state from <SENDING_BUFFER> to
<DISCONNECTED>"

	After the fix:
	In step 3 the scribe client keeps going from DISCONNECTED to
	DISCONNECTED
	Thrift: Thu May 27 11:21:38 2010 TSocket::open() error on socket (after poll)
<Host: 127.0.0.1 Port: 1464>Connection refused
	[Thu May 27 11:21:38 2010] "failed to open connection to remote scribe server
<127.0.0.1:1464> thrift error <socket open() error: Connection refused>"
	[Thu May 27 11:21:38 2010] "[xyz] choosing new retry interval <1> seconds"
	[Thu May 27 11:21:38 2010] "[xyz] Changing state from <DISCONNECTED> to
<DISCONNECTED>"

DiffCamp Revision: 118740
Reviewed By: groys
CC: agiardullo, groys, scribe-dev@lists
Tasks:

Revert Plan:
OK

Blame Revision:
26334

git-svn-id: svn+ssh://tubbs/svnapps/fbomb/branches/scribe-os/fbcode/scribe@28771 2248de34-8caa-4a3c-bc55-5e52d9d7b73a
  • Loading branch information
pkhemani authored and groys committed Jun 7, 2010
1 parent 47f6a21 commit 78abcf7
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 56 deletions.
40 changes: 21 additions & 19 deletions src/conn_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ string ConnPool::makeKey(const string& hostname, unsigned long port) {
}

bool ConnPool::open(const string& hostname, unsigned long port, int timeout) {
return openCommon(makeKey(hostname, port),
return openCommon(makeKey(hostname, port),
shared_ptr<scribeConn>(new scribeConn(hostname, port, timeout)));
}

bool ConnPool::open(const string &service, const server_vector_t &servers, int timeout) {
return openCommon(service,
return openCommon(service,
shared_ptr<scribeConn>(new scribeConn(service, servers, timeout)));
}

Expand All @@ -72,12 +72,12 @@ void ConnPool::close(const string &service) {
closeCommon(service);
}

bool ConnPool::send(const string& hostname, unsigned long port,
int ConnPool::send(const string& hostname, unsigned long port,
shared_ptr<logentry_vector_t> messages) {
return sendCommon(makeKey(hostname, port), messages);
}

bool ConnPool::send(const string &service,
int ConnPool::send(const string &service,
shared_ptr<logentry_vector_t> messages) {
return sendCommon(service, messages);
}
Expand Down Expand Up @@ -141,20 +141,20 @@ void ConnPool::closeCommon(const string &key) {
pthread_mutex_unlock(&mapMutex);
}

bool ConnPool::sendCommon(const string &key,
int ConnPool::sendCommon(const string &key,
shared_ptr<logentry_vector_t> messages) {
pthread_mutex_lock(&mapMutex);
conn_map_t::iterator iter = connMap.find(key);
if (iter != connMap.end()) {
(*iter).second->lock();
pthread_mutex_unlock(&mapMutex);
bool result = (*iter).second->send(messages);
int result = (*iter).second->send(messages);
(*iter).second->unlock();
return result;
} else {
LOG_OPER("send failed. No connection pool entry for <%s>", key.c_str());
pthread_mutex_unlock(&mapMutex);
return false;
return (CONN_FATAL);
}
}

Expand Down Expand Up @@ -275,12 +275,13 @@ void scribeConn::close() {
}
}

bool scribeConn::send(boost::shared_ptr<logentry_vector_t> messages) {
int
scribeConn::send(boost::shared_ptr<logentry_vector_t> messages) {
bool fatal;
int size = messages->size();
if (!isOpen()) {
if (!open()) {
return false;
return (CONN_FATAL);
}
}

Expand All @@ -302,12 +303,12 @@ bool scribeConn::send(boost::shared_ptr<logentry_vector_t> messages) {
g_Handler->incCounter("sent", size);
LOG_OPER("Successfully sent <%d> messages to remote scribe server %s",
size, connectionString().c_str());
return true;
return (CONN_OK);
}
fatal = false;
LOG_OPER("Failed to send <%d> messages, remote scribe server %s "
"returned error code <%d>", size, connectionString().c_str(),
(int) result);
(int) result);
} catch (const TTransportException& ttx) {
fatal = true;
LOG_OPER("Failed to send <%d> messages to remote scribe server %s "
Expand All @@ -325,16 +326,17 @@ bool scribeConn::send(boost::shared_ptr<logentry_vector_t> messages) {
*/
if (serviceBased || fatal) {
close();
return (CONN_FATAL);
}
return false;
return (CONN_TRANSIENT);
}

std::string scribeConn::connectionString() {
if (serviceBased) {
return "<" + remoteHost + " Service: " + serviceName + ">";
} else {
char port[10];
snprintf(port, 10, "%lu", remotePort);
return "<" + remoteHost + ":" + string(port) + ">";
}
if (serviceBased) {
return "<" + remoteHost + " Service: " + serviceName + ">";
} else {
char port[10];
snprintf(port, 10, "%lu", remotePort);
return "<" + remoteHost + ":" + string(port) + ">";
}
}
14 changes: 10 additions & 4 deletions src/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@

#include "common.h"

/* return codes for ScribeConn and ConnPool */
#define CONN_FATAL (-1) /* fatal error. close everything */
#define CONN_OK (0) /* success */
#define CONN_TRANSIENT (1) /* transient error */

// Basic scribe class to manage network connections. Used by network store
class scribeConn {
public:
scribeConn(const std::string& host, unsigned long port, int timeout);
Expand All @@ -41,7 +47,7 @@ class scribeConn {
bool isOpen();
bool open();
void close();
bool send(boost::shared_ptr<logentry_vector_t> messages);
int send(boost::shared_ptr<logentry_vector_t> messages);

private:
std::string connectionString();
Expand Down Expand Up @@ -82,15 +88,15 @@ class ConnPool {
void close(const std::string& host, unsigned long port);
void close(const std::string &service);

bool send(const std::string& host, unsigned long port,
int send(const std::string& host, unsigned long port,
boost::shared_ptr<logentry_vector_t> messages);
bool send(const std::string &service,
int send(const std::string &service,
boost::shared_ptr<logentry_vector_t> messages);

private:
bool openCommon(const std::string &key, boost::shared_ptr<scribeConn> conn);
void closeCommon(const std::string &key);
bool sendCommon(const std::string &key,
int sendCommon(const std::string &key,
boost::shared_ptr<logentry_vector_t> messages);

protected:
Expand Down
54 changes: 21 additions & 33 deletions src/store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1936,6 +1936,8 @@ shared_ptr<Store> NetworkStore::copy(const std::string &category) {
// first try sending an empty vector to catch dfqs
bool
NetworkStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
int ret;

if (!isOpen()) {
if (!open()) {
LOG_OPER("[%s] Could not open NetworkStore in handleMessages",
Expand All @@ -1949,45 +1951,31 @@ NetworkStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {

if (useConnPool) {
if (serviceBased) {
if (!tryDummySend || g_connPool.send(serviceName, dummymessages)) {
if (g_connPool.send(serviceName, messages)) {
return (true);
}
if (!tryDummySend ||
((ret = g_connPool.send(serviceName, dummymessages)) == CONN_OK)) {
ret = g_connPool.send(serviceName, messages);
}
} else {
if (!tryDummySend ||
(ret = g_connPool.send(remoteHost, remotePort, dummymessages)) ==
CONN_OK) {
ret = g_connPool.send(remoteHost, remotePort, messages);
}
/*
* let us force reopen the store on the next try. on the next
* open the serverlist might get updated. we might get a
* connection to a new server.
*/
close();
return (false);
}
} else if (unpooledConn) {
if (!tryDummySend ||
g_connPool.send(remoteHost, remotePort, dummymessages)) {
return(g_connPool.send(remoteHost, remotePort, messages));
((ret = unpooledConn->send(dummymessages)) == CONN_OK)) {
ret = unpooledConn->send(messages);
}
return (false);
} else {
ret = CONN_FATAL;
LOG_OPER("[%s] Logic error: NetworkStore::handleMessages unpooledConn "
"is NULL", categoryHandled.c_str());
}
if (unpooledConn) {
if (!tryDummySend || unpooledConn->send(dummymessages)) {
if (unpooledConn->send(messages)) {
return (true);
}
return unpooledConn->send(messages);
} else {
LOG_OPER("[%s] Logic error: NetworkStore::handleMessages unpooledConn is NULL",
categoryHandled.c_str());
return false;
}
if (serviceBased) {
// force reopen the store, try new service definitions
close();
}
return (false);
if (ret == CONN_FATAL) {
close();
}
LOG_OPER("[%s] Logic error: NetworkStore::handleMessages unpooledConn "
"is NULL", categoryHandled.c_str());
return (false);
return (ret == CONN_OK);
}

void NetworkStore::flush() {
Expand Down

0 comments on commit 78abcf7

Please sign in to comment.