Skip to content

Commit

Permalink
Merge pull request #169 from tribal-tec/157
Browse files Browse the repository at this point in the history
Fix #157
  • Loading branch information
tribal-tec committed Jun 1, 2016
2 parents bf5f68b + ea42079 commit aab499d
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 34 deletions.
5 changes: 4 additions & 1 deletion doc/Changelog.md
Expand Up @@ -2,11 +2,14 @@

# git master

* [169](https://github.com/HBPVIS/ZeroEQ/pull/169):
Fix #157: http server may hang in receive() when remote connection is closed
abruptly
* [167](https://github.com/HBPVIS/ZeroEQ/pull/167):
Move HBP vocabulary to Lexis
* [166](https://github.com/HBPVIS/ZeroEQ/pull/166):
Implement event-based communication as per
[152](https://github.com/HBPVIS/ZeroEQ/pull/152)
[152](https://github.com/HBPVIS/ZeroEQ/pull/152)
* [161](https://github.com/HBPVIS/ZeroEQ/pull/161):
Move progressMonitor tool to Lexis
* [145](https://github.com/HBPVIS/ZeroEQ/pull/145):
Expand Down
46 changes: 36 additions & 10 deletions tests/http/server.cpp
Expand Up @@ -71,6 +71,10 @@ class Client
{
if( ::zmq_connect( _socket, std::to_string( uri ).c_str( )) == -1 )
throw std::runtime_error( "Connect failed" );

// Get server identity
BOOST_CHECK_EQUAL( zmq_getsockopt( _socket, ZMQ_IDENTITY, _id,
&_idSize ), 0 );
}

~Client()
Expand All @@ -81,26 +85,25 @@ class Client
::zmq_ctx_destroy( _ctx );
}

void test( const std::string& request, const std::string& expected,
const int line )
void sendRequest( const std::string& request )
{
// Get server identity
uint8_t id[256];
size_t idSize = sizeof( id );
BOOST_CHECK_EQUAL( zmq_getsockopt( _socket, ZMQ_IDENTITY, id, &idSize ),
0 );

if( ::zmq_send( _socket, id, idSize, ZMQ_SNDMORE ) != int( idSize ) ||
if( ::zmq_send( _socket, _id, _idSize, ZMQ_SNDMORE ) != int(_idSize) ||
::zmq_send( _socket, request.c_str(), request.length(), 0 ) !=
int( request.length( )))
{
throw std::runtime_error( "Send failed" );
}
}

void test( const std::string& request, const std::string& expected,
const int line )
{
sendRequest( request );

std::string response;
while( response.size() < expected.size( ))
{
if( ::zmq_recv( _socket, id, idSize, 0 ) != int( idSize ))
if( ::zmq_recv( _socket, _id, _idSize, 0 ) != int( _idSize ))
throw std::runtime_error( "Recv failed" );

char msg[256];
Expand All @@ -117,6 +120,9 @@ class Client
private:
void* _ctx;
void* _socket;

uint8_t _id[256];
size_t _idSize = sizeof( _id );
};

}
Expand Down Expand Up @@ -439,6 +445,26 @@ BOOST_AUTO_TEST_CASE(largeGet)
thread.join();
}

BOOST_AUTO_TEST_CASE(issue157)
{
bool running = true;
zeroeq::http::Server server;
Foo foo;
server.register_( foo );

std::thread thread( [&]() { while( running ) server.receive( 100 ); });

// Close client before receiving request to provoke #157
{
Client client( server.getURI( ));
client.sendRequest( "GET" + std::string( 4096, ' ' ) +
"/test/Foo HTTP/1.0\r\n\r\n" );
}

running = false;
thread.join();
}

BOOST_AUTO_TEST_CASE(garbage)
{
bool running = true;
Expand Down
2 changes: 1 addition & 1 deletion zeroeq/connection/broker.cpp
Expand Up @@ -122,7 +122,7 @@ void Broker::addSockets( std::vector< zeroeq::detail::Socket >& entries )
_impl->addSockets( entries );
}

void Broker::process( zeroeq::detail::Socket& socket )
void Broker::process( zeroeq::detail::Socket& socket, const uint32_t )
{
_impl->process( socket );
}
Expand Down
2 changes: 1 addition & 1 deletion zeroeq/connection/broker.h
Expand Up @@ -90,7 +90,7 @@ class Broker : public Receiver

// Receiver API
void addSockets( std::vector< zeroeq::detail::Socket >& entries ) final;
void process( zeroeq::detail::Socket& socket ) final;
void process( zeroeq::detail::Socket& socket, uint32_t timeout ) final;
void addConnection( const std::string& ) final { ZEROEQDONTCALL; } // LCOV_EXCL_LINE
};

Expand Down
53 changes: 39 additions & 14 deletions zeroeq/http/server.cpp
Expand Up @@ -130,7 +130,7 @@ class Server::Impl : public detail::Sender
entries.push_back( entry );
}

void process( detail::Socket& )
void process( const uint32_t timeout )
{
// Read request and body
httpxx::BufferedRequest request;
Expand Down Expand Up @@ -213,25 +213,50 @@ class Server::Impl : public detail::Sender
response.headers()[ "Access-Control-Allow-Methods" ] =
access_control_allow_methods;
const std::string& rep = response.to_string();
const int more = body.empty() ? 0 : ZMQ_SNDMORE;
if( ::zmq_send( socket, id, idSize, ZMQ_SNDMORE ) != idSize ||
::zmq_send( socket, rep.c_str(), rep.length(), more ) !=
int( rep.length( )))

if( !sendResponse( id, idSize, ZMQ_SNDMORE, timeout ))
return;
if( !sendResponse( rep.c_str(), rep.length(),
body.empty() ? 0 : ZMQ_SNDMORE, timeout ))
{
ZEROEQWARN << "Could not send HTTP response header: "
<< zmq_strerror( zmq_errno( )) << std::endl;
return;
}

// response body
if( !body.empty() &&
( ::zmq_send( socket, id, idSize, ZMQ_SNDMORE ) != idSize ||
::zmq_send( socket, body.c_str(), body.length(), 0 ) !=
int( body.length( ))))
if( !body.empty( ))
{
ZEROEQWARN << "Could not send HTTP response body: "
if( !sendResponse( id, idSize, ZMQ_SNDMORE, timeout ))
return;
if( !sendResponse( body.c_str(), body.length(), 0, timeout ))
return;
}
}

bool sendResponse( const void* data, const size_t length, const int flags,
const uint32_t timeout )
{
while( ::zmq_send( socket, data, length,
flags | ZMQ_NOBLOCK ) != int( length ))
{
// could be disconnect, send buffer full, ...
if( zmq_errno() == EAGAIN )
{
detail::Socket pollItem;
pollItem.socket = socket;
pollItem.events = ZMQ_POLLERR;
if( ::zmq_poll( &pollItem, 1, timeout == TIMEOUT_INDEFINITE
? -1 : timeout ) > 0 )
{
// client still alive, send again
continue;
}
}

ZEROEQWARN << "HTTP server sendResponse failed: "
<< zmq_strerror( zmq_errno( )) << std::endl;
return false;
}
return true;
}

protected:
Expand Down Expand Up @@ -419,9 +444,9 @@ void Server::addSockets( std::vector< detail::Socket >& entries )
_impl->addSockets( entries );
}

void Server::process( detail::Socket& socket )
void Server::process( detail::Socket&, const uint32_t timeout )
{
_impl->process( socket );
_impl->process( timeout );
}

}
Expand Down
2 changes: 1 addition & 1 deletion zeroeq/http/server.h
Expand Up @@ -182,7 +182,7 @@ class Server : public zeroeq::Receiver

// Receiver API
void addSockets( std::vector< detail::Socket >& entries ) final;
void process( detail::Socket& socket ) final;
void process( detail::Socket& socket, uint32_t timeout ) final;
void addConnection( const std::string& ) final { ZEROEQDONTCALL; } // LCOV_EXCL_LINE
};
}
Expand Down
1 change: 0 additions & 1 deletion zeroeq/publisher.cpp
Expand Up @@ -5,7 +5,6 @@
*/

