Permalink
Browse files

Cleaned up references to zmq_msg_recv/send

  • Loading branch information...
1 parent 4b936eb commit 5545672e03f69bed1234ba5bc313079bcff78c16 @hintjens hintjens committed Dec 23, 2012
View
46 .bookmarks
@@ -1,42 +1,42 @@
-Binary-Star-Implementation Binary Star Implementation
Architecture-of-the-MQ-Community Architecture of the 0MQ Community
+Binary-Star-Implementation Binary Star Implementation
Protocol-Assertions Protocol Assertions
The-ROUTER-to-ROUTER-Combination The ROUTER to ROUTER Combination
Burnout Burnout
Upgrading-from-MQ-to-MQ Upgrading from 0MQ/2.2 to 0MQ/3.2
-Republishing-Updates-from-Clients Republishing Updates from Clients
Vadim-Shalts-s-Story Vadim Shalts's Story
Writing-the-Unprotocol Writing the Unprotocol
-Service-Oriented-Reliable-Queuing-Majordomo-Pattern Service-Oriented Reliable Queuing (Majordomo Pattern)
+Republishing-Updates-from-Clients Republishing Updates from Clients
the-community The ØMQ Community
-ROUTER-Broker-and-DEALER-Workers ROUTER Broker and DEALER Workers
-Goals Goals
+Service-Oriented-Reliable-Queuing-Majordomo-Pattern Service-Oriented Reliable Queuing (Majordomo Pattern)
Preemptive-Discovery-over-Raw-Sockets Preemptive Discovery over Raw Sockets
-Messaging-Patterns Messaging Patterns
-Crazy-Beautiful-and-Easy Crazy, Beautiful, and Easy
+Goals Goals
+ROUTER-Broker-and-DEALER-Workers ROUTER Broker and DEALER Workers
Symbolic-Links Symbolic Links
-Divide-and-Conquer Divide and Conquer
+Crazy-Beautiful-and-Easy Crazy, Beautiful, and Easy
+Messaging-Patterns Messaging Patterns
Step-Draw-a-Rough-Architecture Step 2: Draw a Rough Architecture
+Divide-and-Conquer Divide and Conquer
Group-Messaging Group Messaging
-A-Minor-Note-on-Strings A Minor Note on Strings
Getting-an-Out-of-band-Snapshot Getting an Out-of-band Snapshot
+A-Minor-Note-on-Strings A Minor Note on Strings
The-Flash-Mob The Flash Mob
The-Mystic The Mystic
-ROUTER-Broker-and-REQ-Workers ROUTER Broker and REQ Workers
advanced-architecture Advanced Architecture using ØMQ
+ROUTER-Broker-and-REQ-Workers ROUTER Broker and REQ Workers
Initial-Design-Cut-the-API Initial Design Cut - the API
Making-a-Clean-Exit Making a Clean Exit
-MQ-is-Not-a-Neutral-Carrier 0MQ is Not a Neutral Carrier
Upfront-Coordination Upfront Coordination
Chapter-A-Framework-for-Distributed-Computing Chapter 8 - A Framework for Distributed Computing
-Why-We-Needed-MQ Why We Needed 0MQ
The-Zen-of-Zero The Zen of Zero
+Why-We-Needed-MQ Why We Needed 0MQ
Tom-van-Leeuwen-s-Story Tom van Leeuwen's Story
A-Plausible-Minimal-Implementation A Plausible Minimal Implementation
Using-Sockets-to-Carry-Data Using Sockets to Carry Data
Chapter-Reliable-Request-Reply-Patterns Chapter 4 - Reliable Request-Reply Patterns
-Slow-Subscriber-Detection-Suicidal-Snail-Pattern Slow Subscriber Detection (Suicidal Snail Pattern)
Configuration Configuration
+Slow-Subscriber-Detection-Suicidal-Snail-Pattern Slow Subscriber Detection (Suicidal Snail Pattern)
How-the-Guide-Happened How the Guide Happened
Working-with-Messages Working with Messages
Heartbeating Heartbeating
@@ -68,8 +68,8 @@ Ephemeral-Values Ephemeral Values
Pub-Sub-Message-Envelopes Pub-Sub Message Envelopes
Step-Decide-on-the-Contracts Step 3: Decide on the Contracts
Tracing-Activity Tracing Activity
-Language Language
Simplicity-vs-Complexity Simplicity vs. Complexity
+Language Language
Starting-Assumptions Starting Assumptions
reliable-request-reply Reliable Request-Reply Patterns
Idempotent-Services Idempotent Services
@@ -128,15 +128,15 @@ File-Stability File Stability
Programming-with-MQ Programming with 0MQ
Step-Internalize-the-Semantics Step 1: Internalize the Semantics
The-Constant-Gardener The Constant Gardener
-Robustness-in-Conflict Robustness in Conflict
-Unprotocols Unprotocols
Representing-State-as-Key-Value-Pairs Representing State as Key-Value Pairs
+Unprotocols Unprotocols
+Robustness-in-Conflict Robustness in Conflict
Exploring-ROUTER-Sockets Exploring ROUTER Sockets
How-MQ-Lost-its-Road-map How 0MQ Lost its Road-map
-Chapter-Advanced-Publish-Subscribe-Patterns Chapter 5 - Advanced Publish-Subscribe Patterns
Transport-Bridging Transport Bridging
-The-DEALER-to-REP-Combination The DEALER to REP Combination
+Chapter-Advanced-Publish-Subscribe-Patterns Chapter 5 - Advanced Publish-Subscribe Patterns
Delivery-Notifications Delivery Notifications
+The-DEALER-to-REP-Combination The DEALER to REP Combination
The-Benevolent-Tyrant The Benevolent Tyrant
Cost-of-Failure Cost of Failure
Identities-and-Addresses Identities and Addresses
@@ -263,20 +263,20 @@ Architecture-of-a-Single-Cluster Architecture of a Single Cluster
Audience Audience
Model-Three-Complex-and-Nasty Model Three - Complex and Nasty
Development-Process Development Process
-The-Rolling-Stone The Rolling Stone
Infinite-Property Infinite Property
+The-Rolling-Stone The Rolling Stone
Example-Zyre-Application Example Zyre Application
Postface Postface
Making-a-Detour Making a Detour
-State-Machines State Machines
Complexity-Oriented-Design Complexity-Oriented Design
-More-about-UDP More about UDP
+State-Machines State Machines
Multiple-Nodes-on-One-Device Multiple Nodes on One Device
+More-about-UDP More about UDP
Intermediaries-and-Proxies Intermediaries and Proxies
Reliable-Publish-Subscribe-Clone-Pattern Reliable Publish-Subscribe (Clone Pattern)
-Evolution-of-Public-Contracts Evolution of Public Contracts
The-Process The Process
-Rob-Gagnon-s-Story Rob Gagnon's Story
+Evolution-of-Public-Contracts Evolution of Public Contracts
Compatible-Changes Compatible Changes
-Code-Generation Code Generation
+Rob-Gagnon-s-Story Rob Gagnon's Story
Handling-Interrupt-Signals Handling Interrupt Signals
+Code-Generation Code Generation
View
16 .signatures
@@ -71,8 +71,8 @@ b02005487696f2e75f5fe6ebc2a9d6f34d425425 examples/Erlang/hwclient.es
8d65609f4c5a245f18abdf39ea5f17378ebe7949 examples/Python/clonecli1.py
4a39ab9dcffe45ee068128e8bb2b79071c654c70 images/fig34.txt
19950526a70c9d6d69aa49e26410278a76a2c7b9 examples/Python/mdcliapi2.py
-533912e22da75f4c27e8f865dca8858b804f0346 examples/C++/mtserver.cpp
5e920cc8ab69c567a8e86f514ed330b684ec8803 examples/Java/lpclient.java
+533912e22da75f4c27e8f865dca8858b804f0346 examples/C++/mtserver.cpp
f5255561e6c274c9214ca1e53e4b40d6fff86d47 examples/Java/tripping.java
b9176d1cf9d04301ee470dbe13e3abb4baee3360 examples/Java/version.java
dfe97e2297014f74a9941bbe0d28d6021de0eb51 examples/F#/syncpub.fsx
@@ -127,17 +127,17 @@ c9aa1cea8ad4b225bfaeaae56d2388db2a4bbe13 listings/Lua/listing_52.lua
33eb7e43b88c56411dbb639471fb164e7331d274 examples/Delphi/taskwork.dpr
5d83e50701bd3d32af15cb0739ed8888e721c32c fragments/C/iothreads.c
e0588e9167edf9fb11154ea7364ed1f45b7b8cba examples/C/mdclient2.c
-946d8d8e0dac7224685f0b82024a33573bbf6248 images/fig2.html
b55d6909f384e41673a0ac08c2fc383af38378d3 examples/Haxe/tasksink2.hx
+946d8d8e0dac7224685f0b82024a33573bbf6248 images/fig2.html
6ac10c128697c5696144b44e6b6f2b22365f8fae examples/C/rrworker.c
c2e5818c670b5999a3a1de96b1080932df2ec612 examples/Felix/wuclient.flx
aa7f062cb4b086f3c00fd02a769fc15c41751829 images/fig55.html
944b1f6aec4e513e135d2ea4ef4e4aded05f08f3 examples/Go/wuserver.go
8aa0ee8cdceada1858674ac5f7766ccd1391e0c1 examples/Ruby/psenvsub.rb
89e33e7a0541c59b007ed1cd5a7906c7f29f1e54 images/fig14.html
ddfc2281b409a383f380b8bbd2ef8b63916740e3 examples/Clojure/tasksink2.clj
-6e6502fd6552b22bc7b2ac4b3a5b1e3bf0fbafc5 examples/Python/syncpub.py
757c637e8ce0b9cdedceab28224d7081baef0172 examples/Tcl/rtdealer.tcl
+6e6502fd6552b22bc7b2ac4b3a5b1e3bf0fbafc5 examples/Python/syncpub.py
d50c97ecd4fdfc0c982323734e4f7a3554366b0b fragments/C/endpoint-hashing.c
a3c78d15e03ff754a35ad2d4d9c5129bd6e0781b examples/Erlang/syncsub.es
d4a51d25b984a549dd99076aa80900d071f9c2d1 examples/Ruby/hwclient.rb
@@ -253,8 +253,8 @@ c589caafe8d257fc4958d8a85fd646c90fd0814d examples/Haxe/asyncsrv.hx
f91e9609224e660bb80c78f42ad17038dc3193a7 fragments/C/upgrade-shim.c
db9643aac93bae57d7a76d2b8a3e381d7f07544c examples/Ruby/syncsub.rb
55f68d5fe3f08646069acf6eb0d253d5507371d0 examples/C++/lpserver.cpp
-af40fa855dd2897c3640c00d180db17918389a2a examples/Python/lpclient.py
11864c5a62aa05d8219e546a9601b79496e2235c examples/C/mdworker.c
+af40fa855dd2897c3640c00d180db17918389a2a examples/Python/lpclient.py
e63eff17ab9bb5345c3261b3c3b5cbdc4ab4af69 examples/Tcl/lbbroker.tcl
3dce90835a28094cb8a6f60ebb24f41542ea6449 examples/PHP/spqueue.php
52665bd9b5274d3cde82c0cbc6d6b9c6583b8a32 examples/C#/rrbroker.cs
@@ -1021,18 +1021,18 @@ b20eef22a2eb404f50a16ab964312d02e98e582c examples/Lua/msreader.lua
08786b43b74f2e4215888c7603ca941c50f72761 examples/C++/lpclient.cpp
af3d72a3dd615769775249428cb4326cd626fdf8 fragments/C/zyre-listener.c
304315bf28011c811f657ad49aaaaec595e82837 examples/Lua/wuclient.lua
-5f8efad2a8476391e8c779b30842fcf4ad049aa7 examples/F#/hwclient.fsx
1a8c84cde8bc82946c6ccd643ba4e34e7329949c examples/Tcl/mmiecho.tcl
8baf7592263d466e3b5be2e91d958eb5d0cbc342 examples/Objective-C/hwclient.m
+5f8efad2a8476391e8c779b30842fcf4ad049aa7 examples/F#/hwclient.fsx
513a5190bea8aef397ddc21748abd992a9b82e84 examples/F#/lbbroker.fsx
-a9aaa1bbc4fcc045ca68181f0d6aeb83d6d08520 examples/Haxe/lbbroker2.hx
dbea6f88e76f7b5406b5b616e427a7c844ecc160 fragments/C/kvsetttl.c
+a9aaa1bbc4fcc045ca68181f0d6aeb83d6d08520 examples/Haxe/lbbroker2.hx
ac1156ba2f953b3997550affe245164d50925024 examples/PHP/lbbroker2.php
a58db4e6242dcbd31dcfe8248382a69cb5ebf4dc examples/C/rtdealer.c
3505522b83dc283a63a9ce5654cd9d7071612314 images/fig4.txt
98913d674c40172faadc26833ee49e6b298c507c fragments/C/proxy.c
-b59f86b32ba40e90f104ba6aa9cbb683876b9853 examples/Java/ppworker.java
2f7baf7baed1713272d61725d94f3aa7859e47a1 examples/Go/mspoller.go
+b59f86b32ba40e90f104ba6aa9cbb683876b9853 examples/Java/ppworker.java
7377682230bfb1631480a93cd27cadacaeb4f1e3 examples/Python/psenvpub.py
-cf00f05adee687a45fa3c0bd1d1b90a7843f0070 images/fig63.html
8e4e30e4e224edd0c4a78420a6ddb3c0589e1cfb images/fig26.txt
+cf00f05adee687a45fa3c0bd1d1b90a7843f0070 images/fig63.html
View
2 bin/asciitosvg/ASCIIToSVG.php
@@ -1862,7 +1862,7 @@ private function parseText() {
if (count($boxQueue) > 0) {
$t->setOption('stroke', 'none');
$t->setOption('style',
- "font-family:Consolas,Monaco,Anonymous Pro,Anonymous,Bitstream Sans Mono,monospace;font-size:{$fSize}px");
+ "font-family:LMMono10,monospace;font-size:{$fSize}px");
$boxQueue[count($boxQueue) - 1]->addText($t);
} else {
$this->svgObjects->addObject($t);
View
43 chapter1.txt
@@ -67,7 +67,7 @@ Hello | | World
#------------#
[[/code]]
-The REQ-REP socket pair is in lockstep. The client issues {{zmq_msg_send[3]}} and then {{zmq_msg_recv[3]}}, in a loop (or once if that's all it needs). Doing any other sequence (e.g. sending two messages in a row) will result in a return code of -1 from the {{send}} or {{recv}} call. Similarly, the service issues {{zmq_msg_recv[3]}} and then {{zmq_msg_send[3]}} in that order, as often as it needs to.
+The REQ-REP socket pair is in lockstep. The client issues {{zmq_send[3]}} and then {{zmq_recv[3]}}, in a loop (or once if that's all it needs). Doing any other sequence (e.g. sending two messages in a row) will result in a return code of -1 from the {{send}} or {{recv}} call. Similarly, the service issues {{zmq_recv[3]}} and then {{zmq_send[3]}} in that order, as often as it needs to.
0MQ uses C as its reference language and this is the main language we'll use for examples. If you're reading this on-line, the link below the example takes you to translations into other programming languages. Let's compare the same server in C++:
@@ -99,7 +99,7 @@ There is a lot happening behind the scenes but what matters to us programmers is
In C and some other languages, strings are terminated with a null byte. We could send a string like "HELLO" with that extra null byte:
[[code language="C"]]
-zmq_msg_init_data (&request, "Hello", 6, NULL, NULL);
+zmq_send (requester, "Hello", 6, 0);
[[/code]]
However if you send a string from another language it probably will not include that null byte. For example, when we send that same string in Python, we do this:
@@ -126,22 +126,21 @@ Here is what we need to do, in C, to receive a 0MQ string and deliver it to the
[[code language="C"]]
// Receive 0MQ string from socket and convert into C string
+// Chops string at 255 chars, if it's longer
static char *
s_recv (void *socket) {
- zmq_msg_t message;
- zmq_msg_init (&message);
- int size = zmq_msg_recv (&message, socket, 0);
+ char buffer [256];
+ int size = zmq_recv (socket, buffer, 255, 0);
if (size == -1)
return NULL;
- char *string = malloc (size + 1);
- memcpy (string, zmq_msg_data (&message), size);
- zmq_msg_close (&message);
- string [size] = 0;
- return (string);
+ if (size > 255)
+ size = 255;
+ buffer [size] = 0;
+ return strdup (buffer);
}
[[/code]]
-This makes a very handy helper function and in the spirit of making things we can reuse profitably, let's write a similar 's_send' function that sends strings in the correct 0MQ format, and package this into a header file we can reuse.
+This makes a handy helper function and in the spirit of making things we can reuse profitably, let's write a similar 's_send' function that sends strings in the correct 0MQ format, and package this into a header file we can reuse.
The result is {{zhelpers.h}}, which lets us write sweeter and shorter 0MQ applications in C. It is a fairly long source, and only fun for C developers, so [https://github.com/imatix/zguide/blob/master/examples/C/zhelpers.h read it at leisure].
@@ -195,7 +194,7 @@ Here is client application, which listens to the stream of updates and grabs any
Note that when you use a SUB socket you **must** set a subscription using {{zmq_setsockopt[3]}} and SUBSCRIBE, as in this code. If you don't set any subscription, you won't get any messages. It's a common mistake for beginners. The subscriber can set many subscriptions, which are added together. That is, if a update matches ANY subscription, the subscriber receives it. The subscriber can also cancel specific subscriptions. A subscription is often but not necessarily a printable string. See {{zmq_setsockopt[3]}} for how this works.
-The PUB-SUB socket pair is asynchronous. The client does {{zmq_msg_recv[3]}}, in a loop (or once if that's all it needs). Trying to send a message to a SUB socket will cause an error. Similarly the service does {{zmq_msg_send[3]}} as often as it needs to, but must not do {{zmq_msg_recv[3]}} on a PUB socket.
+The PUB-SUB socket pair is asynchronous. The client does {{zmq_recv[3]}}, in a loop (or once if that's all it needs). Trying to send a message to a SUB socket will cause an error. Similarly the service does {{zmq_send[3]}} as often as it needs to, but must not do {{zmq_recv[3]}} on a PUB socket.
In theory with 0MQ sockets, it does not matter which end connects, and which end binds. However in practice there are undocumented differences that I'll come to later. For now, bind the PUB and connect the SUB, unless your network design makes that impossible.
@@ -496,13 +495,13 @@ Specifically:
* It delivers whole messages exactly as they were sent, using a simple framing on the wire. If you write a 10k message, you will receive a 10k message.
-* It does not impose any format on messages. They are blobs of zero to gigabytes large. When you want to represent data you choose some other product on top, such as Google's protocol buffers, XDR, and others.
+* It does not impose any format on messages. They are blobs of zero to gigabytes large. When you want to represent data you choose some other product on top, such as msgpack, Google's protocol buffers, and others.
-* It handles network errors intelligently. Sometimes it retries, sometimes it tells you an operation failed.
+* It handles network errors intelligently, by retrying automatically in cases where it makes sense.
* It reduces your carbon footprint. Doing more with less CPU means your boxes use less power, and you can keep your old boxes in use for longer. Al Gore would love 0MQ.
-Actually 0MQ does rather more than this. It has a subversive effect on how you develop network-capable applications. Superficially it's a socket-inspired API on which you do {{zmq_msg_recv[3]}} and {{zmq_msg_send[3]}}. But message processing rapidly becomes the central loop, and your application soon breaks down into a set of message processing tasks. It is elegant and natural. And it scales: each of these tasks maps to a node, and the nodes talk to each other across arbitrary transports. Two nodes in one process (node is a thread), two nodes on one box (node is a process), or two boxes on one network (node is a box) - it's all the same, with no application code changes.
+Actually 0MQ does rather more than this. It has a subversive effect on how you develop network-capable applications. Superficially it's a socket-inspired API on which you do {{zmq_recv[3]}} and {{zmq_send[3]}}. But message processing rapidly becomes the central loop, and your application soon breaks down into a set of message processing tasks. It is elegant and natural. And it scales: each of these tasks maps to a node, and the nodes talk to each other across arbitrary transports. Two nodes in one process (node is a thread), two nodes on one box (node is a process), or two boxes on one network (node is a box) - it's all the same, with no application code changes.
++ Socket Scalability
@@ -520,13 +519,13 @@ wuclient 56789 &
As the clients run, we take a look at the active processes using 'top', and we see something like (on a 4-core box):
[[code]]
- PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
- 7136 ph 20 0 1040m 959m 1156 R 157 12.0 16:25.47 wuserver
- 7966 ph 20 0 98608 1804 1372 S 33 0.0 0:03.94 wuclient
- 7963 ph 20 0 33116 1748 1372 S 14 0.0 0:00.76 wuclient
- 7965 ph 20 0 33116 1784 1372 S 6 0.0 0:00.47 wuclient
- 7964 ph 20 0 33116 1788 1372 S 5 0.0 0:00.25 wuclient
- 7967 ph 20 0 33072 1740 1372 S 5 0.0 0:00.35 wuclient
+ PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
+7136 ph 20 0 1040m 959m 1156 R 157 12.0 16:25.47 wuserver
+7966 ph 20 0 98608 1804 1372 S 33 0.0 0:03.94 wuclient
+7963 ph 20 0 33116 1748 1372 S 14 0.0 0:00.76 wuclient
+7965 ph 20 0 33116 1784 1372 S 6 0.0 0:00.47 wuclient
+7964 ph 20 0 33116 1788 1372 S 5 0.0 0:00.25 wuclient
+7967 ph 20 0 33072 1740 1372 S 5 0.0 0:00.35 wuclient
[[/code]]
Let's think for a second about what is happening here. The weather server has a single socket, and yet here we have it sending data to five clients in parallel. We could have thousands of concurrent clients. The server application doesn't see them, doesn't talk to them directly. So the 0MQ socket is acting like a little server, silently accepting client requests and shoving data out to them as fast as the network can handle it. And it's a multithreaded server, squeezing more juice out of your CPU.
View
112 chapter2.txt
@@ -37,9 +37,9 @@ Like a favorite dish, 0MQ sockets are easy to digest. Sockets have a life in fou
* Plugging sockets onto the network topology by creating 0MQ connections to and from them (see {{zmq_bind[3]}}, {{zmq_connect[3]}}).
-* Using the sockets to carry data by writing and receiving messages on them (see {{zmq_msg_send[3]}}, {{zmq_msg_recv[3]}}).
+* Using the sockets to carry data by writing and receiving messages on them (see {{zmq_send[3]}}, {{zmq_recv[3]}}).
-Note that sockets are always void pointers, and messages (which we'll come to very soon) are structures. So in C you pass sockets as-such, but you pass addresses of messages in all functions that work with messages, like {{zmq_msg_send[3]}} and {{zmq_msg_recv[3]}}. As a mnemonic, realize that "in 0MQ all your sockets are belong to us", but messages are things you actually own in your code.
+Note that sockets are always void pointers, and messages (which we'll come to very soon) are structures. So in C you pass sockets as-such, but you pass addresses of messages in all functions that work with messages, like {{zmq_send[3]}} and {{zmq_recv[3]}}. As a mnemonic, realize that "in 0MQ all your sockets are belong to us", but messages are things you actually own in your code.
Creating, destroying, and configuring sockets works as you'd expect for any object. But remember that 0MQ is an asynchronous, elastic fabric. This has some impact on how we plug sockets into the network topology, and how we use the sockets after that.
@@ -79,9 +79,9 @@ Sockets have types. The socket type defines the semantics of the socket, its pol
It's the ability to connect sockets in these different ways that gives 0MQ its basic power as a message queuing system. There are layers on top of this, such as proxies, which we'll get to later. But essentially, with 0MQ you define your network architecture by plugging pieces together like a child's construction toy.
-+++ Using Sockets to Carry Data
++++ Sending and Receiving Messages
-To send and receive messages you use the {{zmq_msg_send[3]}} and {{zmq_msg_recv[3]}} methods. The names are conventional but 0MQ's I/O model is different enough from the TCP model[figure] that you will need time to get your head around it.
+0MQ's I/O model is different enough from the TCP model[figure] that you will need time to get your head around it.
[[code type="textdiagram" title="TCP sockets are 1 to 1"]]
#------------#
@@ -109,7 +109,7 @@ Let's look at the main differences between TCP sockets and 0MQ sockets when it c
* 0MQ sockets have one-to-N routing behavior built-in, according to the socket type.
-The {{zmq_msg_send[3]}} method does not actually send the message to the socket connection(s). It queues the message so that the I/O thread can send it asynchronously. It does not block except in some exception cases. So the message is not necessarily sent when {{zmq_msg_send[3]}} returns to your application. If you created a message using {{zmq_msg_init_data[3]}} you cannot reuse the data or free it, otherwise the I/O thread will rapidly find itself writing overwritten or unallocated garbage. This is a common mistake for beginners. We'll see a little later how to properly work with messages.
+The {{zmq_send[3]}} method does not actually send the message to the socket connection(s). It queues the message so that the I/O thread can send it asynchronously. It does not block except in some exception cases. So the message is not necessarily sent when {{zmq_send[3]}} returns to your application.
+++ Unicast Transports
@@ -127,14 +127,16 @@ A common question that newcomers to 0MQ ask (it's one I asked myself) is, "how d
The answer used to be "this is not how it works". 0MQ is not a neutral carrier, it imposes a framing on the transport protocols it uses. This framing is not compatible with existing protocols, which tend to use their own framing. For example, compare an HTTP request[figure], and a 0MQ request[figure], both over TCP/IP.
-The HTTP request uses CR-LF as its simplest framing delimiter, whereas 0MQ uses a length-specified frame.
+The HTTP request uses CR-LF as its simplest framing delimiter:
[[code type="textdiagram" title="HTTP On the Wire"]]
#----------------+----+----+----+----#
| GET /index.html| 13 | 10 | 13 | 10 |
#----------------+----+----+----+----#
[[/code]]
+Whereas 0MQ uses a length-specified frame:
+
[[code type="textdiagram" title="0MQ On the Wire"]]
#---+---+---+---+---+---#
| 5 | H | E | L | L | O |
@@ -204,7 +206,16 @@ One of the things we aim to provide you with this guide are a set of such high-l
+++ Working with Messages
-On the wire, 0MQ messages are blobs of any size from zero upwards, fitting in memory. You do your own serialization using protobufs, msgpack, JSON, or whatever else your applications need to speak. It's wise to choose a data representation that is portable and fast, but you can make your own decisions about trade-offs.
+The libzmq core library has in fact two APIs to send and receive messages. The {{zmq_send[3]}} and {{zmq_recv[3]}} methods that we've already seen and used are simple one-liners. We will use these often, but {{zmq_recv[3]}} is bad at dealing with arbitrary message sizes: it truncates messages to whatever buffer size you provide. So there's a second API that works with zmq_msg_t structures, with a richer API:
+
+* Initialise a message: {{zmq_msg_init[3]}}, {{zmq_msg_init_size[3]}}, {{zmq_msg_init_data[3]}}.
+* Sending and receiving a message: {{zmq_msg_send[3]}}, {{zmq_msg_recv[3]}}.
+* Release a message: {{zmq_msg_close[3]}}.
+* Access message content: {{zmq_msg_data[3]}}, {{zmq_msg_size[3]}}, {{zmq_msg_more[3]}}.
+* Work with message properties: {{zmq_msg_get[3]}}, {{zmq_msg_set[3]}}.
+* Message manipulation: {{zmq_msg_copy[3]}}, {{zmq_msg_move[3]}}.
+
+On the wire, 0MQ messages are blobs of any size from zero upwards, fitting in memory. You do your own serialization using protocol buffers, msgpack, JSON, or whatever else your applications need to speak. It's wise to choose a data representation that is portable, but you can make your own decisions about trade-offs.
In memory, 0MQ messages are {{zmq_msg_t}} structures (or classes depending on your language). Here are the basic ground rules for using 0MQ messages in C:
@@ -220,40 +231,9 @@ In memory, 0MQ messages are {{zmq_msg_t}} structures (or classes depending on yo
* Do not use {{zmq_msg_move[3]}}, {{zmq_msg_copy[3]}}, or {{zmq_msg_init_data[3]}} unless you read the man pages and know precisely why you need these.
-Here is a typical chunk of code working with messages, which should be familiar if you have been paying attention. This is from the {{zhelpers.h}} file we use in all the examples:
-
-[[code language="C"]]
-// Receive 0MQ string from socket and convert into C string
-static char *
-s_recv (void *socket) {
- zmq_msg_t message;
- zmq_msg_init (&message);
- int size = zmq_msg_recv (&message, socket, 0);
- if (size == -1)
- return NULL;
- char *string = malloc (size + 1);
- memcpy (string, zmq_msg_data (&message), size);
- zmq_msg_close (&message);
- string [size] = 0;
- return (string);
-}
-
-// Convert C string to 0MQ string and send to socket
-static int
-s_send (void *socket, char *string) {
- zmq_msg_t message;
- zmq_msg_init_size (&message, strlen (string));
- memcpy (zmq_msg_data (&message), string, strlen (string));
- int size = zmq_msg_send (&message, socket, 0);
- zmq_msg_close (&message);
- return (size);
-}
-[[/code]]
-
-You can easily extend this code to send and receive blobs of arbitrary length.
+* After you pass a message to {{zmq_msg_send[3]}}, ØMQ will clear the message, i.e., set the size to zero. You cannot send the same message twice, and you cannot access the message data after sending it.
-NOTE:
-After you pass a message to {{zmq_msg_send[3]}}, ØMQ will clear the message, i.e., set the size to zero. You cannot send the same message twice, and you cannot access the message data after sending it.
+* These rules don't apply if you use {{zmq_send[3]}} and {{zmq_recv[3]}}, which work with byte arrays, not message structures.
If you want to send the same message more than once, create a second message, initialize it using {{zmq_msg_init[3]}} and then use {{zmq_msg_copy[3]}} to create a copy of the first message. This does not copy the data but the reference. You can then send the message twice (or more, if you create more copies) and the message will only be finally destroyed when the last copy is sent or closed.
@@ -263,7 +243,7 @@ Frames (also called "message parts" in the 0MQ reference manual pages) are the b
There is a wire-level [http://rfc.zeromq.org/spec:15 protocol called ZMTP] that defines how 0MQ reads and writes frames on a TCP connection. If you're interested in how this works, the spec is quite short, just a few pages.
-Originally, a 0MQ message was one frame, like UDP. We later extended this with "multipart" messages, which are quite simply series of frames with a "more" bit set to one, followed by one with that bit set to zero. The 0MQ API then lets you write messages with a "more" flag, and when you read messages, lets you check if there's "more".
+Originally, a 0MQ message was one frame, like UDP. We later extended this with "multi-part" messages, which are quite simply series of frames with a "more" bit set to one, followed by one with that bit set to zero. The 0MQ API then lets you write messages with a "more" flag, and when you read messages, lets you check if there's "more".
In the low-level 0MQ API and the reference manual, therefore, there's some fuzziness about messages vs. frames. So here's a useful lexicon:
@@ -281,11 +261,13 @@ Some other things that are worth knowing about messages:
* 0MQ does not send the message (single, or multi-part) right away but at some indeterminate later time. A multi-part message must therefore fit in memory.
-* A message (single, or multi-part) must fit in memory. If you want to send files of arbitrary sizes, you should break them into pieces and send each piece as separate single-part messages.
+* A message (single, or multi-part) must fit in memory. If you want to send files of arbitrary sizes, you should break them into pieces and send each piece as separate single-part messages. //Using multi-part data will not reduce memory consumption.//
* You must call {{zmq_msg_close[3]}} when finished with a message, in languages that don't automatically destroy objects when a scope closes.
-And to be necessarily repetitive, do not use {{zmq_msg_init_data[3]}}, yet. This is a zero-copy method and guaranteed to create trouble for you. There are far more important things to learn about 0MQ before you start to worry about shaving off microseconds.
+And to be repetitive, do not use {{zmq_msg_init_data[3]}} yet. This is a zero-copy method and guaranteed to create trouble for you. There are far more important things to learn about 0MQ before you start to worry about shaving off microseconds.
+
+This rich API can be tiresome to work with. The methods are optimized for performance, not simplicity. If you start using these you will almost definitely get them wrong until you've read the man pages with some care. So one of the main jobs of a good language binding is to wrap this API up in classes that are easier to use.
+++ Handling Multiple Sockets
@@ -350,11 +332,9 @@ while (1) {
zmq_msg_init (&message);
zmq_msg_recv (socket, &message, 0);
// Process the message frame
+ ...
zmq_msg_close (&message);
- int more;
- size_t more_size = sizeof (more);
- zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
- if (!more)
+ if (!zmq_msg_more (&message))
break; // Last message frame
}
[[/code]]
@@ -365,7 +345,7 @@ Some things to know about multi-part messages:
* If you are using {{zmq_poll[3]}}, when you receive the first part of a message, all the rest has also arrived.
* You will receive all parts of a message, or none at all.
* Each part of a message is a separate {{zmq_msg}} item.
-* You will receive all parts of a message whether or not you check the RCVMORE option.
+* You will receive all parts of a message whether or not you check the more property.
* On sending, 0MQ queues message frames in memory until the last is received, then sends them all.
* There is no way to cancel a partially sent message, except by closing the socket.
@@ -634,7 +614,7 @@ It looks very similar to the earlier proxy example but the key part is that the
#------------# #------------# +------------+
| XPUB |
Internal network '-----+------'
- ---------------------------------------|---------------
+ -------------------------------------- | --------------
External network bind
tcp://10.1.1.0:8100
|
@@ -661,13 +641,9 @@ When 0MQ detects an external fault it returns an error to the calling code. In s
In most of the C examples we've seen so far there's been no error handling. **Real code should do error handling on every single 0MQ call**. If you're using a language binding other than C, the binding may handle errors for you. In C you do need to do this yourself. There are some simple rules, starting with POSIX conventions:
* Methods that create objects return NULL if they fail.
-
* Methods that process data may return the number of bytes processed, or -1 on an error or failure.
-
* Other methods return 0 on success and -1 on an error or failure.
-
* The error code is provided in {{errno}} or {{zmq_errno[3]}}.
-
* A descriptive error text for logging is provided by {{zmq_strerror[3]}}.
For example:
@@ -678,17 +654,17 @@ assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:5555");
-if (rc != 0) {
+if (rc == -1) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;
}
[[/code]]
-There are two main exceptional conditions that you may want to handle as non-fatal:
+There are two main exceptional conditions that you should handle as non-fatal:
-* When a thread calls {{zmq_msg_recv[3]}} with the {{ZMQ_DONTWAIT}} option and there is no waiting data. 0MQ will return -1 and set {{errno}} to {{EAGAIN}}.
+* When your code receives a message with the {{ZMQ_DONTWAIT}} option and there is no waiting data. 0MQ will return -1 and set {{errno}} to {{EAGAIN}}.
-* When a thread calls {{zmq_ctx_destroy[3]}} and other threads are doing blocking work. The {{zmq_ctx_destroy[3]}} call closes the context and all blocking calls exit with -1, and errno set to {{ETERM}}.
+* When one thread calls {{zmq_ctx_destroy[3]}}, and other threads are still doing blocking work. The {{zmq_ctx_destroy[3]}} call closes the context and all blocking calls exit with -1, and errno set to {{ETERM}}.
In C/C++, asserts can be removed entirely in optimized code, so don't make the mistake of wrapping the whole 0MQ call in an assert(). It looks neat, then the optimizer removes all the asserts and the calls you want to make, and your application breaks in impressive ways.
@@ -708,9 +684,7 @@ void *control = zmq_socket (context, ZMQ_PUB);
zmq_bind (control, "tcp://*:5559");
...
// Send kill signal to workers
-zmq_msg_init_data (&message, "KILL", 5);
-zmq_msg_send (control, &message, 0);
-zmq_msg_close (&message);
+s_send (controller, "KILL");
[[/code]]
[[code type="textdiagram" title="Parallel Pipeline with Kill Signaling"]]
@@ -724,7 +698,7 @@ zmq_msg_close (&message);
.---------------+---------------.
| | |
| .--=------|-----+=--------|-----+---=--.
- task | task | task | |
+ task : task : task : |
| | | | | | |
v v v v v v |
.------+-----. .------+-----. .------+-----. |
@@ -772,7 +746,7 @@ Here is how we handle a signal in various languages:
The program provides {{s_catch_signals()}}, which traps Ctrl-C ({{SIGINT}}) and {{SIGTERM}}. When either of these signals arrive, the {{s_catch_signals()}} handler sets the global variable {{s_interrupted}}. Thanks to your signal handler, your application will not die automatically. Instead, you have a chance to clean up and exit gracefully. You have to now explicitly check for an interrupt, and handle it properly. Do this by calling {{s_catch_signals()}} (copy this from {{interrupt.c}}) at the start of your main code. This sets-up the signal handling. The interrupt will affect 0MQ calls as follows:
-* If your code is blocking in {{zmq_msg_recv[3]}}, {{zmq_poll[3]}}, or {{zmq_msg_send[3]}}, when a signal arrives, the call will return with {{EINTR}}.
+* If your code is blocking in a blocking call (sending a message, receiving a message, or polling), then when a signal arrives, the call will return with {{EINTR}}.
* Wrappers like {{s_recv()}} return NULL if they are interrupted.
So check for an {{EINTR}} return code, a NULL return, and/or {{s_interrupted}}.
@@ -796,7 +770,7 @@ If you call {{s_catch_signals()}} and don't test for interrupts, the your applic
Any long-running application has to manage memory correctly, or eventually it'll use up all available memory and crash. If you use a language that handles this automatically for you, congratulations. If you program in C or C++ or any other language where you're responsible for memory management, here's a short tutorial on using valgrind, which among other things will report on any leaks your programs have.
-* To install valgrind, e.g. on Ubuntu or Debian issue:
+* To install valgrind, e.g. on Ubuntu or Debian, issue this command:
[[code]]
sudo apt-get install valgrind
@@ -1061,7 +1035,9 @@ A more robust model could be:
++ Zero Copy
-0MQ's message API lets you can send and receive messages directly from and to application buffers without copying data. We call "zero-copy", and it can improve performance in some applications. Like all optimizations, use this when you know it helps, and measure before and after. Zero-copy makes your code more complex.
+0MQ's message API lets you can send and receive messages directly from and to application buffers without copying data. We call "zero-copy", and it can improve performance in some applications.
+
+You should think about using zero-copy in the specific case where you are sending large blocks of memory (thousands of bytes), at a high frequency. For short messages, or for lower message rates, using zero-copy will make your code message and complex with no measurable benefit. Like all optimizations, use this when you know it helps, and //measure// before and after.
To do zero-copy you use {{zmq_msg_init_data[3]}} to create a message that refers to a block of data already allocated on the heap with {{malloc()}}, and then you pass that to {{zmq_msg_send[3]}}. When you create the message you also pass a function that 0MQ will call to free the block of data, when it has finished sending the message. This is the simplest example, assuming 'buffer' is a block of 1000 bytes allocated on the heap:
@@ -1075,6 +1051,8 @@ zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_msg_send (socket, &message, 0);
[[/code]]
+Note that you don't call {{zmq_msg_close[3]}} after sending a message - libzmq will do this automatically when it's actually done sending the message.
+
There is no way to do zero-copy on receive: 0MQ delivers you a buffer that you can store as long as you wish but it will not write data directly into application buffers.
On writing, 0MQ's multi-part messages work nicely together with zero-copy. In traditional messaging you need to marshal different buffers together into one buffer that you can send. That means copying data. With 0MQ, you can send multiple buffers coming from different sources as individual message frames. Send each field as a length-delimited frame. To the application it looks like a series of send and recv calls. But internally the multiple parts get written to the network and read back with single system calls, so it's very efficient.
@@ -1168,9 +1146,9 @@ As you build applications with 0MQ you will come across this problem more than o
| messages in a +---' | Do you start | | Start all SUB |
| SUB socket? | Yes | the SUB socket +------->| sockets first |
'--------+-------' | after the PUB? | Yes | then the PUB |
- | No '--------+-------' | sockets to |
- | | No | avoid loss |
- v v '---------------'
+ | No '--------+-------' '---------------'
+ | | No
+ v v
.---------------. .-----------------.
| Are you using | Yes | See explanation | .------------------.
| REQ and REP +---. | of slow joiners | | Send and recv in |
View
59 chapter3.txt
@@ -469,8 +469,7 @@ We're going to push request-reply onto the stack and open a different area, whic
[[code type="fragment" name="lbreader"]]
while (true) {
- // Read and save all frames until we get an empty frame
- // In this example there is only 1 but it could be more
+ // Get one address frame and empty delimiter
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
@@ -488,46 +487,40 @@ while (true) {
}
[[/code]]
-That code isn't even reusable, because it can only handle one reply address in the envelope. And it already does some wrapping around the 0MQ API. If we used the libzmq API directly this is what we'd have to write:
+That code isn't even reusable, because it can only handle one reply address in the envelope. And it already does some wrapping around the 0MQ API. If we used the libzmq simple message API this is what we'd have to write:
[[code type="fragment" name="lowreader"]]
while (true) {
- // Read and save all frames until we get an empty frame
- // In this example there is only 1 but it could be more
- zmq_msg_t address;
- zmq_msg_init (&address);
- zmq_msg_recv (worker, &address, 0);
-
- zmq_msg_t empty;
- zmq_msg_init (&empty);
- zmq_msg_recv (worker, &empty, 0);
+ // Get one address frame and empty delimiter
+ char address [255];
+ int address_size = zmq_recv (worker, address, 255, 0);
+ if (address_size == -1)
+ break;
+
+ char empty [1];
+ int empty_size = zmq_recv (worker, empty, 1, 0);
+ zmq_recv (worker, &empty, 0);
+ assert (empty_size <= 0);
+ if (empty_size == -1)
+ break;
// Get request, send reply
- zmq_msg_t payload;
- zmq_msg_init (&payload);
- zmq_msg_recv (worker, &payload, 0);
-
- int char_nbr;
- printf ("Worker: ");
- for (char_nbr = 0; char_nbr < zmq_msg_size (&payload); char_nbr++)
- printf ("%c", *(char *) (zmq_msg_data (&payload) + char_nbr));
- printf ("\n");
-
- zmq_msg_init_size (&payload, 2);
- memcpy (zmq_msg_data (&payload), "OK", 2);
-
- zmq_msg_send (worker, &address, ZMQ_SNDMORE);
- zmq_close (&address);
- zmq_msg_send (worker, &empty, ZMQ_SNDMORE);
- zmq_close (&empty);
- zmq_msg_send (worker, &payload, 0);
- zmq_close (&payload);
+ char request [256];
+ int request_size = zmq_recv (worker, request, 255, 0);
+ if (request_size == -1)
+ return NULL;
+ request [request_size] = 0;
+ printf ("Worker: %s\n", request);
+
+ zmq_send (worker, address, address_size, ZMQ_SNDMORE);
+ zmq_send (worker, empty, 0, ZMQ_SNDMORE);
+ zmq_send (worker, "OK", 2, 0);
}
[[/code]]
And when code is too long to write quickly, it's also too long to understand. Up to now, I've stuck to the native API because as 0MQ users we need to know that intimately. But when it gets in our way, we have to treat it as a problem to solve.
-I'm not proposing changing the 0MQ API, which is a documented public contract that thousands of people have agreed to and depend on. What I'm proposing is to construct a higher-level API on top, based on our experience so far, and most specifically, our experience from writing more complex request-reply patterns.
+We can't of course just change the 0MQ API, which is a documented public contract that thousands of people have agreed to and depend on. Instead, we construct a higher-level API on top, based on our experience so far, and most specifically, our experience from writing more complex request-reply patterns.
What we want is an API that lets us receive and send an entire message in one shot, including the reply envelope with any number of reply addresses. One that lets us do what we want with the absolute least lines of code.
@@ -553,6 +546,8 @@ Cutting the amount of code we need to read and write complex messages is great:
* //Portable thread management.// Every non-trivial 0MQ application uses threads, but POSIX threads aren't portable. So a decent high-level API should hide this under a portable layer.
+* //Piping from parent to child threads.// It's a recurrent problem: how to signal between parent and child threads. Our API should provide a 0MQ message pipe (using PAIR sockets and {{inproc}} automatically.
+
* //Portable clocks.// Even getting the time to a millisecond resolution, or sleeping for some milliseconds, is not portable. Realistic 0MQ applications need portable clocks, so our API should provide them.
* //A reactor to replace {{zmq_poll[3]}}.// The poll loop is simple but clumsy. Writing a lot of these, we end up doing the same work over and over: calculating timers, and calling code when sockets are ready. A simple reactor with socket readers, and timers, would save a lot of repeated work.
View
4 examples/C/hwserver.c
@@ -20,9 +20,9 @@ int main (void)
while (1) {
char buffer [10];
- rc = zmq_recv (requester, buffer, 10, 0);
+ rc = zmq_recv (responder, buffer, 10, 0);
printf ("Received Hello\n");
- rc = zmq_send (requester, "World", 5, 0);
+ rc = zmq_send (responder, "World", 5, 0);
sleep (1); // Do some 'work'
}
return 0;
View
14 examples/C/interrupt.c
@@ -5,12 +5,12 @@
#include <stdio.h>
#include <signal.h>
-// ---------------------------------------------------------------------
+// ---------------------------------------------------------------
// Signal handling
//
-// Call s_catch_signals() in your application at startup, and then exit
-// your main loop if s_interrupted is ever 1. Works especially well with
-// zmq_poll.
+// Call s_catch_signals() in your application at startup, and then
+// exit your main loop if s_interrupted is ever 1. Works especially
+// well with zmq_poll.
static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
@@ -37,10 +37,8 @@ int main (void)
s_catch_signals ();
while (1) {
// Blocking read will exit on a signal
- zmq_msg_t message;
- zmq_msg_init (&message);
- zmq_msg_recv (&message, socket, 0);
-
+ char buffer [255];
+ zmq_recv (socket, buffer, 255, 0);
if (s_interrupted) {
printf ("W: interrupt received, killing server...\n");
break;
View
32 examples/C/mspoller.c
@@ -6,9 +6,8 @@
int main (void)
{
- void *context = zmq_ctx_new ();
-
// Connect to task ventilator
+ void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
@@ -17,30 +16,27 @@ int main (void)
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
- // Initialize poll set
- zmq_pollitem_t items [] = {
- { receiver, 0, ZMQ_POLLIN, 0 },
- { subscriber, 0, ZMQ_POLLIN, 0 }
- };
// Process messages from both sockets
while (1) {
- zmq_msg_t message;
+ char msg [256];
+ zmq_pollitem_t items [] = {
+ { receiver, 0, ZMQ_POLLIN, 0 },
+ { subscriber, 0, ZMQ_POLLIN, 0 }
+ };
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
- zmq_msg_init (&message);
- zmq_msg_recv (&message, receiver, 0);
- // Process task
- zmq_msg_close (&message);
+ int size = zmq_recv (receiver, msg, 255, 0);
+ if (size != -1) {
+ // Process task
+ }
}
if (items [1].revents & ZMQ_POLLIN) {
- zmq_msg_init (&message);
- zmq_msg_recv (&message, subscriber, 0);
- // Process weather update
- zmq_msg_close (&message);
+ int size = zmq_recv (subscriber, msg, 255, 0);
+ if (size != -1) {
+ // Process weather update
+ }
}
}
- // We never get here
- zmq_close (receiver);
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
View
33 examples/C/msreader.c
@@ -6,10 +6,8 @@
int main (void)
{
- // Prepare our context and sockets
- void *context = zmq_ctx_new ();
-
// Connect to task ventilator
+ void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
@@ -21,29 +19,26 @@ int main (void)
// Process messages from both sockets
// We prioritize traffic from the task ventilator
while (1) {
- // Process any waiting tasks
- int rc;
- for (rc = 0; !rc; ) {
- zmq_msg_t task;
- zmq_msg_init (&task);
- if ((rc = zmq_msg_recv (&task, receiver, ZMQ_DONTWAIT)) != -1) {
- // process task
+ char msg [256];
+ while (1) {
+ int size = zmq_recv (receiver, msg, 255, ZMQ_DONTWAIT);
+ if (size != -1) {
+ // Process task
}
- zmq_msg_close (&task);
+ else
+ break;
}
- // Process any waiting weather updates
- for (rc = 0; !rc; ) {
- zmq_msg_t update;
- zmq_msg_init (&update);
- if ((rc = zmq_msg_recv (&update, subscriber, ZMQ_DONTWAIT)) != -1) {
- // process weather update
+ while (1) {
+ int size = zmq_recv (subscriber, msg, 255, ZMQ_DONTWAIT);
+ if (size != -1) {
+ // Process weather update
}
- zmq_msg_close (&update);
+ else
+ break;
}
// No activity, so sleep for 1 msec
s_sleep (1);
}
- // We never get here but clean up anyhow
zmq_close (receiver);
zmq_close (subscriber);
zmq_ctx_destroy (context);
View
16 examples/C/rrbroker.c
@@ -20,19 +20,16 @@ int main (void)
// Switch messages between sockets
while (1) {
zmq_msg_t message;
- int more; // Multipart detection
-
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
while (1) {
// Process all parts of the message
zmq_msg_init (&message);
zmq_msg_recv (&message, frontend, 0);
- size_t more_size = sizeof (more);
- zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
- zmq_msg_send (&message, backend, more? ZMQ_SNDMORE: 0);
+ zmq_msg_send (&message, backend,
+ zmq_msg_more (&message)? ZMQ_SNDMORE: 0);
zmq_msg_close (&message);
- if (!more)
+ if (!zmq_msg_more (&message))
break; // Last message part
}
}
@@ -41,11 +38,10 @@ int main (void)
// Process all parts of the message
zmq_msg_init (&message);
zmq_msg_recv (&message, backend, 0);
- size_t more_size = sizeof (more);
- zmq_getsockopt (backend, ZMQ_RCVMORE, &more, &more_size);
- zmq_msg_send (&message, frontend, more? ZMQ_SNDMORE: 0);
+ zmq_msg_send (&message, frontend,
+ zmq_msg_more (&message)? ZMQ_SNDMORE: 0);
zmq_msg_close (&message);
- if (!more)
+ if (!zmq_msg_more (&message))
break; // Last message part
}
}
View
7 examples/C/tasksink2.c
@@ -6,9 +6,8 @@
int main (void)
{
- void *context = zmq_ctx_new ();
-
// Socket to receive messages on
+ void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
@@ -19,7 +18,7 @@ int main (void)
// Wait for start of batch
char *string = s_recv (receiver);
free (string);
-
+
// Start our clock now
int64_t start_time = s_clock ();
@@ -39,8 +38,6 @@ int main (void)
// Send kill signal to workers
s_send (controller, "KILL");
-
- // Finished
sleep (1); // Give 0MQ time to deliver
zmq_close (receiver);
View
14 examples/C/taskwork.c
@@ -9,9 +9,8 @@
int main (void)
{
- void *context = zmq_ctx_new ();
-
// Socket to receive messages on
+ void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
@@ -22,16 +21,11 @@ int main (void)
// Process tasks forever
while (1) {
char *string = s_recv (receiver);
- // Simple progress indicator for the viewer
+ printf ("%s.", string); // Show progress
fflush (stdout);
- printf ("%s.", string);
-
- // Do the work
- s_sleep (atoi (string));
+ s_sleep (atoi (string)); // Do the work
free (string);
-
- // Send results to sink
- s_send (sender, "");
+ s_send (sender, ""); // Send results to sink
}
zmq_close (receiver);
zmq_close (sender);
View
35 examples/C/taskwork2.c
@@ -6,9 +6,8 @@
int main (void)
{
- void *context = zmq_ctx_new ();
-
// Socket to receive messages on
+ void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
@@ -21,37 +20,25 @@ int main (void)
zmq_connect (controller, "tcp://localhost:5559");
zmq_setsockopt (controller, ZMQ_SUBSCRIBE, "", 0);
- // Process messages from receiver and controller
- zmq_pollitem_t items [] = {
- { receiver, 0, ZMQ_POLLIN, 0 },
- { controller, 0, ZMQ_POLLIN, 0 }
- };
- // Process messages from both sockets
+ // Process messages from either socket
while (1) {
- zmq_msg_t message;
+ zmq_pollitem_t items [] = {
+ { receiver, 0, ZMQ_POLLIN, 0 },
+ { controller, 0, ZMQ_POLLIN, 0 }
+ };
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
- zmq_msg_init (&message);
- zmq_msg_recv (&message, receiver, 0);
-
- // Do the work
- s_sleep (atoi ((char *) zmq_msg_data (&message)));
-
- // Send results to sink
- zmq_msg_init (&message);
- zmq_msg_send (&message, sender, 0);
-
- // Simple progress indicator for the viewer
- printf (".");
+ char *string = s_recv (receiver);
+ printf ("%s.", string); // Show progress
fflush (stdout);
-
- zmq_msg_close (&message);
+ s_sleep (atoi (string)); // Do the work
+ free (string);
+ s_send (sender, ""); // Send results to sink
}
// Any waiting controller command acts as 'KILL'
if (items [1].revents & ZMQ_POLLIN)
break; // Exit loop
}
- // Finished
zmq_close (receiver);
zmq_close (sender);
zmq_close (controller);
View
30 examples/C/zhelpers.h
@@ -42,38 +42,28 @@
// is being terminated.
static char *
s_recv (void *socket) {
- zmq_msg_t message;
- zmq_msg_init (&message);
- int size = zmq_msg_recv (&message, socket, 0);
+ char buffer [256];
+ int size = zmq_recv (socket, buffer, 255, 0);
if (size == -1)
return NULL;
- char *string = malloc (size + 1);
- memcpy (string, zmq_msg_data (&message), size);
- zmq_msg_close (&message);
- string [size] = 0;
- return (string);
+ if (size > 255)
+ size = 255;
+ buffer [size] = 0;
+ return strdup (buffer);
}
// Convert C string to 0MQ string and send to socket
static int
s_send (void *socket, char *string) {
- zmq_msg_t message;
- zmq_msg_init_size (&message, strlen (string));
- memcpy (zmq_msg_data (&message), string, strlen (string));
- int size = zmq_msg_send (&message, socket, 0);
- zmq_msg_close (&message);
- return (size);
+ int size = zmq_send (socket, string, strlen (string), 0);
+ return size;
}
// Sends string as 0MQ string, as multipart non-terminal
static int
s_sendmore (void *socket, char *string) {
- zmq_msg_t message;
- zmq_msg_init_size (&message, strlen (string));
- memcpy (zmq_msg_data (&message), string, strlen (string));
- int size = zmq_msg_send (&message, socket, ZMQ_SNDMORE);
- zmq_msg_close (&message);
- return (size);
+ int size = zmq_send (socket, string, strlen (string), ZMQ_SNDMORE);
+ return size;
}
// Receives all message parts from socket, prints neatly
View
2 preface_print.txt
@@ -24,7 +24,7 @@ We took a normal TCP socket, injected it with a mix of radioactive isotopes stol
Illegal radioisotopes from
secret Soviet atomic city
- [[/code]]
+[[/code]]
++ The Zen of Zero

0 comments on commit 5545672

Please sign in to comment.