Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Edited Chapter 4

  • Loading branch information...
commit ea0339c1ca8299f3f5e80b6cdd4cce168930e452 1 parent 79e1162
@hintjens hintjens authored
Showing with 433 additions and 456 deletions.
  1. +36 −33 chapter3.txt
  2. +174 −144 chapter4.txt
  3. +4 −4 chapter5.txt
  4. +0 −39 chapter6.txt
  5. 0  examples/C#/{lruqueue.cs → lbbroker.cs}
  6. 0  examples/C#/{lruqueue2.cs → lbbroker2.cs}
  7. 0  examples/C++/{lruqueue.cpp → lbbroker.cpp}
  8. +4 −4 examples/C/.gitignore
  9. +4 −4 examples/C/asyncsrv.c
  10. +36 −36 examples/C/bstar.c
  11. +12 −12 examples/C/bstarsrv.c
  12. +2 −2 examples/C/bstarsrv2.c
  13. +31 −31 examples/C/clonesrv6.c
  14. +2 −2 examples/C/flserver2.c
  15. +2 −2 examples/C/flserver3.c
  16. +23 −23 examples/C/{lruqueue.c → lbbroker.c}
  17. +11 −11 examples/C/{lruqueue2.c → lbbroker2.c}
  18. +13 −13 examples/C/{lruqueue3.c → lbbroker3.c}
  19. +22 −22 examples/C/mdbroker.c
  20. +1 −1  examples/C/mdwrkapi.c
  21. +10 −10 examples/C/peering2.c
  22. +12 −11 examples/C/peering3.c
  23. +17 −17 examples/C/ppqueue.c
  24. +9 −9 examples/C/spqueue.c
  25. +3 −3 examples/C/spworker.c
  26. +0 −23 examples/C/testit.c
  27. 0  examples/CL/{lruqueue.asd → lbbroker.asd}
  28. 0  examples/CL/{lruqueue.lisp → lbbroker.lisp}
  29. +5 −0 examples/ChangeLog
  30. 0  examples/Clojure/{lruqueue.clj → lbbroker.clj}
  31. 0  examples/Erlang/{lruqueue.es → lbbroker.es}
  32. 0  examples/F#/{lruqueue.fsx → lbbroker.fsx}
  33. 0  examples/Haskell/{lruqueue.hs → lbbroker.hs}
  34. 0  examples/Haxe/{lruqueue.hx → lbbroker.hx}
  35. 0  examples/Haxe/{lruqueue2.hx → lbbroker2.hx}
  36. 0  examples/Haxe/{lruqueue3.hx → lbbroker3.hx}
  37. 0  examples/Java/{lruqueue.java → lbbroker.java}
  38. 0  examples/Lua/{lruqueue.lua → lbbroker.lua}
  39. 0  examples/Lua/{lruqueue2.lua → lbbroker2.lua}
  40. 0  examples/PHP/{lruqueue.php → lbbroker.php}
  41. 0  examples/PHP/{lruqueue2.php → lbbroker2.php}
  42. 0  examples/Python/{lruqueue.py → lbbroker.py}
  43. 0  examples/Python/{lruqueue2.py → lbbroker2.py}
  44. 0  examples/Python/{lruqueue3.py → lbbroker3.py}
  45. 0  examples/Scala/{lruqueue.scala → lbbroker.scala}
  46. 0  examples/Scala/{lruqueue2.scala → lbbroker2.scala}
  47. 0  examples/Tcl/{lruqueue.tcl → lbbroker.tcl}
