Skip to content

Commit

Permalink
Move Bind/Connect/Attach to FairMQChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx authored and dennisklein committed Nov 13, 2018
1 parent 3ca0d72 commit 25fcf13
Show file tree
Hide file tree
Showing 20 changed files with 592 additions and 685 deletions.
394 changes: 170 additions & 224 deletions fairmq/FairMQChannel.cxx

Large diffs are not rendered by default.

199 changes: 167 additions & 32 deletions fairmq/FairMQChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
#define FAIRMQCHANNEL_H_

#include <string>
#include <memory> // unique_ptr
#include <memory> // unique_ptr, shared_ptr
#include <vector>
#include <atomic>
#include <mutex>
#include <stdexcept>
#include <utility> // std::move

#include <FairMQTransportFactory.h>
#include <FairMQSocket.h>
Expand Down Expand Up @@ -43,27 +44,35 @@ class FairMQChannel
/// @param factory TransportFactory
FairMQChannel(const std::string& name, const std::string& type, std::shared_ptr<FairMQTransportFactory> factory);

/// Constructor
/// @param name Channel name
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
/// @param method Socket method (bind/connect)
/// @param address Network address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
/// @param factory TransportFactory
FairMQChannel(const std::string& name, const std::string& type, const std::string& method, const std::string& address, std::shared_ptr<FairMQTransportFactory> factory);

/// Copy Constructor
FairMQChannel(const FairMQChannel&);

/// Assignment operator
FairMQChannel& operator=(const FairMQChannel&);

/// Default destructor
virtual ~FairMQChannel();
virtual ~FairMQChannel() {}

struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; };

FairMQSocket& GetSocket() const;

auto Bind(const std::string& address) -> bool
bool Bind(const std::string& address)
{
fMethod = "bind";
fAddress = address;
return fSocket->Bind(address);
}

auto Connect(const std::string& address) -> void
bool Connect(const std::string& address)
{
fMethod = "connect";
fAddress = address;
Expand All @@ -72,15 +81,18 @@ class FairMQChannel

/// Get channel name
/// @return Returns full channel name (e.g. "data[0]")
std::string GetChannelName() const;
std::string GetChannelName() const { return GetName(); } // TODO: deprecate this in favor of following
std::string GetName() const;

/// Get channel prefix
/// @return Returns channel prefix (e.g. "data" in "data[0]")
std::string GetChannelPrefix() const;
std::string GetChannelPrefix() const { return GetPrefix(); } // TODO: deprecate this in favor of following
std::string GetPrefix() const;

/// Get channel index
/// @return Returns channel index (e.g. 0 in "data[0]")
std::string GetChannelIndex() const;
std::string GetChannelIndex() const { return GetPrefix(); } // TODO: deprecate this in favor of following
std::string GetIndex() const;

/// Get socket type
/// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
Expand Down Expand Up @@ -122,6 +134,18 @@ class FairMQChannel
/// @return Returns socket rate logging interval (in seconds)
int GetRateLogging() const;

/// Get start of the port range for automatic binding
/// @return start of the port range
int GetPortRangeMin() const;

/// Get end of the port range for automatic binding
/// @return end of the port range
int GetPortRangeMax() const;

/// Set automatic binding (pick random port if bind fails)
/// @return true/false, true if automatic binding is enabled
bool GetAutoBind() const;

/// Set socket type
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
void UpdateType(const std::string& type);
Expand Down Expand Up @@ -162,17 +186,43 @@ class FairMQChannel
/// @param rateLogging Socket rate logging interval (in seconds)
void UpdateRateLogging(const int rateLogging);

/// Set start of the port range for automatic binding
/// @param minPort start of the port range
void UpdatePortRangeMin(const int minPort);

/// Set end of the port range for automatic binding
/// @param maxPort end of the port range
void UpdatePortRangeMax(const int maxPort);

/// Set automatic binding (pick random port if bind fails)
/// @param autobind true/false, true to enable automatic binding
void UpdateAutoBind(const bool autobind);

/// Set channel name
/// @param name Arbitrary channel name
void UpdateChannelName(const std::string& name);
void UpdateChannelName(const std::string& name) { UpdateName(name); } // TODO: deprecate this in favor of following
void UpdateName(const std::string& name);

/// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel()))
/// @return true if channel settings are valid, false otherwise.
bool IsValid() const;

/// Validates channel configuration
/// @return true if channel settings are valid, false otherwise.
bool ValidateChannel();
bool ValidateChannel() // TODO: deprecate this
{
return Validate();
}

/// Validates channel configuration
/// @return true if channel settings are valid, false otherwise.
bool Validate();

void Init();

bool ConnectEndpoint(const std::string& endpoint);

bool BindEndpoint(std::string& endpoint);

/// Resets the channel (requires validation to be used again).
void ResetChannel();
Expand All @@ -181,31 +231,63 @@ class FairMQChannel
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1);
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, sndTimeoutInMs);
}

/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1);
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, rcvTimeoutInMs);
}

int SendAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);")));
int ReceiveAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);")));
int SendAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);")))
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, 0);
}
int ReceiveAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);")))
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, 0);
}

/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1);
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, sndTimeoutInMs);
}

/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1);
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, rcvTimeoutInMs);
}

int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);")));
int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);")));
int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);")))
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, 0);
}
int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);")))
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, 0);
}

/// Send FairMQParts
/// @param parts FairMQParts reference
Expand Down Expand Up @@ -235,10 +317,10 @@ class FairMQChannel
return Receive(parts.fParts, 0);
}

unsigned long GetBytesTx() const;
unsigned long GetBytesRx() const;
unsigned long GetMessagesTx() const;
unsigned long GetMessagesRx() const;
unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); }
unsigned long GetBytesRx() const { return fSocket->GetBytesRx(); }
unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); }
unsigned long GetMessagesRx() const { return fSocket->GetMessagesRx(); }

auto Transport() -> FairMQTransportFactory*
{
Expand All @@ -264,31 +346,26 @@ class FairMQChannel
}

private:
std::shared_ptr<FairMQTransportFactory> fTransportFactory;
fair::mq::Transport fTransportType;
std::unique_ptr<FairMQSocket> fSocket;

std::string fType;
std::string fMethod;
std::string fAddress;
fair::mq::Transport fTransportType;
int fSndBufSize;
int fRcvBufSize;
int fSndKernelSize;
int fRcvKernelSize;
int fLinger;
int fRateLogging;
int fPortRangeMin;
int fPortRangeMax;
bool fAutoBind;

std::string fName;
std::atomic<bool> fIsValid;

std::shared_ptr<FairMQTransportFactory> fTransportFactory;

void CheckSendCompatibility(FairMQMessagePtr& msg);
void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec);
void CheckReceiveCompatibility(FairMQMessagePtr& msg);
void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec);

void InitTransport(std::shared_ptr<FairMQTransportFactory> factory);

// use static mutex to make the class easily copyable
// implication: same mutex is used for all instances of the class
// this does not hurt much, because mutex is used only during initialization with very low contention
Expand All @@ -297,8 +374,66 @@ class FairMQChannel

bool fMultipart;
bool fModified;
auto SetModified(const bool modified) -> void;
bool fReset;

void CheckSendCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(
msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}

void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(
msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
}

void CheckReceiveCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}

void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
}

void InitTransport(std::shared_ptr<FairMQTransportFactory> factory)
{
fTransportFactory = factory;
fTransportType = factory->GetType();
}
auto SetModified(const bool modified) -> void;
};

#endif /* FAIRMQCHANNEL_H_ */
Loading

0 comments on commit 25fcf13

Please sign in to comment.