Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

file 1293 lines (965 sloc) 82.122 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293
.output chapter2.wd
++ Chapter Two - Intermediate Stuff

In Chapter One we took 0MQ for a drive, with some basic examples of the main 0MQ patterns: request-reply, publish-subscribe, and pipeline. In this chapter we're going to get our hands dirty and start to learn how to use these tools in real programs.

We'll cover:

* How to create and work with 0MQ sockets.
* How to send and receive messages on sockets.
* How to build your apps around 0MQ's asynchronous I/O model.
* How to handle multiple sockets in one thread.
* How to handle fatal and non-fatal errors properly.
* How to handle interrupt signals like Ctrl-C.
* How to shutdown a 0MQ application cleanly.
* How to check a 0MQ application for memory leaks.
* How to send and receive multi-part messages.
* How to forward messages across networks.
* How to build a simple message queuing broker.
* How to write multithreaded applications with 0MQ.
* How to use 0MQ to signal between threads.
* How to use 0MQ to coordinate a network of nodes.
* How to create and use message envelopes for publish-subscribe.
* Using the high-water mark (HWM) to protect against memory overflows.

+++ The Zen of Zero

The Ø in 0MQ is all about tradeoffs. On the one hand this strange name lowers 0MQ's visibility on Google and Twitter. On the other hand it annoys the heck out of some Danish folk who write us things like "ØMG røtfl", and "//Ø is not a funny looking zero!//" and "//Rødgrød med Fløde!//", which is apparently an insult that means "may your neighbours be the direct descendants of Grendel!" Seems like a fair trade.

Originally the zero in 0MQ was meant as "zero broker" and (as close to) "zero latency" (as possible). In the meantime it has come to cover different goals: zero administration, zero cost, zero waste. More generally, "zero" refers to the culture of minimalism that permeates the project. We add power by removing complexity rather than exposing new functionality.

+++ The Socket API

To be perfectly honest, 0MQ does a kind of switch-and-bait on you. Which we don't apologize for, it's for your own good and hurts us more than it hurts you. It presents a familiar socket-based API but that hides a bunch of message-processing engines that will slowly fix your world-view about how to design and write distributed software.

Sockets are the de-facto standard API for network programming, as well as being useful for stopping your eyes from falling onto your cheeks. One thing that makes 0MQ especially tasty to developers is that it uses sockets and messages instead of some other arbitrary set of concepts. Kudos to Martin Sustrik for pulling this off. It turns "Message Oriented Middleware", a phrase guaranteed to send the whole room off to Catatonia, into "Extra Spicy Sockets!" which leaves us with a strange craving for pizza, and a desire to know more.

Like a nice pepperoni pizza, 0MQ sockets are easy to digest. Sockets have a life in four parts, just like BSD sockets:

* Creating and destroying sockets, which go together to form a karmic circle of socket life (see zmq_socket[3], zmq_close[3]).

* Configuring sockets by setting options on them and checking them if necessary (see zmq_setsockopt[3], zmq_getsockopt[3]).

* Plugging sockets onto the network topology by creating 0MQ connections to and from them (see zmq_bind[3], zmq_connect[3]).

* Using the sockets to carry data by writing and receiving messages on them (see zmq_msg_send[3], zmq_msg_recv[3]).

Which looks like this, in C:

[[code language="C"]]
void *mousetrap;

// Create socket for catching mice
mousetrap = zmq_socket (context, ZMQ_PULL);

// Configure the socket
int64_t jawsize = 10000;
zmq_setsockopt (mousetrap, ZMQ_HWM, &jawsize, sizeof jawsize);

// Plug socket into mouse hole
zmq_connect (mousetrap, "tcp://192.168.55.221:5001");

// Wait for juicy mouse to arrive
zmq_msg_t mouse;
zmq_msg_init (&mouse);
zmq_msg_recv (&mouse, mousetrap, 0);
// Destroy the mouse
zmq_msg_close (&mouse);

// Destroy the socket
zmq_close (mousetrap);
[[/code]]