View
69 chapter3.txt
@@ -9,7 +9,7 @@ We'll cover:
* How the request-reply mechanisms work.
* How to combine REQ, REP, DEALER, and ROUTER sockets.
* How ROUTER sockets work, in detail.
-* The least-recently used worker pattern.
+* The load-balancing pattern.
* Building a simple load-balancing message broker.
* Designing a high-level API for 0MQ.
* Building an asynchronous request-reply server.
@@ -160,6 +160,7 @@ These are the legal combinations:
* REQ to ROUTER
* DEALER to ROUTER
* DEALER to DEALER
+* ROUTER to ROUTER
And these combinations are invalid (and I'll explain why):
@@ -167,7 +168,6 @@ And these combinations are invalid (and I'll explain why):
* REQ to DEALER
* REP to REP
* REP to ROUTER
-* ROUTER to ROUTER
Here are some tips for remembering the semantics. DEALER is like an asynchronous REQ socket, and ROUTER is like an asynchronous REP socket. Where we use a REQ socket we can use a DEALER, we just have to read and write the envelope ourselves. Where we use a REP socket we can stick a ROUTER, we just need to manage the identities ourselves.
@@ -210,10 +210,14 @@ Since both DEALER and ROUTER can work with arbitrary message formats, if you hop
++++ The DEALER to DEALER Combination
-This is the odd one out, and things would be more symmetric if this combination was invalid. But there are cases where it's useful. I said you can swap a REP with a ROUTER, but you can also swap a REP with a DEALER, if the DEALER is talking to one and only one peer.
+You can swap a REP with a ROUTER, but you can also swap a REP with a DEALER, if the DEALER is talking to one and only one peer.
When you replace a REP with a DEALER, your worker can suddenly go full asynchronous, sending any number of replies back. The cost is that you have to manage the reply envelopes yourself, and get them right, or nothing at all will work. We'll see a worked example later. Let's just say for now that DEALER to DEALER is one of the trickier patterns to get right, and happily it's rare that we need it.
+++++ The ROUTER to ROUTER Combination
+
+This sounds perfect for N-to-N connections but it's the most difficult combination to use. You should avoid it until you are well-advanced with 0MQ. We'll see one example it in the Freelance pattern in [#reliable-request-reply], and an alternative DEALER to ROUTER design for peer-to-peer work in [#moving-pieces].
+
++++ Invalid Combinations
Mostly, trying to connect clients to clients, or servers to servers, is a bad idea and won't work. However rather than give general vague warnings, I'll explain in detail:
@@ -226,8 +230,6 @@ Mostly, trying to connect clients to clients, or servers to servers, is a bad id
* REP to ROUTER: the ROUTER socket can in theory initiate the dialog and send a properly-formatted request, if it knows the REP socket has connected //and// it knows the identity of that connection. It's messy and adds nothing over DEALER to ROUTER.
-* ROUTER to ROUTER: this is a combination that many people feel //should// work, especially for peer-to-peer architectures. But it has the same problems as REP to ROUTER. We'll come back to peer-to-peer architectures in [#moving-pieces].
-
The common thread in this valid vs. invalid breakdown is that a 0MQ socket connection is always biased towards one peer that binds to an endpoint, and another that connects to that. Further, that which side binds and which side connects is not arbitrary, but follows natural patterns. The side which we expect to "be there" binds: it'll be a server, a broker, a publisher, a collector. The side that "comes and goes" connects: it'll be clients and workers. Remembering this will help you design better 0MQ architectures.
+++ Exploring ROUTER Sockets
@@ -273,11 +275,11 @@ ROUTER sockets do have a somewhat brutal way of dealing with messages they can't
Since 0MQ/3.2 there's a socket option you can set to catch this error: {{ZMQ_ROUTER_MANDATORY}}. Set that on the ROUTER socket and then you provide an unroutable identity on a send call, the socket will signal an EHOSTUNREACH error.
-+++ The Least-Recently Used (LRU) Worker Pattern
++++ The Load-balancing Pattern
-Let's now look at some code. We'll see how to connect a ROUTER socket to a REQ socket, and then to a DEALER socket. These two examples follow the same logic, which is a //least-recently used worker// or LRU pattern. This pattern is our first exposure to using the ROUTER socket for deliberate routing, rather than simply acting as a reply channel.
+Let's now look at some code. We'll see how to connect a ROUTER socket to a REQ socket, and then to a DEALER socket. These two examples follow the same logic, which is a //load-balancing// pattern. This pattern is our first exposure to using the ROUTER socket for deliberate routing, rather than simply acting as a reply channel.
-The LRU worker pattern is very common and we'll see it several times in the Guide. It solves the main problem with simple round-robin routing (as PUSH and DEALER offer) which is that round-robin becomes inefficient if tasks do not all roughly take the same time.
+The load-balancing pattern is very common and we'll see it several times in the Guide. It solves the main problem with simple round-robin routing (as PUSH and DEALER offer) which is that round-robin becomes inefficient if tasks do not all roughly take the same time.
It's the post office analogy. If you have one queue per counter, and you have some people buying stamps (a fast, simple transaction), and some people opening new accounts (a very slow transaction), then you will find stamp-buyers getting unfairly stuck in queues. Just as in a post office, if your messaging architecture is unfair, people will get annoyed.
@@ -289,13 +291,13 @@ This is a recurring theme with 0MQ: the world's problems are diverse and you can
Back to a worker (DEALER or REQ) connected to a broker (ROUTER). The broker has to know when the worker is ready, and keep a list of workers so that it can take the //least recently used// worker each time.
-The solution is really simple in fact: workers send a "Ready" message when they start, and after they finish each task. The broker reads these messages one by one. Each time it reads a message, that is from the least-recently used worker. And since we're using a ROUTER socket, we get an identity that we can then use to send a task back to the worker.
+The solution is really simple in fact: workers send a "Ready" message when they start, and after they finish each task. The broker reads these messages one by one. Each time it reads a message, that is from the last used worker. And since we're using a ROUTER socket, we get an identity that we can then use to send a task back to the worker.
It's a twist on request-reply because the task is sent with the reply, and any response for the task is sent as a new request. The following code examples should make it clearer.
++++ ROUTER Broker and REQ Workers
-Here is an example of the LRU pattern using a ROUTER broker talking to a set of REQ workers:
+Here is an example of the load-balancing pattern using a ROUTER broker talking to a set of REQ workers:
[[code type="example" title="ROUTER-to-REQ" name="rtreq"]]
[[/code]]
@@ -347,11 +349,11 @@ However remember the reason for that empty delimiter frame: it's to allow multih
If we never need to pass the message along to a REP socket, we can simply drop the empty delimiter frame at both sides, which makes things simpler. This is usually the design I use for pure DEALER to ROUTER protocols.
-++++ A Load-Balancing Broker
+++++ A Load-Balancing Message Broker
The previous example is half-complete. It can manage a set of workers with dummy requests and replies, but it has no way to talk to clients.
-If we add a second //frontend// ROUTER socket that accepts client requests, and turn our example into a proxy that can switch messages from frontend to backend, we get a useful and reusable little message broker!figref().
+If we add a second //frontend// ROUTER socket that accepts client requests, and turn our example into a proxy that can switch messages from frontend to backend, we get a useful and reusable tiny load-balancing message broker!figref().
[[code type="textdiagram" title="Load-Balancing Broker"]]
+--------+ +--------+ +--------+
@@ -365,7 +367,7 @@ If we add a second //frontend// ROUTER socket that accepts client requests, and
+---+----+
| ROUTER | Frontend
+--------+
- | Proxy | LRU queue
+ | Proxy | Load-balancer
+--------+
| ROUTER | Backend
+---+----+
@@ -384,16 +386,16 @@ What this broker does is:
* Accepts connections from a set of clients.
* Accepts connections from a set of workers.
* Accepts requests from clients and holds these in a single queue.
-* Sends these requests to workers using the LRU load-balancing pattern.
+* Sends these requests to workers using the load-balancing pattern.
* Receives replies back from workers.
* Sends these replies back to the original requesting client.
The broker code is fairly long but worth understanding:
-[[code type="example" title="Load-balancing broker" name="lruqueue"]]
+[[code type="example" title="Load-balancing broker" name="lbbroker"]]
[[/code]]
-The difficult part of this program is (a) the envelopes that each socket reads and writes, and (b) the LRU algorithm. We'll take these in turn, starting with the message envelope formats.
+The difficult part of this program is (a) the envelopes that each socket reads and writes, and (b) the load-balancing algorithm. We'll take these in turn, starting with the message envelope formats.
Let's walk through a full request-reply chain from client to worker and back. In this code we set the identity of client and worker sockets to make it easier to trace the message frames. In reality we'd allow the ROUTER sockets to invent identities for connections. Let's assume the client's identity is "CLIENT" and the worker's identity is "WORKER". The client application sends a single frame containing "HELLO"!figref().
@@ -447,25 +449,25 @@ The worker has to save the envelope (which is all the parts up to and including
On the return path the messages are the same as when they come in, i.e. the backend socket gives the broker a message in five parts, and the broker sends the frontend socket a message in three parts, and the client gets a message in one part.
-Now let's look at the LRU algorithm. It requires that both clients and workers use REQ sockets, and that workers correctly store and replay the envelope on messages they get. The algorithm is:
+Now let's look at the load-balancing algorithm. It requires that both clients and workers use REQ sockets, and that workers correctly store and replay the envelope on messages they get. The algorithm is:
* Create a pollset which polls the backend always, and the frontend only if there are one or more workers available.
* Poll for activity with infinite timeout.
-* If there is activity on the backend, we either have a "ready" message or a reply for a client. In either case we store the worker address (the first part) on our LRU queue, and if the rest is a client reply we send it back to that client via the frontend.
+* If there is activity on the backend, we either have a "ready" message or a reply for a client. In either case we store the worker address (the first part) on our worker queue, and if the rest is a client reply we send it back to that client via the frontend.
-* If there is activity on the frontend, we take the client request, pop the next worker (which is the least-recently used), and send the request to the backend. This means sending the worker address, empty part, and then the three parts of the client request.
+* If there is activity on the frontend, we take the client request, pop the next worker (which is the last used), and send the request to the backend. This means sending the worker address, empty part, and then the three parts of the client request.
-You should now see that you can reuse and extend the LRU algorithm with variations based on the information the worker provides in its initial "ready" message. For example, workers might start up and do a performance self-test, then tell the broker how fast they are. The broker can then choose the fastest available worker rather than LRU or round-robin.
+You should now see that you can reuse and extend the load-balancing algorithm with variations based on the information the worker provides in its initial "ready" message. For example, workers might start up and do a performance self-test, then tell the broker how fast they are. The broker can then choose the fastest available worker rather than the oldest.
+++ A High-Level API for 0MQ
++++ Making a Detour
-We're going to push request-reply onto the stack and open a different area, which is the 0MQ API itself. There's a reason for this detour: as we write more complex examples, the low-level 0MQ API starts to look increasingly clumsy. Look at the core of the worker thread from our load-balancing message broker:
+We're going to push request-reply onto the stack and open a different area, which is the 0MQ API itself. There's a reason for this detour: as we write more complex examples, the low-level 0MQ API starts to look increasingly clumsy. Look at the core of the worker thread from our load-balancing broker:
-[[code type="fragment" name="lrureader"]]
+[[code type="fragment" name="lbreader"]]
while (true) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
@@ -561,9 +563,9 @@ Cutting the amount of code we need to read and write complex messages is great:
Turning this wishlist into reality for the C language gives us [http://zero.mq/c CZMQ], a 0MQ language binding for C. This high-level binding in fact developed out of earlier versions of the Guide. It combines nicer semantics for working with 0MQ with some portability layers, and (importantly for C but less for other languages) containers like hashes and lists. CZMQ also uses an elegant object model that leads to frankly lovely code.
-Here is the load-balancing message broker rewritten to use a higher-level API (CZMQ for the C case):
+Here is the load-balancing broker rewritten to use a higher-level API (CZMQ for the C case):
-[[code type="example" title="Load-balancing broker using high-level API" name="lruqueue2"]]
+[[code type="example" title="Load-balancing broker using high-level API" name="lbbroker2"]]
[[/code]]
One thing CZMQ provides is clean interrupt handling. This means that Ctrl-C will cause any blocking 0MQ call to exit with a return code -1 and errno set to EINTR. The high-level recv methods will return NULL in such cases. So, you can cleanly exit a loop like this:
@@ -609,7 +611,7 @@ While the actual handling of messages sits inside dedicated functions or methods
Here is the load-balancing broker rewritten once again, this time to use {{zloop}}:
-[[code type="example" title="LRU queue broker using zloop" name="lruqueue3"]]
+[[code type="example" title="Load balancing broker using zloop" name="lbbroker3"]]
[[/code]]
Getting applications to properly shut-down when you send them Ctrl-C can be tricky. If you use the {{zctx}} class it'll automatically set-up signal handling, but your code still has to cooperate. You must break any loop if {{zmq_poll}} returns -1 or if any of the {{zstr_recv}}, {{zframe_recv}}, or {{zmsg_recv}} methods return NULL. If you have nested loops, it can be useful to make the outer ones conditional on {{!zctx_interrupted}}.
@@ -708,7 +710,7 @@ Note that we're doing DEALER to ROUTER dialog between client and server, but int
Let's think about the routing envelope. The client sends a simple message. The server thread receives a two-part message (real message prefixed by client identity). We have two possible designs for the server-to-worker interface:
-* Workers get unaddressed messages, and we manage the connections from server thread to worker threads explicitly using a ROUTER socket as backend. This would require that workers start by telling the server they exist, which can then route requests to workers and track which client is 'connected' to which worker. This is the LRU load-balancing pattern we already covered.
+* Workers get unaddressed messages, and we manage the connections from server thread to worker threads explicitly using a ROUTER socket as backend. This would require that workers start by telling the server they exist, which can then route requests to workers and track which client is 'connected' to which worker. This is the load-balancing pattern again.
* Workers get addressed messages, and they return addressed replies. This requires that workers can properly decode and recode envelopes but it doesn't need any other mechanisms.
@@ -766,7 +768,7 @@ It's a straight-forward problem that requires no exotic hardware or protocols, j
++++ Architecture of a Single Cluster
-Workers and clients are synchronous. We want to use the LRU pattern to route tasks to workers. Workers are all identical, our facility has no notion of different services. Workers are anonymous, clients never address them directly. We make no attempt here to provide guaranteed delivery, retry, etc.
+Workers and clients are synchronous. We want to use the load-balancing pattern to route tasks to workers. Workers are all identical, our facility has no notion of different services. Workers are anonymous, clients never address them directly. We make no attempt here to provide guaranteed delivery, retry, etc.
For reasons we already looked at, clients and workers won't speak to each other directly. It makes it impossible to add or remove nodes dynamically. So our basic model consists of the request-reply message broker we saw earlier!figref().
@@ -784,7 +786,8 @@ For reasons we already looked at, clients and workers won't speak to each other
| +-----+------+ |
| | ROUTER | |
| +------------+ |
-| | LRU Queue | |
+| | Load | |
+| | balancer | |
| +------------+ |
| | ROUTER | |
| +-----+------+ |
@@ -829,7 +832,7 @@ The question is: how do we get the clients of each cluster talking to the worker
* Clients could connect directly to both brokers. The advantage is that we don't need to modify brokers or workers. But clients get more complex, and become aware of the overall topology. If we want to add, e.g. a third or forth cluster, all the clients are affected. In effect we have to move routing and fail-over logic into the clients and that's not nice.
-* Workers might connect directly to both brokers. But REQ workers can't do that, they can only reply to one broker. We might use REPs but REPs don't give us customizable broker-to-worker routing like LRU, only the built-in load balancing. That's a fail, if we want to distribute work to idle workers: we precisely need LRU. One solution would be to use ROUTER sockets for the worker nodes. Let's label this "Idea #1".
+* Workers might connect directly to both brokers. But REQ workers can't do that, they can only reply to one broker. We might use REPs but REPs don't give us customizable broker-to-worker routing like load-balancing, only the built-in load balancing. That's a fail, if we want to distribute work to idle workers: we precisely need load-balancing. One solution would be to use ROUTER sockets for the worker nodes. Let's label this "Idea #1".
* Brokers could connect to each other. This looks neatest because it creates the fewest additional connections. We can't add clusters on the fly but that is probably out of scope. Now clients and workers remain ignorant of the real network topology, and brokers tell each other when they have spare capacity. Let's label this "Idea #2".
@@ -896,7 +899,7 @@ We know the basic model well by now:
* The REQ client (REQ) threads create workloads and pass them to the broker (ROUTER).
* The REQ worker (REQ) threads process workloads and return the results to the broker (ROUTER).
-* The broker queues and distributes workloads using the LRU routing model.
+* The broker queues and distributes workloads using the load-balancing pattern.
++++ Federation vs. Peering
@@ -926,9 +929,9 @@ The simplest interconnect is //federation// in which brokers simulate clients an
This would give us simple logic in both brokers and a reasonably good mechanism: when there are no clients, tell the other broker 'ready', and accept one job from it. The problem is also that it is too simple for this problem. A federated broker would be able to handle only one task at once. If the broker emulates a lock-step client and worker, it is by definition also going to be lock-step and if it has lots of available workers they won't be used. Our brokers need to be connected in a fully asynchronous fashion.
-The federation model is perfect for other kinds of routing, especially service-oriented architectures (SOAs) which route by service name and proximity rather than LRU or round-robin. So don't dismiss it as useless, it's just not right for least-recently used and cluster load-balancing.
+The federation model is perfect for other kinds of routing, especially service-oriented architectures (SOAs) which route by service name and proximity rather than load-balancing or round-robin. So don't dismiss it as useless, it's just not right for all use-cases.
-So instead of federation, let's look at a //peering// approach in which brokers are explicitly aware of each other and talk over privileged channels. Let's break this down, assuming we want to interconnect N brokers. Each broker has (N - 1) peers, and all brokers are using exactly the same code and logic. There are two distinct flows of information between brokers:
+Instead of federation, let's look at a //peering// approach in which brokers are explicitly aware of each other and talk over privileged channels. Let's break this down, assuming we want to interconnect N brokers. Each broker has (N - 1) peers, and all brokers are using exactly the same code and logic. There are two distinct flows of information between brokers:
* Each broker needs to tell its peers how many workers it has available at any time. This can be fairly simple information, just a quantity that is updated regularly. The obvious (and correct) socket pattern for this is publish-subscribe. So every broker opens a PUB socket and publishes state information on that, and every broker also opens a SUB socket and connects that to the PUB socket of every other broker, to get state information from its peers.
@@ -1123,7 +1126,7 @@ Before we jump into the code, which is getting a little complex, let's sketch th
We need two queues, one for requests from local clients and one for requests from cloud clients. One option would be to pull messages off the local and cloud frontends, and pump these onto their respective queues. But this is kind of pointless because 0MQ sockets //are// queues already. So let's use the 0MQ socket buffers as queues.
-This was the technique we used in the LRU queue broker, and it worked nicely. We only read from the two frontends when there is somewhere to send the requests. We can always read from the backends, since they give us replies to route back. As long as the backends aren't talking to us, there's no point in even looking at the frontends.
+This was the technique we used in the load-balancing broker, and it worked nicely. We only read from the two frontends when there is somewhere to send the requests. We can always read from the backends, since they give us replies to route back. As long as the backends aren't talking to us, there's no point in even looking at the frontends.
So our main loop becomes:
View
318 chapter4.txt
@@ -6,7 +6,7 @@
In this chapter we focus heavily on user-space request-reply "patterns", reusable models that help you design your own 0MQ architectures:
* The //Lazy Pirate// pattern: reliable request reply from the client side
-* The //Simple Pirate// pattern: reliable request-reply using a LRU queue
+* The //Simple Pirate// pattern: reliable request-reply using load-balancing
* The //Paranoid Pirate// pattern: reliable request-reply with heartbeating
* The //Majordomo// pattern: service-oriented reliable queuing
* The //Titanic// pattern: disk-based / disconnected reliable queuing
@@ -41,30 +41,27 @@ So to make things brutally simple, reliability is "keeping things working proper
Let's take them one by one:
-Request-reply:: If the server dies (while processing a request), the client can figure that out because it won't get an answer back. Then it can give up in a huff, wait and try again later, find another server, etc. As for the client dying, we can brush that off as "someone else's problem" for now.
+* Request-reply - if the server dies (while processing a request), the client can figure that out because it won't get an answer back. Then it can give up in a huff, wait and try again later, find another server, etc. As for the client dying, we can brush that off as "someone else's problem" for now.
-Publish-subscribe:: if the client dies (having gotten some data), the server doesn't know about it. Pubsub doesn't send any information back from client to server. But the client can contact the server out-of-band, e.g. via request-reply, and ask, "please resend everything I missed". As for the server dying, that's out of scope for here. Subscribers can also self-verify that they're not running too slowly, and take action (e.g., warn the operator, and die) if they are.
+* Publish-subscribe - if the client dies (having gotten some data), the server doesn't know about it. Pubsub doesn't send any information back from client to server. But the client can contact the server out-of-band, e.g. via request-reply, and ask, "please resend everything I missed". As for the server dying, that's out of scope for here. Subscribers can also self-verify that they're not running too slowly, and take action (e.g., warn the operator, and die) if they are.
-Pipeline:: if a worker dies (while working), the ventilator doesn't know about it. Pipelines, like pubsub, and the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn't get done, and send a message back to the ventilator saying, "hey, resend task 324!" If the ventilator or collector dies, whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It's not elegant, but system code should really not die often enough to matter.
+* Pipeline - if a worker dies (while working), the ventilator doesn't know about it. Pipelines, like pubsub, and the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn't get done, and send a message back to the ventilator saying, "hey, resend task 324!" If the ventilator or collector dies, whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It's not elegant, but system code should really not die often enough to matter.
-In this chapter we'll focus on just on request-reply, which is the low-hanging Durian fruit of reliable messaging. We'll cover reliable pub-sub and pipeline in later chapters.
+In this chapter we'll focus on just on request-reply, which is the low-hanging fruit of reliable messaging.
The basic request-reply pattern (a REQ client socket doing a blocking send/recv to a REP server socket) scores low on handling the most common types of failure. If the server crashes while processing the request, the client just hangs forever. If the network loses the request or the reply, the client hangs forever.
Request-reply is still much better than TCP, thanks to 0MQ's ability to reconnect peers silently, to load-balance messages, and so on. But it's still not good enough for real work. The only case where you can really trust the basic request-reply pattern is between two threads in the same process where there's no network or separate server process to die.
-However, with a little extra work this humble pattern becomes a good basis for real work across a distributed network, and we get a set of reliable request-reply (RRR) patterns I like to call the "Pirate" patterns (you'll get the joke, eventually).
+However, with a little extra work this humble pattern becomes a good basis for real work across a distributed network, and we get a set of reliable request-reply (RRR) patterns I like to call the //Pirate// patterns (you'll eventually get the joke, I hope).
There are, in my experience, roughly three ways to connect clients to servers. Each needs a specific approach to reliability:
* Multiple clients talking directly to a single server. Use case: a single well-known server that clients need to talk to. Types of failure we aim to handle: server crashes and restarts, network disconnects.
-////
-AO: I'd prefer to state the use case differently, because the current use case sounds exactly like the approach itself; it's tautological.
-////
-* Multiple clients talking to a single queue device that distributes work to multiple servers. Use case: workload distribution to workers. Types of failure we aim to handle: worker crashes and restarts, worker busy looping, worker overload, queue crashes and restarts, network disconnects.
+* Multiple clients talking to a broker proxy that distributes work to multiple workers. Use case: service oriented transaction processing. Types of failure we aim to handle: worker crashes and restarts, worker busy looping, worker overload, queue crashes and restarts, network disconnects.
-* Multiple clients talking to multiple servers with no intermediary devices. Use case: distributed services such as name resolution. Types of failure we aim to handle: service crashes and restarts, service busy looping, service overload, network disconnects.
+* Multiple clients talking to multiple servers with no intermediary proxies. Use case: distributed services such as name resolution. Types of failure we aim to handle: service crashes and restarts, service busy looping, service overload, network disconnects.
Each of these approaches has its trade-offs and often you'll mix them. We'll look at all three in detail.
@@ -146,20 +143,13 @@ So, pros and cons:
* Pro: 0MQ automatically retries the actual reconnection until it works.
* Con: doesn't do fail-over to backup / alternate servers.
-////
-AO: You didn't explain where the term Pirate comes from.
-////
-
+++ Basic Reliable Queuing (Simple Pirate Pattern)
-Our second approach extends the Lazy Pirate pattern with a queue device that lets us talk, transparently, to multiple servers, which we can more accurately call "workers". We'll develop this in stages, starting with a minimal working model, the Simple Pirate pattern.
+Our second approach extends the Lazy Pirate pattern with a queue proxy that lets us talk, transparently, to multiple servers, which we can more accurately call "workers". We'll develop this in stages, starting with a minimal working model, the Simple Pirate pattern.
-In all these Pirate patterns, workers are stateless. If the application requires some shared state--e.g., a shared database--we don't know about it as we design our messaging framework. Having a queue device means workers can come and go without clients knowing anything about it. If one worker dies, another takes over. This is a nice, simple topology with only one real weakness, namely the central queue itself, which can become a problem to manage, and a single point of failure.
+In all these Pirate patterns, workers are stateless. If the application requires some shared state--e.g., a shared database--we don't know about it as we design our messaging framework. Having a queue proxy means workers can come and go without clients knowing anything about it. If one worker dies, another takes over. This is a nice, simple topology with only one real weakness, namely the central queue itself, which can become a problem to manage, and a single point of failure.
-////
-AO: It would be nice to point directly to the section within Chapter 3, but your tools might not support that.
-////
-The basis for the queue device is the least-recently-used (LRU) routing queue from [#advanced-request-reply]. What is the very //minimum// we need to do to handle dead or blocked workers? Turns out, it's surprisingly little. We already have a retry mechanism in the client. So using the standard LRU queue will work pretty well. This fits with 0MQ's philosophy that we can extend a peer-to-peer pattern like request-reply by plugging naive devices in the middle!figref().
+The basis for the queue proxy is the load-balancing broker from [#advanced-request-reply]. What is the very //minimum// we need to do to handle dead or blocked workers? Turns out, it's surprisingly little. We already have a retry mechanism in the client. So using the load-balancing pattern will work pretty well. This fits with 0MQ's philosophy that we can extend a peer-to-peer pattern like request-reply by plugging naive proxies in the middle!figref().
[[code type="textdiagram" title="The Simple Pirate Pattern"]]
+-----------+ +-----------+ +-----------+
@@ -177,8 +167,8 @@ The basis for the queue device is the least-recently-used (LRU) routing queue fr
/-----------\
| ROUTER |
+-----------+
- | LRU |
- | Queue |
+ | Load |
+ | balancer |
+-----------+
| ROUTER |
\-----------/
@@ -190,25 +180,21 @@ The basis for the queue device is the least-recently-used (LRU) routing queue fr
/-----------\ /-----------\ /-----------\
| REQ | | REQ | | REQ |
+-----------+ +-----------+ +-----------+
-| LRU | | LRU | | LRU |
| Worker | | Worker | | Worker |
+-----------+ +-----------+ +-----------+
[[/code]]
-////
-AO: Actually, it may be more accurate to say that you are basing your client on the previous section's Lazy Pirate, because there are changes.
-////
-We don't need a special client; we're still using the Lazy Pirate client. Here is the queue, which is precisely the LRU queue described in [#advanced-request-reply], no more or less:
+We don't need a special client; we're still using the Lazy Pirate client. Here is the queue, which is identical to the main task of the load-balancing broker:
[[code type="example" title="Simple Pirate queue" name="spqueue"]]
[[/code]]
-Here is the worker, which takes the Lazy Pirate server and adapts it for the LRU pattern (using the REQ 'ready' signaling):
+Here is the worker, which takes the Lazy Pirate server and adapts it for the load-balancing pattern (using the REQ 'ready' signaling):
[[code type="example" title="Simple Pirate worker" name="spworker"]]
[[/code]]
-To test this, start a handful of workers, a client, and the queue, in any order. You'll see that the workers eventually all crash and burn, and the client retries and then gives up. The queue never stops, and you can restart workers and clients ad-nauseam. This model works with any number of clients and workers.
+To test this, start a handful of workers, a Lazy Pirate client, and the queue, in any order. You'll see that the workers eventually all crash and burn, and the client retries and then gives up. The queue never stops, and you can restart workers and clients ad-nauseam. This model works with any number of clients and workers.
+++ Robust Reliable Queuing (Paranoid Pirate Pattern)
@@ -258,12 +244,12 @@ We previously used a REQ socket for the worker. For the Paranoid Pirate worker w
+-----------+ +-----------+ +-----------+
[[/code]]
-We're still using the Lazy Pirate client. Here is the Paranoid Pirate queue device:
+We're still using the Lazy Pirate client. Here is the Paranoid Pirate queue proxy:
[[code type="example" title="Paranoid Pirate queue" name="ppqueue"]]
[[/code]]
-The queue extends the LRU pattern with heartbeating of workers. Heartbeating is one of those "simple" things that can be subtle to get right. I'll explain more about that in a second.
+The queue extends the load-balancing pattern with heartbeating of workers. Heartbeating is one of those "simple" things that can be subtle to get right. I'll explain more about that in a second.
Here is the Paranoid Pirate worker:
@@ -274,10 +260,7 @@ Some comments about this example:
* The code includes simulation of failures, as before. This makes it (a) very hard to debug, and (b) dangerous to reuse. When you want to debug this, disable the failure simulation.
-* The worker uses a reconnect strategy similar to the one we designed for the Lazy Pirate client, with two major differences: (a) it does an exponential back-off, and (b) it never abandons.
-////
-AO: Abandons the queue?
-////
+* The worker uses a reconnect strategy similar to the one we designed for the Lazy Pirate client, with two major differences: (a) it does an exponential back-off, and (b) it retries indefinitely (where as the client retries a few times before reporting a failure).
Try the client, queue, and workers, such as by using a script like this:
@@ -294,49 +277,120 @@ You should see the workers die, one by one, as they simulate a crash, and the cl
+++ Heartbeating
-When writing the Paranoid Pirate examples, it took about five hours to get the queue-to-worker heartbeating working properly. The rest of the request-reply chain took perhaps ten minutes. Heartbeating is one of those reliability layers that often causes more trouble than it saves. It is especially easy to create "false failures", i.e., when peers decide that they are disconnected because the heartbeats aren't sent properly.
+Heartbeating solves the problem of knowing whether a peer is alive or dead. This is not an issue specific to 0MQ. TCP has a long timeout (30 minutes or so), that means that it can be impossible to know whether a peer has died, been disconnected, or gone on a weekend to Prague with a case of vodka, a redhead, and a large expense account.
-Some points to consider when understanding and implementing heartbeating:
-////
-AO: The following points are useful guidelines, but they are no substitute for an explanation of how to do heartbeating. The code is hard to trace through, particularly given that multiple nodes are doing different things that interact somehow through messages. You need to explain what you did.
-////
+It's is not easy to get heartbeating right. When writing the Paranoid Pirate examples, it took about five hours to get the heartbeating working properly. The rest of the request-reply chain took perhaps ten minutes. It is especially easy to create "false failures", i.e., when peers decide that they are disconnected because the heartbeats aren't sent properly.
+
+We'll look at the three main answers people use for heartbeating with 0MQ.
+
+++++ Shrugging It Off
+
+The most common approach is to do no heartbeating at all and hope for the best. Many if not most 0MQ applications do this. 0MQ encourages this by hiding peers in many cases. What problems does this approach cause?
+
+* When we use a ROUTER socket in an application that tracks peers, as peers disconnect and reconnect, the application will leak memory (resources that the application holds for each peer) and get slower and slower.
+
+* When we use SUB or DEALER-based data recipients, we can't tell the difference between good silence (there's no data) and bad silence (the other end died). When a recipient knows the other side died, it can for example switch over to a backup route.
+
+* If we use a TCP connection that stays silent for a long while, it will, in some networks, just die. Sending something (technically, a "keep-alive" more than a heartbeat), will keep the network alive.
+
+++++ One-Way Heartbeats
+
+A second option is to sending a heartbeat message from each node to its peers, every second or so. When one node hears nothing from another, within some timeout (several seconds, typically), it will treat that peer as dead. Sounds good, right? Sadly no. This works in some cases but has nasty edge cases in other cases.
-* Note that heartbeats are not request-reply. They flow asynchronously in both directions. Either peer can decide the other is 'dead' and stop talking to it.
+For PUB-SUB, this does work, and it's the only model you can use. SUB sockets cannot talk back to PUB sockets, but PUB sockets can happily send "I'm alive" messages to their subscribers.
-* First, get the heartbeating working, and only //then// add in the rest of the message flow. You should be able to prove the heartbeating works by starting peers in any order, stopping and restarting them, simulating freezes, and so on.
+As an optimization, you can send heartbeats only when there is no real data to send. Furthermore, you can send heartbeats progressively slower and slower, if network activity is an issue (e.g. on mobile networks where activity drains the battery). As long as the recipient can detect a failure (sharp stop in activity), that's fine.
-* When your main loop is based on {{zmq_poll[3]}}, use a secondary timer to trigger heartbeats. Do //not// use the poll loop for this, because you'll enter the loop every time you receive any message--fun, when you have two peers sending each other heartbeats (think about it).
+Now the typical problems with this design:
-Your language or binding should provide a method that returns the current system clock in milliseconds. It's easy to use this to calculate when to send the next heartbeats. Thus, in C:
+* It can be inaccurate when we send large amounts of data, since heartbeats will be delayed behind that data. If heartbeats are delayed, you can get false timeouts and disconnections due to network congestion. Thus, always treat //any// incoming data as a heartbeat, whether or not the sender optimizes out heartbeats.
+
+* While the PUB-SUB pattern will drop messages for disappeared recipients, PUSH and DEALER sockets will queue them. So, if you send heartbeats to a dead peer, and it comes back, it'll get all the heartbeats you sent. Which can be thousands. Whoa, whoa!
+
+* This design assumes that heartbeat timeouts are the same across the whole network. But that won't be accurate. Some peers will want very aggressive heart-beating, to detect faults rapidly. And some will want very relaxed heart-beating, to let sleeping networks lie, and save power.
+
+++++ Ping-Pong Heartbeats
+
+The third option is to use a ping-pong dialog. One peer sends a ping command to the other, which replies with a pong command. Neither command has any payload. Pings and pongs are not correlated. Since the roles of "client" and "server" are arbitrary in some networks, we usually specify that either peer can in fact send a ping and expect a pong in response. However, since the timeouts depend on network topologies known best to dynamic clients, it is usually the client which pings the server.
+
+This works for all ROUTER-based brokers. The same optimizations we used in the second model make this work even better: treat any incoming data as a pong, and only send a ping when not otherwise sending data.
+
+++++ Heartbeating for Paranoid Pirate
+
+For Paranoid Pirate we chose the second approach. It might not have been the simplest option: if designing this today, I'd probably try a ping-pong approach instead. However the principles are similar. The heartbeat messages flow asynchronously in both directions, and either peer can decide the other is 'dead' and stop talking to it.
+
+In the worker, this is how we handle heartbeats from the queue:
+
+* We calculate a //liveness// which is how many heartbeats we can still miss before deciding the queue is dead. It starts at 3 and we decrement it each time we miss a heartbeat.
+* We wait, in the {{zmq_poll}} loop, for one second each time, which is our heartbeat interval.
+* If there's any message from the queue during that time we reset our liveness to three.
+* If there's no message during that time, we count down our liveness.
+* If the liveness reaches zero, we consider the queue dead.
+* If the queue is 'dead', we destroy our socket, create a new one, and reconnect.
+* To avoid opening and closing too many sockets we wait for a certain //interval// before reconnecting, and we double the interval each time until it reaches 32 seconds.
+
+And this is how we handle heartbeats //to// the queue:
+
+* We calculate when to send the next heartbeat; this is a single variable since we're talking to one peer, the queue.
+* In the {{zmq_poll}} loop, whenever we pass this time, we send a heartbeat to the queue.
+
+Here's the essential heartbeating code for the worker:
[[code type="fragment" name="heartbeats"]]
+#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
+#define HEARTBEAT_INTERVAL 1000 // msecs
+#define INTERVAL_INIT 1000 // Initial reconnect
+#define INTERVAL_MAX 32000 // After exponential backoff
+
+...
+// If liveness hits zero, queue is considered disconnected
+size_t liveness = HEARTBEAT_LIVENESS;
+size_t interval = INTERVAL_INIT;
+
// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
while (true) {
- ...
+ zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
- ...
+
+ if (items [0].revents & ZMQ_POLLIN) {
+ // Receive any message from queue
+ liveness = HEARTBEAT_LIVENESS;
+ interval = INTERVAL_INIT;
+ }
+ else
+ if (--liveness == 0) {
+ zclock_sleep (interval);
+ if (interval < INTERVAL_MAX)
+ interval *= 2;
+ zsocket_destroy (ctx, worker);
+ ...
+ liveness = HEARTBEAT_LIVENESS;
+ }
// Send heartbeat to queue if it's time
if (zclock_time () > heartbeat_at) {
- ... Send heartbeats to all peers that expect them
- // Set timer for next heartbeat
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
+ // Send heartbeat message to queue
}
}
[[/code]]
-* Your main poll loop should use the heartbeat interval as its timeout. Anything less will just waste cycles. Obviously, don't use infinity.
+The queue does the same, but manages an expiry time for each worker.
+
+Here are some tips for your own heartbeating implementation:
+
+* Use {{zmq_poll}} or a reactor as the core of your application's main task.
+
+* Start by building the heartbeating between peers, test it by simulating failures, and //then// build the rest of the message flow. Adding heartbeating afterwards is much trickier.
* Use simple tracing, i.e. print to console, to get this working. Some tricks to help you trace the flow of messages between peers: a dump method such as zmsg offers; number messages incrementally so you can see if there are gaps.
* In a real application, heartbeating must be configurable and usually negotiated with the peer. Some peers will want aggressive heartbeating, as low as 10 msecs. Other peers will be far away and want heartbeating as high as 30 seconds.
-* If you have different heartbeat intervals for different peers, your poll timeout should be the lowest (shortest time) of these.
-
-* You might be tempted to open a separate socket dialog for heartbeats. This is superficially nice because you can separate different dialogs, e.g. the synchronous request-reply from the asynchronous heartbeating. However it's a bad idea for several reasons. First, if you're sending data you don't need to send heartbeats. Second, sockets may, due to network vagaries, become jammed. You need to know when your main data socket is silent because it's dead, rather than just not busy, so you need heartbeats on that socket. Lastly, two sockets is more complex than one.
+* If you have different heartbeat intervals for different peers, your poll timeout should be the lowest (shortest time) of these. Do not use an infinite timeout.
-* We're not doing heartbeating from client to queue. It does make things more complex, but we do that in real applications so that clients can detect when brokers die, and do clever things like switch to alternate brokers.
+* Do heartbeating on the same socket as you use for messages, so your heartbeats also act as a //keep-alive// to stop the network connection from going stale (some firewalls can be unkind to silent connections).
+++ Contracts and Protocols
@@ -345,6 +399,7 @@ If you're paying attention you'll realize that Paranoid Pirate is not interopera
It's fun to experiment without specifications, but that's not a sensible basis for real applications. What happens if we want to write a worker in another language? Do we have to read code to see how things work? What if we want to change the protocol for some reason? The protocol may be simple but it's not obvious, and if it's successful, it must become more complex.
Lack of contracts is a sure sign of a disposable application. So, let's write a contract for this protocol. How do we do that?
+
There's a wiki at [http://rfc.zeromq.org rfc.zeromq.org] that we made especially as a home for public 0MQ contracts.
To create a new specification, register, and follow the instructions. It's straightforward, though technical writing is not for everyone.
@@ -361,13 +416,7 @@ Turning PPP into a real protocol would take more work:
The nice thing about progress is how fast it happens when lawyers and committees aren't involved. Just a few sentences ago we were dreaming of a better protocol that would fix the world. And [http://rfc.zeromq.org/spec:7 here we have it].
This one-page specification turs PPP into something more solid!figref(). This is how we should design complex architectures: start by writing down the contracts, and only //then// write software to implement them.
-The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above.
-////
-AO: Where is above?
-////
-It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. This made it easy to draft.
-
-Adding service names is a small but significant change that turns our Paranoid Pirate queue into a service-oriented broker.
+The Majordomo Protocol (MDP) extends and improves on PPP in one interesting way: it adds a "service name" to requests that the client sends, and asks workers to register for specific services. Adding service names turns our Paranoid Pirate queue into a service-oriented broker. The nice thing about MDP is that it came out working code, a simpler ancestor protocol (PPP), and a precise set of improvements. This made it easy to draft.
[[code type="textdiagram" title="The Majordomo Pattern"]]
+-----------+ +-----------+ +-----------+
@@ -457,15 +506,12 @@ And here is the broker:
[[code type="example" title="Majordomo broker" name="mdbroker"]]
[[/code]]
-////
-AO: I suggest you break the code into several parts and perhaps turn some of the comments before major functions into text that introduces the reader to the part of the code that's coming next.
-////
This is by far the most complex example we've seen. It's almost 500 lines of code. To write this, and make it somewhat robust took two days. However this is still a short piece of code for a full service-oriented broker.
Notes on this code:
-* The Majordomo Protocol lets us handle both clients and workers on a single socket. This is nicer for those deploying and managing the broker: it just sits on one 0MQ endpoint rather than the two that most devices need.
+* The Majordomo Protocol lets us handle both clients and workers on a single socket. This is nicer for those deploying and managing the broker: it just sits on one 0MQ endpoint rather than the two that most proxies need.
* The broker implements all of MDP/0.1 properly (as far as I know), including disconnection if the broker sends invalid commands, heartbeating, and the rest.
@@ -496,7 +542,7 @@ Asynchronous round-trip test...
173010 calls/second
[[/code]]
-Note that the client thread does a small pause before starting. This is to get around one of the "features" of the router socket: if you send a message with the address of a peer that's not yet connected, the message gets discarded. In this example we don't use the LRU mechanism, so without the sleep, if the worker thread is too slow to connect, it will lose messages, making a mess of our test.
+Note that the client thread does a small pause before starting. This is to get around one of the "features" of the router socket: if you send a message with the address of a peer that's not yet connected, the message gets discarded. In this example we don't use the load-balancing mechanism, so without the sleep, if the worker thread is too slow to connect, it will lose messages, making a mess of our test.
As we see, round-tripping in the simplest case is 20 times slower than the asynchronous, "shove it down the pipe as fast as it'll go" approach. Let's see if we can apply this to Majordomo to make it faster.
@@ -510,14 +556,19 @@ zmsg_t *mdcli_recv (mdcli_t *self);
[[/code]]
It's literally a few minutes' work to refactor the synchronous client API to become asynchronous:
-////
-AO: Summarize the changes. Some are undocumente and some are buried in the code. I see you substitute a DEALER socket, but I don't see the asynchronous behavior.
-////
[[code type="example" title="Majordomo asynchronous client API" name="mdcliapi2"]]
[[/code]]
-And here's the corresponding client test program:
+The differences are:
+
+* We use a DEALER socket instead of REQ, so we emulate REQ with an empty delimiter frame before each request and each response.
+* We don't retry requests; if the application needs to retry, it can do this itself.
+* We break the synchronous {{send}} method into separate {{send}} and {{recv}} methods.
+* The {{send}} method is asynchronous and returns immediately after sending. The caller can thus send a number of messages before getting a response.
+* The {{recv}} method waits for (with a timeout) one response and returns that to the caller.
+
+And here's the corresponding client test program, which sends 100,000 messages and then receives 100,00 back:
[[code type="example" title="Majordomo client application" name="mdclient2"]]
[[/code]]
@@ -555,7 +606,7 @@ user 0m0.730s
sys 0m0.470s
[[/code]]
-It isn't fully asynchronous since workers get their messages on a strict LRU basis. But it will scale better with more workers. On my PC, after eight or so workers it doesn't get any faster. Four cores only stretches so far. But we got a 4x improvement in throughput with just a few minutes' work. The broker is still unoptimized. It spends most of its time copying message frames around, instead of doing zero copy, which it could. But we're getting 25K reliable request/reply calls a second, with pretty low effort.
+It isn't fully asynchronous since workers get their messages on a strict last-used basis. But it will scale better with more workers. On my PC, after eight or so workers it doesn't get any faster. Four cores only stretches so far. But we got a 4x improvement in throughput with just a few minutes' work. The broker is still unoptimized. It spends most of its time copying message frames around, instead of doing zero copy, which it could. But we're getting 25K reliable request/reply calls a second, with pretty low effort.
However, the asynchronous Majordomo pattern isn't all roses. It has a fundamental weakness, namely that it cannot survive a broker crash without more work. If you look at the mdcliapi2 code you'll see it does not attempt to reconnect after a failure. A proper reconnect would require:
@@ -573,15 +624,22 @@ Another option is to do what email does, and ask that undeliverable requests be
Let's try to use what we've already built, building on top of MDP instead of modifying it. Service discovery is, itself, a service. It might indeed be one of several management services, such as "disable service X", "provide statistics", and so on. What we want is a general, extensible solution that doesn't affect the protocol or existing applications.
-////
-AO: Your display of the code should explain the major things you did in it.
-////
-So here's a small RFC that layers this on top of MDP: [http://rfc.zeromq.org/spec:8 the Majordomo Management Interface (MMI)]. We already implemented it in the broker, though unless you read the whole thing you probably missed that. Here's how we use the service discovery in an application:
+So here's a small RFC that layers this on top of MDP: [http://rfc.zeromq.org/spec:8 the Majordomo Management Interface (MMI)]. We already implemented it in the broker, though unless you read the whole thing you probably missed that. I'll explain how it works in the broker:
+
+* When a client requests a service that starts with "mmi.", instead of routing this to a worker, we handle it internally.
+
+* We handle just one service in this broker, which is "mmi.service", the service discovery service.
+
+* The payload for the request is the name of an external service (a real one, provided by a worker).
+
+* The broker returns "200" (OK) or "404" (Not found) depending on whether there are workers registered for that service, or not.
+
+Here's how we use the service discovery in an application:
[[code type="example" title="Service discovery over Majordomo" name="mmiecho"]]
[[/code]]
-The broker checks the service name, and handles any service starting with "mmi." itself, rather than passing the request on to a worker. Try this with and without a worker running, and you should see the little program report '200' or '404' accordingly. The implementation of MMI in our example broker is pretty weak. For example if a worker disappears, services remain "present". In practice, a broker should remove services that have no workers after some configurable timeout.
+Try this with and without a worker running, and you should see the little program report "200" or "404" accordingly. The implementation of MMI in our example broker is flimsy. For example if a worker disappears, services remain "present". In practice, a broker should remove services that have no workers after some configurable timeout.
+++ Idempotent Services
@@ -610,16 +668,13 @@ To handle non-idempotent operations, use the fairly standard solution of detecti
Once you realize that Majordomo is a "reliable" message broker, you might be tempted to add some spinning rust (that is, ferrous-based hard disk platters). After all, this works for all the enterprise messaging systems. It's such a tempting idea that it's a little sad to have to be negative toward it. But brutal cynicism is one of my specialties. So, some reasons you don't want rust-based brokers sitting in the center of your architecture are:
-* As you've seen, the Lazy Pirate client performs surprisingly well. It works across a whole range of architectures, from direct client-to-server to distributed queue devices. It does tend to assume that workers are stateless and idempotent. But we can work around that limitation without resorting to rust.
+* As you've seen, the Lazy Pirate client performs surprisingly well. It works across a whole range of architectures, from direct client-to-server to distributed queue proxies. It does tend to assume that workers are stateless and idempotent. But we can work around that limitation without resorting to rust.
* Rust brings a whole set of problems, from slow performance to additional pieces that you have to manage, repair, and handle 6am panics from, as they inevitably break at the start of daily operations. The beauty of the Pirate patterns in general is their simplicity. They won't crash. And if you're still worried about the hardware, you can move to a peer-to-peer pattern that has no broker at all. I'll explain later in this chapter.
Having said this, however, there is one sane use case for rust-based reliability, which is an asynchronous disconnected network. It solves a major problem with Pirate, namely that a client has to wait for an answer in real-time. If clients and workers are only sporadically connected (think of email as an analogy), we can't use a stateless network between clients and workers. We have to put state in the middle.
-////
-AO: This name seems pretty ironic.
-////
-So, here's the Titanic pattern!figref(), in which we write messages to disk to ensure they never get lost, no matter how sporadically clients and workers are connected. As we did for service discovery, we're going to layer Titanic on top of Majordomo rather than extend MDP. It's wonderfully lazy because it means we can implement our fire-and-forget reliability in a specialized worker, rather than in the broker. This is excellent for several reasons:
+So, here's the Titanic pattern!figref(), in which we write messages to disk to ensure they never get lost, no matter how sporadically clients and workers are connected. As we did for service discovery, we're going to layer Titanic on top of MDP rather than extend it. It's wonderfully lazy because it means we can implement our fire-and-forget reliability in a specialized worker, rather than in the broker. This is excellent for several reasons:
* It is //much// easier because we divide and conquer: the broker handles message routing and the worker handles reliability.
* It lets us mix brokers written in one language with workers written in another.
@@ -668,11 +723,9 @@ Titanic is thus both a worker and a client. The dialog between client and Titani
Whereas the dialog between Titanic and broker and worker goes like this:
-////
-AO: I'm not sure what you mean by "echo". Is this a redundant request that is handled gracefully somehow?
-////
-* Titanic: hey, broker, is there an echo service? Broker: uhm, yeah, seems like.
-* Titanic: hey, echo, please handle this for me. Echo: sure, here you are.
+* Titanic: hey, broker, is there an Coffee service? Broker: uhm, yeah, seems like.
+* Titanic: hey, Coffee service, please handle this for me.
+* Coffee: sure, here you are.
* Titanic: sweeeeet!
You can work through this, and the possible failure scenarios. If a worker crashes while processing a request, Titanic retries, indefinitely. If a reply gets lost somewhere, Titanic will retry. If the request gets processed but the client doesn't get the reply, it will ask again. If Titanic crashes while processing a request, or a reply, the client will try again. As long as requests are fully committed to safe storage, work can't get lost.
@@ -681,47 +734,38 @@ The handshaking is pedantic, but can be pipelined, i.e., clients can use the asy
We need some way for a client to request //its// replies. We'll have many clients asking for the same services, and clients disappear and reappear with different identities. So here is a simple, reasonably secure solution:
-////
-AO: Did I rewrite the following correctly?
-////
-* Every request generates a universally unique ID (UUID), which Titanic returns to the client after the client has queued the request.
+* Every request generates a universally unique ID (UUID), which Titanic returns to the client after it has queued the request.
* When a client asks for a reply, it must specify the UUID for the original request.
-This puts some onus on the client to store its request UUIDs safely, but it removes any need for authentication. What alternatives are there?
-////
-AO: Is that question rhetorical? You don't seem to answer it.
-////
+In a realistic case the client would want to store its request UUIDs safely, e.g. in a local database.
Before we jump off and write yet another formal specification (fun, fun!) let's consider how the client talks to Titanic. One way is to use a single service and send it three different request types. Another way, which seems simpler, is to use three services:
-**titanic.request**:: Store a request message, and return a UUID for the request.
-**titanic.reply**:: Fetch a reply, if available, for a given request UUID.
-**titanic.close**:: Confirm that a reply has been stored and processed.
+* {{titanic.request}} - store a request message, and return a UUID for the request.
+* {{titanic.reply}} - fetch a reply, if available, for a given request UUID.
+* {{titanic.close}} - confirm that a reply has been stored and processed.
-We'll just make a multithreaded worker, which as we've seen from our multithreading experience with 0MQ, is trivial. However before jumping into code, let's sketch down what Titanic would look like in terms of 0MQ messages and frames: the [http://rfc.zeromq.org/spec:9 Titanic Service Protocol (TSP)].
+We'll just make a multithreaded worker, which as we've seen from our multithreading experience with 0MQ, is trivial. However let's first sketch what Titanic would look like in terms of 0MQ messages and frames. This gives us the [http://rfc.zeromq.org/spec:9 Titanic Service Protocol (TSP)].
Using TSP is clearly more work for client applications than accessing a service directly via MDP. Here's the shortest robust "echo" client example:
-////
-AO: Seems like you've created a simple example by polling in a wait loop for a reply, but wouldn't it be better to do something else and check for replies in a more efficient way?
-////
[[code type="example" title="Titanic client example" name="ticlient"]]
[[/code]]
-Of course this can and in practice would be wrapped up in some kind of framework. Real application developers should never see messaging up close, it's a tool for more technically-minded experts to build frameworks and APIs. If we had infinite time to explore this, I'd make a TSP API example, and bring the client application back down to a few lines of code. But it's the same principle as we saw for MDP, no need to be repetitive.
+Of course this can be, and should be, wrapped up in some kind of framework or API. It's not healthy to ask average application developers to learn the full details of messaging: it hurts their brains, costs time, and offers too many ways to make buggy complexity. Additionally, it makes it hard to add intelligence.
+
+For example, this client blocks on each request whereas in a real application we'd want to be doing useful work while tasks are executed. This requires some non-trivial plumbing, to build a background thread and talk to that cleanly. It's the kind of thing you want to wrap in a nice simple API that the average developer cannot misuse. It's the same approach that we used for Majordomo.
Here's the Titanic implementation. This server handles the three services using three threads, as proposed. It does full persistence to disk using the most brute-force approach possible: one file per message. It's so simple it's scary. The only complex part is that it keeps a separate 'queue' of all requests to avoid reading the directory over and over:
[[code type="example" title="Titanic broker example" name="titanic"]]
[[/code]]
-////
-AO: Seems strange to put a send at the top of the main loop. I think some explanation is required.
-////
To test this, start {{mdbroker}} and {{titanic}}, then run {{ticlient}}. Now start {{mdworker}} arbitrarily, and you should see the client getting a response and exiting happily.
Some notes about this code:
+* Note that some loops start by sending, others by receiving messages. This is because Titanic acts both as a client and a worker in different roles.
* We send requests only to services that appear to be running, using MMI. This works as well as the MMI implementation in the broker.
* We use an inproc connection to send new request data from the **titanic.request** service through to the main dispatcher. This saves the dispatcher from having to scan the disk directory, load all request files, and sort them by date/time.
@@ -735,8 +779,7 @@ If you want to use Titanic in real cases, you'll rapidly be asking "how do we ma
* Use a solid-state drive rather than spinning iron oxide platters.
* Preallocate the entire file, or allocate it in large chunks allowing the circular buffer to grow and shrink as needed. This avoids fragmentation and ensures most reads and writes are contiguous.
-And so on. What I'd not recommend is storing messages in a database, not even a
-"fast" key/value store, unless you really like a specific database and don't have performance worries. You will pay a steep price for the abstraction, 10 to 1000x over a raw disk file.
+And so on. What I'd not recommend is storing messages in a database, not even a "fast" key/value store, unless you really like a specific database and don't have performance worries. You will pay a steep price for the abstraction, ten to a thousand times over a raw disk file.
If you want to make Titanic //even more reliable//, duplicate the requests to a second server, which you'd place in a second location just far enough to survive nuclear attack on your primary location, yet not so far that you get too much latency.
@@ -744,10 +787,7 @@ If you want to make Titanic //much faster and less reliable//, store requests an
+++ High-availability Pair (Binary Star Pattern)
-////
-AO: Usually, master and slave in computing have a different connotation. A slave makes no independent decisions (such as writing to a database) but follows along doing what the master does. So what you describe is not a master/slave relationship. The second server is merely a backup server. The distinction you create later between master and primary servers doesn't justify the use of the term "master". The terms used in the code, active and passive, are fine.
-////
-The Binary Star pattern puts two servers in a primary-backup high-availability pair!figref(). At any given time, one of these (the master) accepts connections from client applications. The other (the slave) does nothing, but the two servers monitor each other. If the master disappears from the network, after a certain time the slave takes over as master.
+The Binary Star pattern puts two servers in a primary-backup high-availability pair!figref(). At any given time, one of these (the active) accepts connections from client applications. The other (the passive) does nothing, but the two servers monitor each other. If the active disappears from the network, after a certain time the passive takes over as active.
Binary Star pattern was developed by Pieter Hintjens and Martin Sustrik for the iMatix [http://www.openamq.org OpenAMQ server]. We designed it:
@@ -759,7 +799,7 @@ Binary Star pattern was developed by Pieter Hintjens and Martin Sustrik for the
+------------+ +------------+
| | | |
| Primary |<------------->| Backup |
-| "master" | | "slave" |
+| "active" | | "passive" |
| | | |
+------------+ +------------+
^
@@ -783,7 +823,7 @@ Assuming we have a Binary Star pair running, here are the different scenarios th
+------------+ +------------+
| | | |
| Primary |<------------->| Backup |
-| "slave" | | "master" |
+| "passive" | | "active" |
| | | |
+------------+ +------------+
^
@@ -803,7 +843,7 @@ Recovery from fail-over works as follows:
. The operators stop the backup server at a moment when it will cause minimal disruption to applications.
. When applications have reconnected to the primary server, the operators restart the backup server.
-Recovery (to using the primary server as master) is a manual operation. Painful experience teaches us that automatic recovery is undesirable. There are several reasons:
+Recovery (to using the primary server as active) is a manual operation. Painful experience teaches us that automatic recovery is undesirable. There are several reasons:
* Failover creates an interruption of service to applications, possibly lasting 10-30 seconds. If there is a real emergency, this is much better than total outage. But if recovery creates a further 10-30 second outage, it is better that this happens off-peak, when users have gone off the network.
@@ -837,7 +877,7 @@ AO: In fact, I think such recoveries are built in to some earlier solutions in C
* The semantics for client applications should be simple and easy for developers to understand. Ideally they should be hidden in the client API.
-* There should be clear instructions for network architects on how to avoid designs that could lead to //split brain syndrome//, in which both servers in a Binary Star pair think they are the master server.
+* There should be clear instructions for network architects on how to avoid designs that could lead to //split brain syndrome//, in which both servers in a Binary Star pair think they are the active server.
* There should be no dependencies on the order in which the two servers are started.
@@ -867,13 +907,13 @@ We don't attempt to cover:
Here is the key terminology we use in Binary Star:
-**Primary**:: The server that is normally 'master'.
+* //Primary// - the server that is normally or initially 'active'.
-**Backup**:: The server that is normally 'slave'. It will become master if and when the primary server disappears from the network, and when client applications ask the backup server to connect.
+* //Backup// - the server that is normally 'passive'. It will become active if and when the primary server disappears from the network, and when client applications ask the backup server to connect.
-**Master**:: The server that accepts client connections. There is at most one master server.
+* //Active// - the server that accepts client connections. There is at most one active server.
-**Slave**:: The server that takes over if the master disappears. Note that when a Binary Star pair is running normally, the primary server is master, and the backup is slave. When a fail-over has happened, the roles are switched.
+* //Passive// - the server that takes over if the active disappears. Note that when a Binary Star pair is running normally, the primary server is active, and the backup is passive. When a fail-over has happened, the roles are switched.
To configure a Binary Star pair, you need to:
@@ -881,7 +921,7 @@ To configure a Binary Star pair, you need to:
# Tell the backup server where the primary server is.
# Optionally, tune the fail-over response times, which must be the same for both servers.
-The main tuning concern is how frequently you want the servers to check their peering status, and how quickly you want to activate fail-over. In our example, the fail-over timeout value defaults to 2,000 msec. If you reduce this, the backup server will take over as master more rapidly but may take over in cases where the primary server could recover. You may for example have wrapped the primary server in a shell script that restarts it if it crashes. In that case the timeout should be higher than the time needed to restart the primary server.
+The main tuning concern is how frequently you want the servers to check their peering status, and how quickly you want to activate fail-over. In our example, the fail-over timeout value defaults to 2,000 msec. If you reduce this, the backup server will take over as active more rapidly but may take over in cases where the primary server could recover. You may for example have wrapped the primary server in a shell script that restarts it if it crashes. In that case the timeout should be higher than the time needed to restart the primary server.
For client applications to work properly with a Binary Star pair, they must:
@@ -901,14 +941,14 @@ These are the main limitations of the Binary Star pattern:
* A server process cannot be part of more than one Binary Star pair.
* A primary server can have a single backup server, no more.
-* The backup server cannot do useful work while in slave mode.
+* Whichever server is passive is wasted.
* The backup server must be capable of handling full application loads.
* Failover configuration cannot be modified at runtime.
* Client applications must do some work to benefit from fail-over.
++++ Preventing Split-Brain Syndrome
-//Split-brain syndrome// occurs when different parts of a cluster think they are "master" at the same time. It causes applications to stop seeing each other. Binary Star has an algorithm for detecting and eliminating split brain, based on a three-way decision mechanism (a server will not decide to become master until it gets application connection requests and it cannot see its peer server).
+//Split-brain syndrome// occurs when different parts of a cluster think they are active at the same time. It causes applications to stop seeing each other. Binary Star has an algorithm for detecting and eliminating split brain, based on a three-way decision mechanism (a server will not decide to become active until it gets application connection requests and it cannot see its peer server).
However it is still possible to (mis)design a network to fool this algorithm. A typical scenario would be a Binary Star pair distributed between two buildings, where each building also had a set of applications, and there was a single network link between both buildings. Breaking this link would create two sets of client applications, each with half of the Binary Star pair, and each fail-over server would become active.
@@ -940,10 +980,7 @@ bstarcli
You can then provoke fail-over by killing the primary server, and recovery by restarting the primary and killing the backup. Note how it's the client vote that triggers fail-over, and recovery.
-////
-AO: Some readers will have black-and-white print books, while others will have black-and-white e-readers. So in addition to color, it's a good idea to use hatching or some other background to distinguish boxes. Our artist can do that, but we should explain what to do in Figure_list.
-////
-Binary star is driven by a finite state machine!figref(). States in green accept client requests, states in pink refuse them. Events are the peer state, so "Peer Active" means the other server has told us it's active. "Client Request" means we've received a client request. "Client Vote" means we've received a client request AND our peer is inactive for two heartbeats.
+Binary star is driven by a finite state machine!figref(). States in white accept client requests, states in grey refuse them. Events are the peer state, so "Peer Active" means the other server has told us it's active. "Client Request" means we've received a client request. "Client Vote" means we've received a client request AND our peer is inactive for two heartbeats.
[[code type="textdiagram" title="Binary Star Finite State Machine"]]
Start /-----------------------\ /----------\ Start
@@ -953,7 +990,7 @@ Binary star is driven by a finite state machine!figref(). States in green accept
| | Peer Backup | +------/ | |
| Primary +---------------->| Active |<--------------\ | Backup |
| | /------>| |<-----\ | | |
-| {o} c9FB | | | {o} c9FB | | | | {o} cF9B |
+| {o} | | | {o} | | | | {o} c333 |
+-----+-----+ | +-----+-----+ | | +-----+-----+
| | | | | |
Peer|Active | Peer|Active | | Peer|Active
@@ -962,7 +999,7 @@ Binary star is driven by a finite state machine!figref(). States in green accept
| | | | | | |
| Peer|Backup | Error! | Peer|Primary | |
| | | | | | |
- | | | {o} cF00 | | | |
+ | | | {o} c333 | | | |
| | +-----------+ | | |
| | ^ | | |
| | Peer|Passive | Client|Vote |
@@ -971,7 +1008,7 @@ Binary star is driven by a finite state machine!figref(). States in green accept
| | | +------/ | |
| \-------+ Passive +---------------/ |
\---------------------->| |<-----------------------/
- | {o} cF9B |
+ | {o} c333 |
+-----------+
[[/code]]
@@ -980,10 +1017,8 @@ Note that the servers use PUB-SUB sockets for state exchange. No other socket co
++++ Binary Star Reactor
Binary Star is useful and generic enough to package up as a reusable reactor class. The reactor then runs and calls our code whenever it has a message to process. This is much nicer than copying/pasting the Binary Star code into each server where we want that capability.
-////
-The following is vague: what does zloop offer, how do we use it, and what would differ in other languages?
-////
-In C we wrap the CZMQ {{zloop}} class, though your mileage may vary in other languages. Here is the {{bstar}} API:
+
+In C we wrap the CZMQ {{zloop}} class that we saw before. {{zloop}} lets you register handlers to react on socket and timer events. In the Binary Star reactor, we provide handlers for voters, and for state changes (active to passive, and vice-versa). Here is the {{bstar}} API:
[[code type="fragment" name="bstar"]]
// Create a new Binary Star instance, using local (bind) and
@@ -1002,8 +1037,8 @@ int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);
// Register main state change handlers
-void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
-void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
+void bstar_new_active (bstar_t *self, zloop_fn handler, void *arg);
+void bstar_new_passive (bstar_t *self, zloop_fn handler, void *arg);
// Start the reactor, ends if a callback function returns -1, or the
// process received SIGINT or SIGTERM.
@@ -1022,9 +1057,9 @@ Which gives us the following short main program for the server:
+++ Brokerless Reliability (Freelance Pattern)
-It might seem ironic to focus so much on broker-based reliability, when we often explain 0MQ as "brokerless messaging". However in messaging, as in real life, the middleman is both a burden and a benefit. In practice, most messaging architectures benefit from a mix of distributed and brokered messaging. You get the best results when you can decide freely what tradeoffs you want to make. This is why I can drive 10km to a wholesaler to buy five cases of wine for a party, but I can also walk 10 minutes to a corner store to buy one bottle for a dinner. Our highly context-sensitive relative valuations of time, energy, and cost are essential to the real world economy. And they are essential to an optimal message-based architecture.
+It might seem ironic to focus so much on broker-based reliability, when we often explain 0MQ as "brokerless messaging". However in messaging, as in real life, the middleman is both a burden and a benefit. In practice, most messaging architectures benefit from a mix of distributed and brokered messaging. You get the best results when you can decide freely what trade-offs you want to make. This is why I can drive 10km to a wholesaler to buy five cases of wine for a party, but I can also walk 10 minutes to a corner store to buy one bottle for a dinner. Our highly context-sensitive relative valuations of time, energy, and cost are essential to the real world economy. And they are essential to an optimal message-based architecture.
-Which is why 0MQ does not //impose// a broker-centric architecture, though it gives you the tools to build brokers, aka "devices", and we've built a dozen or so different ones so far, just for practice.
+Which is why 0MQ does not //impose// a broker-centric architecture, though it gives you the tools to build brokers, aka //proxies//, and we've built a dozen or so different ones so far, just for practice.
So we'll end this chapter by deconstructing the broker-based reliability we've built so far, and turning it back into a distributed peer-to-peer architecture I call the Freelance pattern. Our use case will be a name resolution service. This is a common problem with 0MQ architectures: how do we know the endpoint to connect to? Hard-coding TCP/IP addresses in code is insanely fragile. Using configuration files creates an administration nightmare. Imagine if you had to hand-configure your web browser, on every PC or mobile phone you used, to realize that "google.com" was "74.125.230.82".
@@ -1116,9 +1151,6 @@ Start one or more servers, specifying a bind endpoint each time:
[[code type="example" title="Freelance server, Model Two" name="flserver2"]]
[[/code]]
-////
-AO: Is the "address" the number sent by the client? If so, that seems an odd term.
-////
Then start the client, specifying the connect endpoints as arguments:
@@ -1149,10 +1181,7 @@ The shotgun approach seems too good to be true. Let's be scientific and work thr
We can solve the main problems of the client by switching to a ROUTER socket. That lets us send requests to specific servers, avoid servers we know are dead, and in general be as smart as we want to make it. We can also solve the main problem of the server (single-threadedness) by switching to a ROUTER socket.
-////
-AO: I added a bit of justification below.
-////
-But doing ROUTER-to-ROUTER between two anonymous sockets (which haven't set an identity) is not possible. Both sides generate an identity (for the other peer) only when they receive a first message, and thus neither can talk to the other until it has first received a message. The only way out of this conundrum is to cheat, and use hard-coded identities in one direction. The proper way to cheat, in a client server case, is to let client 'know' the identity of the server. Doing it the other way around would be insane, on top of complex and nasty, because any number of clients should be able to arise independently. Insane, complex, and nasty are great attributes for a genocidal dictator, but terrible ones for software.
+But doing ROUTER to ROUTER between two anonymous sockets (which haven't set an identity) is not possible. Both sides generate an identity (for the other peer) only when they receive a first message, and thus neither can talk to the other until it has first received a message. The only way out of this conundrum is to cheat, and use hard-coded identities in one direction. The proper way to cheat, in a client server case, is to let client 'know' the identity of the server. Doing it the other way around would be insane, on top of complex and nasty, because any number of clients should be able to arise independently. Insane, complex, and nasty are great attributes for a genocidal dictator, but terrible ones for software.
Rather than invent yet another concept to manage, we'll use the connection endpoint as identity. This is a unique string both sides can agree on without more prior knowledge than they already have for the shotgun model. It's a sneaky and effective way to connect two ROUTER sockets.
@@ -1165,6 +1194,7 @@ There's a small paradox here. We need to know when servers become connected and
My solution is to mix in a little of the shotgun approach from model 2, meaning we'll fire (harmless) shots at anything we can, and if anything moves, we know it's alive. We're not going to fire real requests, but rather a kind of ping-pong heartbeat.
This brings us to the realm of protocols again, so here's a [http://rfc.zeromq.org/spec:10 short spec that defines how a Freelance client and server exchange PING-PONG commands and request-reply commands].
+
It is short and sweet to implement as a server. Here's our echo server, Model Three, now speaking FLP:
[[code type="example" title="Freelance server, Model Three" name="flserver3"]]
View
8 chapter5.txt
@@ -449,11 +449,11 @@ So, Model Six introduces these changes over Model Five:
* We add heartbeats to server updates (to clients), so that a client can detect when the primary server has died. It can then switch over to the backup server.
-* We connect the two servers using the Binary Star {{bstar}} reactor class. Binary Star relies on the clients to 'vote' by making an explicit request to the server they consider "master". We'll use snapshot requests for this.
+* We connect the two servers using the Binary Star {{bstar}} reactor class. Binary Star relies on the clients to 'vote' by making an explicit request to the server they consider "active". We'll use snapshot requests for this.
* We make all update messages uniquely identifiable by adding a UUID field. The client generates this, and the server propagates it back on re-published updates.
-* The slave server keeps a "pending list" of updates that it has received from clients, but not yet from the master server. Or, updates it's received from the master, but not yet clients. The list is ordered from oldest to newest, so that it is easy to remove updates off the head.
+* The passive server keeps a "pending list" of updates that it has received from clients, but not yet from the active server. Or, updates it's received from the active, but not yet clients. The list is ordered from oldest to newest, so that it is easy to remove updates off the head.
It's useful to design the client logic as a finite state machine. The client cycles through three states:
@@ -503,7 +503,7 @@ Fail-over happens as follows:
When the primary server comes back on-line, it will:
-* Start up as slave server, and connect to the backup server as a Clone client.
+* Start up as passive server, and connect to the backup server as a Clone client.
* Start to receive updates from clients, via its SUB socket.
@@ -611,7 +611,7 @@ This main program is only a few hundred lines of code, but it took some time to
I'm sure the code still has flaws which kind readers will spend weekends debugging and fixing for me. I'm happy enough with this model to use it as the basis for real applications.
-To test the sixth model, start the primary server and backup server, and a set of clients, in any order. Then kill and restart one of the servers, randomly, and keep doing this. If the design and code is accurate, clients will continue to get the same stream of updates from whatever server is currently master.
+To test the sixth model, start the primary server and backup server, and a set of clients, in any order. Then kill and restart one of the servers, randomly, and keep doing this. If the design and code is accurate, clients will continue to get the same stream of updates from whatever server is currently active.
++++ Clone Protocol Specification
View
39 chapter6.txt
@@ -15,7 +15,6 @@ We'll cover:
* How to write and license an protocol specification.
* How to do fast restartable file transfer over 0MQ.
* How to do credit-based flow control.
-* How to do heartbeating for different 0MQ patterns.
* How to build protocol servers and clients as state machines.
* How to make a secure protocol over 0MQ (yay!).
* A large-scale file publishing system (FileMQ).
@@ -936,44 +935,6 @@ To make this work (and we will, my dear readers), we need to be a little more ex
And this gives us "credit-based flow control", which effectively removes the need for HWMs, and any risk of memory overflow.
-+++ Heartbeating
-
-Just as a real protocol needs to solve the problem of flow control, it also needs to solve the problem of knowing whether a peer is alive or dead. This is not an issue specific to 0MQ. TCP has a long timeout (30 minutes or so), that means that it can be impossible to know whether a peer has died, been disconnected, or gone on a weekend to Prague with a case of vodka, a redhead, and a large expense account.
-
-Heartbeating is not easy to get right, and as with flow control it can make the difference between a working, and failing architecture. So using our standard approach, let's start with the simplest possible heartbeat design, and develop better and better designs until we have one with no visible faults.
-
-++++ Shrugging It Off
-
-A decent first iteration is to do no heartbeating at all and see what actually happens. Many if not most 0MQ applications do this. 0MQ encourages this by hiding peers in many cases. What problems does this approach cause?
-
-* When we use a ROUTER socket in an application that tracks peers, as peers disconnect and reconnect, the application will leak memory and get slower and slower.
-
-* When we use SUB or DEALER-based data recipients, we can't tell the difference between good silence (there's no data) and bad silence (the other end died). When a recipient knows the other side died, it can for example switch over to a backup route.
-
-* If we use a TCP connection that stays silent for a long while, it will, in some networks, just die. Sending something (technically, a "keep-alive" more than a heartbeat), will keep the network alive.
-
-++++ One-Way Heartbeats
-
-So, our first solution is to sending a "heartbeat" message from each node to its peers, every second or so. When one node hears nothing from another, within some timeout (several seconds, typically), it will treat that peer as dead. Sounds good, right? Sadly no. This works in some cases but has nasty edge cases in other cases.
-
-For PUB-SUB, this does work, and it's the only model you can use. SUB sockets cannot talk back to PUB sockets, but PUB sockets can happily send "I'm alive" messages to their subscribers.
-
-As an optimization, you can send heartbeats only when there is no real data to send. Furthermore, you can send heartbeats progressively slower and slower, if network activity is an issue (e.g. on mobile networks where activity drains the battery). As long as the recipient can detect a failure (sharp stop in activity), that's fine.
-
-Now the typical problems with this design:
-
-* It can be inaccurate when we send large amounts of data, since heartbeats will be delayed behind that data. If heartbeats are delayed, you can get false timeouts and disconnections due to network congestion. Thus, always treat //any// incoming data as a heartbeat, whether or not the sender optimizes out heartbeats.
-
-* While the PUB-SUB pattern will drop messages for disappeared recipients, PUSH and DEALER sockets will queue them. So, if you send heartbeats to a dead peer, and it comes back, it'll get all the heartbeats you sent. Which can be thousands. Whoa, whoa!
-
-* This design assumes that heartbeat timeouts are the same across the whole network. But that won't be accurate. Some peers will want very aggressive heart-beating, to detect faults rapidly. And some will want very relaxed heart-beating, to let sleeping networks lie, and save power.
-
-++++ Ping-Pong Heartbeats
-
-Our third design uses a ping-pong dialog. One peer sends a ping command to the other, which replies with a pong command. Neither command has any payload. Pings and pongs are not correlated. Since the roles of "client" and "server" are often arbitrary, we specify that either peer can in fact send a ping and expect a pong in response. However, since the timeouts depend on network topologies known best to dynamic clients, it is usually the client which pings the server.
-
-This works for all ROUTER-based brokers. The same optimizations we used in the second model make this work even better: treat any incoming data as a pong, and only send a ping when not otherwise sending data.
-
+++ State Machines
Software engineers tend to treat (finite) state machines as a kind of intermediary interpreter. That is, you take a regular language and compile that into a state machine, then execute the state machine. The state machine itself is rarely visible to the developer: it's an internal representation, optimized, compressed, and bizarre.
View
0  examples/C#/lruqueue.cs → examples/C#/lbbroker.cs
File renamed without changes
View
0  examples/C#/lruqueue2.cs → examples/C#/lbbroker2.cs
File renamed without changes
View
0  examples/C++/lruqueue.cpp → examples/C++/lbbroker.cpp
File renamed without changes
View
8 examples/C/.gitignore
@@ -34,8 +34,8 @@ durapub
durapub2
durasub
identity
-lruqueue
-lruqueue2
+lbbroker
+lbbroker2
peering1
peering2
peering3
@@ -85,7 +85,7 @@ bstarsrv
bstarcli
bstarsrv2
version
-lruqueue3
+lbbroker3
tstkvsimple
espresso
rrworker
@@ -100,4 +100,4 @@ udpping1
udpping2
udpping3
eagain
-
+rtreq
View
8 examples/C/asyncsrv.c
@@ -88,9 +88,9 @@ server_worker (void *args, zctx_t *ctx, void *pipe)
zsocket_connect (worker, "inproc://backend");
while (true) {
- // The DEALER socket gives us the address envelope and message
+ // The DEALER socket gives us the reply envelope and message
zmsg_t *msg = zmsg_recv (worker);
- zframe_t *address = zmsg_pop (msg);
+ zframe_t *identity = zmsg_pop (msg);
zframe_t *content = zmsg_pop (msg);
assert (content);
zmsg_destroy (&msg);
@@ -100,10 +100,10 @@ server_worker (void *args, zctx_t *ctx, void *pipe)
for (reply = 0; reply < replies; reply++) {
// Sleep for some fraction of a second
zclock_sleep (randof (1000) + 1);
- zframe_send (&address, worker, ZFRAME_REUSE + ZFRAME_MORE);
+ zframe_send (&identity, worker, ZFRAME_REUSE + ZFRAME_MORE);
zframe_send (&content, worker, ZFRAME_REUSE);
}
- zframe_destroy (&address);
+ zframe_destroy (&identity);
zframe_destroy (&content);
}
}
View
72 examples/C/bstar.c
@@ -33,10 +33,10 @@ struct _bstar_t {
int64_t peer_expiry; // When peer is considered 'dead'
zloop_fn *voter_fn; // Voting socket handler
void *voter_arg; // Arguments for voting handler
- zloop_fn *master_fn; // Call when become master
- void *master_arg; // Arguments for handler
- zloop_fn *slave_fn; // Call when become slave
- void *slave_arg; // Arguments for handler
+ zloop_fn *active_fn; // Call when become active
+ void *active_arg; // Arguments for handler
+ zloop_fn *passive_fn; // Call when become passive
+ void *passive_arg; // Arguments for handler
};
// The finite-state machine is the same as in the proof-of-concept server.
@@ -59,32 +59,32 @@ s_execute_fsm (bstar_t *self)
// Accepts CLIENT_REQUEST events in this state
if (self->state == STATE_PRIMARY) {
if (self->event == PEER_BACKUP) {
- zclock_log ("I: connected to backup (slave), ready as master");
+ zclock_log ("I: connected to backup (passive), ready as active");
self->state = STATE_ACTIVE;
- if (self->master_fn)
- (self->master_fn) (self->loop, NULL, self->master_arg);
+ if (self->active_fn)
+ (self->active_fn) (self->loop, NULL, self->active_arg);
}
else
if (self->event == PEER_ACTIVE) {
- zclock_log ("I: connected to backup (master), ready as slave");
+ zclock_log ("I: connected to backup (active), ready as passive");
self->state = STATE_PASSIVE;
- if (self->slave_fn)
- (self->slave_fn) (self->loop, NULL, self->slave_arg);
+ if (self->passive_fn)
+ (self->passive_fn) (self->loop, NULL, self->passive_arg);
}
else
if (self->event == CLIENT_REQUEST) {
- // Allow client requests to turn us into the master if we've
+ // Allow client requests to turn us into the active if we've
// waited sufficiently long to believe the backup is not
- // currently acting as master (i.e., after a failover)
+ // currently acting as active (i.e., after a failover)
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
- zclock_log ("I: request from client, ready as master");
+ zclock_log ("I: request from client, ready as active");
self->state = STATE_ACTIVE;
- if (self->master_fn)
- (self->master_fn) (self->loop, NULL, self->master_arg);
+ if (self->active_fn)
+ (self->active_fn) (self->loop, NULL, self->active_arg);
} else
// Don't respond to clients yet - it's possible we're
- // performing a failback and the backup is currently master
+ // performing a failback and the backup is currently active
rc = -1;
}
}
@@ -93,10 +93,10 @@ s_execute_fsm (bstar_t *self)
// Rejects CLIENT_REQUEST events in this state
if (self->state == STATE_BACKUP) {
if (self->event == PEER_ACTIVE) {
- zclock_log ("I: connected to primary (master), ready as slave");
+ zclock_log ("I: connected to primary (active), ready as passive");
self->state = STATE_PASSIVE;
- if (self->slave_fn)
- (self->slave_fn) (self->loop, NULL, self->slave_arg);
+ if (self->passive_fn)
+ (self->passive_fn) (self->loop, NULL, self->passive_arg);
}
else
if (self->event == CLIENT_REQUEST)
@@ -108,8 +108,8 @@ s_execute_fsm (bstar_t *self)
// The only way out of ACTIVE is death
if (self->state == STATE_ACTIVE) {
if (self->event == PEER_ACTIVE) {
- // Two masters would mean split-brain
- zclock_log ("E: fatal error - dual masters, aborting");
+ // Two actives would mean split-brain
+ zclock_log ("E: fatal error - dual actives, aborting");
rc = -1;
}
}
@@ -119,29 +119,29 @@ s_execute_fsm (bstar_t *self)
if (self->state == STATE_PASSIVE) {
if (self->event == PEER_PRIMARY) {
// Peer is restarting - become active, peer will go passive
- zclock_log ("I: primary (slave) is restarting, ready as master");
+ zclock_log ("I: primary (passive) is restarting, ready as active");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_BACKUP) {
// Peer is restarting - become active, peer will go passive
- zclock_log ("I: backup (slave) is restarting, ready as master");
+ zclock_log ("I: backup (passive) is restarting, ready as active");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_PASSIVE) {
// Two passives would mean cluster would be non-responsive
- zclock_log ("E: fatal error - dual slaves, aborting");
+ zclock_log ("E: fatal error - dual passives, aborting");
rc = -1;
}
else
if (self->event == CLIENT_REQUEST) {
- // Peer becomes master if timeout has passed
+ // Peer becomes active if timeout has passed
// It's the client request that triggers the failover
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
// If peer is dead, switch to the active state
- zclock_log ("I: failover successful, ready as master");
+ zclock_log ("I: failover successful, ready as active");
self->state = STATE_ACTIVE;
}
else
@@ -149,8 +149,8 @@ s_execute_fsm (bstar_t *self)
rc = -1;
}
// Call state change handler if necessary
- if (self->state == STATE_ACTIVE && self->master_fn)
- (self->master_fn) (self->loop, NULL, self->master_arg);
+ if (self->state == STATE_ACTIVE && self->active_fn)
+ (self->active_fn) (self->loop, NULL, self->active_arg);
}
return rc;
}
@@ -289,19 +289,19 @@ bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
// Register handlers to be called each time there's a state change:
void
-bstar_new_master (bstar_t *self, zloop_fn handler, void *arg)
+bstar_new_active (bstar_t *self, zloop_fn handler, void *arg)
{
- assert (!self->master_fn);
- self->master_fn = handler;
- self->master_arg = arg;
+ assert (!self->active_fn);
+ self->active_fn = handler;
+ self->active_arg = arg;
}
void
-bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg)
+bstar_new_passive (bstar_t *self, zloop_fn handler, void *arg)
{
- assert (!self->slave_fn);
- self->slave_fn = handler;
- self->slave_arg = arg;
+ assert (!self->passive_fn);
+ self->passive_fn = handler;
+ self->passive_arg = arg;
}
// .split enable/disable tracing
View
24 examples/C/bstarsrv.c
@@ -46,12 +46,12 @@ s_state_machine (bstar_t *fsm)
// ACTIVE or PASSIVE depending on events we get from our peer:
if (fsm->state == STATE_PRIMARY) {
if (fsm->event == PEER_BACKUP) {
- printf ("I: connected to backup (slave), ready as master\n");
+ printf ("I: connected to backup (passive), ready as active\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_ACTIVE) {
- printf ("I: connected to backup (master), ready as slave\n");
+ printf ("I: connected to backup (active), ready as passive\n");
fsm->state = STATE_PASSIVE;
}
// Accept client connections
@@ -59,7 +59,7 @@ s_state_machine (bstar_t *fsm)
else
if (fsm->state == STATE_BACKUP) {
if (fsm->event == PEER_ACTIVE) {
- printf ("I: connected to primary (master), ready as slave\n");
+ printf ("I: connected to primary (active), ready as passive\n");
fsm->state = STATE_PASSIVE;
}
else
@@ -73,8 +73,8 @@ s_state_machine (bstar_t *fsm)
if (fsm->state == STATE_ACTIVE) {
if (fsm->event == PEER_ACTIVE) {
- // Two masters would mean split-brain
- printf ("E: fatal error - dual masters, aborting\n");
+ // Two actives would mean split-brain
+ printf ("E: fatal error - dual actives, aborting\n");
exception = TRUE;
}
}
@@ -84,29 +84,29 @@ s_state_machine (bstar_t *fsm)
if (fsm->state == STATE_PASSIVE) {
if (fsm->event == PEER_PRIMARY) {
// Peer is restarting - become active, peer will go passive
- printf ("I: primary (slave) is restarting, ready as master\n");
+ printf ("I: primary (passive) is restarting, ready as active\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_BACKUP) {
// Peer is restarting - become active, peer will go passive
- printf ("I: backup (slave) is restarting, ready as master\n");
+ printf ("I: backup (passive) is restarting, ready as active\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_PASSIVE) {
// Two passives would mean cluster would be non-responsive
- printf ("E: fatal error - dual slaves, aborting\n");
+ printf ("E: fatal error - dual passives, aborting\n");
exception = TRUE;
}
else
if (fsm->event == CLIENT_REQUEST) {
- // Peer becomes master if timeout has passed
+ // Peer becomes active if timeout has passed
// It's the client request that triggers the failover
assert (fsm->peer_expiry > 0);
if (zclock_time () >= fsm->peer_expiry) {
// If peer is dead, switch to the active state
- printf ("I: failover successful, ready as master\n");
+ printf ("I: failover successful, ready as active\n");
fsm->state = STATE_ACTIVE;
}
else
@@ -137,7 +137,7 @@ int main (int argc, char *argv [])
bstar_t fsm = { 0 };
if (argc == 2 && streq (argv [1], "-p")) {
- printf ("I: Primary master, waiting for backup (slave)\n");
+ printf ("I: Primary active, waiting for backup (passive)\n");
zsocket_bind (frontend, "tcp://*:5001");
zsocket_bind (statepub, "tcp://*:5003");
zsocket_connect (statesub, "tcp://localhost:5004");
@@ -145,7 +145,7 @@ int main (int argc, char *argv [])
}
else
if (argc == 2 && streq (argv [1], "-b")) {
- printf ("I: Backup slave, waiting for primary (master)\n");
+ printf ("I: Backup passive, waiting for primary (active)\n");
zsocket_bind (frontend, "tcp://*:5002");
zsocket_bind (statepub, "tcp://*:5004");
zsocket_connect (statesub, "tcp://localhost:5003");
View
4 examples/C/bstarsrv2.c
@@ -20,14 +20,14 @@ int main (int argc, char *argv [])
// -b backup server, at tcp://localhost:5002
bstar_t *bstar;
if (argc == 2 && streq (argv [1], "-p")) {
- printf ("I: Primary master, waiting for backup (slave)\n");
+ printf ("I: Primary active, waiting for backup (passive)\n");
bstar = bstar_new (BSTAR_PRIMARY,
"tcp://*:5003", "tcp://localhost:5004");
bstar_voter (bstar, "tcp://*:5001", ZMQ_ROUTER, s_echo, NULL);
}
else
if (argc == 2 && streq (argv [1], "-b")) {
- printf ("I: Backup slave, waiting for primary (master)\n");
+ printf ("I: Backup passive, waiting for primary (active)\n");
bstar = bstar_new (BSTAR_BACKUP,
"tcp://*:5004", "tcp://localhost:5003");
bstar_voter (bstar, "tcp://*:5002", ZMQ_ROUTER, s_echo, NULL);
View
62 examples/C/clonesrv6.c
@@ -10,13 +10,13 @@
// We define a set of reactor handlers and our server object structure:
// Bstar reactor handlers
-static int s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args);
-static int s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args);
-static int s_flush_ttl (zloop_t *loop, zmq_pollitem_t *poller, void *args);
-static int s_send_hugz (zloop_t *loop, zmq_pollitem_t *poller, void *args);
-static int s_new_master (zloop_t *loop, zmq_pollitem_t *poller, void *args);
-static int s_new_slave (zloop_t *loop, zmq_pollitem_t *poller, void *args);
-static int s_subscriber (zloop_t *loop, zmq_pollitem_t *poller, void *args);
+static int s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args);
+static int s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args);
+static int s_flush_ttl (zloop_t *loop, zmq_pollitem_t *poller, void *args);
+static int s_send_hugz (zloop_t *loop, zmq_pollitem_t *poller, void *args);
+static int s_new_active (zloop_t *loop, zmq_pollitem_t *poller, void *args);
+static int s_new_passive (zloop_t *loop, zmq_pollitem_t *poller, void *args);
+static int s_subscriber (zloop_t *loop, zmq_pollitem_t *poller, void *args);
// Our server is defined by these properties
typedef struct {
@@ -31,8 +31,8 @@ typedef struct {
void *subscriber; // Get updates from peer
zlist_t *pending; // Pending updates from clients
Bool primary; // TRUE if we're primary
- Bool master; // TRUE if we're master
- Bool slave; // TRUE if we're slave
+ Bool active; // TRUE if we're active
+ Bool passive; // TRUE if we're passive
} clonesrv_t;
// .split main task setup
@@ -50,7 +50,7 @@ int main (int argc, char *argv [])
{
clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
if (argc == 2 && streq (argv [1], "-p")) {
- zclock_log ("I: primary master, waiting for backup (slave)");
+ zclock_log ("I: primary active, waiting for backup (passive)");
self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003",
"tcp://localhost:5004");
bstar_voter (self->bstar, "tcp://*:5556", ZMQ_ROUTER, s_snapshots, self);
@@ -60,7 +60,7 @@ int main (int argc, char *argv [])
}
else
if (argc == 2 && streq (argv [1], "-b")) {
- zclock_log ("I: backup slave, waiting for primary (master)");
+ zclock_log ("I: backup passive, waiting for primary (active)");
self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004",
"tcp://localhost:5003");
bstar_voter (self->bstar, "tcp://*:5566", ZMQ_ROUTER, s_snapshots, self);
@@ -73,7 +73,7 @@ int main (int argc, char *argv [])
free (self);
exit (0);
}
- // Primary server will become first master
+ // Primary server will become first active
if (self->primary)
self->kvmap = zhash_new ();
@@ -100,8 +100,8 @@ int main (int argc, char *argv [])
// interrupt:
// Register state change handlers
- bstar_new_master (self->bstar, s_new_master, self);
- bstar_new_slave (self->bstar, s_new_slave, self);
+ bstar_new_active (self->bstar, s_new_active, self);
+ bstar_new_passive (self->bstar, s_new_passive, self);
// Register our other handlers with the bstar reactor
zmq_pollitem_t poller = { self->collector, 0, ZMQ_POLLIN };
@@ -193,8 +193,8 @@ s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args)
// .split collect updates
// The collector is more complex than in the clonesrv5 example since how
-// process updates depends on whether we're master or slave. The master
-// applies them immediately to its kvmap, whereas the slave queues them
+// process updates depends on whether we're active or passive. The active
+// applies them immediately to its kvmap, whereas the passive queues them
// as pending:
// If message was already on pending list, remove it and return TRUE,
@@ -221,7 +221,7 @@ s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args)
kvmsg_t *kvmsg = kvmsg_recv (poller->socket);
if (kvmsg) {
- if (self->master) {
+ if (self->active) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
@@ -232,7 +232,7 @@ s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args)
zclock_log ("I: publishing update=%d", (int) self->sequence);
}
else {
- // If we already got message from master, drop it, else
+ // If we already got message from active, drop it, else
// hold on pending list
if (s_was_pending (self, kvmsg))
kvmsg_destroy (&kvmsg);
@@ -279,7 +279,7 @@ s_flush_ttl (zloop_t *loop, zmq_pollitem_t *poller, void *args)
// .split heartbeating
// We send a HUGZ message once a second to all subscribers so that they
// can detect if our server dies. They'll then switch over to the backup
-// server, which will become master:
+// server, which will become active:
static int
s_send_hugz (zloop_t *loop, zmq_pollitem_t *poller, void *args)
@@ -296,17 +296,17 @@ s_send_hugz (zloop_t *loop, zmq_pollitem_t *poller, void *args)
}
// .split handling state changes
-// When we switch from slave to master, we apply our pending list so that
-// our kvmap is up-to-date. When we switch to slave, we wipe our kvmap
-// and grab a new snapshot from the master:
+// When we switch from passive to active, we apply our pending list so that
+// our kvmap is up-to-date. When we switch to passive, we wipe our kvmap
+// and grab a new snapshot from the active:
static int
-s_new_master (zloop_t *loop, zmq_pollitem_t *unused, void *args)
+s_new_active (zloop_t *loop, zmq_pollitem_t *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
- self->master = TRUE;
- self->slave = FALSE;
+ self->active = TRUE;
+ self->passive = FALSE;
// Stop subscribing to updates
zmq_pollitem_t poller = { self->subscriber, 0, ZMQ_POLLIN };
@@ -324,13 +324,13 @@ s_new_master (zloop_t *loop, zmq_pollitem_t *unused, void *args)
}
static int
-s_new_slave (zloop_t *loop, zmq_pollitem_t *unused, void *args)
+s_new_passive (zloop_t *loop, zmq_pollitem_t *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zhash_destroy (&self->kvmap);
- self->master = FALSE;
- self->slave = TRUE;
+ self->active = FALSE;
+ self->passive = TRUE;
// Start subscribing to updates
zmq_pollitem_t poller = { self->subscriber, 0, ZMQ_POLLIN };
@@ -341,7 +341,7 @@ s_new_slave (zloop_t *loop, zmq_pollitem_t *unused, void *args)
// .split subscriber handler
// When we get an update, we create a new kvmap if necessary, and then
-// add our update to our kvmap. We're always slave in this case:
+// add our update to our kvmap. We're always passive in this case:
static int
s_subscriber (zloop_t *loop, zmq_pollitem_t *poller, void *args)
@@ -377,8 +377,8 @@ s_subscriber (zloop_t *loop, zmq_pollitem_t *poller, void *args)
if (strneq (kvmsg_key (kvmsg), "HUGZ")) {
if (!s_was_pending (self, kvmsg)) {
- // If master update came before client update, flip it
- // around, store master update (with sequence) on pending
+ // If active update came before client update, flip it
+ // around, store active update (with sequence) on pending
// list and use to clear client update when it comes later
zlist_append (self->pending, kvmsg_dup (kvmsg));
}
View
4 examples/C/flserver2.c
@@ -22,11 +22,11 @@ int main (int argc, char *argv [])
// Fail nastily if run against wrong client
assert (zmsg_size (request) == 2);
- zframe_t *address = zmsg_pop (request);
+ zframe_t *identity = zmsg_pop (request);
zmsg_destroy (&request);
zmsg_t *reply = zmsg_new ();
- zmsg_add (reply, address);
+ zmsg_add (reply, identity);
zmsg_addstr (reply, "OK");
zmsg_send (&reply, server);
}
View
4 examples/C/flserver3.c
@@ -29,7 +29,7 @@ int main (int argc, char *argv [])
// Frame 0: identity of client
// Frame 1: PING, or client control frame
// Frame 2: request body
- zframe_t *address = zmsg_pop (request);
+ zframe_t *identity = zmsg_pop (request);
zframe_t *control = zmsg_pop (request);
zmsg_t *reply = zmsg_new ();
if (zframe_streq (control, "PING"))
@@ -39,7 +39,7 @@ int main (int argc, char *argv [])
zmsg_addstr (reply, "OK");
}
zmsg_destroy (&request);
- zmsg_push (reply, address);
+ zmsg_push (reply, identity);
if (verbose && reply)
zmsg_dump (reply);
zmsg_send (&reply, server);
View
46 examples/C/lruqueue.c → examples/C/lbbroker.c
@@ -1,5 +1,5 @@
//
-// Least-recently used (LRU) queue device
+// Load-balancing broker
// Clients and workers are shown here in-process
//
#include "zhelpers.h"
@@ -37,7 +37,7 @@ client_task (void *args)
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. Each thread has its own
// context and conceptually acts as a separate process.
-// This is the worker task, using a REQ socket to do LRU routing.
+// This is the worker task, using a REQ socket to do load-balancing.
// Since s_send and s_recv can't handle 0MQ binary identities we
// set a printable text identity to allow routing.
@@ -55,7 +55,7 @@ worker_task (void *args)
while (1) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
- char *address = s_recv (worker);
+ char *identity = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);
@@ -65,10 +65,10 @@ worker_task (void *args)
printf ("Worker: %s\n", request);
free (request);
- s_sendmore (worker, address);
+ s_sendmore (worker, identity);
s_sendmore (worker, "");
s_send (worker, "OK");
- free (address);
+ free (identity);
}
zmq_close (worker);
zmq_ctx_destroy (context);
@@ -79,8 +79,8 @@ worker_task (void *args)
// This is the main task. It starts the clients and workers, and then
// routes requests between the two layers. Workers signal READY when
// they start; after that we treat them as ready when they reply with
-// a response back to a client. The LRU data structure is just a queue
-// of next available workers.
+// a response back to a client. The load-balancing data structure is
+// just a queue of next available workers.
int main (void)
{
@@ -108,9 +108,9 @@ int main (void)
// one or more workers ready. This is a neat way to use 0MQ's own queues
// to hold messages we're not ready to process yet. When we get a client
// reply, we pop the next available worker, and send the request to it,
- // including the originating client address. When a worker replies, we
+ // including the originating client identity. When a worker replies, we
// re-queue that worker, and we forward the reply to the original client,
- // using the address envelope.
+ // using the reply envelope.
// Queue of available workers
int available_workers = 0;
@@ -128,41 +128,41 @@ int main (void)
// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
- // Queue worker address for LRU routing
- char *worker_addr = s_recv (backend);
+ // Queue worker identity for load-balancing
+ char *worker_id = s_recv (backend);
assert (available_workers < NBR_WORKERS);
- worker_queue [available_workers++] = worker_addr;
+ worker_queue [available_workers++] = worker_id;
// Second frame is empty
char *empty = s_recv (backend);
assert (empty [0] == 0);
free (empty);
- // Third frame is READY or else a client reply address
- char *client_addr = s_recv (backend);
+ // Third frame is READY or else a client reply identity
+ char *client_id = s_recv (backend);
// If client reply, send rest back to frontend
- if (strcmp (client_addr, "READY") != 0) {
+ if (strcmp (client_id, "READY") != 0) {
empty = s_recv (backend);
assert (empty [0] == 0);
free (empty);
char *reply = s_recv (backend);
- s_sendmore (frontend, client_addr);
+ s_sendmore (frontend, client_id);
s_sendmore (frontend, "");
s_send (frontend, reply);
free (reply);
if (--client_nbr == 0)
break; // Exit after N messages
}
- free (client_addr);
+ free (client_id);
}
// .split handling a client request
// Here is how we handle a client request:
if (items [1].revents & ZMQ_POLLIN) {
- // Now get next client request, route to LRU worker
- // Client request is [address][empty][request]
- char *client_addr = s_recv (frontend);
+ // Now get next client request, route to last-used worker
+ // Client request is [identity][empty][request]
+ char *client_id = s_recv (frontend);
char *empty = s_recv (frontend);
assert (empty [0] == 0);
free (empty);
@@ -170,14 +170,14 @@ int main (void)
s_sendmore (backend, worker_queue [0]);
s_sendmore (backend, "");
- s_sendmore (backend, client_addr);
+ s_sendmore (backend, client_id);
s_sendmore (backend, "");
s_send (backend, request);
- free (client_addr);
+ free (client_id);
free (request);
- // Dequeue and drop the next worker address
+ // Dequeue and drop the next worker identity
free (worker_queue [0]);
DEQUEUE (worker_queue);
available_workers--;
View
22 examples/C/lruqueue2.c → examples/C/lbbroker2.c
@@ -1,12 +1,12 @@
//
-// Least-recently used (LRU) queue device
+// Load-balancing broker
// Demonstrates use of the CZMQ API
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
-#define LRU_READY "\001" // Signals worker is ready
+#define WORKER_READY "\001" // Signals worker is ready
// Basic request-reply client using REQ socket
//
@@ -31,7 +31,7 @@ client_task (void *args)
return NULL;
}
-// Worker using REQ socket to do LRU routing
+// Worker using REQ socket to do load-balancing
//
static void *
worker_task (void *args)
@@ -41,7 +41,7 @@ worker_task (void *args)
zsocket_connect (worker, "ipc://backend.ipc");
// Tell broker we're ready for work
- zframe_t *frame = zframe_new (LRU_READY, 1);
+ zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, worker, 0);
// Process messages as they arrive
@@ -58,7 +58,7 @@ worker_task (void *args)
// .split main task
// Now we come to the main task. This has the identical functionality to
-// the previous lruqueue example but uses CZMQ to start child threads,
+// the previous lbbroker example but uses CZMQ to start child threads,
// to hold the list of workers, and to read and send messages:
int main (void)
@@ -79,8 +79,8 @@ int main (void)
// Queue of available workers
zlist_t *workers = zlist_new ();
- // .split main LRU queue loop
- // Here is the main loop for the LRU queue. It works the same way
+ // .split main load-balancer loop
+ // Here is the main loop for the load-balancer. It works the same way
// as the previous example, but is a lot shorter because CZMQ gives
// us an API that does more with fewer calls:
while (true) {
@@ -95,16 +95,16 @@ int main (void)
// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
- // Use worker address for LRU routing
+ // Use worker identity for load-balancing
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Interrupted
- zframe_t *address = zmsg_unwrap (msg);
- zlist_append (workers, address);
+ zframe_t *identity = zmsg_unwrap (msg);
+ zlist_append (workers, identity);
// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first (msg);
- if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
+ if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, frontend);
View
26 examples/C/lruqueue3.c → examples/C/lbbroker3.c
@@ -1,5 +1,5 @@
//
-// Least-recently used (LRU) queue device
+// Load-balancing broker
// Demonstrates use of the CZMQ API and reactor style
//
// The client and worker tasks are identical from the previous example.
@@ -9,7 +9,7 @@
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
-#define LRU_READY "\001" // Signals worker is ready
+#define WORKER_READY "\001" // Signals worker is ready
// Basic request-reply client using REQ socket
//
@@ -34,7 +34,7 @@ client_task (void *args)
return NULL;
}
-// Worker using REQ socket to do LRU routing
+// Worker using REQ socket to do load-balancing
//
static void *
worker_task (void *args)
@@ -44,7 +44,7 @@ worker_task (void *args)
zsocket_connect (worker, "ipc://backend.ipc");
// Tell broker we're ready for work
- zframe_t *frame = zframe_new (LRU_READY, 1);
+ zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, worker, 0);
// Process messages as they arrive
@@ -61,12 +61,12 @@ worker_task (void *args)
}
// .until
-// Our LRU queue structure, passed to reactor handlers
+// Our load-balancer structure, passed to reactor handlers
typedef struct {
void *frontend; // Listen to clients
void *backend; // Listen to workers
zlist_t *workers; // List of ready workers
-} lruqueue_t;
+} lbbroker_t;
// .split reactor design
@@ -77,7 +77,7 @@ typedef struct {
// Handle input from client, on frontend
int s_handle_frontend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
- lruqueue_t *self = (lruqueue_t *) arg;
+ lbbroker_t *self = (lbbroker_t *) arg;
zmsg_t *msg = zmsg_recv (self->frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (self->workers));
@@ -95,12 +95,12 @@ int s_handle_frontend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
// Handle input from worker, on backend
int s_handle_backend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
- // Use worker address for LRU routing
- lruqueue_t *self = (lruqueue_t *) arg;
+ // Use worker identity for load-balancing
+ lbbroker_t *self = (lbbroker_t *) arg;
zmsg_t *msg = zmsg_recv (self->backend);
if (msg) {
- zframe_t *address = zmsg_unwrap (msg);
- zlist_append (self->workers, address);
+ zframe_t *identity = zmsg_unwrap (msg);
+ zlist_append (self->workers, identity);
// Enable reader on frontend if we went from 0 to 1 workers
if (zlist_size (self->workers) == 1) {
@@ -109,7 +109,7 @@ int s_handle_backend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
}
// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first (msg);
- if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
+ if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, self->frontend);
@@ -126,7 +126,7 @@ int s_handle_backend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
int main (void)
{
zctx_t *ctx = zctx_new ();
- lruqueue_t *self = (lruqueue_t *) zmalloc (sizeof (lruqueue_t));
+ lbbroker_t *self = (lbbroker_t *) zmalloc (sizeof (lbbroker_t));
self->frontend = zsocket_new (ctx, ZMQ_ROUTER);
self->backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (self->frontend, "ipc://frontend.ipc");
View
44 examples/C/mdbroker.c
@@ -62,14 +62,14 @@ static void
typedef struct {
broker_t *broker; // Broker instance
- char *identity; // Identity of worker
- zframe_t *address; // Address frame to route to
+ char *id_string; // Identity of worker as string
+ zframe_t *identity; // Identity frame for routing
service_t *service; // Owning service, if known
int64_t expiry; // Expires at unless heartbeat
} worker_t;
static worker_t *
- s_worker_require (broker_t *self, zframe_t *address);
+ s_worker_require (broker_t *self, zframe_t *identity);
static void
s_worker_delete (worker_t *self, int disconnect);
static void
@@ -136,9 +136,9 @@ s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
assert (zmsg_size (msg) >= 1); // At least, command
zframe_t *command = zmsg_pop (msg);
- char *identity = zframe_strhex (sender);
- int worker_ready = (zhash_lookup (self->workers, identity) != NULL);
- free (identity);
+ char *id_string = zframe_strhex (sender);
+ int worker_ready = (zhash_lookup (self->workers, id_string) != NULL);
+ free (id_string);
worker_t *worker = s_worker_require (self, sender);
if (zframe_streq (command, MDPW_READY)) {
@@ -202,7 +202,7 @@ s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
zframe_t *service_frame = zmsg_pop (msg);
service_t *service = s_service_require (self, service_frame);
- // Set reply return address to client sender
+ // Set reply return identity to client sender
zmsg_wrap (msg, zframe_dup (sender));
// If we got a MMI service request, process that internally
@@ -251,7 +251,7 @@ s_broker_purge (broker_t *self)
break; // Worker is alive, we're done here
if (self->verbose)
zclock_log ("I: deleting expired worker: %s",
- worker->identity);
+ worker->id_string);
s_worker_delete (worker, 0);
worker = (worker_t *) zlist_first (self->waiting);
@@ -333,27 +333,27 @@ s_service_dispatch (service_t *self, zmsg_t *msg)
// worker if there is no worker already with that identity.
static worker_t *
-s_worker_require (broker_t *self, zframe_t *address)
+s_worker_require (broker_t *self, zframe_t *identity)
{
- assert (address);
+ assert (identity);
// self->workers is keyed off worker identity
- char *identity = zframe_strhex (address);
+ char *id_string = zframe_strhex (identity);
worker_t *worker =
- (worker_t *) zhash_lookup (self->workers, identity);
+ (worker_t *) zhash_lookup (self->workers, id_string);
if (worker == NULL) {