Skip to content

Commit

Permalink
Drop new xmpp connection for existing connection with same name
Browse files Browse the repository at this point in the history
If the IP address is different, then it implies mis-configuration of
the xmpp agents. In such cases, drop new connection request and
retain existing connection intact

Since xmpp tasks run concurrently (across different IP addresses),
use a XmppConnectionManager scoped mutex to solve concurrency issues
among such parallel xmpp connection requests processing

Also increase severity of a couple of related log messages

Re-enable and fix few UTs that test this very exact scenario

Closes-Bug: #1687096

Change-Id: Ic7252060abfc4b34fa184ff163afc217a88bb40c
  • Loading branch information
ananth-at-camphor-networks committed May 2, 2017
1 parent 8610a42 commit 77fa8e6
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 22 deletions.
71 changes: 53 additions & 18 deletions src/bgp/test/bgp_xmpp_basic_test.cc
Expand Up @@ -109,7 +109,7 @@ class BgpXmppBasicTest : public ::testing::Test {
xs_x_ = new XmppServer(&evm_,
test::XmppDocumentMock::kControlNodeJID, &xs_cfg);
} else {
xs_x_ = new XmppServer(&evm_,
xs_x_ = new XmppServer(&evm_,
test::XmppDocumentMock::kControlNodeJID);
}
xs_x_->Initialize(0, false);
Expand All @@ -123,14 +123,17 @@ class BgpXmppBasicTest : public ::testing::Test {
}

virtual void TearDown() {
XmppStateMachineTest::set_notify_fn(NULL);
xs_x_->Shutdown();
task_util::WaitForIdle();
bs_x_->Shutdown();
task_util::WaitForIdle();
TASK_UTIL_EXPECT_EQ(0, xs_x_->ConnectionCount());
cm_x_.reset();
task_util::WaitForIdle();

TcpServerManager::DeleteServer(xs_x_);
xs_x_ = NULL;
bs_x_->Shutdown();
task_util::WaitForIdle();

evm_.Shutdown();
thread_.Join();
Expand Down Expand Up @@ -274,6 +277,7 @@ class BgpXmppBasicTest : public ::testing::Test {
bool queue_disable) {
if (sm->IsActiveChannel())
return;
tbb::mutex::scoped_lock lock(mutex_);
if (create) {
xmpp_state_machines_.insert(sm);
sm->set_queue_disable(queue_disable);
Expand All @@ -285,7 +289,7 @@ class BgpXmppBasicTest : public ::testing::Test {
protected:
EventManager evm_;
ServerThread thread_;
BgpServerTestPtr bs_x_;
boost::scoped_ptr<BgpServerTest> bs_x_;
XmppServer *xs_x_;
XmppLifetimeManagerTest *xltm_x_;
bool auth_enabled_;
Expand All @@ -303,6 +307,7 @@ class BgpXmppBasicTest : public ::testing::Test {
test::NetworkAgentMockPtr agent_x3_;
boost::scoped_ptr<BgpXmppChannelManager> cm_x_;
std::set<XmppStateMachineTest *> xmpp_state_machines_;
mutable tbb::mutex mutex_;
};

bool BgpXmppBasicTest::validate_done_ = false;
Expand Down Expand Up @@ -1279,7 +1284,7 @@ class BgpXmppBasicParamTest2 : public BgpXmppBasicTest,
public ::testing::WithParamInterface<TestParams2> {
protected:
virtual void SetUp() {
bool agent_address_same_ = std::tr1::get<0>(GetParam());
agent_address_same_ = std::tr1::get<0>(GetParam());
auth_enabled_ = std::tr1::get<1>(GetParam());

LOG(DEBUG, "BgpXmppBasicParamTest Agent Address: " <<
Expand Down Expand Up @@ -1318,13 +1323,19 @@ class BgpXmppBasicParamTest2 : public BgpXmppBasicTest,
}

void DestroyAgents() {
agent_x1_->SessionDown();
agent_x2_->SessionDown();
agent_x3_->SessionDown();

agent_x1_->Delete();
agent_x2_->Delete();
agent_x3_->Delete();
}

bool agent_address_same_;
};

TEST_P(BgpXmppBasicParamTest2, DISABLED_DuplicateEndpointName1) {
TEST_P(BgpXmppBasicParamTest2, DuplicateEndpointName1) {
CreateAgents();

// Bring up one agent with given name.
Expand All @@ -1339,18 +1350,34 @@ TEST_P(BgpXmppBasicParamTest2, DISABLED_DuplicateEndpointName1) {
agent_x2_->SessionUp();
agent_x3_->SessionUp();

// Make sure that latter two agents see sessions getting closed.
TASK_UTIL_EXPECT_TRUE(
agent_x2_->get_session_close() >= client_x2_session_close + 3);
TASK_UTIL_EXPECT_TRUE(
agent_x3_->get_session_close() >= client_x3_session_close + 3);
TASK_UTIL_EXPECT_TRUE(
agent_x1_->get_session_close() >= client_x1_session_close);
// Make sure that latter two agents see sessions getting closed if their IP
// address is not the same.
if (!agent_address_same_) {
TASK_UTIL_EXPECT_TRUE(
agent_x2_->get_session_close() >= client_x2_session_close + 3);
TASK_UTIL_EXPECT_TRUE(
agent_x3_->get_session_close() >= client_x3_session_close + 3);

// Session which was up to begin with should remain up and not flap.
TASK_UTIL_EXPECT_TRUE(
agent_x1_->get_session_close() == client_x1_session_close);
} else {
// Sessions flap, triggering GR Helper mode if configured so. Even
// otherwise, in case like 'reboot' we expect existing session to reset
// if new connection formation is attempted from the same name-address
// combination.
TASK_UTIL_EXPECT_TRUE(
agent_x2_->get_session_close() >= client_x2_session_close + 3);
TASK_UTIL_EXPECT_TRUE(
agent_x3_->get_session_close() >= client_x3_session_close + 3);
TASK_UTIL_EXPECT_TRUE(
agent_x1_->get_session_close() >= client_x1_session_close + 3);
}

DestroyAgents();
}

TEST_P(BgpXmppBasicParamTest2, DISABLED_DuplicateEndpointName2) {
TEST_P(BgpXmppBasicParamTest2, DuplicateEndpointName2) {
CreateAgents();

// Bring up one agent with given name.
Expand All @@ -1364,10 +1391,18 @@ TEST_P(BgpXmppBasicParamTest2, DISABLED_DuplicateEndpointName2) {
agent_x2_->SessionUp();

// Make sure that second agent sees sessions getting closed.
TASK_UTIL_EXPECT_TRUE(
agent_x2_->get_session_close() >= client_x2_session_close + 3);
TASK_UTIL_EXPECT_TRUE(
agent_x1_->get_session_close() >= client_x1_session_close);
if (!agent_address_same_) {
TASK_UTIL_EXPECT_TRUE(
agent_x2_->get_session_close() >= client_x2_session_close + 3);
TASK_UTIL_EXPECT_TRUE(
agent_x1_->get_session_close() == client_x1_session_close);

} else {
TASK_UTIL_EXPECT_TRUE(
agent_x2_->get_session_close() >= client_x2_session_close + 3);
TASK_UTIL_EXPECT_TRUE(
agent_x1_->get_session_close() >= client_x1_session_close + 3);
}

// Bring down the first agent and make sure that second comes up.
agent_x1_->SessionDown();
Expand Down
2 changes: 2 additions & 0 deletions src/xmpp/xmpp_connection_manager.h
Expand Up @@ -24,12 +24,14 @@ class XmppConnectionManager : public SslServer {
void EnqueueSession(XmppSession *session);
size_t GetSessionQueueSize() const;
virtual LifetimeActor *deleter() = 0;
tbb::mutex &mutex() const { return mutex_; }

private:
bool DequeueSession(TcpSessionPtr tcp_session);
void WorkQueueExitCallback(bool done);

WorkQueue<TcpSessionPtr> session_queue_;
mutable tbb::mutex mutex_;

DISALLOW_COPY_AND_ASSIGN(XmppConnectionManager);
};
Expand Down
26 changes: 22 additions & 4 deletions src/xmpp/xmpp_state_machine.cc
Expand Up @@ -1366,6 +1366,9 @@ bool XmppStateMachine::PassiveOpen(XmppSession *session) {
// Return true if msg is enqueued for further processing, false otherwise.
void XmppStateMachine::ProcessStreamHeaderMessage(XmppSession *session,
const XmppStanza::XmppMessage *msg) {
XmppConnectionManager *connection_manager =
dynamic_cast<XmppConnectionManager *>(connection_->server());
tbb::mutex::scoped_lock lock(connection_manager->mutex());

// Update "To" information which can be used to map an older session
session->Connection()->SetTo(msg->from);
Expand All @@ -1381,6 +1384,21 @@ void XmppStateMachine::ProcessStreamHeaderMessage(XmppSession *session,
// check if older connection is under graceful-restart.
if (endpoint && endpoint->connection()) {
if (connection_ != endpoint->connection()) {
// Close new connection and retain old connection if the endpoint
// IP addresses do not match.
boost::asio::ip::address addr =
endpoint->connection()->endpoint().address();
if (connection_->endpoint().address() != addr) {
XMPP_WARNING(XmppDeleteConnection,
"Drop new xmpp connection " + session->ToString() +
" as another connection with same name " + msg->from +
" but with different IP address " + addr.to_string() +
" already exists");
ProcessEvent(xmsm::EvTcpClose(session));
delete msg;
return;
}

XmppChannelMux *channel = endpoint->connection()->ChannelMux();

// If GR is not supported, then close all new connections until old
Expand All @@ -1398,16 +1416,16 @@ void XmppStateMachine::ProcessStreamHeaderMessage(XmppSession *session,
if (ready) {
XmppStateMachine *sm =
endpoint->connection()->state_machine();
XMPP_DEBUG(XmppDeleteConnection,
XMPP_NOTICE(XmppDeleteConnection,
"Delete old xmpp connection " +
sm->session()->ToString() +
" as a new connection as been initiated");
sm->Enqueue(xmsm::EvTcpClose(sm->session()));
}

XMPP_DEBUG(XmppDeleteConnection,
"Drop new xmpp connection " + session->ToString() +
" as current connection is still not deleted");
XMPP_NOTICE(XmppDeleteConnection,
"Drop new xmpp connection " + session->ToString() +
" as current connection is still not deleted");
ProcessEvent(xmsm::EvTcpClose(session));
delete msg;
return;
Expand Down

0 comments on commit 77fa8e6

Please sign in to comment.