Skip to content

Commit

Permalink
UDP support builds, but it not yet working.
Browse files Browse the repository at this point in the history
  • Loading branch information
Lindley French authored and Lindley French committed Jan 3, 2014
1 parent f5b6bd7 commit c5f576e
Show file tree
Hide file tree
Showing 13 changed files with 794 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Expand Up @@ -348,6 +348,8 @@ set(cxx-sources
tcp_listener.cpp
thread.cpp
trie.cpp
udp_sender.cpp
udp_receiver.cpp
v1_decoder.cpp
v1_encoder.cpp
v2_decoder.cpp
Expand Down Expand Up @@ -587,6 +589,7 @@ set(tests
test_pair_tcp
test_reqrep_inproc
test_reqrep_tcp
test_pubsub_udp
test_hwm
test_reqrep_device
test_sub_forward
Expand Down
16 changes: 16 additions & 0 deletions builds/msvc/libzmq/libzmq.vcproj
Expand Up @@ -612,6 +612,14 @@
RelativePath="..\..\..\src\trie.cpp"
>
</File>
<File
RelativePath="..\..\..\src\udp_sender.cpp"
>
</File>
<File
RelativePath="..\..\..\src\udp_receiver.cpp"
>
</File>
<File
RelativePath="..\..\..\src\v1_decoder.cpp"
>
Expand Down Expand Up @@ -934,6 +942,14 @@
RelativePath="..\..\..\src\trie.hpp"
>
</File>
<File
RelativePath="..\..\..\src\udp_sender.hpp"
>
</File>
<File
RelativePath="..\..\..\src\udp_receiver.hpp"
>
</File>
<File
RelativePath="..\..\..\src\v1_decoder.hpp"
>
Expand Down
4 changes: 4 additions & 0 deletions builds/msvc/libzmq/libzmq.vcxproj
Expand Up @@ -227,6 +227,8 @@
<ClCompile Include="..\..\..\src\tcp_listener.cpp" />
<ClCompile Include="..\..\..\src\thread.cpp" />
<ClCompile Include="..\..\..\src\trie.cpp" />
<ClCompile Include="..\..\..\src\udp_sender.cpp" />
<ClCompile Include="..\..\..\src\udp_receiver.cpp" />
<ClCompile Include="..\..\..\src\v1_decoder.cpp" />
<ClCompile Include="..\..\..\src\v1_encoder.cpp" />
<ClCompile Include="..\..\..\src\v2_decoder.cpp" />
Expand Down Expand Up @@ -310,6 +312,8 @@
<ClInclude Include="..\..\..\src\tcp_listener.hpp" />
<ClInclude Include="..\..\..\src\thread.hpp" />
<ClInclude Include="..\..\..\src\trie.hpp" />
<ClInclude Include="..\..\..\src\udp_sender.hpp" />
<ClInclude Include="..\..\..\src\udp_receiver.hpp" />
<ClInclude Include="..\..\..\src\v1_decoder.hpp" />
<ClInclude Include="..\..\..\src\v1_encoder.hpp" />
<ClInclude Include="..\..\..\src\v1_protocol.hpp" />
Expand Down
4 changes: 4 additions & 0 deletions builds/msvc/libzmq/libzmq11.vcxproj
Expand Up @@ -265,6 +265,8 @@
<ClCompile Include="..\..\..\src\tcp_listener.cpp" />
<ClCompile Include="..\..\..\src\thread.cpp" />
<ClCompile Include="..\..\..\src\trie.cpp" />
<ClCompile Include="..\..\..\src\udp_sender.cpp" />
<ClCompile Include="..\..\..\src\udp_receiver.cpp" />
<ClCompile Include="..\..\..\src\v1_decoder.cpp" />
<ClCompile Include="..\..\..\src\v1_encoder.cpp" />
<ClCompile Include="..\..\..\src\v2_decoder.cpp" />
Expand Down Expand Up @@ -344,6 +346,8 @@
<ClInclude Include="..\..\..\src\tcp_listener.hpp" />
<ClInclude Include="..\..\..\src\thread.hpp" />
<ClInclude Include="..\..\..\src\trie.hpp" />
<ClInclude Include="..\..\..\src\udp_sender.hpp" />
<ClInclude Include="..\..\..\src\udp_receiver.hpp" />
<ClInclude Include="..\..\..\src\v1_decoder.hpp" />
<ClInclude Include="..\..\..\src\v1_encoder.hpp" />
<ClInclude Include="..\..\..\src\v1_protocol.hpp" />
Expand Down
6 changes: 5 additions & 1 deletion src/Makefile.am
Expand Up @@ -171,7 +171,11 @@ libzmq_la_SOURCES = \
tipc_listener.cpp \
tipc_listener.hpp \
tipc_connecter.cpp \
tipc_connecter.hpp
tipc_connecter.hpp \
udp_sender.hpp \
udp_sender.cpp \
udp_receiver.hpp \
udp_receiver.cpp


if ON_MINGW
Expand Down
38 changes: 38 additions & 0 deletions src/session_base.cpp
Expand Up @@ -27,6 +27,8 @@
#include "tipc_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "udp_sender.hpp"
#include "udp_receiver.hpp"
#include "address.hpp"

#include "ctx.hpp"
Expand Down Expand Up @@ -548,6 +550,42 @@ void zmq::session_base_t::start_connecting (bool wait_)
}
#endif

// UDP support.
if (addr->protocol == "udp") {

// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with UDP anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

// UDP sender.
udp_sender_t *udp_sender = new (std::nothrow) udp_sender_t (
io_thread, options);
alloc_assert (udp_sender);

int rc = udp_sender->init (addr->address.c_str ());
zmq_assert (rc == 0);

send_attach (this, udp_sender);
}
else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {

// UDP receiver.
udp_receiver_t *udp_receiver = new (std::nothrow) udp_receiver_t (
io_thread, options);
alloc_assert (udp_receiver);

int rc = udp_receiver->init (addr->address.c_str ());
zmq_assert (rc == 0);

send_attach (this, udp_receiver);
}
else
zmq_assert (false);

return;
}

