From ea42079c9ee7f0c371b3bb5cc85af4be36bef7cc Mon Sep 17 00:00:00 2001 From: Daniel Nachbaur Date: Tue, 31 May 2016 17:51:49 +0200 Subject: [PATCH] Fix #157 --- doc/Changelog.md | 5 +++- tests/http/server.cpp | 46 ++++++++++++++++++++++++------- zeroeq/connection/broker.cpp | 2 +- zeroeq/connection/broker.h | 2 +- zeroeq/http/server.cpp | 53 ++++++++++++++++++++++++++---------- zeroeq/http/server.h | 2 +- zeroeq/publisher.cpp | 1 - zeroeq/receiver.cpp | 2 +- zeroeq/receiver.h | 3 +- zeroeq/subscriber.cpp | 5 ++-- zeroeq/subscriber.h | 2 +- 11 files changed, 89 insertions(+), 34 deletions(-) diff --git a/doc/Changelog.md b/doc/Changelog.md index ee26854..1f34b93 100644 --- a/doc/Changelog.md +++ b/doc/Changelog.md @@ -2,11 +2,14 @@ # git master +* [169](https://github.com/HBPVIS/ZeroEQ/pull/169): + Fix #157: http server may hang in receive() when remote connection is closed + abruptly * [167](https://github.com/HBPVIS/ZeroEQ/pull/167): Move HBP vocabulary to Lexis * [166](https://github.com/HBPVIS/ZeroEQ/pull/166): Implement event-based communication as per - [152](https://github.com/HBPVIS/ZeroEQ/pull/152) + [152](https://github.com/HBPVIS/ZeroEQ/pull/152) * [161](https://github.com/HBPVIS/ZeroEQ/pull/161): Move progressMonitor tool to Lexis * [145](https://github.com/HBPVIS/ZeroEQ/pull/145): diff --git a/tests/http/server.cpp b/tests/http/server.cpp index 5168815..ac1f1bc 100644 --- a/tests/http/server.cpp +++ b/tests/http/server.cpp @@ -71,6 +71,10 @@ class Client { if( ::zmq_connect( _socket, std::to_string( uri ).c_str( )) == -1 ) throw std::runtime_error( "Connect failed" ); + + // Get server identity + BOOST_CHECK_EQUAL( zmq_getsockopt( _socket, ZMQ_IDENTITY, _id, + &_idSize ), 0 ); } ~Client() @@ -81,26 +85,25 @@ class Client ::zmq_ctx_destroy( _ctx ); } - void test( const std::string& request, const std::string& expected, - const int line ) + void sendRequest( const std::string& request ) { - // Get server identity - uint8_t id[256]; - size_t idSize = sizeof( id ); - BOOST_CHECK_EQUAL( zmq_getsockopt( _socket, ZMQ_IDENTITY, id, &idSize ), - 0 ); - - if( ::zmq_send( _socket, id, idSize, ZMQ_SNDMORE ) != int( idSize ) || + if( ::zmq_send( _socket, _id, _idSize, ZMQ_SNDMORE ) != int(_idSize) || ::zmq_send( _socket, request.c_str(), request.length(), 0 ) != int( request.length( ))) { throw std::runtime_error( "Send failed" ); } + } + + void test( const std::string& request, const std::string& expected, + const int line ) + { + sendRequest( request ); std::string response; while( response.size() < expected.size( )) { - if( ::zmq_recv( _socket, id, idSize, 0 ) != int( idSize )) + if( ::zmq_recv( _socket, _id, _idSize, 0 ) != int( _idSize )) throw std::runtime_error( "Recv failed" ); char msg[256]; @@ -117,6 +120,9 @@ class Client private: void* _ctx; void* _socket; + + uint8_t _id[256]; + size_t _idSize = sizeof( _id ); }; } @@ -439,6 +445,26 @@ BOOST_AUTO_TEST_CASE(largeGet) thread.join(); } +BOOST_AUTO_TEST_CASE(issue157) +{ + bool running = true; + zeroeq::http::Server server; + Foo foo; + server.register_( foo ); + + std::thread thread( [&]() { while( running ) server.receive( 100 ); }); + + // Close client before receiving request to provoke #157 + { + Client client( server.getURI( )); + client.sendRequest( "GET" + std::string( 4096, ' ' ) + + "/test/Foo HTTP/1.0\r\n\r\n" ); + } + + running = false; + thread.join(); +} + BOOST_AUTO_TEST_CASE(garbage) { bool running = true; diff --git a/zeroeq/connection/broker.cpp b/zeroeq/connection/broker.cpp index 153c6ab..5ed2c74 100644 --- a/zeroeq/connection/broker.cpp +++ b/zeroeq/connection/broker.cpp @@ -122,7 +122,7 @@ void Broker::addSockets( std::vector< zeroeq::detail::Socket >& entries ) _impl->addSockets( entries ); } -void Broker::process( zeroeq::detail::Socket& socket ) +void Broker::process( zeroeq::detail::Socket& socket, const uint32_t ) { _impl->process( socket ); } diff --git a/zeroeq/connection/broker.h b/zeroeq/connection/broker.h index 97af236..6572380 100644 --- a/zeroeq/connection/broker.h +++ b/zeroeq/connection/broker.h @@ -90,7 +90,7 @@ class Broker : public Receiver // Receiver API void addSockets( std::vector< zeroeq::detail::Socket >& entries ) final; - void process( zeroeq::detail::Socket& socket ) final; + void process( zeroeq::detail::Socket& socket, uint32_t timeout ) final; void addConnection( const std::string& ) final { ZEROEQDONTCALL; } // LCOV_EXCL_LINE }; diff --git a/zeroeq/http/server.cpp b/zeroeq/http/server.cpp index 5c7567e..fd2a31d 100644 --- a/zeroeq/http/server.cpp +++ b/zeroeq/http/server.cpp @@ -130,7 +130,7 @@ class Server::Impl : public detail::Sender entries.push_back( entry ); } - void process( detail::Socket& ) + void process( const uint32_t timeout ) { // Read request and body httpxx::BufferedRequest request; @@ -213,25 +213,50 @@ class Server::Impl : public detail::Sender response.headers()[ "Access-Control-Allow-Methods" ] = access_control_allow_methods; const std::string& rep = response.to_string(); - const int more = body.empty() ? 0 : ZMQ_SNDMORE; - if( ::zmq_send( socket, id, idSize, ZMQ_SNDMORE ) != idSize || - ::zmq_send( socket, rep.c_str(), rep.length(), more ) != - int( rep.length( ))) + + if( !sendResponse( id, idSize, ZMQ_SNDMORE, timeout )) + return; + if( !sendResponse( rep.c_str(), rep.length(), + body.empty() ? 0 : ZMQ_SNDMORE, timeout )) { - ZEROEQWARN << "Could not send HTTP response header: " - << zmq_strerror( zmq_errno( )) << std::endl; return; } // response body - if( !body.empty() && - ( ::zmq_send( socket, id, idSize, ZMQ_SNDMORE ) != idSize || - ::zmq_send( socket, body.c_str(), body.length(), 0 ) != - int( body.length( )))) + if( !body.empty( )) { - ZEROEQWARN << "Could not send HTTP response body: " + if( !sendResponse( id, idSize, ZMQ_SNDMORE, timeout )) + return; + if( !sendResponse( body.c_str(), body.length(), 0, timeout )) + return; + } + } + + bool sendResponse( const void* data, const size_t length, const int flags, + const uint32_t timeout ) + { + while( ::zmq_send( socket, data, length, + flags | ZMQ_NOBLOCK ) != int( length )) + { + // could be disconnect, send buffer full, ... + if( zmq_errno() == EAGAIN ) + { + detail::Socket pollItem; + pollItem.socket = socket; + pollItem.events = ZMQ_POLLERR; + if( ::zmq_poll( &pollItem, 1, timeout == TIMEOUT_INDEFINITE + ? -1 : timeout ) > 0 ) + { + // client still alive, send again + continue; + } + } + + ZEROEQWARN << "HTTP server sendResponse failed: " << zmq_strerror( zmq_errno( )) << std::endl; + return false; } + return true; } protected: @@ -419,9 +444,9 @@ void Server::addSockets( std::vector< detail::Socket >& entries ) _impl->addSockets( entries ); } -void Server::process( detail::Socket& socket ) +void Server::process( detail::Socket&, const uint32_t timeout ) { - _impl->process( socket ); + _impl->process( timeout ); } } diff --git a/zeroeq/http/server.h b/zeroeq/http/server.h index cbdebb0..9865f06 100644 --- a/zeroeq/http/server.h +++ b/zeroeq/http/server.h @@ -182,7 +182,7 @@ class Server : public zeroeq::Receiver // Receiver API void addSockets( std::vector< detail::Socket >& entries ) final; - void process( detail::Socket& socket ) final; + void process( detail::Socket& socket, uint32_t timeout ) final; void addConnection( const std::string& ) final { ZEROEQDONTCALL; } // LCOV_EXCL_LINE }; } diff --git a/zeroeq/publisher.cpp b/zeroeq/publisher.cpp index d385872..4e9f4d3 100644 --- a/zeroeq/publisher.cpp +++ b/zeroeq/publisher.cpp @@ -5,7 +5,6 @@ */ #include "publisher.h" -#include "fbevent.h" #include "log.h" #include "detail/broker.h" #include "detail/byteswap.h" diff --git a/zeroeq/receiver.cpp b/zeroeq/receiver.cpp index 7e270c3..68b3466 100644 --- a/zeroeq/receiver.cpp +++ b/zeroeq/receiver.cpp @@ -136,7 +136,7 @@ class Receiver } if( socket.revents & ZMQ_POLLIN ) - (*i)->process( socket ); + (*i)->process( socket, timeout ); } return true; } diff --git a/zeroeq/receiver.h b/zeroeq/receiver.h index 17d73c7..d83ee5c 100644 --- a/zeroeq/receiver.h +++ b/zeroeq/receiver.h @@ -71,8 +71,9 @@ class Receiver * Process data on a signalled socket. * * @param socket the socket provided from addSockets(). + * @param timeout user provided timeout from receive(). */ - virtual void process( detail::Socket& socket ) = 0; + virtual void process( detail::Socket& socket, uint32_t timeout ) = 0; /** * Update the internal connection list. diff --git a/zeroeq/subscriber.cpp b/zeroeq/subscriber.cpp index 7bd6c5e..ca71c70 100644 --- a/zeroeq/subscriber.cpp +++ b/zeroeq/subscriber.cpp @@ -154,7 +154,8 @@ class Subscriber::Impl EventFuncMap::const_iterator i = _eventFuncs.find( type ); if( i == _eventFuncs.cend( )) - ZEROEQTHROW( std::runtime_error( "Got unsubscribed event " + type.getString( ))); + ZEROEQTHROW( std::runtime_error( "Got unsubscribed event " + + type.getString( ))); if( payload ) { @@ -381,7 +382,7 @@ void Subscriber::addSockets( std::vector< detail::Socket >& entries ) _impl->addSockets( entries ); } -void Subscriber::process( detail::Socket& socket ) +void Subscriber::process( detail::Socket& socket, const uint32_t ) { _impl->process( socket ); } diff --git a/zeroeq/subscriber.h b/zeroeq/subscriber.h index c40d4bd..d67ee91 100644 --- a/zeroeq/subscriber.h +++ b/zeroeq/subscriber.h @@ -191,7 +191,7 @@ class Subscriber : public Receiver // Receiver API void addSockets( std::vector< detail::Socket >& entries ) final; - void process( detail::Socket& socket ) final; + void process( detail::Socket& socket, uint32_t timeout ) final; void update() final; void addConnection( const std::string& uri ) final; };