From b44a2ddc9cc60b0e841287c29f5cc90a2516496d Mon Sep 17 00:00:00 2001 From: Nobuaki Sukegawa Date: Sun, 7 Dec 2014 21:36:51 +0900 Subject: [PATCH] THRIFT-2838 TNonblockingServer can bind to port 0 (i.e., get an OS-assigned port) but there is no way to get the port number --- .gitignore | 1 + .../src/thrift/server/TNonblockingServer.cpp | 15 +- .../src/thrift/server/TNonblockingServer.h | 8 +- lib/cpp/test/Makefile.am | 13 ++ lib/cpp/test/TNonblockingServerTest.cpp | 132 ++++++++++++++++++ 5 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 lib/cpp/test/TNonblockingServerTest.cpp diff --git a/.gitignore b/.gitignore index f99691aa419..1651102bcdf 100644 --- a/.gitignore +++ b/.gitignore @@ -94,6 +94,7 @@ test-driver /lib/cpp/test/RecursiveTest /lib/cpp/test/TFDTransportTest /lib/cpp/test/TFileTransportTest +/lib/cpp/test/TNonblockingServerTest /lib/cpp/test/TPipedTransportTest /lib/cpp/test/TransportTest /lib/cpp/test/UnitTests diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index 9833f10e31a..a952bf0bfba 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -1099,6 +1099,16 @@ void TNonblockingServer::listenSocket(THRIFT_SOCKET s) { // Cool, this socket is good to go, set it as the serverSocket_ serverSocket_ = s; + + if (!port_) { + sockaddr_in addr; + unsigned int size = sizeof(addr); + if (!getsockname(serverSocket_, reinterpret_cast(&addr), &size)) { + listenPort_ = ntohs(addr.sin_port); + } else { + GlobalOutput.perror("TNonblocking: failed to get listen port: ", THRIFT_GET_SOCKET_ERROR); + } + } } void TNonblockingServer::setThreadManager(boost::shared_ptr threadManager) { @@ -1157,6 +1167,9 @@ void TNonblockingServer::expireClose(boost::shared_ptr task) { } void TNonblockingServer::stop() { + if (!port_) { + listenPort_ = 0; + } // Breaks the event loop in all threads so that they end ASAP. for (uint32_t i = 0; i < ioThreads_.size(); ++i) { ioThreads_[i]->stop(); @@ -1196,7 +1209,7 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) { assert(ioThreads_.size() > 0); GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.", - port_, + listenPort_, ioThreads_.size()); // Launch all the secondary IO threads in separate threads diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h index 3edc795f169..4f2348797e5 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.h +++ b/lib/cpp/src/thrift/server/TNonblockingServer.h @@ -155,9 +155,12 @@ class TNonblockingServer : public TServer { /// Server socket file descriptor THRIFT_SOCKET serverSocket_; - /// Port server runs on + /// Port server runs on. Zero when letting OS decide actual port int port_; + /// Port server actually runs on + int listenPort_; + /// The optional user-provided event-base (for single-thread servers) event_base* userEventBase_; @@ -280,6 +283,7 @@ class TNonblockingServer : public TServer { nextIOThread_ = 0; useHighPriorityIOThreads_ = false; port_ = port; + listenPort_ = port; userEventBase_ = NULL; threadPoolProcessing_ = false; numTConnections_ = 0; @@ -395,6 +399,8 @@ class TNonblockingServer : public TServer { void setThreadManager(boost::shared_ptr threadManager); + int getListenPort() { return listenPort_; } + boost::shared_ptr getThreadManager() { return threadManager_; } /** diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am index 9111201f929..9ab6ba4ede5 100755 --- a/lib/cpp/test/Makefile.am +++ b/lib/cpp/test/Makefile.am @@ -68,6 +68,7 @@ check_PROGRAMS = \ RecursiveTest \ SpecializationTest \ AllProtocolsTest \ + TNonblockingServerTest \ TransportTest \ ZlibTest \ TFileTransportTest \ @@ -179,6 +180,18 @@ JSONProtoTest_SOURCES = \ JSONProtoTest_LDADD = libtestgencpp.la +# +# TNonblockingServerTest +# +TNonblockingServerTest_SOURCES = TNonblockingServerTest.cpp + +TNonblockingServerTest_LDADD = libprocessortest.la \ + $(top_builddir)/lib/cpp/libthrift.la \ + $(top_builddir)/lib/cpp/libthriftnb.la \ + $(BOOST_LDFLAGS) \ + -levent \ + -l:libboost_unit_test_framework.a + # # OptionalRequiredTest # diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp new file mode 100644 index 00000000000..b6c41667b8f --- /dev/null +++ b/lib/cpp/test/TNonblockingServerTest.cpp @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#define BOOST_TEST_MODULE TNonblockingServerTest +#include +#include + +#include "thrift/concurrency/Thread.h" +#include "thrift/server/TNonblockingServer.h" + +#include "gen-cpp/ParentService.h" + +using namespace apache::thrift; + +struct Handler : public test::ParentServiceIf { + void addString(const std::string& s) { strings_.push_back(s); } + void getStrings(std::vector& _return) { _return = strings_; } + std::vector strings_; + + // dummy overrides not used in this test + int32_t incrementGeneration() { return 0; } + int32_t getGeneration() { return 0; } + void getDataWait(std::string&, int32_t) {} + void onewayWait() {} + void exceptionWait(const std::string&) {} + void unexpectedExceptionWait(const std::string&) {} +}; + +class Fixture { +private: + struct Runner : public concurrency::Runnable { + boost::shared_ptr server; + bool error; + virtual void run() { + error = false; + try { + server->serve(); + } catch (const TException& x) { + error = true; + } + } + }; + +protected: + Fixture() : processor(new test::ParentServiceProcessor(boost::make_shared())) {} + + int startServer(int port) { + boost::scoped_ptr threadFactory( + new concurrency::PlatformThreadFactory( +#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD) + concurrency::PlatformThreadFactory::OTHER, + concurrency::PlatformThreadFactory::NORMAL, + 1, +#endif + true)); + + int retry_count = port ? 10 : 0; + for (int p = port; p <= port + retry_count; p++) { + server.reset(new server::TNonblockingServer(processor, p)); + boost::shared_ptr runner(new Runner); + runner->server = server; + thread = threadFactory->newThread(runner); + thread->start(); + // wait 50ms for the server to begin listening + THRIFT_SLEEP_USEC(50000); + if (!runner->error) { + return p; + } + } + throw transport::TTransportException(transport::TTransportException::NOT_OPEN, + "Failed to start server."); + } + + bool canCommunicate(int serverPort) { + boost::shared_ptr socket(new transport::TSocket("localhost", serverPort)); + socket->open(); + test::ParentServiceClient client(boost::make_shared( + boost::make_shared(socket))); + client.addString("foo"); + std::vector strings; + client.getStrings(strings); + return strings.size() == 1 && !(strings[0].compare("foo")); + } + +private: + boost::shared_ptr processor; + boost::shared_ptr thread; + +protected: + boost::shared_ptr server; +}; + +BOOST_AUTO_TEST_SUITE(TNonblockingServerTest) + +BOOST_FIXTURE_TEST_CASE(get_specified_port, Fixture) { + int specified_port = startServer(12345); + BOOST_REQUIRE_GE(specified_port, 12345); + BOOST_REQUIRE_EQUAL(server->getListenPort(), specified_port); + BOOST_CHECK(canCommunicate(specified_port)); + + server->stop(); + BOOST_CHECK_EQUAL(server->getListenPort(), specified_port); +} + +BOOST_FIXTURE_TEST_CASE(get_assigned_port, Fixture) { + int specified_port = startServer(0); + BOOST_REQUIRE_EQUAL(specified_port, 0); + int assigned_port = server->getListenPort(); + BOOST_REQUIRE_NE(assigned_port, 0); + BOOST_CHECK(canCommunicate(assigned_port)); + + server->stop(); + BOOST_CHECK_EQUAL(server->getListenPort(), 0); +} + +BOOST_AUTO_TEST_SUITE_END()