zmq_assert (false);
}

12 changes: 7 additions & 5 deletions src/socket_base.cpp
Expand Up @@ -186,9 +186,10 @@ int zmq::socket_base_t::parse_uri (const char *uri_,

int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protcol is something we are aware of.
// First check out whether the protocol is something we are aware of.
if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc") {
protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc" &&
protocol_ != "udp") {
errno = EPROTONOSUPPORT;
return -1;
}
Expand Down Expand Up @@ -222,7 +223,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
// Check whether socket type and transport protocol match.
// Specifically, multicast protocols can't be combined with
// bi-directional messaging patterns (socket types).
if ((protocol_ == "pgm" || protocol_ == "epgm") &&
if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "udp") &&
options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
errno = ENOCOMPATPROTO;
Expand Down Expand Up @@ -360,7 +361,7 @@ int zmq::socket_base_t::bind (const char *addr_)
return rc;
}

if (protocol == "pgm" || protocol == "epgm") {
if (protocol == "pgm" || protocol == "epgm" || protocol == "udp") {
// For convenience's sake, bind can be used interchageable with
// connect for PGM and EPGM transports.
return connect (addr_);
Expand Down Expand Up @@ -611,7 +612,8 @@ int zmq::socket_base_t::connect (const char *addr_)

// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe.
bool subscribe_to_all = protocol == "pgm" || protocol == "epgm";
bool subscribe_to_all = protocol == "pgm" || protocol == "epgm"
|| protocol == "udp";
pipe_t *newpipe = NULL;

if (options.immediate != 1 || subscribe_to_all) {
Expand Down

0 comments on commit c5f576e

Please sign in to comment.