Note that sockets are always void pointers, and messages (which we'll come to very soon) are structures. So in C you pass sockets as-such, but you pass addresses of messages in all functions that work with messages, like zmq_msg_send[3] and zmq_msg_recv[3]. As a mnemonic, realize that "in 0MQ all your sockets are belong to us", but messages are things you actually own in your code.

Creating, destroying, and configuring sockets works as you'd expect for any object. But remember that 0MQ is an asynchronous, elastic fabric. This has some impact on how we plug sockets into the network topology, and how we use the sockets after that.

+++ Plugging Sockets Into the Topology

To create a connection between two nodes you use zmq_bind[3] in one node, and zmq_connect[3] in the other. As a general rule of thumb, the node which does zmq_bind[3] is a "server", sitting on a well-known network address, and the node which does zmq_connect[3] is a "client", with unknown or arbitrary network addresses. Thus we say that we "bind a socket to an endpoint" and "connect a socket to an endpoint", the endpoint being that well-known network address.

0MQ connections are somewhat different from old-fashioned TCP connections. The main notable differences are:

* They go across an arbitrary transport ({{inproc}}, {{ipc}}, {{tcp}}, {{pgm}} or {{epgm}}). See zmq_inproc[7], zmq_ipc[7], zmq_tcp[7], zmq_pgm[7], and zmq_epgm[7].

* They exist when a client does zmq_connect[3] to an endpoint, whether or not a server has already done zmq_bind[3] to that endpoint.

* They are asynchronous, and have queues that magically exist where and when needed.

* They may express a certain "messaging pattern", according to the type of socket used at each end.

* One socket may have many outgoing and many incoming connections.

* There is no zmq_accept() method. When a socket is bound to an endpoint it automatically starts accepting connections.

* Your application code cannot work with these connections directly; they are encapsulated under the socket.

Many architectures follow some kind of client-server model, where the server is the component that is most static, and the clients are the components that are most dynamic, i.e. they come and go the most. There are sometimes issues of addressing: servers will be visible to clients, but not necessarily vice-versa. So mostly it's obvious which node should be doing zmq_bind[3] (the server) and which should be doing zmq_connect[3] (the client). It also depends on the kind of sockets you're using, with some exceptions for unusual network architectures. We'll look at socket types later.

Now, imagine we start the client //before// we start the server. In traditional networking we get a big red Fail flag. But 0MQ lets us start and stop pieces arbitrarily. As soon as the client node does zmq_connect[3] the connection exists and that node can start to write messages to the socket. At some stage (hopefully before messages queue up so much that they start to get discarded, or the client blocks), the server comes alive, does a zmq_bind[3] and 0MQ starts to deliver messages.

A server node can bind to many endpoints and it can do this using a single socket. This means it will accept connections across different transports:

[[code language="C"]]
zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");
[[/code]]

You cannot bind to the same endpoint twice, that will cause an exception.

Each time a client node does a zmq_connect[3] to any of these endpoints, the server node's socket gets another connection. There is no inherent limit to how many connections a socket can have. A client node can also connect to many endpoints using a single socket.

In most cases, which node acts as client, and which as server, is about network topology rather than message flow. However, there //are// cases (resending when connections are broken) where the same socket type will behave differently if it's a server or if it's a client.

What this means is that you should always think in terms of "servers" as static parts of your topology, with more-or-less fixed endpoint addresses, and "clients" as dynamic parts that come and go. Then, design your application around this model. The chances that it will "just work" are much better like that.

Sockets have types. The socket type defines the semantics of the socket, its policies for routing messages inwards and outwards, queuing, etc. You can connect certain types of socket together, e.g. a publisher socket and a subscriber socket. Sockets work together in "messaging patterns". We'll look at this in more detail later.

It's the ability to connect sockets in these different ways that gives 0MQ its basic power as a message queuing system. There are layers on top of this, such as proxies, which we'll get to later. But essentially, with 0MQ you define your network architecture by plugging pieces together like a child's construction toy.

+++ Using Sockets to Carry Data

To send and receive messages you use the zmq_msg_send[3] and zmq_msg_recv[3] methods. The names are conventional but 0MQ's I/O model is different enough from the TCP model!figref() that you will need time to get your head around it.

[[code type="textdiagram" title="TCP sockets are 1 to 1"]]
+------------+
| |
| Node |
| |
+------------+
| Socket |
\------------/
      ^
      |
   1 to 1
      |
      v
/------------\
| Socket |
+------------+
| |
| Node |
| |
+------------+
[[/code]]

Let's look at the main differences between TCP sockets and 0MQ sockets when it comes to carrying data:

* 0MQ sockets carry messages, rather than bytes (as in TCP) or frames (as in UDP). A message is a length-specified blob of binary data. We'll come to messages shortly, their design is optimized for performance and thus somewhat tricky to understand.

* 0MQ sockets do their I/O in a background thread. This means that messages arrive in a local input queue, and are sent from a local output queue, no matter what your application is busy doing. These are configurable memory queues, by the way.

* 0MQ sockets can, depending on the socket type, be connected to (or from, it's the same) many other sockets. Where TCP emulates a one-to-one phone call, 0MQ implements one-to-many (like a radio broadcast), many-to-many (like a post office), many-to-one (like a mail box), and even one-to-one.

* 0MQ sockets can send to many endpoints (creating a fan-out model), or receive from many endpoints (creating a fan-in model)!figref().

[[code type="textdiagram" title="0MQ Sockets are N to N"]]
   +------------+ +------------+
   | | | |
   | Node | | Node |
   | | | |
   +------------+ +------------+
   | Socket | | Socket |
   \----+-+-----/ \------+-----/
        | | :
1 to N | +------------------------+
Fan out | |
        +------------------------+ | N to 1
        | | | Fan in
        v v v
   /------------\ /------------\
   | Socket | | Socket |
   +------------+ +------------+
   | | | |
   | Node | | Node |
   | | | |
   +------------+ +------------+
[[/code]]

So writing a message to a socket may send the message to one or many other places at once, and conversely, one socket will collect messages from all connections sending messages to it. The zmq_msg_recv[3] method uses a fair-queuing algorithm so each sender gets an even chance.

The zmq_msg_send[3] method does not actually send the message to the socket connection(s). It queues the message so that the I/O thread can send it asynchronously. It does not block except in some exception cases. So the message is not necessarily sent when zmq_msg_send[3] returns to your application. If you created a message using zmq_msg_init_data[3] you cannot reuse the data or free it, otherwise the I/O thread will rapidly find itself writing overwritten or unallocated garbage. This is a common mistake for beginners. We'll see a little later how to properly work with messages.

+++ Unicast Transports

0MQ provides a set of unicast transports ({{inproc}}, {{ipc}}, and {{tcp}}) and multicast transports (epgm, pgm). Multicast is an advanced technique that we'll come to later. Don't even start using it unless you know that your fanout ratios will make 1-to-N unicast impossible.

For most common cases, use **{{tcp}}**, which is a //disconnected TCP// transport. It is elastic, portable, and fast enough for most cases. We call this 'disconnected' because 0MQ's {{tcp}} transport doesn't require that the endpoint exists before you connect to it. Clients and servers can connect and bind at any time, can go and come back, and it remains transparent to applications.

The inter-process transport, **{{ipc}}**, is like {{tcp}} except that it is abstracted from the LAN, so you don't need to specify IP addresses or domain names. This makes it better for some purposes, and we use it quite often in the examples in this book. 0MQ's {{ipc}} transport is disconnected, like {{tcp}}. It has one limitation: it does not work on Windows. This may be fixed in future versions of 0MQ. By convention we use endpoint names with an ".ipc" extension to avoid potential conflict with other file names. On UNIX systems, if you use {{ipc}} endpoints you need to create these with appropriate permissions otherwise they may not be shareable between processes running under different user ids. You must also make sure all processes can access the files, e.g. by running in the same working directory.

The inter-thread transport, **{{inproc}}**, is a connected signaling transport. It is much faster than {{tcp}} or {{ipc}}. This transport has a specific limitation compared to {{ipc}} and {{tcp}}: **you must do bind before connect**. This is something future versions of 0MQ may fix, but at present this defines you use {{inproc}} sockets. We create and bind one socket, start the child threads, which create and connect the other sockets.

+++ 0MQ is Not a Neutral Carrier

A common question that newcomers to 0MQ ask (it's one I asked myself) is something like, "//how do I write a XYZ server in 0MQ?//" For example, "how do I write an HTTP server in 0MQ?"

The implication is that if we use normal sockets to carry HTTP requests and responses, we should be able to use 0MQ sockets to do the same, only much faster and better.

Sadly the answer is "this is not how it works". 0MQ is not a neutral carrier, it imposes a framing on the transport protocols it uses. This framing is not compatible with existing protocols, which tend to use their own framing. For example, compare an HTTP request!figref(), and a 0MQ request, both over TCP/IP.

[[code type="textdiagram" title="HTTP On the Wire"]]
+----------------+----+----+----+----+
| GET /index.html| 13 | 10 | 13 | 10 |
+----------------+----+----+----+----+
[[/code]]

Where the HTTP request uses CR-LF as its simplest framing delimiter, and 0MQ uses a length-specified frame!figref().

[[code type="textdiagram" title="0MQ On the Wire"]]
+---+---+---+---+---+---+
| 5 | H | E | L | L | O |
+---+---+---+---+---+---+
[[/code]]

So you could write a HTTP-like protocol using 0MQ, using for example the request-reply socket pattern. But it would not be HTTP.

There is however a good answer to the question, "How can I make profitable use of 0MQ when making my new XYZ server?" You need to implement whatever protocol you want to speak in any case, but you can connect that protocol server (which can be extremely thin) to a 0MQ backend that does the real work. The beautiful part here is that you can then extend your backend with code in any language, running locally or remotely, as you wish. Zed Shaw's [http://www.mongrel2.org Mongrel2] web server is a great example of such an architecture.

+++ I/O Threads

We said that 0MQ does I/O in a background thread. One I/O thread (for all sockets) is sufficient for all but the most extreme applications. When you create a new context it starts with one I/O thread. The general rule of thumb is to allow one I/O thread per gigabyte of data in or out per second. To raise the number of I/O threads, use the zmq_ctx_set[3] call //before// creating any sockets:

[[code language="C"]]
int io_threads = 4;
void *context = zmq_ctx_new ();
zmq_ctx_set (context, ZMQ_IO_THREADS, io_threads);
assert (zmq_ctx_get (context, ZMQ_IO_THREADS) == io_threads);
[[/code]]

There is a major difference between a 0MQ application and a conventional networked application, which is that you don't create one socket per connection. One socket handles all incoming and outgoing connections for a particular point of work. E.g. when you publish to a thousand subscribers, it's via one socket. When you distribute work among twenty services, it's via one socket. When you collect data from a thousand web applications, it's via one socket.

This has a fundamental impact on how you write applications. A traditional networked application has one process or one thread per remote connection, and that process or thread handles one socket. 0MQ lets you collapse this entire structure into a single thread, and then break it up as necessary for scaling.

+++ Limiting Socket Use

By default, a 0MQ socket will continue to accept connections until your operating system runs out of file handles. This isn't always the best policy for public-facing services as it leaves you open to a simple denial-of-service attack. You can set a limit using another zmq_ctx_set[3] call:

[[code language="C"]]
int max_sockets = 1024;
void *context = zmq_ctx_new ();
zmq_ctx_get (context, ZMQ_MAX_SOCKETS, max_sockets);
assert (zmq_ctx_get (context, ZMQ_MAX_SOCKETS) == max_sockets);
[[/code]]

+++ Core Messaging Patterns

Underneath the brown paper wrapping of 0MQ's socket API lies the world of messaging patterns. If you have a background in enterprise messaging, or know UDP well, these will be vaguely familiar. But to most 0MQ newcomers they are a surprise, we're so used to the TCP paradigm where a socket maps one-to-one to another node.

Let's recap briefly what 0MQ does for you. It delivers blobs of data (messages) to nodes, quickly and efficiently. You can map nodes to threads, processes, or boxes. It gives your applications a single socket API to work with, no matter what the actual transport (like in-process, inter-process, TCP, or multicast). It automatically reconnects to peers as they come and go. It queues messages at both sender and receiver, as needed. It manages these queues carefully to ensure processes don't run out of memory, overflowing to disk when appropriate. It handles socket errors. It does all I/O in background threads. It uses lock-free techniques for talking between nodes, so there are never locks, waits, semaphores, or deadlocks.

But cutting through that, it routes and queues messages according to precise recipes called //patterns//. It is these patterns that provide 0MQ's intelligence. They encapsulate our hard-earned experience of the best ways to distribute data and work. 0MQ's patterns are hard-coded but future versions may allow user-definable patterns.

0MQ patterns are implemented by pairs of sockets with matching types. In other words, to understand 0MQ patterns you need to understand socket types and how they work together. Mostly this just takes learning, there is little that is obvious at this level.

The built-in core 0MQ patterns are:

* **Request-reply**, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.

* **Publish-subscribe**, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.

* **Pipeline**, connects nodes in a fan-out / fan-in pattern that can have multiple steps, and loops. This is a parallel task distribution and collection pattern.

We looked at each of these in the first chapter. There's one more pattern that people tend to try to use when they still think of 0MQ in terms of traditional TCP sockets:

* **Exclusive pair**, which connects two sockets in an exclusive pair. This is a pattern you should use only to connect two threads in a process. We'll see an example at the end of this chapter.

The zmq_socket[3] man page is fairly clear about the patterns, it's worth reading several times until it starts to make sense. These are the socket combinations that are valid for a connect-bind pair (either side can bind):

* PUB and SUB
* REQ and REP
* REQ and ROUTER
* DEALER and REP
* DEALER and ROUTER
* DEALER and DEALER
* ROUTER and ROUTER
* PUSH and PULL
* PAIR and PAIR

You'll also see references to XPUB and XSUB sockets, which we'll come to later (they're like raw versions of PUB and SUB). Any other combination will produce undocumented and unreliable results and future versions of 0MQ will probably return errors if you try them. You can and will of course bridge other socket types //via code//, i.e. read from one socket type and write to another.

+++ High-level Messaging Patterns

These four core patterns are cooked-in to 0MQ. They are part of the 0MQ API, implemented in the core C++ library, and guaranteed to be available in all fine retail stores.

On top, we add //high-level patterns//. We build these high-level patterns on top of 0MQ and implement them in whatever language we're using for our application. They are not part of the core library, do not come with the 0MQ package, and exist in their own space, as part of the 0MQ community. For example the Majordomo pattern, which we explore in Chapter Four, sits in the github Majordomo project in the ZeroMQ organization.

One of the things we aim to provide you with this guide are a set of such high-level patterns, both small (how to handle messages sanely) to large (how to make a reliable publish-subscribe architecture).

+++ Working with Messages

On the wire, 0MQ messages are blobs of any size from zero upwards, fitting in memory. You do your own serialization using protobufs, msgpack, JSON, or whatever else your applications need to speak. It's wise to choose a data representation that is portable and fast, but you can make your own decisions about trade-offs.

In memory, 0MQ messages are zmq_msg_t structures (or classes depending on your language). Here are the basic ground rules for using 0MQ messages in C:

* You create and pass around zmq_msg_t objects, not blocks of data.

* To read a message you use zmq_msg_init[3] to create an empty message, and then you pass that to zmq_msg_recv[3].

* To write a message from new data, you use zmq_msg_init_size[3] to create a message and at the same time allocate a block of data of some size. You then fill that data using memcpy[3], and pass the message to zmq_msg_send[3].

* To release (not destroy) a message you call zmq_msg_close[3]. This drops a reference, and eventually 0MQ will destroy the message.

* To access the message content you use zmq_msg_data[3]. To know how much data the message contains, use zmq_msg_size[3].

* Do not use zmq_msg_move[3], zmq_msg_copy[3], or zmq_msg_init_data[3] unless you read the man pages and know precisely why you need these.

Here is a typical chunk of code working with messages, which should be familiar if you have been paying attention. This is from the zhelpers.h file we use in all the examples:

[[code language="C"]]
// Receive 0MQ string from socket and convert into C string
static char *
s_recv (void *socket) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    int size = zmq_msg_recv (&message, socket, 0);
    if (size == -1)
        return NULL;
    char *string = malloc (size + 1);
    memcpy (string, zmq_msg_data (&message), size);
    zmq_msg_close (&message);
    string [size] = 0;
    return (string);
}

// Convert C string to 0MQ string and send to socket
static int
s_send (void *socket, char *string) {
    zmq_msg_t message;
    zmq_msg_init_size (&message, strlen (string));
    memcpy (zmq_msg_data (&message), string, strlen (string));
    int size = zmq_msg_send (&message, socket, 0);
    zmq_msg_close (&message);
    return (size);
}
[[/code]]

You can easily extend this code to send and receive blobs of arbitrary length.

**Note than when you have passed a message to zmq_msg_send(3), ØMQ will clear the message, i.e. set the size to zero. You cannot send the same message twice, and you cannot access the message data after sending it.**

If you want to send the same message more than once, create a second message, initialize it using zmq_msg_init[3] and then use zmq_msg_copy[3] to create a copy of the first message. This does not copy the data but the reference. You can then send the message twice (or more, if you create more copies) and the message will only be finally destroyed when the last copy is sent or closed.

0MQ also supports //multi-part// messages, which let you send or receive a list of frames as a single on-the-wire message. This is widely used in real applications and we'll look at that later in this chapter and in Chapter Three.

Some other things that are worth knowing about messages:

* 0MQ sends and receives them atomically, i.e. you get a whole message, or you don't get it at all. This is also true for multi-part messages.

* 0MQ does not send a message right away but at some indeterminate later time.

* You may send zero-length messages, e.g. for sending a signal from one thread to another.

* A message must fit in memory. If you want to send files of arbitrary sizes, you should break them into pieces and send each piece as a separate message.

* You must call zmq_msg_close[3] when finished with a message, in languages that don't automatically destroy objects when a scope closes.

And to be necessarily repetitive, do not use zmq_msg_init_data[3], yet. This is a zero-copy method and guaranteed to create trouble for you. There are far more important things to learn about 0MQ before you start to worry about shaving off microseconds.

+++ Handling Multiple Sockets

In all the examples so far, the main loop of most examples has been:

# wait for message on socket
# process message
# repeat

What if we want to read from multiple sockets at the same time? The simplest way is to connect one socket to multiple endpoints and get 0MQ to do the fan-in for us. This is legal if the remote endpoints are in the same pattern but it would be wrong to e.g. connect a PULL socket to a PUB endpoint.

The right way is to use zmq_poll[3]. An even better way might be to wrap zmq_poll[3] in a framework that turns it into a nice event-driven //reactor//, but it's significantly more work than we want to cover here.

Let's start with a dirty hack, partly for the fun of not doing it right, but mainly because it lets me show you how to do non-blocking socket reads. Here is a simple example of reading from two sockets using non-blocking reads. This rather confused program acts both as a subscriber to weather updates, and a worker for parallel tasks:

[[code type="example" title="Multiple socket reader" name="msreader"]]
[[/code]]

The cost of this approach is some additional latency on the first message (the sleep at the end of the loop, when there are no waiting messages to process). This would be a problem in applications where sub-millisecond latency was vital. Also, you need to check the documentation for nanosleep() or whatever function you use to make sure it does not busy-loop.

You can treat the sockets fairly by reading first from one, then the second rather than prioritizing them as we did in this example. This is called "fair-queuing", something that 0MQ does automatically when one socket receives messages from more than one source.

Now let's see the same little senseless application done right, using zmq_poll[3]:

[[code type="example" title="Multiple socket poller" name="mspoller"]]
[[/code]]

+++ Handling Errors and ETERM

0MQ's error handling philosophy is a mix of fail-fast and resilience. Processes, we believe, should be as vulnerable as possible to internal errors, and as robust as possible against external attacks and errors. To give an analogy, a living cell will self-destruct if it detects a single internal error, yet it will resist attack from the outside by all means possible.

Assertions, which pepper the 0MQ code, are absolutely vital to robust code, they just have to be on the right side of the cellular wall. And there should be such a wall. If it is unclear whether a fault is internal or external, that is a design flaw to be fixed. In C/C++, assertions stop the application immediately with an error. In other languages you may get exceptions or halts.

When 0MQ detects an external fault it returns an error to the calling code. In some rare cases it drops messages silently, if there is no obvious strategy for recovering from the error.

In most of the C examples we've seen so far there's been no error handling. **Real code should do error handling on every single 0MQ call**. If you're using a language binding other than C, the binding may handle errors for you. In C you do need to do this yourself. There are some simple rules, starting with POSIX conventions:

* Methods that create objects will return NULL if they fail.

* Methods that process data may return the number of bytes processed, or -1 on an error or failure.

* Other methods will return 0 on success and -1 on an error or failure.

* The error code is provided in {{errno}} or zmq_errno[3].

* A descriptive error text for logging is provided by zmq_strerror[3].

There are two main exceptional conditions that you may want to handle as non-fatal:

* When a thread calls zmq_msg_recv[3] with the ZMQ_DONTWAIT option and there is no waiting data. 0MQ will return -1 and set errno to EAGAIN.

* When a thread calls zmq_ctx_destroy[3] and other threads are doing blocking work. The zmq_ctx_destroy[3] call closes the context and all blocking calls exit with -1, and errno set to ETERM.

What this boils down to is that in most cases you can use assertions on 0MQ calls, like this, in C:

[[code language="C"]]
void *context = zmq_ctx_new ();
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:5555");
if (rc != 0) {
    printf ("E: bind failed: %s\n", strerror (errno));
    return -1;
}
[[/code]]

In C/C++, asserts can be removed entirely in optimized code, so don't make the mistake of wrapping the whole 0MQ call in an assert(). It looks neat, then the optimizer removes all the asserts and the calls you want to make, and your application breaks in impressive ways.

Let's see how to shut down a process cleanly. We'll take the parallel pipeline example from the previous section. If we've started a whole lot of workers in the background, we now want to kill them when the batch is finished. Let's do this by sending a kill message to the workers. The best place to do this is the sink, since it really knows when the batch is done.

How do we connect the sink to the workers? The PUSH/PULL sockets are one-way only. The standard 0MQ answer is: create a new socket flow for each type of problem you need to solve. We'll use a publish-subscribe model to send kill messages to the workers!figref():

* The sink creates a PUB socket on a new endpoint.
* Workers bind their input socket to this endpoint.
* When the sink detects the end of the batch it sends a kill to its PUB socket.
* When a worker detects this kill message, it exits.

It doesn't take much new code in the sink:

[[code language="C"]]
    void *control = zmq_socket (context, ZMQ_PUB);
    zmq_bind (control, "tcp://*:5559");
    ...
    // Send kill signal to workers
    zmq_msg_init_data (&message, "KILL", 5);
    zmq_msg_send (control, &message, 0);
    zmq_msg_close (&message);
[[/code]]

[[code type="textdiagram" title="Parallel Pipeline with Kill Signaling"]]
             +-------------+
             | |
             | Ventilator |
             | |
             +-------------+
             | PUSH |
             \------+------/
                    |
                  tasks
                    |
    +---------------+---------------+
    | | |
    | /=--------|-----+=--------|-----+------\
  task | task | task | :
    | | | | | | |
    v v v v v v |
/------+-----\ /------+-----\ /------+-----\ |
| PULL | SUB | | PULL | SUB | | PULL | SUB | |
+------+-----+ +------+-----+ +------+-----+ |
| | | | | | |
| Worker | | Worker | | Worker | |
| | | | | | |
+------------+ +------------+ +------------+ |
| PUSH | | PUSH | | PUSH | |
\-----+------/ \-----+------/ \-----+------/ |
      | | | |
    result result result |
      | | | |
      +---------------+---------------+ |
                      | |
                   results |
                      | |
                      v |
               /-------------\ |
               | PULL | |
               +-------------+ |
               | | |
               | Sink | |
               | | |
               +-------------+ |
               | PUB | |
               \------+------/ |
                      | |
                 KILL signal |
                      | |
                      \--------------------------/
[[/code]]

Here is the worker process, which manages two sockets (a PULL socket getting tasks, and a SUB socket getting control commands) using the zmq_poll[3] technique we saw earlier:

[[code type="example" title="Parallel task worker with kill signaling" name="taskwork2"]]
[[/code]]

Here is the modified sink application. When it's finished collecting results it broadcasts a KILL message to all workers:

[[code type="example" title="Parallel task sink with kill signaling" name="tasksink2"]]
[[/code]]

+++ Handling Interrupt Signals

Realistic applications need to shutdown cleanly when interrupted with Ctrl-C or another signal such as SIGTERM. By default, these simply kill the process, meaning messages won't be flushed, files won't be closed cleanly, etc.

Here is how we handle a signal in various languages:

[[code type="example" title="Handling Ctrl-C cleanly" name="interrupt"]]
[[/code]]

The program provides s_catch_signals(), which traps Ctrl-C (SIGINT) and SIGTERM. When either of these signals arrive, the s_catch_signals() handler sets the global variable s_interrupted. Your application will not die automatically, you have to now explicitly check for an interrupt, and handle it properly. Here's how:

* Call s_catch_signals() (copy this from interrupt.c) at the start of your main code. This sets-up the signal handling.
* If your code is blocking in zmq_msg_recv[3], zmq_poll[3], or zmq_msg_send[3], when a signal arrives, the call will return with EINTR.
* Wrappers like s_recv() return NULL if they are interrupted.
* So, your application checks for an EINTR return code, a NULL return, and/or s_interrupted.

Here is a typical code fragment:

[[code]]
s_catch_signals ();
client = zmq_socket (...);
while (!s_interrupted) {
    char *message = s_recv (client);
    if (!message)
        break; // Ctrl-C used
}
zmq_close (client);
[[/code]]

If you call s_catch_signals() and don't test for interrupts, the your application will become immune to Ctrl-C and SIGTERM, which may be useful, but is usually not.

+++ Detecting Memory Leaks

Any long-running application has to manage memory correctly, or eventually it'll use up all available memory and crash. If you use a language that handles this automatically for you, congratulations. If you program in C or C++ or any other language where you're responsible for memory management, here's a short tutorial on using valgrind, which among other things will report on any leaks your programs have.

* To install valgrind, e.g. on Ubuntu or Debian: {{sudo apt-get install valgrind}}.

* By default, 0MQ will cause valgrind to complain a lot. To remove these warnings, create a file {{valgrind.supp}} that contains this:

[[code]]
{
   <socketcall_sendto>
   Memcheck:Param
   socketcall.sendto(msg)
   fun:send
   ...
}
{
   <socketcall_sendto>
   Memcheck:Param
   socketcall.send(msg)
   fun:send
   ...
}
[[/code]]

* Fix your applications to exit cleanly after Ctrl-C. For any application that exits by itself, that's not needed, but for long-running applications, this is essential, otherwise valgrind will complain about all currently allocated memory.

* Build your application with -DDEBUG, if it's not your default setting. That ensures valgrind can tell you exactly where memory is being leaked.

* Finally, run valgrind thus:

[[code]]
valgrind --tool=memcheck --leak-check=full --suppressions=valgrind.supp someprog
[[/code]]

And after fixing any errors it reported, you should get the pleasant message:

[[code]]
==30536== ERROR SUMMARY: 0 errors from 0 contexts...
[[/code]]

+++ Multi-part Messages

0MQ lets us compose a message out of several frames, giving us a 'multi-part message'. Realistic applications use multi-part messages heavily, both for wrapping messages with address information, and for simple serialization. We'll look at address envelopes later.

What we'll learn now is simply how to safely (but blindly) read and write multi-part messages in any application (like a proxy) that needs to forward messages without inspecting them.

When you work with multi-part messages, each part is a zmq_msg item. E.g. if you are sending a message with five parts, you must construct, send, and destroy five zmq_msg items. You can do this in advance (and store the zmq_msg items in an array or structure), or as you send them, one by one.

Here is how we send the frames in a multi-part message (we receive each frame into a message object):

[[code language="C"]]
zmq_msg_send (socket, &message, ZMQ_SNDMORE);
...
zmq_msg_send (socket, &message, ZMQ_SNDMORE);
...
zmq_msg_send (socket, &message, 0);
[[/code]]

Here is how we receive and process all the parts in a message, be it single part or multi-part:

[[code language="C"]]
while (1) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_msg_recv (socket, &message, 0);
    // Process the message frame
    zmq_msg_close (&message);
    int64_t more;
    size_t more_size = sizeof (more);
    zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
    if (!more)
        break; // Last message frame
}
[[/code]]

Some things to know about multi-part messages:

* When you send a multi-part message, the first part (and all following parts) are only actually sent on the wire when you send the final part.
* If you are using zmq_poll[3], when you receive the first part of a message, all the rest has also arrived.
* You will receive all parts of a message, or none at all.
* Each part of a message is a separate zmq_msg item.
* You will receive all parts of a message whether or not you check the RCVMORE option.
* On sending, 0MQ queues message frames in memory until the last is received, then sends them all.
* There is no way to cancel a partially sent message, except by closing the socket.

+++ Intermediaries and Proxies

0MQ aims for decentralized intelligence but that doesn't mean your network is empty space in the middle. It's filled with message-aware infrastructure and quite often, we build that infrastructure with 0MQ. The 0MQ plumbing can range from tiny pipes to full-blown service-oriented brokers. The messaging industry calls this "intermediation", meaning that the stuff in the middle deals with either side. In 0MQ we call these proxies, queues, forwarders, device, or brokers, depending on the context.

This pattern is extremely common in the real world and is why our societies and economies are filled with intermediaries who have no other real function than to reduce the complexity and scaling costs of larger networks. Real-world intermediaries are typically called wholesalers, distributors, managers, etc.

++++ The Dynamic Discovery Problem

One of the problems you will hit as you design larger distributed architectures is discovery. That is, how do pieces know about each other? It's especially difficult if pieces come and go, thus we can call this the "dynamic discovery problem".

There are several solutions to dynamic discovery. The simplest is to entirely avoid it by hard-coding (or configuring) the network architecture so discovery is done by hand. That is, when you add a new piece, you reconfigure the network to know about it.

In practice this leads to increasingly fragile and hard-to-manage architectures. Let's say you have one publisher and a hundred subscribers. You connect each subscriber to the publisher by configuring a publisher endpoint in each subscriber. That's easy!figref(). Subscribers are dynamic, the publisher is static. Now say you add more publishers. Suddenly it's not so easy any more. If you continue to connect each subscriber to each publisher, the cost of avoiding dynamic discovery gets higher and higher.

[[code type="textdiagram" title="Small-scale Pub-Sub Network"]]
                 +-----------+
                 | |
                 | Publisher |
                 | |
                 +-----------+
                 | PUB |
                 \-----------/
                     bind
           tcp://192.168.55.210:5556
                       |
                       |
      +----------------+----------------+
      | | |
      | | |
   connect connect connect
/------------\ /------------\ /------------\
| SUB | | SUB | | SUB |
+------------+ +------------+ +------------+
| | | | | |
| Subscriber | | Subscriber | | Subscriber |
| | | | | |
+------------+ +------------+ +------------+
[[/code]]

There are quite a few answers to this but the very simplest answer is to add an intermediary, that is, a static point in the network to which all other nodes connect. In classic messaging, this is the job of the "message broker". 0MQ doesn't come with a message broker as such, but it lets us build intermediaries quite easily.

You might wonder, if all networks eventually get large enough to need intermediaries, why don't we simply always design around a message broker? For beginners, it's a fair compromise. Just always use a star topology, forget about performance, and things will usually work. However message brokers are greedy things; in their role as central intermediaries, they become too complex, too stateful, and eventually a problem.

It's better to think of intermediaries as simple stateless message switches. The best analogy is an HTTP proxy; it's there but doesn't have any special role. Adding a pub-sub proxy solves the dynamic discovery problem in our example. We set the proxy in the "middle" of the network!figref(). The proxy opens an XSUB socket, an XPUB socket, and binds each to well-known IP addresses and ports. Then all other processes connect to the proxy, instead of to each other. It becomes trivial to add more subscribers or publishers.

[[code type="textdiagram" title="Pub-Sub Network with a Proxy"]]
+------------+ +------------+ +------------+
| | | | | |
| Publisher | | Publisher | | Publisher |
| | | | | |
+------------+ +------------+ +------------+
| PUB | | PUB | | PUB |
\------------/ \------------/ \------------/
   connect connect connect
      | | |
      | | |
      +----------------+----------------+
                       |
                       |
                      bind
                 /------------\
                 | XSUB |
                 +------------+
                 | |
                 | Proxy |
                 | |
                 +------------+
                 | XPUB |
                 \------------/
                      bind
                       |
                       |
      +----------------+----------------+
      | | |
      | | |
   connect connect connect
/------------\ /------------\ /------------\
| SUB | | SUB | | SUB |
+------------+ +------------+ +------------+
| | | | | |
| Subscriber | | Subscriber | | Subscriber |
| | | | | |
+------------+ +------------+ +------------+
[[/code]]

We need XPUB and XSUB sockets because 0MQ does subscription forwarding: SUB sockets actually send subscriptions to PUB sockets as special messages. The proxy has to forward these as well, by reading them from the XPUB socket and writing them to the XSUB socket. This is the main use-case for XSUB and XPUB!figref().

[[code type="textdiagram" title="Extended Publish-Subscribe"]]
+---------+ +---------+ +---------+
| PUB | | PUB | | PUB |
\----+----/ \----+----/ \----+----/
     | | |
     | | |
     +-------------+-------------+
                   |
                   |
             /-----+-----\
             | XSUB |
             +-----------+
             | code |
             +-----------+
             | XPUB |
             \-----+-----/
                   |
                   |
     +-------------+-------------+
     | | |
     | | |
/----+----\ /----+----\ /----+----\
| SUB | | SUB | | SUB |
+---------+ +---------+ +---------+
[[/code]]

++++ The Shared Queue Problem

In the Hello World client-server application we have one client that talks to one service. However in real cases we usually need to allow multiple services as well as multiple clients. This lets us scale up the power of the service (many threads or processes or boxes rather than just one). The only constraint is that services must be stateless, all state being in the request or in some shared storage such as a database.

There are two ways to connect multiple clients to multiple servers. The brute-force way is to connect each client socket to multiple service endpoints. One client socket can connect to multiple service sockets, and the REQ socket will then load-balance requests among these services. Let's say you connect a client socket to three service endpoints, A, B, and C. The client makes requests R1, R2, R3, R4. R1 and R4 go to service A, R2 goes to B, and R3 goes to service C!figref().

[[code type="textdiagram" title="Load-balancing of Requests"]]
             +-----------+
             | |
             | Client |
             | |
             +-----------+
             | REQ |
             \-----+-----/
                   |
             R1, R2, R3, R4
                   |
     +-------------+-------------+
     | | |
  R1, R4 R2 R3
     | | |
     v v v
/---------\ /---------\ /---------\
| REP | | REP | | REP |
+---------+ +---------+ +---------+
| | | | | |
| Service | | Service | | Service |
| A | | B | | C |
| | | | | |
+---------+ +---------+ +---------+
[[/code]]

This design lets you add more clients cheaply. You can also add more services. Each client will load-balance its requests to the services. But each client has to know the service topology. If you have 100 clients and then you decide to add three more services, you need to reconfigure and restart 100 clients in order for the clients to know about the three new services.

That's clearly not the kind of thing we want to be doing at 3am when our supercomputing cluster has run out of resources and we desperately need to add a couple of hundred new service nodes. Too many static pieces are like liquid concrete: knowledge is distributed and the more static pieces you have, the more effort it is to change the topology. What we want is something sitting in between clients and services that centralizes all knowledge of the topology. Ideally, we should be able to add and remove services or clients at any time without touching any other part of the topology.

So we'll write a little message queuing broker that gives us this flexibility. The broker binds to two endpoints, a frontend for clients and a backend for services. It then uses zmq_poll[3] to monitor these two sockets for activity and when it has some, it shuttles messages between its two sockets. It doesn't actually manage any queues explicitly -- 0MQ does that automatically on each socket.

When you use REQ to talk to REP you get a strictly synchronous request-reply dialog. The client sends a request, the service reads the request and sends a reply. The client then reads the reply. If either the client or the service try to do anything else (e.g. sending two requests in a row without waiting for a response) they will get an error.

But our broker has to be non-blocking. Obviously we can use zmq_poll[3] to wait for activity on either socket, but we can't use REP and REQ.

Luckily there are two sockets called DEALER and ROUTER that let you do non-blocking request-response. You'll see in Chapter Three how DEALER and ROUTER sockets let you build all kinds of asynchronous request-reply flows. For now, we're just going to see how DEALER and ROUTER let us extend REQ-REP across an intermediary, that is, our little broker.

In this simple stretched request-reply pattern, REQ talks to ROUTER and DEALER talks to REP. In between the DEALER and ROUTER we have to have code (like our broker) that pulls messages off the one socket and shoves them onto the other!figref().

[[code type="textdiagram" title="Extended Request-reply"]]
+---------+ +---------+ +---------+
| REQ | | REQ | | REQ |
\----+----/ \----+----/ \----+----/
     | | |
     | | |
     +-------------+-------------+
                   |
                   |
             /-----+-----\
             | ROUTER |
             +-----------+
             | code |
             +-----------+
             | DEALER |
             \-----+-----/
                   |
                   |
     +-------------+-------------+
     | | |
     | | |
/----+----\ /----+----\ /----+----\
| REP | | REP | | REP |
+---------+ +---------+ +---------+
[[/code]]

The request-reply broker binds to two endpoints, one for clients to connect to (the frontend socket) and one for workers to connect to (the backend). To test this broker, you will want to change your workers so they connect to the backend socket. Here are a client and worker that show what I mean:

[[code type="example" title="Request-reply client" name="rrclient"]]
[[/code]]

Here is the worker:

[[code type="example" title="Request-reply worker" name="rrworker"]]
[[/code]]

And here is the broker, which properly handles multi-part messages:

[[code type="example" title="Request-reply broker" name="rrbroker"]]
[[/code]]

Using a request-reply broker makes your client-server architectures easier to scale since clients don't see workers, and workers don't see clients. The only static node is the broker in the middle!figref().

[[code type="textdiagram" title="Request-reply Broker"]]
+---------+ +---------+ +---------+
| | | | | |
| Client | | Client | | Client |
| | | | | |
+---------+ +---------+ +---------+
| REQ | | REQ | | REQ |
\---------/ \---------/ \---------/
  connect connect connect
     | | |
     | | |
  request request request
     | | |
     +-------------+-------------+
                   |
             fair-queuing
                   |
                   v
                 bind
             /-----------\
             | ROUTER |
             +-----------+
             | |
             | Broker |
             | |
             +-----------+
             | DEALER |
             \-----------/
                 bind
                   |
            load balancing
                   |
     +-------------+-------------+
     | | |
  request request request
     | | |
     v v v
  connect connect connect
/---------\ /---------\ /---------\
| REP | | REP | | REP |
+---------+ +---------+ +---------+
| | | | | |
| Service | | Service | | Service |
| A | | B | | C |
| | | | | |
+---------+ +---------+ +---------+
[[/code]]

++++ 0MQ's Built-in Proxy Function

It turns out that that core loop in rrbroker is very useful, and reusable. It lets us build pub-sub forwarders and shared queues and other little intermediaries, with very little effort. 0MQ wraps this up in a single method, zmq_proxy[3]:

[[code language="C"]]
zmq_proxy (frontend, backend, capture);
[[/code]]

The two (or three sockets, if we want to capture data) must be properly connected, bound, configured. When we call the zmq_proxy method it's exactly like starting the main loop of rrbroker. Let's rewrite the request-reply broker to call zmq_proxy, and re-badge this as an expensive-sounding "message queue" (people have charged houses for code that did less):

[[code type="example" title="Message queue broker" name="msgqueue"]]
[[/code]]

If you're like most 0MQ users, at this stage your mind is starting to think, "//what kind of evil stuff can I do if I plug random socket types into the proxy?//" The short answer is: try it and work out what is happening. In practice you would usually stick to ROUTER/DEALER, XSUB/XPUB, or PULL/PUSH.

++++ The Transport Bridging Problem

A frequent request from 0MQ users is "how do I connect my 0MQ network with technology X?" where X is some other networking or messaging technology. The simple answer is to build a "bridge". A bridge is a small application that speaks one protocol at one socket, and converts to/from a second protocol at another socket. A protocol interpreter, if you like. A common bridging problem in 0MQ is to bridge two transports or networks.

As example, we're going to write a little proxy that sits in between a publisher and a set of subscribers, bridging two networks. The frontend socket (SUB) faces the internal network, where the weather server is sitting, and the backend (PUB) faces subscribers on the external network. It subscribes to the weather service on the frontend socket, and republishes its data on the backend socket!figref().

[[code type="example" title="Weather update proxy" name="wuproxy"]]
[[/code]]

[[code type="textdiagram" title="Pub-Sub Forwarder Proxy"]]
                 +-----------+
                 | |
                 | Publisher |
                 | |
                 +-----------+
                 | PUB |
                 \-----------/
                     bind
           tcp://192.168.55.210:5556
                       |
                       |
      +----------------+----------------+
      | | |
      | | |
   connect connect |
/------------\ /------------\ connect
| SUB | | SUB | /------------\
+------------+ +------------+ | XSUB |
| | | | +------------+
| Subscriber | | Subscriber | | |
| | | | | Forwarder |
+------------+ +------------+ | |
                                  +------------+
 Internal network | XPUB |
 ---------------------------------\------------/--------
 External network bind
                                tcp://10.1.1.0:8100
                                        |
                                        |
                               +--------+--------+
                               | |
                               | |
                            connect connect
                         /------------\ /------------\
                         | SUB | | SUB |
                         +------------+ +------------+
                         | | | |
                         | Subscriber | | Subscriber |
                         | | | |
                         +------------+ +------------+
[[/code]]

+++ Multithreading with 0MQ

0MQ is perhaps the nicest way ever to write multithreaded (MT) applications. Whereas as 0MQ sockets require some readjustment if you are used to traditional sockets, 0MQ multithreading will take everything you know about writing MT applications, throw it into a heap in the garden, pour gasoline over it, and set it alight. It's a rare book that deserves burning, but most books on concurrent programming do.

To make utterly perfect MT programs (and I mean that literally) **we don't need mutexes, locks, or any other form of inter-thread communication except messages sent across 0MQ sockets.**

By "perfect" MT programs I mean code that's easy to write and understand, that works with one design language in any programming language, and on any operating system, and that scales across any number of CPUs with zero wait states and no point of diminishing returns.

If you've spent years learning tricks to make your MT code work at all, let alone rapidly, with locks and semaphores and critical sections, you will be disgusted when you realize it was all for nothing. If there's one lesson we've learned from 30+ years of concurrent programming it is: //just don't share state//. It's like two drunkards trying to share a beer. It doesn't matter if they're good buddies. Sooner or later they're going to get into a fight. And the more drunkards you add to the table, the more they fight each other over the beer. The tragic majority of MT applications look like drunken bar fights.

The list of weird problems that you need to fight as you write classic shared-state MT code would be hilarious if it didn't translate directly into stress and risk, as code that seems to work suddenly fails under pressure. Here is a list of "//11 Likely Problems In Your Multithreaded Code//" from a large firm with world-beating experience in buggy code: forgotten synchronization, incorrect granularity, read and write tearing, lock-free reordering, lock convoys, two-step dance, and priority inversion.

Yeah, we also counted seven, not eleven. That's not the point though. The point is, do you really want that code running the power grid or stock market to start getting two-step lock convoys at 3pm on a busy Thursday? Who cares what the terms actually mean. This is not what turned us on to programming, fighting ever more complex side-effects with ever more complex hacks.

Some widely used models, despite being the basis for entire industries, are fundamentally broken, and shared state concurrency is one of them. Code that wants to scale without limit does it like the Internet does, by sending messages and sharing nothing except a common contempt for broken programming models.

You should follow some rules to write happy multithreaded code with 0MQ:

* You MUST NOT access the same data from multiple threads. Using classic MT techniques like mutexes are an anti-pattern in 0MQ applications. The only exception to this is a 0MQ context object, which is threadsafe.

* You MUST create a 0MQ context for your process, and pass that to all threads that you want to connect via {{inproc}} sockets.

* You MAY treat threads as separate tasks, with their own context, but these threads cannot communicate over {{inproc}}. However they will be easier to break into standalone processes afterwards.

* You MUST NOT share 0MQ sockets between threads. 0MQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.

If you need to start more than one proxy in an application, for example, you will want to run each in their own thread. It is easy to make the error of creating the proxy frontend and backend sockets in one thread, and then passing the sockets to the proxy in another thread. This may appear to work but will fail randomly. Remember: //Do not use or close sockets except in the thread that created them.//

If you follow these rules, you can quite easily split threads into separate processes, when you need to. Application logic can sit in threads, processes, boxes: whatever your scale needs.

0MQ uses native OS threads rather than virtual "green" threads. The advantage is that you don't need to learn any new threading API, and that 0MQ threads map cleanly to your operating system. You can use standard tools like Intel's ThreadChecker to see what your application is doing. The disadvantages are that your code, when it for instance starts new threads, won't be portable, and that if you have a huge number of threads (thousands), some operating systems will get stressed.

Let's see how this works in practice. We'll turn our old Hello World server into something more capable. The original server was a single thread. If the work per request is low, that's fine: one ØMQ thread can run at full speed on a CPU core, with no waits, doing an awful lot of work. But realistic servers have to do non-trivial work per request. A single core may not be enough when 10,000 clients hit the server all at once. So a realistic server must start multiple worker threads. It then accepts requests as fast as it can, and distributes these to its worker threads. The worker threads grind through the work, and eventually send their replies back.

You can of course do all this using a proxy broker and external worker processes, but often it's easier to start one process that gobbles up sixteen cores, than sixteen processes, each gobbling up one core. Further, running workers as threads will cut out a network hop, latency, and network traffic.

The MT version of the Hello World service basically collapses the broker and workers into a single process:

[[code type="example" title="Multithreaded service" name="mtserver"]]
[[/code]]

All the code should be recognizable to you by now. How it works:

* The server starts a set of worker threads. Each worker thread creates a REP socket and then processes requests on this socket. Worker threads are just like single-threaded servers. The only differences are the transport ({{inproc}} instead of {{tcp}}), and the bind-connect direction.

* The server creates a ROUTER socket to talk to clients and binds this to its external interface (over {{tcp}}).

* The server creates a DEALER socket to talk to the workers and binds this to its internal interface (over {{inproc}}).

* The server starts a proxy that connects the two sockets. The proxy pulls incoming requests fairly from all clients, and distributes those out to workers. It also routes replies back to their origin.

Note that creating threads is not portable in most programming languages. The POSIX library is {{pthreads}}, but on Windows you have to use a different API. We'll see in Chapter Three how to wrap this in a portable API.

Here the 'work' is just a one-second pause. We could do anything in the workers, including talking to other nodes. This is what the MT server looks like in terms of ØMQ sockets and nodes. Note how the request-reply chain is {{REQ-ROUTER-queue-DEALER-REP}}!figref().

[[code type="textdiagram" title="Multithreaded Server"]]
               +------------+
               | |
               | Client |
               | |
               +------------+
               | REQ |
               \---+--------/
                   | ^
                   | |
              "Hello" "World"
                   | |
/------------------|--=-|------------------\
| v | :
| /--------+---\ |
| | ROUTER | |
| +------------+ |
| | | |
| | Server | |
| | | |
| +------------+ |
| | | |
| | Queue | |
| | proxy | |
| | | |
| +------------+ |
| | DEALER | |
| \------------/ |
| ^ |
| | |
| +-----------+-----------+ |
| | | | |
| v v v |
| /--------\ /--------\ /--------\ |
| | REP | | REP | | REP | |
| +--------+ +--------+ +--------+ |
| | | | | | | |
| | Worker | | Worker | | Worker | |
| | | | | | | |
| +--------+ +--------+ +--------+ |
| |
\------------------------------------------/
[[/code]]

+++ Signaling between Threads

When you start making multithreaded applications with 0MQ, you'll hit the question of how to coordinate your threads. Though you might be tempted to insert 'sleep' statements, or use multithreading techniques such as semaphores or mutexes, **the only mechanism that you should use are 0MQ messages**. Remember the story of The Drunkards and the Beer Bottle.

Let's make three threads that signal each other when they are ready!figref(). In this example we use PAIR sockets over the {{inproc}} transport:

[[code type="example" title="Multithreaded relay" name="mtrelay"]]
[[/code]]

[[code type="textdiagram" title="The Relay Race"]]
+------------+
| |
| Step 1 |
| |
+------------+
| PAIR |
\-----+------/
      |
      |
    Ready!
      |
      v
/------------\
| PAIR |
+------------+
| |
| Step 2 |
| |
+------------+
| PAIR |
\-----+------/
      |
      |
    Ready!
      |
      v
/------------\
| PAIR |
+------------+
| |
| Step 3 |
| |
+------------+
[[/code]]

This is a classic pattern for multithreading with 0MQ:

# Two threads communicate over {{inproc}}, using a shared context.
# The parent thread creates one socket, binds it to an inproc:// endpoint, and //then// starts the child thread, passing the context to it.
# The child thread creates the second socket, connects it to that inproc:// endpoint, and //then// signals to the parent thread that it's ready.

Note that multithreading code using this pattern is **//not scalable out to processes//**. If you use {{inproc}} and socket pairs, you are building a tightly-bound application. Do this when low latency is really vital. For all normal apps, use one context per thread, and {{ipc}} or {{tcp}}. Then you can easily break your threads out to separate processes, or boxes, as needed.

This is the first time we've shown an example using PAIR sockets. Why use PAIR? Other socket combinations might seem to work but they all have side-effects that could interfere with signaling:

* You can use PUSH for the sender and PULL for the receiver. This looks simple and will work, but remember that PUSH will load-balance messages to all available receivers. If you by accident start two receivers (e.g. you already have one running and you start a second), you'll "lose" half of your signals. PAIR has the advantage of refusing more than one connection, the pair is //exclusive//.

* You can use DEALER for the sender and ROUTER for the receiver. ROUTER however wraps your message in an "envelope", meaning your zero-size signal turns into a multi-part message. If you don't care about the data, and treat anything as a valid signal, and if you don't read more than once from the socket, that won't matter. If however you decide to send real data, you will suddenly find ROUTER providing you with "wrong" messages. DEALER also load-balances, giving the same risk as PUSH.

* You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not load-balance as PUSH or DEALER do. However you need to configure the subscriber with an empty subscription, which is annoying. Worse, the reliability of the PUB-SUB link is timing dependent and messages can get lost if the SUB socket is connecting while the PUB socket is sending its messages.

For these reasons, PAIR makes the best choice for coordination between pairs of threads.

+++ Node Coordination

When you want to coordinate nodes, PAIR sockets won't work well any more. This is one of the few areas where the strategies for threads and nodes are different. Principally nodes come and go whereas threads are static. PAIR sockets do not automatically reconnect if the remote node goes away and comes back.

The second significant difference between threads and nodes is that you typically have a fixed number of threads but a more variable number of nodes. Let's take one of our earlier scenarios (the weather server and clients) and use node coordination to ensure that subscribers don't lose data when starting up.

This is how the application will work:

* The publisher knows in advance how many subscribers it expects. This is just a magic number it gets from somewhere.

* The publisher starts up and waits for all subscribers to connect. This is the node coordination part. Each subscriber subscribes and then tells the publisher it's ready via another socket.

* When the publisher has all subscribers connected, it starts to publish data.

In this case we'll use a REQ-REP socket flow to synchronize subscribers and publisher!figref(). Here is the publisher:

[[code type="example" title="Synchronized publisher" name="syncpub"]]
[[/code]]

[[code type="textdiagram" title="Pub-Sub Synchronization"]]
+----------------+
| |
| Publisher |
| |
+--------+-------+
| PUB | REP |
\---+----+-----+-/
    | ^ |
    | | |
    | (1) |
   [3] | |
    | | (2)
    | | |
    v | v
/--------+-+-----\
| SUB | REQ |
+--------+-------+
| |
| Subscriber |
| |
+----------------+
[[/code]]

And here is the subscriber:

[[code type="example" title="Synchronized subscriber" name="syncsub"]]
[[/code]]

This Linux shell script will start ten subscribers and then the publisher:

[[code]]
echo "Starting subscribers..."
for a in 1 2 3 4 5 6 7 8 9 10; do
    syncsub &
done
echo "Starting publisher..."
syncpub
[[/code]]

Which gives us this satisfying output:

[[code]]
Starting subscribers...
Starting publisher...
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
[[/code]]

We can't assume that the SUB connect will be finished by the time the REQ/REP dialog is complete. There are no guarantees that outbound connects will finish in any order whatsoever, if you're using any transport except {{inproc}}. So, the example does a brute-force sleep of one second between subscribing, and sending the REQ/REP synchronization.

A more robust model could be:

* Publisher opens PUB socket and starts sending "Hello" messages (not data).
* Subscribers connect SUB socket and when they receive a Hello message they tell the publisher via a REQ/REP socket pair.
* When the publisher has had all the necessary confirmations, it starts to send real data.

+++ Zero Copy

We teased you in Chapter One, when you were still a 0MQ newbie, about zero-copy. If you survived this far, you are probably ready to use zero-copy. However, remember that there are many roads to Hell, and premature optimization is not the most enjoyable nor profitable one, by far. To say this in English, trying to do zero-copy properly while your architecture is not perfect is a waste of time and will make things worse, not better.

0MQ's message API lets you can send and receive messages directly from and to application buffers without copying data. Given that 0MQ sends messages in the background, zero-copy needs some extra sauce.

To do zero-copy you use zmq_msg_init_data[3] to create a message that refers to a block of data already allocated on the heap with malloc(), and then you pass that to zmq_msg_send[3]. When you create the message you also pass a function that 0MQ will call to free the block of data, when it has finished sending the message. This is the simplest example, assuming 'buffer' is a block of 1000 bytes allocated on the heap:

[[code language="C"]]
void my_free (void *data, void *hint) {
    free (data);
}
// Send message from buffer, which we allocate and 0MQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_msg_send (socket, &message, 0);
[[/code]]

There is no way to do zero-copy on receive: 0MQ delivers you a buffer that you can store as long as you wish but it will not write data directly into application buffers.

On writing, 0MQ's multi-part messages work nicely together with zero-copy. In traditional messaging you need to marshal different buffers together into one buffer that you can send. That means copying data. With 0MQ, you can send multiple buffers coming from different sources as individual message frames. We send each field as a length-delimited frame. To the application it looks like a series of send and recv calls. But internally the multiple parts get written to the network and read back with single system calls, so it's very efficient.

+++ Pub-Sub Message Envelopes

We've looked briefly at multi-part messages. Let's now look at their main use-case, which is //message envelopes//. An envelope is a way of safely packaging up data with an address, without touching the data itself.

In the pub-sub pattern, the envelope at least holds the subscription key for filtering but you can also add the sender identity in the envelope.

If you want to use pub-sub envelopes, you make them yourself. It's optional, and in previous pub-sub examples we didn't do this. Using a pub-sub envelope is a little more work for simple cases but it's cleaner especially for real cases, where the key and the data are naturally separate things. It's also faster, if you are writing the data directly from an application buffer.

Here is what a publish-subscribe message with an envelope looks like:

[[code type="textdiagram" title="Pub-Sub Envelope with Separate Key"]]
          +-------------+
Frame 1 | Key | Subscription key
          +-------------+
Frame 2 | Data | Actual message body
          +-------------+
[[/code]]


Recall that pub-sub matches messages based on the prefix. Putting the key into a separate frame makes the matching very obvious, since there is no chance an application will accidentally match on part of the data.

Here is a minimalist example of how pub-sub envelopes look in code. This publisher sends messages of two types, A and B. The envelope holds the message type:

[[code type="example" title="Pub-Sub envelope publisher" name="psenvpub"]]
[[/code]]

The subscriber only wants messages of type B:

[[code type="example" title="Pub-Sub envelope subscriber" name="psenvsub"]]
[[/code]]

When you run the two programs, the subscriber should show you this:

[[code]]
[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
...
[[/code]]

This examples shows that the subscription filter rejects or accepts the entire multi-part message (key plus data). You won't get part of a multi-part message, ever.

If you subscribe to multiple publishers and you want to know their identity so that you can send them data via another socket (and this is a fairly typical use-case), you create a three-part message:

[[code type="textdiagram" title="Pub-Sub Envelope with Sender Address"]]
          +-------------+
Frame 1 | Key | Subscription key
          +-------------+
Frame 2 | Identity | Address of publisher
          +-------------+
Frame 3 | Data | Actual message body
          +-------------+
[[/code]]

+++ High Water Marks

When you can send messages rapidly from process to process, you soon discover that memory is a precious resource, and one that's trivially filled up. A few seconds delay somewhere in a process can turn into a backlog that blows up a server, unless you understand the problem and take precautions.

The problem is this: if you have process A sending messages to process B, which suddenly gets very busy (garbage collection, CPU overload, whatever), then what happens to the messages that process A wants to send? Some will sit in B's network buffers. Some will sit on the Ethernet wire itself. Some will sit in A's network buffers. And the rest will accumulate in A's memory. If you don't take some precaution, A can easily run out of memory and crash. It is a consistent, classic problem with message brokers.

What are the answers? One is to pass the problem upstream. A is getting the messages from somewhere else. So tell that process, "stop!" And so on. This is called "flow control". It sounds great, but what if you're sending out a Twitter feed? Do you tell the whole world to stop tweeting while B gets its act together?

Flow control works in some cases but in others, the transport layer can't tell the application layer "stop" any more than a subway system can tell a large business, "please keep your staff at work another half an hour, I'm too busy".

The answer for messaging is to set limits on the size of buffers, and then when we reach those limits, take some sensible action. In most cases (not for a subway system, though), the answer is to throw away messages. In a few others, it's to wait.

0MQ uses the concept of "high water mark" or HWM to define the capacity of its internal pipes. Each connection out of a socket or into a socket has its own pipe, and HWM capacity.

In 0MQ/2.x the HWM was set to infinite by default. In 0MQ/3.x it's set to 1,000 by default, which is more sensible. If you're still using 0MQ/2.x you should always set a HWM on your sockets, be it 1,000 to match 0MQ/3.x or another figure that takes into account your message sizes.

The high water mark affects both the transmit and receive buffers of a single socket. Some sockets (PUB, PUSH) only have transmit buffers. Some (SUB, PULL, REQ, REP) only have receive buffers. Some (DEALER, ROUTER, PAIR) have both transmit and receive buffers.

When your socket reaches its high-water mark, it will either block or drop data depending on the socket type. PUB sockets will drop data if they reach their high-water mark, while other socket types will block.

Over the {{inproc}} transport, the sender and receiver share the same buffers, so the real HWM is the sum of the HWM set by both sides. This means in effect that if one side does not set a HWM, there is no limit to the buffer size.

+++ A Bare Necessity

ØMQ is like a box of pieces that plug together, the only limitation being your imagination and sobriety.

The scalable elastic architecture you get should be an eye-opener. You might need a coffee or two first. Don't make the mistake I made once and buy exotic German coffee labeled //Entkoffeiniert//. That does not mean "Delicious". Scalable elastic architectures are not a new idea - [http://en.wikipedia.org/wiki/Flow-based_programming flow-based programming] and languages like [http://www.erlang.org/ Erlang] already worked like this - but ØMQ makes it easier to use than ever before.

As [http://permalink.gmane.org/gmane.network.zeromq.devel/2145 Gonzo Diethelm said], '//My gut feeling is summarized in this sentence: "if ØMQ didn't exist, it would be necessary to invent it". Meaning that I ran into ØMQ after years of brain-background processing, and it made instant sense... ØMQ simply seems to me a "bare necessity" nowadays.//'
Something went wrong with that request. Please try again.