#include "publisher.h"
#include "fbevent.h"
#include "log.h"
#include "detail/broker.h"
#include "detail/byteswap.h"
Expand Down
2 changes: 1 addition & 1 deletion zeroeq/receiver.cpp
Expand Up @@ -136,7 +136,7 @@ class Receiver
}

if( socket.revents & ZMQ_POLLIN )
(*i)->process( socket );
(*i)->process( socket, timeout );
}
return true;
}
Expand Down
3 changes: 2 additions & 1 deletion zeroeq/receiver.h
Expand Up @@ -71,8 +71,9 @@ class Receiver
* Process data on a signalled socket.
*
* @param socket the socket provided from addSockets().
* @param timeout user provided timeout from receive().
*/
virtual void process( detail::Socket& socket ) = 0;
virtual void process( detail::Socket& socket, uint32_t timeout ) = 0;

/**
* Update the internal connection list.
Expand Down
5 changes: 3 additions & 2 deletions zeroeq/subscriber.cpp
Expand Up @@ -154,7 +154,8 @@ class Subscriber::Impl

EventFuncMap::const_iterator i = _eventFuncs.find( type );
if( i == _eventFuncs.cend( ))
ZEROEQTHROW( std::runtime_error( "Got unsubscribed event " + type.getString( )));
ZEROEQTHROW( std::runtime_error( "Got unsubscribed event " +
type.getString( )));

if( payload )
{
Expand Down Expand Up @@ -381,7 +382,7 @@ void Subscriber::addSockets( std::vector< detail::Socket >& entries )
_impl->addSockets( entries );
}

void Subscriber::process( detail::Socket& socket )
void Subscriber::process( detail::Socket& socket, const uint32_t )
{
_impl->process( socket );
}
Expand Down
2 changes: 1 addition & 1 deletion zeroeq/subscriber.h
Expand Up @@ -191,7 +191,7 @@ class Subscriber : public Receiver

// Receiver API
void addSockets( std::vector< detail::Socket >& entries ) final;
void process( detail::Socket& socket ) final;
void process( detail::Socket& socket, uint32_t timeout ) final;
void update() final;
void addConnection( const std::string& uri ) final;
};
Expand Down

0 comments on commit aab499d

Please sign in to comment.