Skip to content

Commit

Permalink
CR HBPVIS#219 IV
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefan Eilemann authored and Stefan Eilemann committed Jul 27, 2017
1 parent a18c0e1 commit d9169fb
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 62 deletions.
2 changes: 1 addition & 1 deletion doc/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# git master

* [219](https://github.com/HBPVIS/ZeroEQ/pull/219):
Implement Client-Server req-rep support:
Implement Client-Server req-rep support
* The environment variables ZEROEQ_PUB_SESSION and ZEROEQ_SERVER_SESSION
replace the now deprecated ZEROEQ_SESSION variable.
* [223](https://github.com/HBPVIS/ZeroEQ/pull/223):
Expand Down
2 changes: 1 addition & 1 deletion zeroeq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ set(ZEROEQ_PUBLIC_HEADERS
uri.h)

set(ZEROEQ_HEADERS
detail/browser.h
detail/common.h
detail/constants.h
detail/context.h
detail/port.h
detail/receiver.h
detail/sender.h
detail/socket.h)

Expand Down
16 changes: 7 additions & 9 deletions zeroeq/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,36 @@

#include "client.h"

#include "detail/browser.h"
#include "detail/common.h"
#include "detail/receiver.h"

#include <servus/servus.h>
#include <unordered_map>

namespace zeroeq
{
class Client::Impl : public detail::Browser
class Client::Impl : public detail::Receiver
{
public:
explicit Impl(const std::string& session)
: Browser(SERVER_SERVICE,
session == DEFAULT_SESSION ? getDefaultRepSession() : session)
: detail::Receiver(SERVER_SERVICE, session == DEFAULT_SESSION
? getDefaultRepSession()
: session)
, _servers(zmq_socket(getContext(), ZMQ_DEALER),
[](void* s) { ::zmq_close(s); })
{
}

explicit Impl(const URIs& uris)
: Browser(SERVER_SERVICE)
: detail::Receiver(SERVER_SERVICE)
, _servers(zmq_socket(getContext(), ZMQ_DEALER),
[](void* s) { ::zmq_close(s); })
{
for (const auto& uri : uris)
{
if (uri.getScheme() == DEFAULT_SCHEMA &&
(uri.getHost().empty() || uri.getPort() == 0))
{
if (!uri.isFullyQualified())
ZEROEQTHROW(std::runtime_error(
std::string("Non-fully qualified URI used for server")));
}

const auto& zmqURI = buildZmqURI(uri);
if (!addConnection(zmqURI))
Expand Down
27 changes: 14 additions & 13 deletions zeroeq/detail/browser.h → zeroeq/detail/receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ namespace zeroeq
namespace detail
{
/** Manages and updates a set of connections with a zeroconf browser. */
class Browser
class Receiver
{
public:
Browser(const std::string& service, const std::string session)
Receiver(const std::string& service, const std::string session)
: _servus(service)
, _session(session)
, _context(detail::getContext())
Expand All @@ -39,14 +39,14 @@ class Browser
update();
}

Browser(const std::string& service)
Receiver(const std::string& service)
: _servus(service)
, _session(zeroeq::NULL_SESSION)
, _context(detail::getContext())
{
}

virtual ~Browser()
virtual ~Receiver()
{
if (_servus.isBrowsing())
_servus.endBrowsing();
Expand Down Expand Up @@ -77,7 +77,7 @@ class Browser
const uint128_t identifier(_servus.get(instance, KEY_INSTANCE));
zmq::SocketPtr socket = createSocket(identifier);
if (socket)
_addConnection(zmqURI, socket);
_connect(zmqURI, socket);
}
}
}
Expand All @@ -86,16 +86,10 @@ class Browser
{
zmq::SocketPtr socket = createSocket(uint128_t());
if (socket)
return _addConnection(zmqURI, socket);
return _connect(zmqURI, socket);
return true;
}

/**
* Create the socket for zmqURI, return nullptr if connection is to be
* ignored.
*/
virtual zmq::SocketPtr createSocket(const uint128_t& instance) = 0;

void addSockets(std::vector<detail::Socket>& entries)
{
entries.insert(entries.end(), _entries.begin(), _entries.end());
Expand All @@ -105,9 +99,16 @@ class Browser
using SocketMap = std::map<std::string, zmq::SocketPtr>;

void* getContext() { return _context.get(); }

/**
* Create the socket for the given instance, return nullptr if connection is
* to be ignored.
*/
virtual zmq::SocketPtr createSocket(const uint128_t& instance) = 0;

const SocketMap& getSockets() { return _sockets; }

bool _addConnection(const std::string& zmqURI, zmq::SocketPtr socket)
bool _connect(const std::string& zmqURI, zmq::SocketPtr socket)
{
if (zmq_connect(socket.get(), zmqURI.c_str()) == -1)
{
Expand Down
38 changes: 19 additions & 19 deletions zeroeq/detail/sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ namespace zeroeq
{
namespace detail
{
Sender::Sender(const URI& uri_, const int type)
: Sender(uri_, type, {}, {})
{
}

Sender::Sender(const URI& uri_, const int type, const std::string service,
const std::string& session)
: _context(getContext())
Expand All @@ -37,11 +42,6 @@ Sender::Sender(const URI& uri_, const int type, const std::string service,
zmq_setsockopt(socket.get(), ZMQ_SNDHWM, &hwm, sizeof(hwm));
}

Sender::Sender(const URI& uri_, const int type)
: Sender(uri_, type, {}, {})
{
}

Sender::~Sender()
{
socket.reset();
Expand All @@ -62,23 +62,23 @@ void Sender::initURI()
host.clear();

uint16_t port = uri.getPort();
if (host.empty() || port == 0)
{
std::string hostStr, portStr;
_getEndPoint(hostStr, portStr);

if (port == 0)
{
// No overflow is possible unless ZMQ reports bad port number.
port = std::stoi(portStr);
uri.setPort(port);
}
if (!host.empty() && port != 0)
return;

if (host.empty())
uri.setHost(hostStr);
std::string hostStr, portStr;
_getEndPoint(hostStr, portStr);

ZEROEQINFO << "Bound to " << uri << std::endl;
if (port == 0)
{
// No overflow is possible unless ZMQ reports bad port number.
port = std::stoi(portStr);
uri.setPort(port);
}

if (host.empty())
uri.setHost(hostStr);

ZEROEQINFO << "Bound to " << uri << std::endl;
}

void Sender::announce()
Expand Down
2 changes: 1 addition & 1 deletion zeroeq/detail/sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ class Sender
zmq::ContextPtr _context; // must be private before socket

public:
Sender(const URI& uri_, const int type);
Sender(const URI& uri_, const int type, const std::string service,
const std::string& session);
Sender(const URI& uri_, const int type);
~Sender();

std::string getAddress() const;
Expand Down
14 changes: 6 additions & 8 deletions zeroeq/receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ class Receiver::Impl
intervals.push_back(sockets.size() - before);
}

const uint32_t remaining =
duration_cast<milliseconds>(high_resolution_clock::now() -
startTime)
.count();
const auto remaining = duration_cast<milliseconds>(
high_resolution_clock::now() - startTime)
.count();

switch (zmq_poll(sockets.data(), int(sockets.size()), remaining))
{
Expand Down Expand Up @@ -152,10 +151,9 @@ class Receiver::Impl
}
}
}
} while (haveData &&
duration_cast<milliseconds>(high_resolution_clock::now() -
startTime)
.count() < timeout);
} while (haveData && duration_cast<milliseconds>(
high_resolution_clock::now() - startTime)
.count() < timeout);
return hadData;
}
};
Expand Down
1 change: 1 addition & 0 deletions zeroeq/receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class Receiver
* their list of sockets.
*/
virtual void update() {}

/**
* Add the given connection to the list of receiving sockets.
*
Expand Down
4 changes: 3 additions & 1 deletion zeroeq/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

#include "server.h"

#include "detail/browser.h"
#include "detail/receiver.h"
#include "detail/sender.h"

#include <zmq.h>

#include <cassert>
#include <unordered_map>

Expand Down
16 changes: 7 additions & 9 deletions zeroeq/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

#include "subscriber.h"

#include "detail/browser.h"
#include "detail/byteswap.h"
#include "detail/common.h"
#include "detail/constants.h"
#include "detail/receiver.h"
#include "detail/sender.h"
#include "detail/socket.h"
#include "log.h"
Expand All @@ -24,28 +24,26 @@

namespace zeroeq
{
class Subscriber::Impl : public detail::Browser
class Subscriber::Impl : public detail::Receiver
{
public:
Impl(const std::string& session)
: Browser(PUBLISHER_SERVICE,
session == DEFAULT_SESSION ? getDefaultPubSession() : session)
: detail::Receiver(PUBLISHER_SERVICE, session == DEFAULT_SESSION
? getDefaultPubSession()
: session)
, _selfInstance(detail::Sender::getUUID())
{
}

Impl(const URIs& uris)
: Browser(PUBLISHER_SERVICE)
: detail::Receiver(PUBLISHER_SERVICE)
, _selfInstance(detail::Sender::getUUID())
{
for (const URI& uri : uris)
{
if (uri.getScheme() == DEFAULT_SCHEMA &&
(uri.getHost().empty() || uri.getPort() == 0))
{
if (!uri.isFullyQualified())
ZEROEQTHROW(std::runtime_error(std::string(
"Non-fully qualified URI used for subscriber")));
}

const std::string& zmqURI = buildZmqURI(uri);
if (!addConnection(zmqURI))
Expand Down
6 changes: 6 additions & 0 deletions zeroeq/uri.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,10 @@ bool URI::operator!=(const servus::URI& rhs) const
{
return servus::URI::operator!=(rhs);
}

bool URI::isFullyQualified() const
{
return getScheme() != DEFAULT_SCHEMA ||
(!getHost().empty() && getPort() != 0);
}
}
4 changes: 4 additions & 0 deletions zeroeq/uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class URI : private servus::URI

/** Convert this URI to a servus::URI */
const servus::URI& toServusURI() const { return *this; }

/** @return true if the host and port are given for a tcp URI. */
ZEROEQ_API bool isFullyQualified() const;

/** @name servus::URI API */
//@{
using servus::URI::getScheme;
Expand Down

0 comments on commit d9169fb

Please sign in to comment.