diff --git a/mythtv/external/nzmqt/.hgignore b/mythtv/external/nzmqt/.hgignore new file mode 100644 index 00000000000..d9c454ca593 --- /dev/null +++ b/mythtv/external/nzmqt/.hgignore @@ -0,0 +1,7 @@ +syntax: glob +Debug +Release +**.user +**.orig +**.DS_Store + diff --git a/mythtv/external/nzmqt/CHANGELOG.txt b/mythtv/external/nzmqt/CHANGELOG.txt new file mode 100644 index 00000000000..33fc9fdd8d1 --- /dev/null +++ b/mythtv/external/nzmqt/CHANGELOG.txt @@ -0,0 +1,8 @@ +Release 0.7 +----------- +* Introduced enumeration types for several ZMQ constants for type-safety. +* Added a new polling based implementation that works for all ZMQ communication protocols. +* Dropped support for REQ-REP protocol for old 'QSocketNotifier' based implementation. +* Added some more convenience methods to 'ZMQSocket' class. +* Old and new socket implementations now emit a signal with a received message as parameter. + diff --git a/mythtv/external/nzmqt/COPYING.txt b/mythtv/external/nzmqt/COPYING.txt new file mode 100644 index 00000000000..96b4ecc5b88 --- /dev/null +++ b/mythtv/external/nzmqt/COPYING.txt @@ -0,0 +1,25 @@ +Copyright 2011-2012 Johann Duscher. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are +permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, this list of + conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, this list + of conditions and the following disclaimer in the documentation and/or other materials + provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +The views and conclusions contained in the software and documentation are those of the +authors and should not be interpreted as representing official policies, either expressed +or implied, of Johann Duscher. diff --git a/mythtv/external/nzmqt/LICENSE.header b/mythtv/external/nzmqt/LICENSE.header new file mode 100644 index 00000000000..be98aaa13dc --- /dev/null +++ b/mythtv/external/nzmqt/LICENSE.header @@ -0,0 +1,25 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. diff --git a/mythtv/external/nzmqt/README.markdown b/mythtv/external/nzmqt/README.markdown new file mode 100755 index 00000000000..a94d8befe9f --- /dev/null +++ b/mythtv/external/nzmqt/README.markdown @@ -0,0 +1,83 @@ +nzmqt - A lightweight C++ [Qt][] binding for [ZeroMQ][] +====================================================== + +nzmqt is a re-implementation of the approach taken by the [zeromqt][] library. The idea +is to integrate ZeroMQ into the Qt event loop, mapping ZeroMQ message events onto +Qt signals. The original implementation also provides a Qt-like API +which allows to represent messages as QByteArray instances. While I took this idea +and the original implementation as a source of information, I've done a completely +new implementation. Not only in order to get rid of some short comings, but also +because I wanted to be sure I can use the code in my projects without problems, +because until now zeromqt's author hasn't officially released his work under a certain +(open source) license. Consequently, nzmqt is released to the public under the +simplified BSD license. So you can use the code in your projects without any fear. + +While zeromqt works fine for non-multi-part messages, it doesn't support multi-part +messages yet. Also, a lot of code duplicates code of ZeroMQ's standard C++ binding. +But this requires to take care of both implementations. So in contrast to the original +implementation, nzmqt reuses as much code of ZeroMQ's original C++ binding as possible +by using inheritance. Additionally, several things have been changed from a user's +perspective. In summary, nzmqt contains the following changes compared to zeromqt: + +* The implementation is a complete re-write in the sense that it doesn't duplicate code +of ZeroMQ's official C++ binding anymore. Instead, it builds upon existing code +through inheritance and, hence, it will likely benefit from future bugfixes and +enhancements targeted at ZeroMQ's C++ binding. +* All classes are placed into a separate namespace 'nzmqt'. +* This version now also supports ZeroMQ's multi-part messages. +* The initial support for using Qt's way of handling errors using error codes +has been dropped. Instead, this code only throws exception originally thrown +by ZeroMQ's official C++ API. Note that although it looks like 'ZMQException' +is a new custom exception class there is no custom exception class, but only +a simple typedef which places the original exception class into the new +namespace giving it a new name. +* As with ZeroMQ's C++ binding all classes are contained within a singe header +file which makes integrating this Qt binding very easy. +* There is no 'ZmqContext' singleton anymore. Instead you can create your +own instance of a concrete subclass of 'ZMQContext' yourself. +* The socket class 'ZMQSocket' now also inherits from QObject, so you can +add it as a child to any QObject parent as you know it from Qt. +* The code is officially licensed under the simplified BSD license. +* Not only PUB-SUB, but also REQ-REP and PUSH-PULL are supported. + +Status +------ + +See the [official bug tracker](https://github.com/jonnydee/nzmqt/issues "https://github.com/jonnydee/nzmqt/issues"). + +Usage +----- + +As ZeroMQ's C++ binding this Qt binding only consists of a single C++ header file +which you need to include in your project. + +Consequently, using 'nzmqt' in a Qt project is as simple as adding that single header +file to your project's .pro file as follows (assumed you use QT Creator). + + HEADERS += nzmqt/nzmqt.hpp + +If not already done, you also need to link against ZeroMQ library: + + LIBS += -lzmq + +Of course, you need to make sure the header file as well as the ZeroMQ library +can be found by your compiler/linker. + +As nzmqt uses C++ exceptions for error handling so you will need to catch them +by overriding QCoreApplication::notify() method. The included sample will +show you how this can be done. + +Included Samples +---------------- + +Currently, there are samples showing PUB-SUB, REQ-REP and PUSH-PULL protocol with multi-part +messages in action. They also show how to deal with exceptions in Qt. + +More Information +---------------- + +[nzmqt](https://github.com/jonnydee/nzmqt "https://github.com/jonnydee/nzmqt") + +[Qt]: http://qt.nokia.com/ "Qt" +[ZeroMQ]: http://zeromq.com/ "ZeroMQ" +[zeromqt]: https://github.com/wttw/zeromqt "zeromqt" diff --git a/mythtv/external/nzmqt/include/nzmqt/nzmqt.hpp b/mythtv/external/nzmqt/include/nzmqt/nzmqt.hpp new file mode 100755 index 00000000000..7c75b68ec01 --- /dev/null +++ b/mythtv/external/nzmqt/include/nzmqt/nzmqt.hpp @@ -0,0 +1,822 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef NZMQT_H +#define NZMQT_H + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Define default context implementation to be used. +#ifndef NZMQT_DEFAULT_ZMQCONTEXT_IMPLEMENTATION + #define NZMQT_DEFAULT_ZMQCONTEXT_IMPLEMENTATION PollingZMQContext + //#define NZMQT_DEFAULT_ZMQCONTEXT_IMPLEMENTATION SocketNotifierZMQContext +#endif + +// Define default number of IO threads to be used by ZMQ. +#ifndef NZMQT_DEFAULT_IOTHREADS + #define NZMQT_DEFAULT_IOTHREADS 4 +#endif + +// Define default poll interval for polling-based implementation. +#ifndef NZMQT_POLLINGZMQCONTEXT_DEFAULT_POLLINTERVAL + #define NZMQT_POLLINGZMQCONTEXT_DEFAULT_POLLINTERVAL 10 /* msec */ +#endif + +// Declare metatypes for using them in Qt signals. +Q_DECLARE_METATYPE(QList< QList >) +Q_DECLARE_METATYPE(QList) + +namespace nzmqt +{ + typedef zmq::free_fn free_fn; + typedef zmq::pollitem_t pollitem_t; + + typedef zmq::error_t ZMQException; + + using zmq::poll; + using zmq::version; + + // This class wraps ZMQ's message structure. + class ZMQMessage : private zmq::message_t + { + friend class ZMQSocket; + + typedef zmq::message_t super; + + public: + inline ZMQMessage() : super() {} + + inline ZMQMessage(size_t size_) : super(size_) {} + + inline ZMQMessage(void* data_, size_t size_, free_fn *ffn_, void* hint_ = 0) + : super(data_, size_, ffn_, hint_) {} + + inline ZMQMessage(const QByteArray& b) : super(b.size()) + { + memcpy(data(), b.constData(), b.size()); + } + + using super::rebuild; + + inline void move(ZMQMessage* msg_) + { + super::move(static_cast(msg_)); + } + + inline void copy(ZMQMessage* msg_) + { + super::copy(msg_); + } + + inline void clone(ZMQMessage* msg_) + { + rebuild(msg_->size()); + memcpy(data(), msg_->data(), size()); + } + + using super::data; + + using super::size; + + inline QByteArray toByteArray() + { + return QByteArray((const char *)data(), size()); + } + }; + + class ZMQContext; + + // This class cannot be instantiated. Its purpose is to serve as an + // intermediate base class that provides Qt-based convenience methods + // to subclasses. + class ZMQSocket : public QObject, private zmq::socket_t + { + Q_OBJECT + Q_ENUMS(Type Event SendFlag ReceiveFlag Option) + Q_FLAGS(Event Events) + Q_FLAGS(SendFlag SendFlags) + Q_FLAGS(ReceiveFlag ReceiveFlags) + + typedef QObject qsuper; + typedef zmq::socket_t zmqsuper; + + public: + enum Type + { + TYP_PUB = ZMQ_PUB, + TYP_SUB = ZMQ_SUB, + TYP_PUSH = ZMQ_PUSH, + TYP_PULL = ZMQ_PULL, + TYP_REQ = ZMQ_REQ, + TYP_REP = ZMQ_REP, + TYP_DEALER = ZMQ_DEALER, + TYP_ROUTER = ZMQ_ROUTER, + TYP_PAIR = ZMQ_PAIR, + TYP_XPUB = ZMQ_XPUB, + TYP_XSUB = ZMQ_XSUB, + }; + + enum Event + { + EVT_POLLIN = ZMQ_POLLIN, + EVT_POLLOUT = ZMQ_POLLOUT, + EVT_POLLERR = ZMQ_POLLERR, + }; + Q_DECLARE_FLAGS(Events, Event) + + enum SendFlag + { + SND_MORE = ZMQ_SNDMORE, + SND_NOBLOCK = ZMQ_NOBLOCK, + }; + Q_DECLARE_FLAGS(SendFlags, SendFlag) + + enum ReceiveFlag + { + RCV_NOBLOCK = ZMQ_NOBLOCK, + }; + Q_DECLARE_FLAGS(ReceiveFlags, ReceiveFlag) + + enum Option + { + // Get only. + OPT_TYPE = ZMQ_TYPE, + OPT_RCVMORE = ZMQ_RCVMORE, + OPT_FD = ZMQ_FD, + OPT_EVENTS = ZMQ_EVENTS, + + // Set only. + OPT_SUBSCRIBE = ZMQ_SUBSCRIBE, + OPT_UNSUBSCRIBE = ZMQ_UNSUBSCRIBE, + + // Get and set. + OPT_HWM = ZMQ_HWM, + OPT_SWAP = ZMQ_SWAP, + OPT_AFFINITY = ZMQ_AFFINITY, + OPT_IDENTITY = ZMQ_IDENTITY, + OPT_RATE = ZMQ_RATE, + OPT_RECOVERY_IVL = ZMQ_RECOVERY_IVL, + OPT_RECOVERY_IVL_MSEC = ZMQ_RECOVERY_IVL_MSEC, + OPT_MCAST_LOOP = ZMQ_MCAST_LOOP, + OPT_SNDBUF = ZMQ_SNDBUF, + OPT_RCVBUF = ZMQ_RCVBUF, + OPT_LINGER = ZMQ_LINGER, + OPT_RECONNECT_IVL = ZMQ_RECONNECT_IVL, + OPT_RECONNECT_IVL_MAX = ZMQ_RECONNECT_IVL_MAX, + OPT_BACKLOG = ZMQ_BACKLOG, + }; + + using zmqsuper::operator void *; + + using zmqsuper::close; + + inline void setOption(Option optName_, const void *optionVal_, size_t optionValLen_) + { + setsockopt(optName_, optionVal_, optionValLen_); + } + + inline void setOption(Option optName_, const char* str_) + { + setOption(optName_, str_, strlen(str_)); + } + + inline void setOption(Option optName_, const QByteArray& bytes_) + { + setOption(optName_, bytes_.constData(), bytes_.size()); + } + + inline void setOption(Option optName_, int value_) + { + setOption(optName_, &value_, sizeof(value_)); + } + + inline void getOption(Option option_, void *optval_, size_t *optvallen_) const + { + const_cast(this)->getsockopt(option_, optval_, optvallen_); + } + + inline void bindTo(const QString& addr_) + { + bind(addr_.toLocal8Bit()); + } + + inline void bindTo(const char *addr_) + { + bind(addr_); + } + + inline void connectTo(const QString& addr_) + { + zmqsuper::connect(addr_.toLocal8Bit()); + } + + inline void connectTo(const char* addr_) + { + zmqsuper::connect(addr_); + } + + inline bool sendMessage(ZMQMessage& msg_, SendFlags flags_ = SND_NOBLOCK) + { + return send(msg_, flags_); + } + + inline bool sendMessage(const QByteArray& bytes_, SendFlags flags_ = SND_NOBLOCK) + { + ZMQMessage msg(bytes_); + return send(msg, flags_); + } + + // Interprets the provided list of byte arrays as a multi-part message + // and sends them accordingly. + // If an empty list is provided this method doesn't do anything and returns trua. + inline bool sendMessage(const QList& msg_, SendFlags flags_ = SND_NOBLOCK) + { + int i; + for (i = 0; i < msg_.size() - 1; i++) + { + if (!sendMessage(msg_[i], flags_ | SND_MORE)) + return false; + } + if (i < msg_.size()) + return sendMessage(msg_[i], flags_); + + return true; + } + + // Receives a message or a message part. + inline bool receiveMessage(ZMQMessage* msg_, ReceiveFlags flags_ = RCV_NOBLOCK) + { + return recv(msg_, flags_); + } + + // Receives a message. + // The message is represented as a list of byte arrays representing + // a message's parts. If the message is not a multi-part message the + // list will only contain one array. + inline QList receiveMessage() + { + QList parts; + + ZMQMessage msg; + while (receiveMessage(&msg)) + { + parts += msg.toByteArray(); + msg.rebuild(); + + if (!hasMoreMessageParts()) + break; + } + + return parts; + } + + // Receives all messages currently available. + // Each message is represented as a list of byte arrays representing the messages + // and their parts in case of multi-part messages. If a message isn't a multi-part + // message the corresponding byte array list will only contain one element. + // Note that this method won't work with REQ-REP protocol. + inline QList< QList > receiveMessages() + { + QList< QList > ret; + + QList parts = receiveMessage(); + while (!parts.isEmpty()) + { + ret += parts; + + parts = receiveMessage(); + } + + return ret; + } + + inline int fileDescriptor() const + { + int value; + size_t size = sizeof(value); + getOption(OPT_FD, &value, &size); + return value; + } + + inline Events events() const + { + quint32 value; + size_t size = sizeof(value); + getOption(OPT_EVENTS, &value, &size); + return static_cast(value); + } + + // Returns true if there are more parts of a multi-part message + // to be received. + inline bool hasMoreMessageParts() const + { + quint64 value; + size_t size = sizeof(value); + getOption(OPT_RCVMORE, &value, &size); + return value; + } + + inline void setIdentity(const char* nameStr_) + { + setOption(OPT_IDENTITY, nameStr_); + } + + inline void setIdentity(const QString& name_) + { + setOption(OPT_IDENTITY, name_.toLocal8Bit()); + } + + inline void setIdentity(const QByteArray& name_) + { + setOption(OPT_IDENTITY, const_cast(name_.constData()), name_.size()); + } + + inline QByteArray identity() const + { + char idbuf[256]; + size_t size = sizeof(idbuf); + getOption(OPT_IDENTITY, idbuf, &size); + return QByteArray(idbuf, size); + } + + inline void setLinger(int msec_) + { + setOption(OPT_LINGER, msec_); + } + + inline int linger() const + { + int msec=-1; + size_t size = sizeof(msec); + getOption(OPT_LINGER, &msec, &size); + return msec; + } + + inline void subscribeTo(const char* filterStr_) + { + setOption(OPT_SUBSCRIBE, filterStr_); + } + + inline void subscribeTo(const QString& filter_) + { + setOption(OPT_SUBSCRIBE, filter_.toLocal8Bit()); + } + + inline void subscribeTo(const QByteArray& filter_) + { + setOption(OPT_SUBSCRIBE, filter_); + } + + inline void unsubscribeFrom(const char* filterStr_) + { + setOption(OPT_UNSUBSCRIBE, filterStr_); + } + + inline void unsubscribeFrom(const QString& filter_) + { + setOption(OPT_UNSUBSCRIBE, filter_.toLocal8Bit()); + } + + inline void unsubscribeFrom(const QByteArray& filter_) + { + setOption(OPT_UNSUBSCRIBE, filter_); + } + + protected: + ZMQSocket(ZMQContext* context_, Type type_); + }; + Q_DECLARE_OPERATORS_FOR_FLAGS(ZMQSocket::Events) + Q_DECLARE_OPERATORS_FOR_FLAGS(ZMQSocket::SendFlags) + Q_DECLARE_OPERATORS_FOR_FLAGS(ZMQSocket::ReceiveFlags) + + + // This class is an abstract base class for concrete implementations. + class ZMQContext : public QObject, private zmq::context_t + { + Q_OBJECT + + typedef QObject qsuper; + typedef zmq::context_t zmqsuper; + + friend class ZMQSocket; + + public: + inline ZMQContext(QObject* parent_ = 0, int io_threads_ = NZMQT_DEFAULT_IOTHREADS) + : qsuper(parent_), zmqsuper(io_threads_) {} + + // Deleting children is necessary, because otherwise the children are deleted after the context + // which results in a blocking state. So we delete the children before the zmq::context_t + // destructor implementation is called. + inline ~ZMQContext() + { + QObjectList children_ = children(); + foreach (QObject* child, children_) + delete child; + } + + using zmqsuper::operator void*; + + // Creates a socket instance of the specified type. + // The created instance will have this context set as its parent, + // so deleting this context will first delete the socket. + // You can call 'ZMQSocket::setParent()' method to change ownership, + // but then you need to make sure the socket instance is deleted + // before its context. Otherwise, you might encounter blocking + // behavior. + inline ZMQSocket* createSocket(ZMQSocket::Type type_) + { + return createSocket(type_, this); + } + + // Creates a socket instance of the specified type and parent. + // The created instance will have the specified parent. + // You can also call 'ZMQSocket::setParent()' method to change + // ownership later on, but then you need to make sure the socket + // instance is deleted before its context. Otherwise, you might + // encounter blocking behavior. + inline ZMQSocket* createSocket(ZMQSocket::Type type_, QObject* parent_) + { + ZMQSocket* socket = createSocketInternal(type_); + socket->setParent(parent_); + return socket; + } + + // Start watching for incoming messages. + virtual void start() = 0; + + // Stop watching for incoming messages. + virtual void stop() = 0; + + // Indicates if watching for incoming messages is enabled. + virtual bool isStopped() const = 0; + + protected: + // Creates a socket instance of the specified type. + virtual ZMQSocket* createSocketInternal(ZMQSocket::Type type_) = 0; + }; + + + inline ZMQSocket::ZMQSocket(ZMQContext* context_, Type type_) + : qsuper(0), zmqsuper(*context_, type_) + { + } + + + class ZMQDevice : public QObject, public QRunnable + { + Q_OBJECT + Q_ENUMS(Type) + + public: + enum Type + { + TYP_QUEUE = ZMQ_QUEUE, + TYP_FORWARDED = ZMQ_FORWARDER, + TYP_STREAMER = ZMQ_STREAMER, + }; + + inline ZMQDevice(Type type, ZMQSocket* frontend, ZMQSocket* backend) + : type_(type), frontend_(frontend), backend_(backend) + { + } + + inline void run() + { + zmq::device(type_, *frontend_, *backend_); + } + + private: + Type type_; + ZMQSocket* frontend_; + ZMQSocket* backend_; + }; + + + // An instance of this class cannot directly be created. Use one + // of the 'PollingZMQContext::createSocket()' factory methods instead. + class PollingZMQSocket : public ZMQSocket + { + Q_OBJECT + + typedef ZMQSocket super; + + friend class PollingZMQContext; + + protected: + inline PollingZMQSocket(ZMQContext* context_, Type type_) + : super(context_, type_) {} + + // This method is called by the socket's context object in order + // to signal a new received message. + inline void onMessageReceived(const QList& message) + { + emit messageReceived(message); + } + + signals: + void messageReceived(const QList&); + }; + + class PollingZMQContext : public ZMQContext, public QRunnable + { + Q_OBJECT + + typedef ZMQContext super; + + public: + inline PollingZMQContext(QObject* parent_ = 0, int io_threads_ = NZMQT_DEFAULT_IOTHREADS) + : super(parent_, io_threads_), + m_pollItemsMutex(QMutex::Recursive), + m_interval(NZMQT_POLLINGZMQCONTEXT_DEFAULT_POLLINTERVAL), + m_stopped(false) + { + setAutoDelete(false); + } + + inline virtual ~PollingZMQContext() + { + QMutexLocker lock(&m_pollItemsMutex); + + Sockets list = m_sockets; + for (Sockets::iterator soIt = list.begin(); soIt != list.end(); soIt++) + unregisterSocket(*soIt); + } + + // Sets the polling interval. + // Note that the interval does not denote the time the zmq::poll() function will + // block in order to wait for incoming messages. Instead, it denotes the time in-between + // consecutive zmq::poll() calls. + inline void setInterval(int interval_) + { + m_interval = interval_; + } + + inline int getInterval() const + { + return m_interval; + } + + // Starts the polling process by scheduling a call to the 'run()' method into Qt's event loop. + inline void start() + { + m_stopped = false; + QTimer::singleShot(0, this, SLOT(run())); + } + + // Stops the polling process in the sense that no further 'run()' calls will be scheduled into + // Qt's event loop. + inline void stop() + { + m_stopped = true; + } + + inline bool isStopped() const + { + return m_stopped; + } + + public slots: + // If the polling process is not stopped (by a previous call to the 'stop()' method) this + // method will call the 'poll()' method once and re-schedule a subsequent call to this method + // using the current polling interval. + inline void run() + { + if (m_stopped) + return; + + poll(); + + if (!m_stopped) + QTimer::singleShot(m_interval, this, SLOT(run())); + } + + // This method will poll on all currently available poll-items (known ZMQ sockets) + // using the given timeout to wait for incoming messages. Note that this timeout has + // nothing to do with the polling interval. Instead, the poll method will block the current + // thread by waiting at most the specified amount of time for incoming messages. + // This method is public because it can be called directly if you need to. + inline void poll(long timeout_ = 0) + { + QMutexLocker lock(&m_pollItemsMutex); + + if (m_pollItems.empty()) + return; + + zmq::poll(&m_pollItems[0], m_pollItems.size(), timeout_); + + PollItems::iterator poIt = m_pollItems.begin(); + Sockets::iterator soIt = m_sockets.begin(); + while (poIt != m_pollItems.end()) + { + if (poIt->revents & ZMQSocket::EVT_POLLIN) + { + QList message = (*soIt)->receiveMessage(); + (*soIt)->onMessageReceived(message); + } + ++soIt; + ++poIt; + } + } + + protected: + inline PollingZMQSocket* createSocketInternal(ZMQSocket::Type type_) + { + PollingZMQSocket* socket = new PollingZMQSocket(this, type_); + // Add the socket to the poll-item list. + registerSocket(socket); + return socket; + } + + // Add the given socket to list list of poll-items. + inline void registerSocket(PollingZMQSocket* socket_) + { + // Make sure the socket is removed from the poll-item list as soon + // as it is destroyed. + connect(socket_, SIGNAL(destroyed(QObject*)), SLOT(unregisterSocket(QObject*))); + + pollitem_t pollItem = { *socket_, 0, ZMQSocket::EVT_POLLIN, 0 }; + + QMutexLocker lock(&m_pollItemsMutex); + m_sockets.push_back(socket_); + m_pollItems.push_back(pollItem); + } + + protected slots: + // Remove the given socket object from the list of poll-items. + inline void unregisterSocket(QObject* socket_) + { + QMutexLocker lock(&m_pollItemsMutex); + + PollItems::iterator poIt = m_pollItems.begin(); + Sockets::iterator soIt = m_sockets.begin(); + while (soIt != m_sockets.end()) + { + if (*soIt == socket_) + { + socket_->disconnect(this); + m_sockets.erase(soIt); + m_pollItems.erase(poIt); + break; + } + ++soIt; + ++poIt; + } + } + + private: + typedef QVector PollItems; + typedef QVector Sockets; + + Sockets m_sockets; + PollItems m_pollItems; + QMutex m_pollItemsMutex; + int m_interval; + volatile bool m_stopped; + }; + + + // An instance of this class cannot directly be created. Use one + // of the 'SocketNotifierZMQContext::createSocket()' factory methods instead. + class SocketNotifierZMQSocket : public ZMQSocket + { + Q_OBJECT + + friend class SocketNotifierZMQContext; + + typedef ZMQSocket super; + +// public: +// using super::sendMessage; + +// inline bool sendMessage(const QByteArray& bytes_, SendFlags flags_ = SND_NOBLOCK) +// { +// bool result = super::sendMessage(bytes_, flags_); + +// if (!result) +// socketNotifyWrite_->setEnabled(true); + +// return result; +// } + + protected: + inline SocketNotifierZMQSocket(ZMQContext* context_, Type type_) + : super(context_, type_), + socketNotifyRead_(0) +// socketNotifyWrite_(0) + { + int fd = fileDescriptor(); + + socketNotifyRead_ = new QSocketNotifier(fd, QSocketNotifier::Read, this); + QObject::connect(socketNotifyRead_, SIGNAL(activated(int)), this, SLOT(socketReadActivity())); + +// socketNotifyWrite_ = new QSocketNotifier(fd, QSocketNotifier::Write, this); +// socketNotifyWrite_->setEnabled(false); +// QObject::connect(socketNotifyWrite_, SIGNAL(activated(int)), this, SLOT(socketWriteActivity())); + } + + protected slots: + inline void socketReadActivity() + { + socketNotifyRead_->setEnabled(false); + + while(events() & EVT_POLLIN) + { + QList message = receiveMessage(); + emit messageReceived(message); + } + + socketNotifyRead_->setEnabled(true); + } + +// inline void socketWriteActivity() +// { +// if(events() == 0) +// { +// socketNotifyWrite_->setEnabled(false); +// } +// } + + signals: + void messageReceived(const QList&); + + private: + QSocketNotifier *socketNotifyRead_; +// QSocketNotifier *socketNotifyWrite_; + }; + + class SocketNotifierZMQContext : public ZMQContext + { + Q_OBJECT + + typedef ZMQContext super; + + public: + inline SocketNotifierZMQContext(QObject* parent_ = 0, int io_threads_ = NZMQT_DEFAULT_IOTHREADS) + : super(parent_, io_threads_) + { + } + + inline void start() + { + } + + inline void stop() + { + } + + inline bool isStopped() const + { + return false; + } + + protected: + inline SocketNotifierZMQSocket* createSocketInternal(ZMQSocket::Type type_) + { + return new SocketNotifierZMQSocket(this, type_); + } + }; + + inline ZMQContext* createDefaultContext(QObject* parent_ = 0, int io_threads_ = NZMQT_DEFAULT_IOTHREADS) + { + return new NZMQT_DEFAULT_ZMQCONTEXT_IMPLEMENTATION(parent_, io_threads_); + } +} + + +#endif // NZMQT_H diff --git a/mythtv/external/nzmqt/src/NzmqtApp.h b/mythtv/external/nzmqt/src/NzmqtApp.h new file mode 100644 index 00000000000..ec8dac63698 --- /dev/null +++ b/mythtv/external/nzmqt/src/NzmqtApp.h @@ -0,0 +1,214 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef NZMQTAPP_H +#define NZMQTAPP_H + +#include +#include +#include +#include +#include +#include + +#include "nzmqt/nzmqt.hpp" + +#include "pubsub/PubSubClient.h" +#include "pubsub/PubSubServer.h" +#include "reqrep/ReqRepClient.h" +#include "reqrep/ReqRepServer.h" +#include "pushpull/PushPullVentilator.h" +#include "pushpull/PushPullWorker.h" +#include "pushpull/PushPullSink.h" + + +namespace nzmqt +{ + +namespace samples +{ + +class NzmqtApp : public QCoreApplication +{ + Q_OBJECT + + typedef QCoreApplication super; + +public: + explicit NzmqtApp(int& argc, char** argv) + : super(argc, argv) + { + QTimer::singleShot(0, this, SLOT(run())); + } + + bool notify(QObject *obj, QEvent *event) + { + try + { + return super::notify(obj, event); + } + catch (std::exception& ex) + { + qWarning() << ex.what(); + return false; + } + } + +protected slots: + void run() + { + QTextStream cout(stdout); + try + { + QStringList args = arguments(); + + if (args.size() == 1 || "-h" == args[1] || "--help" == args[1]) + { + printUsage(cout); + quit(); + return; + } + + QString command = args[1]; + QRunnable* commandImpl = 0; + + if ("pubsub-server" == command) + { + if (args.size() < 4) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString address = args[2]; + QString topic = args[3]; + commandImpl = new PubSubServer(address, topic, this); + } + else if ("pubsub-client" == command) + { + if (args.size() < 4) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString address = args[2]; + QString topic = args[3]; + commandImpl = new PubSubClient(address, topic, this); + } + else if ("reqrep-server" == command) + { + if (args.size() < 4) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString address = args[2]; + QString responseMsg = args[3]; + commandImpl = new ReqRepServer(address, responseMsg, this); + } + else if ("reqrep-client" == command) + { + if (args.size() < 4) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString address = args[2]; + QString requestMsg = args[3]; + commandImpl = new ReqRepClient(address, requestMsg, this); + } + else if ("pushpull-ventilator" == command) + { + if (args.size() < 5) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString ventilatorAddress = args[2]; + QString sinkAddress = args[3]; + quint32 numberOfWorkItems = args[4].toUInt(); + commandImpl = new PushPullVentilator(ventilatorAddress, sinkAddress, numberOfWorkItems, this); + } + else if ("pushpull-worker" == command) + { + if (args.size() < 4) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString ventilatorAddress = args[2]; + QString sinkAddress = args[3]; + commandImpl = new PushPullWorker(ventilatorAddress, sinkAddress, this); + } + else if ("pushpull-sink" == command) + { + if (args.size() < 3) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString sinkAddress = args[2]; + commandImpl = new PushPullSink(sinkAddress, this); + } + else + { + throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString()); + } + + // Run command. + commandImpl->run(); + } + catch (std::exception& ex) + { + qWarning() << ex.what(); + exit(-1); + } + } + +protected: + void printUsage(QTextStream& out) + { + QString executable = arguments().at(0); + out << QString( +"\n\ +USAGE: %1 [-h|--help] -- Show this help message.\n\ +\n\ +USAGE: %1
-- Start PUB server.\n\ + %1
-- Start SUB client.\n\ +\n\ +USAGE: %1
-- Start REQ server.\n\ + %1
-- Start REP client.\n\ +\n\ +USAGE: %1 -- Start ventilator.\n\ + %1 -- Start a worker.\n\ + %1 -- Start sink.\n\ +\n\ +Publish-Subscribe Sample:\n\ +* Server: %1 pubsub-server tcp://127.0.0.1:1234 ping\n\ +* Client: %1 pubsub-client tcp://127.0.0.1:1234 ping\n\ +\n\ +Request-Reply Sample:\n\ +* Server: %1 reqrep-server tcp://127.0.0.1:1234 World\n\ +* Client: %1 reqrep-client tcp://127.0.0.1:1234 Hello\n\ +\n\ +Push-Pull Sample:\n\ +* Ventilator: %1 pushpull-ventilator tcp://127.0.0.1:5557 tcp://127.0.0.1:5558 100\n\ +* Worker 1..n: %1 pushpull-worker tcp://127.0.0.1:5557 tcp://127.0.0.1:5558\n\ +* Sink: %1 pushpull-sink tcp://127.0.0.1:5558\n\ +\n").arg(executable); + } +}; + +} + +} + +#endif // NZMQTAPP_H diff --git a/mythtv/external/nzmqt/src/common/Tools.h b/mythtv/external/nzmqt/src/common/Tools.h new file mode 100644 index 00000000000..a7c6992d6f9 --- /dev/null +++ b/mythtv/external/nzmqt/src/common/Tools.h @@ -0,0 +1,58 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef TOOLS_H +#define TOOLS_H + +// For sleep. +#ifdef Q_OS_WIN + #include +#else + #include +#endif + + +namespace nzmqt +{ + +namespace samples +{ + +inline void sleep(int msec) +{ +#ifdef Q_OS_WIN + Sleep(uint(msec)); +#else + struct timespec ts = { msec / 1000, (msec % 1000) * 1000 * 1000 }; + nanosleep(&ts, NULL); +#endif +} + +} + +} + +#endif // TOOLS_H diff --git a/mythtv/external/nzmqt/src/main.cpp b/mythtv/external/nzmqt/src/main.cpp new file mode 100755 index 00000000000..6feae87015a --- /dev/null +++ b/mythtv/external/nzmqt/src/main.cpp @@ -0,0 +1,37 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#include + +#include "NzmqtApp.h" + + +int main(int argc, char *argv[]) +{ + nzmqt::samples::NzmqtApp nzmqtApp(argc, argv); + + return nzmqtApp.exec(); +} diff --git a/mythtv/external/nzmqt/src/nzmqt.pro b/mythtv/external/nzmqt/src/nzmqt.pro new file mode 100755 index 00000000000..f14d0f21476 --- /dev/null +++ b/mythtv/external/nzmqt/src/nzmqt.pro @@ -0,0 +1,45 @@ +#------------------------------------------------- +# +# Project created by QtCreator 2010-10-15T17:00:35 +# +#------------------------------------------------- + +QT += core + +QT -= gui + +TARGET = nzmqt +CONFIG += console +CONFIG -= app_bundle + +TEMPLATE = app + + +SOURCES += \ + main.cpp + +HEADERS += \ + ../include/nzmqt/nzmqt.hpp \ + pubsub/PubSubServer.h \ + pubsub/PubSubClient.h \ + reqrep/ReqRepServer.h \ + reqrep/ReqRepClient.h \ + pushpull/PushPullWorker.h \ + pushpull/PushPullVentilator.h \ + pushpull/PushPullSink.h \ + NzmqtApp.h \ + common/Tools.h + +LIBS += -lzmq + +INCLUDEPATH += \ + ../include \ + /opt/local/include + +QMAKE_LIBDIR += \ + /opt/local/lib + +OTHER_FILES += \ + ../README.markdown \ + ../COPYING \ + ../LICENSE.header diff --git a/mythtv/external/nzmqt/src/pubsub/PubSubClient.h b/mythtv/external/nzmqt/src/pubsub/PubSubClient.h new file mode 100644 index 00000000000..94dfaefba31 --- /dev/null +++ b/mythtv/external/nzmqt/src/pubsub/PubSubClient.h @@ -0,0 +1,85 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef PUBSUBCLIENT_H +#define PUBSUBCLIENT_H + +#include +#include +#include +#include +#include + +#include "nzmqt/nzmqt.hpp" + + +namespace nzmqt +{ + +namespace samples +{ + +class PubSubClient : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit PubSubClient(const QString& address, const QString& topic, QObject *parent) + : super(parent), address_(address), topic_(topic) + { + ZMQContext* context = createDefaultContext(this); + context->start(); + + socket_ = context->createSocket(ZMQSocket::TYP_SUB); + connect(socket_, SIGNAL(messageReceived(const QList&)), SLOT(messageReceived(const QList&))); + } + + void run() + { + socket_->subscribeTo(topic_); + socket_->connectTo(address_); + } + +protected slots: + void messageReceived(const QList& message) + { + qDebug() << "PubSubClient> " << message; + } + +private: + QString address_; + QString topic_; + + ZMQSocket* socket_; +}; + +} + +} + +#endif // PUBSUBCLIENT_H diff --git a/mythtv/external/nzmqt/src/pubsub/PubSubServer.h b/mythtv/external/nzmqt/src/pubsub/PubSubServer.h new file mode 100644 index 00000000000..25f742c43e8 --- /dev/null +++ b/mythtv/external/nzmqt/src/pubsub/PubSubServer.h @@ -0,0 +1,96 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef PUBSUBSERVER_H +#define PUBSUBSERVER_H + +#include +#include +#include +#include +#include +#include +#include + +#include "nzmqt/nzmqt.hpp" + + +namespace nzmqt +{ + +namespace samples +{ + +class PubSubServer : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit PubSubServer(const QString& address, const QString& topic, QObject* parent) + : super(parent), address_(address), topic_(topic) + { + ZMQContext* context = createDefaultContext(this); + context->start(); + + socket_ = context->createSocket(ZMQSocket::TYP_PUB); + } + + void run() + { + socket_->bindTo(address_); + + QTimer* timer = new QTimer(socket_); + timer->setInterval(1000); + connect(timer, SIGNAL(timeout()), SLOT(sendPing())); + timer->start(); + } + +protected slots: + void sendPing() + { + static quint64 counter = 0; + + QList< QByteArray > msg; + msg += topic_.toLocal8Bit(); + msg += QString("MSG[%1: %2]").arg(++counter).arg(QDateTime::currentDateTime().toLocalTime().toString(Qt::ISODate)).toLocal8Bit(); + socket_->sendMessage(msg); + qDebug() << "PubSubServer> " << msg; + } + +private: + QString address_; + QString topic_; + + ZMQSocket* socket_; +}; + +} + +} + +#endif // PUBSUBSERVER_H diff --git a/mythtv/external/nzmqt/src/pushpull/PushPullSink.h b/mythtv/external/nzmqt/src/pushpull/PushPullSink.h new file mode 100644 index 00000000000..efb31404b33 --- /dev/null +++ b/mythtv/external/nzmqt/src/pushpull/PushPullSink.h @@ -0,0 +1,113 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef PUSHPULLSINK_H +#define PUSHPULLSINK_H + +#include +#include +#include +#include +#include +#include +#include + +#include "nzmqt/nzmqt.hpp" + + +namespace nzmqt +{ + +namespace samples +{ + +class PushPullSink : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit PushPullSink(const QString& sinkAddress, QObject *parent) + : super(parent), sinkAddress_(sinkAddress), numberOfWorkItems_(-1) + { + ZMQContext* context = createDefaultContext(this); + context->start(); + + sink_ = context->createSocket(ZMQSocket::TYP_PULL); + connect(sink_, SIGNAL(messageReceived(const QList&)), SLOT(batchEvent(const QList&))); + } + + void run() + { + sink_->bindTo(sinkAddress_); + } + +protected slots: + void batchEvent(const QList& message) + { + if (numberOfWorkItems_ < 0) + { + // 'message' is a batch start message. + numberOfWorkItems_ = message[0].toUInt(); + qDebug() << "Batch started for >" << numberOfWorkItems_ << "< work items."; + stopWatch_.start(); + + if (numberOfWorkItems_) + return; + } + + if (numberOfWorkItems_ > 0) + { + if (numberOfWorkItems_ % 10 == 0) + qDebug() << numberOfWorkItems_; + else + qDebug() << "."; + + --numberOfWorkItems_; + } + + if (!numberOfWorkItems_) + { + int msec = stopWatch_.elapsed(); + qDebug() << "FINISHED all task in " << msec << "msec"; + numberOfWorkItems_ = -1; + } + } + +private: + QString sinkAddress_; + + ZMQSocket* sink_; + int numberOfWorkItems_; + QTime stopWatch_; +}; + +} + +} + +#endif // PUSHPULLSINK_H diff --git a/mythtv/external/nzmqt/src/pushpull/PushPullVentilator.h b/mythtv/external/nzmqt/src/pushpull/PushPullVentilator.h new file mode 100644 index 00000000000..e13ef13cf72 --- /dev/null +++ b/mythtv/external/nzmqt/src/pushpull/PushPullVentilator.h @@ -0,0 +1,118 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef PUSHPULLVENTILATOR_H +#define PUSHPULLVENTILATOR_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "nzmqt/nzmqt.hpp" + + +namespace nzmqt +{ + +namespace samples +{ + +class PushPullVentilator : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit PushPullVentilator(const QString& ventilatorAddress, const QString& sinkAddress, quint32 numberOfWorkItems, QObject* parent) + : super(parent), ventilatorAddress_(ventilatorAddress), sinkAddress_(sinkAddress), numberOfWorkItems_(numberOfWorkItems) + { + ZMQContext* context = createDefaultContext(this); + context->start(); + + ventilator_ = context->createSocket(ZMQSocket::TYP_PUSH); + + sink_ = context->createSocket(ZMQSocket::TYP_PUSH); + } + + void run() + { + ventilator_->bindTo(ventilatorAddress_); + sink_->connectTo(sinkAddress_); + + // Wait for user start. + + QTextStream stream(stdin); + qDebug() << "Available work items:" << numberOfWorkItems_; + qDebug() << "Press ENTER if workers are ready!"; + stream.readLine(); + + // The first message tells the sink how much work it needs to do + // and at the same time signals start of batch. + + sink_->sendMessage(QString::number(numberOfWorkItems_).toLocal8Bit()); + + // Initialize random number generator. + + qsrand(QTime::currentTime().msec()); + + // Send work items. + + int totalCost = 0; // Total expected cost in msecs + + for (quint32 workItem = 0; workItem < numberOfWorkItems_; workItem++) { + // Random workload from 1 to 100msecs + quint32 workload = qrand() % 100 + 1;; + // Update toal cost. + totalCost += workload; + // Push workload. + ventilator_->sendMessage(QString::number(workload).toLocal8Bit()); + } + + qDebug() << "Total expected cost: " << totalCost << " msec"; + QCoreApplication::instance()->quit(); + } + +private: + QString ventilatorAddress_; + QString sinkAddress_; + quint32 numberOfWorkItems_; + + ZMQSocket* ventilator_; + ZMQSocket* sink_; +}; + +} + +} + +#endif // PUSHPULLVENTILATOR_H diff --git a/mythtv/external/nzmqt/src/pushpull/PushPullWorker.h b/mythtv/external/nzmqt/src/pushpull/PushPullWorker.h new file mode 100644 index 00000000000..1d7a584e2cd --- /dev/null +++ b/mythtv/external/nzmqt/src/pushpull/PushPullWorker.h @@ -0,0 +1,98 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef PUSHPULLWORKER_H +#define PUSHPULLWORKER_H + +#include +#include +#include +#include +#include +#include + +#include "nzmqt/nzmqt.hpp" + +#include "common/Tools.h" // For sleep() function. + + +namespace nzmqt +{ + +namespace samples +{ + +class PushPullWorker : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit PushPullWorker(const QString& ventilatorAddress, const QString& sinkAddress, QObject *parent) + : super(parent), ventilatorAddress_(ventilatorAddress), sinkAddress_(sinkAddress) + { + ZMQContext* context = createDefaultContext(this); + context->start(); + + ventilator_ = context->createSocket(ZMQSocket::TYP_PULL); + connect(ventilator_, SIGNAL(messageReceived(const QList&)), SLOT(workAvailable(const QList&))); + + sink_ = context->createSocket(ZMQSocket::TYP_PUSH); + } + + void run() + { + sink_->connectTo(sinkAddress_); + ventilator_->connectTo(ventilatorAddress_); + } + +protected slots: + void workAvailable(const QList& message) + { + quint32 work = QString(message[0]).toUInt(); + + // Do the work ;-) + qDebug() << "snore" << work << "msec"; + sleep(work); + + // Send results to sink. + sink_->sendMessage(""); + } + +private: + QString ventilatorAddress_; + QString sinkAddress_; + + ZMQSocket* ventilator_; + ZMQSocket* sink_; +}; + +} + +} + +#endif // PUSHPULLWORKER_H diff --git a/mythtv/external/nzmqt/src/reqrep/ReqRepClient.h b/mythtv/external/nzmqt/src/reqrep/ReqRepClient.h new file mode 100644 index 00000000000..1ef969876f2 --- /dev/null +++ b/mythtv/external/nzmqt/src/reqrep/ReqRepClient.h @@ -0,0 +1,109 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef REQREPCLIENT_H +#define REQREPCLIENT_H + +#include +#include +#include +#include +#include +#include +#include + + +#include "nzmqt/nzmqt.hpp" + + +namespace nzmqt +{ + +namespace samples +{ + +class ReqRepClient : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit ReqRepClient(const QString& address, const QString& requestMsg, QObject *parent) + : super(parent), address_(address), requestMsg_(requestMsg) + { + ZMQContext* context = createDefaultContext(this); + context->start(); + + socket_ = context->createSocket(ZMQSocket::TYP_REQ); + connect(socket_, SIGNAL(messageReceived(const QList&)), SLOT(replyReceived(const QList&))); + + timer_ = new QTimer(socket_); + timer_->setSingleShot(true); + timer_->setInterval(1000); + connect(timer_, SIGNAL(timeout()), SLOT(sendRequest())); + } + + void run() + { + socket_->connectTo(address_); + + timer_->start(); + } + +protected slots: + void sendRequest() + { + static quint64 counter = 0; + + QList request; + request += QString("REQUEST[%1: %2]").arg(++counter).arg(QDateTime::currentDateTime().toString(Qt::ISODate)).toLocal8Bit(); + request += requestMsg_.toLocal8Bit(); + qDebug() << "ReqRepClient::sendRequest> " << request; + socket_->sendMessage(request); + } + + void replyReceived(const QList& reply) + { + qDebug() << "ReqRepClient::replyReceived> " << reply; + + // Start timer again in order to trigger the next sendRequest() call. + timer_->start(); + } + +private: + QString address_; + QString requestMsg_; + + ZMQSocket* socket_; + QTimer* timer_; +}; + +} + +} + +#endif // REQREPCLIENT_H diff --git a/mythtv/external/nzmqt/src/reqrep/ReqRepServer.h b/mythtv/external/nzmqt/src/reqrep/ReqRepServer.h new file mode 100644 index 00000000000..d938ef653f5 --- /dev/null +++ b/mythtv/external/nzmqt/src/reqrep/ReqRepServer.h @@ -0,0 +1,96 @@ +// Copyright 2011-2012 Johann Duscher. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY JOHANN DUSCHER ''AS IS'' AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are those of the +// authors and should not be interpreted as representing official policies, either expressed +// or implied, of Johann Duscher. + +#ifndef REQREPSERVER_H +#define REQREPSERVER_H + +#include +#include +#include +#include +#include +#include +#include + +#include "nzmqt/nzmqt.hpp" + + +namespace nzmqt +{ + +namespace samples +{ + +class ReqRepServer : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit ReqRepServer(const QString& address, const QString& replyMsg, QObject* parent) + : super(parent), address_(address), replyMsg_(replyMsg) + { + } + + void run() + { + ZMQContext* context = createDefaultContext(this); + context->start(); + + socket_ = context->createSocket(ZMQSocket::TYP_REP); + connect(socket_, SIGNAL(messageReceived(const QList&)), SLOT(requestReceived(const QList&))); + + socket_->bindTo(address_); + } + +protected slots: + void requestReceived(const QList& request) + { + static quint64 counter = 0; + + qDebug() << "ReqRepServer::requestReceived> " << request; + + QList reply; + reply += QString("REPLY[%1: %2]").arg(++counter).arg(QDateTime::currentDateTime().toString(Qt::ISODate)).toLocal8Bit(); + reply += replyMsg_.toLocal8Bit(); + reply += request; // We also append original request. + qDebug() << "ReqRepServer::sendReply> " << reply; + socket_->sendMessage(reply); + } + +private: + QString address_; + QString replyMsg_; + + ZMQSocket* socket_; +}; + +} + +} + +#endif // REQREPSERVER_H