From 6665029561d40d5b89b7ee9cb2487f6c905b904a Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Sun, 21 Jan 2018 09:43:49 -0800 Subject: [PATCH] remove workSocket call that is too early Make the work socket conditional Revert back the changes Only call workSocket() when there is pending data to read Minor update to a comment Only call workSocket() when there is pending data to read Only call workSocket() when there is pending data to read Fix the CMake build Fix the CMake build Fix the CMake build Use the correct type for ioctlsocket call Make non-blocking peek optional Revert "Make non-blocking peek optional" This reverts commit 501c4401ddb93cf9bf451dd289d4c3768c0a7b15. Review fixes Review fixes Review fixes --- build/cmake/ConfigureChecks.cmake | 3 ++- build/cmake/config.h.in | 7 ++++-- configure.ac | 1 + .../src/thrift/server/TNonblockingServer.cpp | 23 +++++++++++-------- lib/cpp/src/thrift/transport/PlatformSocket.h | 4 ++++ lib/cpp/src/thrift/transport/TSSLSocket.cpp | 11 +++++++++ lib/cpp/src/thrift/transport/TSSLSocket.h | 1 + lib/cpp/src/thrift/transport/TSocket.cpp | 23 +++++++++++++++++++ lib/cpp/src/thrift/transport/TSocket.h | 15 +++++++++++- 9 files changed, 74 insertions(+), 14 deletions(-) diff --git a/build/cmake/ConfigureChecks.cmake b/build/cmake/ConfigureChecks.cmake index 12a50df910d..e4793d41eb8 100644 --- a/build/cmake/ConfigureChecks.cmake +++ b/build/cmake/ConfigureChecks.cmake @@ -37,11 +37,12 @@ check_include_file(netinet/in.h HAVE_NETINET_IN_H) check_include_file(stdint.h HAVE_STDINT_H) check_include_file(unistd.h HAVE_UNISTD_H) check_include_file(pthread.h HAVE_PTHREAD_H) -check_include_file(sys/time.h HAVE_SYS_TIME_H) +check_include_file(sys/ioctl.h HAVE_SYS_IOCTL_H) check_include_file(sys/param.h HAVE_SYS_PARAM_H) check_include_file(sys/resource.h HAVE_SYS_RESOURCE_H) check_include_file(sys/socket.h HAVE_SYS_SOCKET_H) check_include_file(sys/stat.h HAVE_SYS_STAT_H) +check_include_file(sys/time.h HAVE_SYS_TIME_H) check_include_file(sys/un.h HAVE_SYS_UN_H) check_include_file(sys/poll.h HAVE_SYS_POLL_H) check_include_file(sys/select.h HAVE_SYS_SELECT_H) diff --git a/build/cmake/config.h.in b/build/cmake/config.h.in index 083bc55ec5e..21561b2659a 100644 --- a/build/cmake/config.h.in +++ b/build/cmake/config.h.in @@ -100,8 +100,8 @@ /* Define to 1 if you have the header file. */ #cmakedefine HAVE_PTHREAD_H 1 -/* Define to 1 if you have the header file. */ -#cmakedefine HAVE_SYS_TIME_H 1 +/* Define to 1 if you have the header file. */ +#cmakedefine HAVE_SYS_IOCTL_H 1 /* Define to 1 if you have the header file. */ #cmakedefine HAVE_SYS_PARAM_H 1 @@ -124,6 +124,9 @@ /* Define to 1 if you have the header file. */ #cmakedefine HAVE_SYS_SELECT_H 1 +/* Define to 1 if you have the header file. */ +#cmakedefine HAVE_SYS_TIME_H 1 + /* Define to 1 if you have the header file. */ #cmakedefine HAVE_SCHED_H 1 diff --git a/configure.ac b/configure.ac index bbf53d3b538..64c7d6dabf8 100755 --- a/configure.ac +++ b/configure.ac @@ -623,6 +623,7 @@ AC_CHECK_HEADERS([netinet/in.h]) AC_CHECK_HEADERS([pthread.h]) AC_CHECK_HEADERS([stddef.h]) AC_CHECK_HEADERS([stdlib.h]) +AC_CHECK_HEADERS([sys/ioctl.h]) AC_CHECK_HEADERS([sys/socket.h]) AC_CHECK_HEADERS([sys/time.h]) AC_CHECK_HEADERS([sys/un.h]) diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index d17f77c1922..e60bffcafa5 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -472,6 +472,18 @@ void TNonblockingServer::TConnection::workSocket() { } // size known; now get the rest of the frame transition(); + + // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for + // regular sockets, because if there is more data, libevent will fire the event handler registered for read + // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the + // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In + // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket, + // despite having more data. + if (tSocket_->hasPendingDataToRead()) + { + workSocket(); + } + return; case SOCKET_RECV: @@ -677,9 +689,6 @@ void TNonblockingServer::TConnection::transition() { appState_ = APP_SEND_RESULT; setWrite(); - // Try to work the socket immediately - // workSocket(); - return; } @@ -718,9 +727,6 @@ void TNonblockingServer::TConnection::transition() { // Register read event setRead(); - // Try to work the socket right away - // workSocket(); - return; case APP_READ_FRAME_SIZE: @@ -753,9 +759,6 @@ void TNonblockingServer::TConnection::transition() { socketState_ = SOCKET_RECV; appState_ = APP_READ_REQUEST; - // Work the socket right away - workSocket(); - return; case APP_CLOSE_CONNECTION: @@ -1063,7 +1066,7 @@ void TNonblockingServer::expireClose(stdcxx::shared_ptr task) { connection->forceClose(); } -void TNonblockingServer::stop() { +void TNonblockingServer::stop() { // Breaks the event loop in all threads so that they end ASAP. for (uint32_t i = 0; i < ioThreads_.size(); ++i) { ioThreads_[i]->stop(); diff --git a/lib/cpp/src/thrift/transport/PlatformSocket.h b/lib/cpp/src/thrift/transport/PlatformSocket.h index 1890b607de4..95910580613 100644 --- a/lib/cpp/src/thrift/transport/PlatformSocket.h +++ b/lib/cpp/src/thrift/transport/PlatformSocket.h @@ -51,6 +51,8 @@ # define THRIFT_LSEEK _lseek # define THRIFT_WRITE _write # define THRIFT_READ _read +# define THRIFT_IOCTL_SOCKET ioctlsocket +# define THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE u_long # define THRIFT_FSTAT _fstat # define THRIFT_STAT _stat # ifdef _WIN32_WCE @@ -111,6 +113,8 @@ # define THRIFT_LSEEK lseek # define THRIFT_WRITE write # define THRIFT_READ read +# define THRIFT_IOCTL_SOCKET ioctl +# define THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE int # define THRIFT_STAT stat # define THRIFT_FSTAT fstat # define THRIFT_GAI_STRERROR gai_strerror diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp index 3f0e28ed8c9..7bdacb0150c 100644 --- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp +++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp @@ -249,6 +249,17 @@ TSSLSocket::~TSSLSocket() { close(); } +bool TSSLSocket::hasPendingDataToRead() { + if (!isOpen()) { + return false; + } + initializeHandshake(); + if (!checkHandshake()) + throw TSSLException("TSSLSocket::hasPendingDataToRead: Handshake is not completed"); + // data may be available in SSL buffers (note: SSL_pending does not have a failure mode) + return SSL_pending(ssl_) > 0 || TSocket::hasPendingDataToRead(); +} + void TSSLSocket::init() { handshakeCompleted_ = false; readRetryCount_ = 0; diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.h b/lib/cpp/src/thrift/transport/TSSLSocket.h index 852720930b4..ec30cc1491f 100644 --- a/lib/cpp/src/thrift/transport/TSSLSocket.h +++ b/lib/cpp/src/thrift/transport/TSSLSocket.h @@ -78,6 +78,7 @@ class TSSLSocket : public TSocket { bool peek(); void open(); void close(); + bool hasPendingDataToRead(); uint32_t read(uint8_t* buf, uint32_t len); void write(const uint8_t* buf, uint32_t len); uint32_t write_partial(const uint8_t* buf, uint32_t len); diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp index d93d0ffd401..c90593d75a8 100644 --- a/lib/cpp/src/thrift/transport/TSocket.cpp +++ b/lib/cpp/src/thrift/transport/TSocket.cpp @@ -21,6 +21,9 @@ #include #include +#ifdef HAVE_SYS_IOCTL_H +#include +#endif #ifdef HAVE_SYS_SOCKET_H #include #endif @@ -167,6 +170,26 @@ TSocket::~TSocket() { close(); } +bool TSocket::hasPendingDataToRead() { + if (!isOpen()) { + return false; + } + + int32_t retries = 0; + THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE numBytesAvailable; +try_again: + int r = THRIFT_IOCTL_SOCKET(socket_, FIONREAD, &numBytesAvailable); + if (r == -1) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) { + goto try_again; + } + GlobalOutput.perror("TSocket::hasPendingDataToRead() THRIFT_IOCTL_SOCKET() " + getSocketInfo(), errno_copy); + throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy); + } + return numBytesAvailable > 0; +} + bool TSocket::isOpen() { return (socket_ != THRIFT_INVALID_SOCKET); } diff --git a/lib/cpp/src/thrift/transport/TSocket.h b/lib/cpp/src/thrift/transport/TSocket.h index 1f95e68bd95..66d9e6cd37d 100644 --- a/lib/cpp/src/thrift/transport/TSocket.h +++ b/lib/cpp/src/thrift/transport/TSocket.h @@ -84,7 +84,9 @@ class TSocket : public TVirtualTransport { virtual bool isOpen(); /** - * Calls select on the socket to see if there is more data available. + * Checks whether there is more data available in the socket to read. + * + * This call blocks until at least one byte is available or the socket is closed. */ virtual bool peek(); @@ -100,6 +102,17 @@ class TSocket : public TVirtualTransport { */ virtual void close(); + /** + * Determines whether there is pending data to read or not. + * + * This call does not block. + * \throws TTransportException of types: + * NOT_OPEN means the socket has been closed + * UNKNOWN means something unexpected happened + * \returns true if there is pending data to read, false otherwise + */ + virtual bool hasPendingDataToRead(); + /** * Reads from the underlying socket. * \returns the number of bytes read or 0 indicates EOF