Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
604 lines (403 sloc) 42.8 KB
.output chapter4.wd
++ Chapter 4 - Reliability Patterns
In Chapter Three we looked at advanced use of 0MQ's request-reply pattern with worked examples. In this chapter we'll look at the general question of reliability and build a set of reliable messaging patterns on top of 0MQ's core patterns.
We'll cover:
* How we define 'reliability'.
* The types of failures we will experience in 0MQ applications.
* How to implement reliability on top of the 0MQ core patterns.
* How to implement heartbeating between 0MQ peers.
* How to write a reusable protocol specification.
* How to design a service-oriented framework API.
In this chapter we focus heavily on user-space 'patterns', which are reusable models that help you design your 0MQ architecture:
* The //Suicidal Snail// pattern: how to handle slow clients.
* The //Lazy Pirate// pattern: reliable request reply from the client side.
* The //Simple Pirate// pattern: reliable request-reply using a LRU queue.
* The //Paranoid Pirate// pattern: reliable request-reply with heartbeating.
* The //Majordomo// pattern: service-oriented reliable queuing.
* The //Titanic// pattern: disk-based reliable queuing.
* The //Freelance// pattern: brokerless reliable request-reply.
* The //Clone// pattern: reliable publish-subscribe using distributed key-value tables.
+++ What is "Reliability"?
To understand what 'reliability' means, we have to look at its opposite, namely *failure*. If we can handle a certain set of failures, we are reliable with respect to those failures. No more, no less. So let's look at the possible causes of failure in a distributed 0MQ application, in roughly descending order of probability:
* Application code is the worst offender. It can crash and exit, freeze and stop responding to input, run too slowly for its input, exhaust all memory, etc.
* System code - like brokers we write using 0MQ - can die. System code should be more reliable than application code but can still crash and burn, and especially run out of memory if it tries to compensate for slow clients.
* Message queues can overflow, typically in system code that has learned to deal brutally with slow clients. When a queue overflows, it starts to discard messages.
* Networks can fail temporarily, causing intermittent message loss. Such errors are hidden to 0MQ applications since it automatically reconnects peers after a network-forced disconnection.
* Hardware can fail and take with it all the processes running on that box.
* Networks can fail in exotic ways, e.g. some ports on a switch may die and those parts of the network become inaccessible.
* Entire data centers can be struck by lightning, earthquakes, fire, or more mundane power or cooling failures.
To make a software system fully reliable against //all// of these possible failures is an enormously difficult and expensive job and goes beyond the scope of this modest guide.
Since the first five cases cover 99.9% of real world requirements outside large companies (according to a highly scientific study I just ran), that's what we'll look at. If you're a large company with money to spend on the last two cases, contact me immediately, there's a large hole behind my beach house waiting to be converted into a pool.
++++ The Slow Client Problem
In any high-volume architecture (data or workload distribution), applications need to be able to keep up with incoming data. The problem is that application developers too often don't have the skills to write fast code, or use languages that are inherently slow, or deploy to boxes that can easily run very slowly. Even a fast, well-written client application can appear to run "slowly" if the network is congested, or the application gets temporarily disconnected from the server.
Handling slow clients correctly is delicate. On the one hand, you really don't want to face application developers with an excuse like "sorry, our messaging layer lost your message somewhere". On the other hand, if you allow message queues to build up, especially in publishers that handle many clients, things just break.
0MQ does two things to handle the slow client problem:
* It moves messages as rapidly as possible to the client, and queues them there. In all the asynchronous messaging patterns (that is, all except synchronous request-reply), messages are sent to their destination without pause. By itself, this strategy avoids the bulk of queue overflow problems. If the client application runs out of memory, we don't really care.
* For cases where network congestion or client disconnection stops the sender getting rid of messages, 0MQ offers a "high water mark" that limits the size of a given socket queue. Since each use case has its own needs, 0MQ lets you set this per outgoing socket. When a queue hits the HWM, messages are just dropped. It's brutal, but there is no other sane strategy.
++++ Designing Reliability
So to make things brutally simple, reliability is "keeping things working properly when code freezes or crashes", a situation we'll shorten to "dies". However the things we want to keep working properly are more complex than just messages. We need to take each core 0MQ messaging pattern and see how to make it work (if we can) even when code dies.
Let's take them one by one:
* Request-reply: if the server dies (while processing a request), the client can figure that out since it won't get an answer back. Then it can give up in a huff, wait and try again later, find another server, etc. As for the client dying, we can brush that off as "someone else's problem" for now.
* Publish-subscribe: if the client dies (having gotten some data), the server doesn't know about it. Pubsub doesn't send any information back from client to server. But the client can contact the server out-of-band, e.g. via request-reply, and ask, "please resend everything I missed". As for the server dying, that's out of scope for here. Subscribers can also self-verify that they're not running too slowly, and take action (e.g. warn the operator, and die) if they are.
* Pipeline: if a worker dies (while working), the ventilator doesn't know about it. Pipelines, like pubsub, and the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn't get done, and send a message back to the ventilator saying, "hey, resend task 324!" If the ventilator or collector die, then whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It's not elegant but system code should really not die often enough to matter.
+++ Slow Subscriber Detection (Suicidal Snail Pattern)
One of the most common problems you will hit when using the pubsub pattern is the slow subscriber. In an ideal world, we stream data at full speed from publishers to subscribers. In reality, subscriber applications are often written in interpreted languages, or just do a lot of work, or are just badly written, to the extent that they can't keep up with publishers.
How do we handle a slow subscriber? The ideal fix is to make the subscriber faster, but that might take work and time. Some of the classic strategies for handling a slow subscriber are:
* **Queue messages on the publisher**. This is what Gmail does when I don't read my email for a couple of hours. But in high-volume messaging, pushing queues upstream has the thrilling but unprofitable result of making publishers run out of memory and crash. Especially if there are lots of subscribers and it's not possible to flush to disk for performance reasons.
* **Queue messages on the subscriber**. This is much better, and it's what 0MQ does by default if the network can keep up with things. If anyone's going to run out of memory and crash, it'll be the subscriber rather than the publisher, which is fair. This is perfect for "peaky" streams where a subscriber can't keep up for a while, but can catch up when the stream slows down. However it's no answer to a subscriber that's simply too slow in general.
* **Stop queuing new messages after a while**. This is what Gmail does when my mailbox overflows its 7.554GB, no 7.555GB of space. New messages just get rejected or dropped. This is a great strategy from the perspective of the publisher, and it's what 0MQ does when the publisher sets a high water mark or HWM. However it still doesn't help us fix the slow subscriber. Now we just get gaps in our message stream.
* **Punish slow subscribers with disconnect**. This is what Hotmail does when I don't login for two weeks, which is why I'm on my fifteenth Hotmail account. It's a nice brutal strategy that forces subscribers to sit up and pay attention, and would be ideal, but 0MQ doesn't do this, and there's no way to layer it on top since subscribers are invisible to publisher applications.
None of these classic strategies fit. So we need to get creative. Rather than disconnect the publisher, let's convince the subscriber to kill itself. This is the Suicidal Snail pattern. When a subscriber detects that it's running too slowly (where "too slowly" is presumably a configured option that really means "so slowly that if you ever get here, shout really loudly because I need to know, so I can fix this!"), it croaks and dies.
How can a subscriber detect this? One way would be to sequence messages (number them in order), and use a HWM at the publisher. Now, if the subscriber detects a gap (i.e. the numbering isn't consecutive), it knows something is wrong. We then tune the HWM to the "croak and die if you hit this" level.
There are two problems with this solution. One, if we have many publishers, how do we sequence messages? The solution is to give each publisher a unique ID and add that to the sequencing. Second, if subscribers use ZMQ_SUBSCRIBE filters, they will get gaps by definition. Our precious sequencing will be for nothing.
Some use cases won't use filters, and sequencing will work for them. But a more general solution is that the publisher timestamps each message. When a subscriber gets a message it checks the time, and if the difference is more than, say, one second, it does the "croak and die" thing. Possibly firing off a squawk to some operator console first.
The Suicide Snail pattern works especially when subscribers have their own clients and service-level agreements and need to guarantee certain maximum latencies. Aborting a subscriber may not seem like a constructive way to guarantee a maximum latency, but it's the assertion model. Abort today, and the problem will be fixed. Allow late data to flow downstream, and the problem may cause wider damage and take longer to appear on the radar.
So here is a minimal example of a Suicidal Snail:
[[code type="example" title="Suicidal Snail" name="suisnail"]]
[[/code]]
Notes about this example:
* The message here consists simply of the current system clock as a number of milliseconds. In a realistic application you'd have at least a message header with the timestamp, and a message body with data.
* The example has subscriber and publisher in a single process, as two threads. In reality they would be separate processes. Using threads is just convenient for the demonstration.
+++ Reliable Request-Reply (Pirate Pattern)
The basic request-reply pattern (a REQ client socket doing a blocking send/recv to a REP server socket) scores low on handling the most common types of failure. If the server crashes while processing the request, the client just hangs forever. If the network loses the request or the reply, the client hangs forever.
It is a lot better than TCP, thanks to 0MQ's ability to reconnect peers silently, to load-balance messages, and so on. But it's still not good enough for real work. The only use case where you can trust the basic request-reply pattern is between two threads in the same process where there's no network or separate server process to die.
However, with a little extra work this humble pattern becomes a good basis for real work across a distributed network, and we get a reliable request-reply pattern I like to call the "Pirate" pattern. RRR!
There are, roughly, three ways to connect clients to servers, each needing a specific approach to reliability:
* Multiple clients talking directly to a single server. Use case: single well-known server that clients need to talk to. Types of failure we aim to handle: server crashes and restarts, network disconnects.
* Multiple clients talking to a single queue device that distributes work to multiple servers. Use case: workload distribution to workers. Types of failure we aim to handle: worker crashes and restarts, worker busy looping, worker overload, queue crashes and restarts, network disconnects.
* Multiple clients talking to multiple servers with no intermediary devices. Use case: distributed services such as name resolution. Types of failure we aim to handle: service crashes and restarts, service busy looping, service overload, network disconnects.
Each of these has their trade-offs and often you'll mix them. We'll look at all three of these in detail.
++++ Client-side Reliability (Lazy Pirate Pattern)
We can get very simple reliable request-reply with only some changes in the client. We call this the Lazy Pirate pattern. Rather than doing a blocking receive, we:
* Poll the REQ socket and only receive from it when it's sure a reply has arrived.
* Resend a request several times, it no reply arrived within a timeout period.
* Abandon the transaction if after several requests, there is still no reply.
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| Client | | Client | | Client |
+-----------+ +-----------+ +-----------+
| Retry | | Retry | | Retry |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-------------\
| REP |
+-------------+
| |
| Server |
| |
+-------------+
Figure # - Lazy Pirate pattern
[[/code]]
If you try to use a REQ socket in anything than a strict send-recv fashion, you'll get an EFSM error. This is slightly annoying when we want to use REQ in a pirate pattern, because we may send several requests before getting a reply. The pretty good brute-force solution is to close and reopen the REQ socket after an error:
[[code type="example" title="Lazy Pirate client" name="lpclient"]]
[[/code]]
Run this together with the matching server:
[[code type="example" title="Lazy Pirate server" name="lpserver"]]
[[/code]]
To run this testcase, start the client and the server in two console windows. The server will randomly misbehave after a few messages. You can check the client's response. Here is a typical output from the server:
[[code]]
I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash
[[/code]]
And here is the client's response:
[[code]]
I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning
[[/code]]
The client sequences each message, and checks that replies come back exactly in order: that no requests or replies are lost, and no replies come back more than once, or out of order. Run the test a few times until you're convinced this mechanism actually works.
The client uses a REQ socket, and does the brute-force close/reopen because REQ sockets impose a strict send/receive cycle. You might be tempted to use an XREQ instead, but it would not be a good decision. First, it would mean emulating the secret sauce that REQ does with envelopes (if you've forgotten what that is, it's a good sign you don't want to have to do it). Second, it would mean potentially getting back replies that you didn't expect.
Handling failures only at the client works when we have a set of clients talking to a single server. It can handle a server crash, but only if recovery means restarting that same server. If there's a permanent error - e.g. a dead power supply on the server hardware - this approach won't work. Since the application code in servers is usually the biggest source of failures in any architecture, depending on a single server is not a great idea.
So, pros and cons:
* Pro: simple to understand and implement.
* Pro: works easily with existing client and server application code.
* Pro: 0MQ automatically retries the actual reconnection until it works.
* Con: doesn't do failover to backup / alternate servers.
++++ Basic Reliable Queuing (Simple Pirate Pattern)
Our second approach takes Lazy Pirate pattern and extends it with a queue device that lets us talk, transparently, to multiple servers, which we can more accurately call 'workers'. We'll develop this in stages, starting with a minimal working model, the Simple Pirate pattern.
In all these Pirate patterns, workers are stateless, or have some shared state we don't know about, e.g. a shared database. Having a queue device means workers can come and go without clients knowing anything about it. If one worker dies, another takes over. This is a nice simple topology with only one real weakness, namely the central queue itself, which can become a problem to manage, and a single point of failure.
The basis for the queue device is the least-recently-used (LRU) routing queue from Chapter 3. What is the very //minimum// we need to do to handle dead or blocked workers? Turns out, its surprisingly little. We already have a retry mechanism in the client. So using the standard LRU queue will work pretty well. This fits with 0MQ's philosophy that we can extend a peer-to-peer pattern like request-reply by plugging naive devices in the middle:
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| Client | | Client | | Client |
+-----------+ +-----------+ +-----------+
| Retry | | Retry | | Retry |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-----------\
| XREP |
+-----------+
| LRU |
| Queue |
+-----------+
| XREP |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| REQ | | REQ | | REQ |
+-----------+ +-----------+ +-----------+
| LRU | | LRU | | LRU |
| Worker | | Worker | | Worker |
+-----------+ +-----------+ +-----------+
Figure # - Simple Pirate Pattern
[[/code]]
We don't need a special client, we're still using the Lazy Pirate client. Here is the queue, which is exactly a LRU queue, no more or less:
[[code type="example" title="Simple Pirate queue" name="spqueue"]]
[[/code]]
Here is the worker, which takes the Lazy Pirate server and adapts it for the LRU pattern (using the REQ 'ready' signaling):
[[code type="example" title="Simple Pirate worker" name="spworker"]]
[[/code]]
To test this, start a handlful of workers, a client, and the queue, in any order. You'll see that the workers eventually all crash and burn, and the client retries and then gives up. The queue never stops, and you can restart workers and clients ad-nauseam. This model works with any number of clients and workers.
++++ Robust Reliable Queuing (Paranoid Pirate Pattern)
The Simple Pirate Queue pattern works pretty well, especially since it's just a combination of two existing patterns, but it has some weaknesses:
* It's not robust against a queue crash and restart. The client will recover, but the workers won't. While 0MQ will reconnect workers' sockets automatically, as far as the newly started queue is concerned, the workers haven't signalled "READY", so don't exist. To fix this we have to do heartbeating from queue to worker, so that the worker can detect when the queue has gone away.
* The queue does not detect worker failure, so if a worker dies while idle, the queue can only remove it from its worker queue by first sending it a request. The client waits and retries for nothing. It's not a critical problem but it's not nice. To make this work properly we do heartbeating from worker to queue, so that the queue can detect a lost worker at any stage.
We'll fix these in a properly pedantic Paranoid Pirate Pattern.
We previously used a REQ socket for the worker. For the Paranoid Pirate worker we'll switch to an XREQ socket. This has the advantage of letting us send and receive messages at any time, rather than the lock-step send/receive that REQ imposes. The downside of XREQ is that we have to do our own envelope management. If you don't know what I mean, please re-read Chapter 3.
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| Client | | Client | | Client |
+-----------+ +-----------+ +-----------+
| Retry | | Retry | | Retry |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-----------\
| XREP |
+-----------+
| Queue |
+-----------+
| Heartbeat |
+-----------+
| XREP |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| XREQ | | XREQ | | XREQ |
+-----------+ +-----------+ +-----------+
| Heartbeat | | Heartbeat | | Heartbeat |
+-----------+ +-----------+ +-----------+
| Worker | | Worker | | Worker |
+-----------+ +-----------+ +-----------+
Figure # - Paranoid Pirate Pattern
[[/code]]
We're still using the Lazy Pirate client. Here is the Paranoid Pirate queue device:
[[code type="example" title="Paranoid Pirate queue" name="ppqueue"]]
[[/code]]
Some comments about this example:
* In C, it's quite horrid to manage any kind of data structure. The queue really needs two data structures: a least-recently used list of servers, and a hash of the same set of servers. The C code is not optimized, and won't scale as such. A proper version would use hash and list containers such as the [http://zfl.zeromq.org ZFL project] provides.
* The queue extends the LRU pattern with heartbeating of workers. It's simple once it works, but quite difficult to invent. I'll explain more about heartbeating in a second.
Here is the Paranoid Pirate worker:
[[code type="example" title="Paranoid Pirate worker" name="ppworker"]]
[[/code]]
Some comments about this example:
* The code includes simulation of failures, as before. This makes it (a) very hard to debug, and (b) dangerous to reuse. When you want to debug this, disable the failure simulation.
* As for the Paranoid Pirate queue, the heartbeating is quite tricky to get right. See below for a discussion about this.
* The worker uses a reconnect strategy similar to the one we designed for the Lazy Pirate client. With two major differences: (a) it does an exponential back-off, and (b) it never abandons.
Try the client, queue, and workers, e.g. using a script like this:
[[code]]
ppqueue &
for i in 1 2 3 4; do
ppworker &
sleep 1
done
lpclient &
[[/code]]
You should see the workers die, one by one, as they simulate a crash, and the client eventually give up. You can stop and restart the queue and both client and workers will reconnect and carry on. And matter what you do to queues and workers, the client will never get an out-of-order reply: the whole chain either works, or the client abandons.
++++ Heartbeating
When writing the Paranoid Pirate examples, it took about five hours to get the queue-to-worker heartbeating working properly. The rest of the request-reply chain took perhaps ten minutes. Heartbeating is one of those reliability layers that often causes more trouble than it saves. It is especially easy to create 'false failures', i.e. peers decide that they are disconnected because the heartbeats aren't sent properly.
Some points to consider when understanding and implementing heartbeating:
* Note that heartbeats are not request-reply. They flow asynchronously in both directions. Either peer can decide the other is 'dead' and stop talking to it.
* If one of the peers uses durable sockets, this means it may get heartbeats queued up that it will receive if it reconnects. For this reason, workers should //not// reuse durable sockets. The example code uses durable sockets for debugging purposes but they are randomized to (in theory) never reuse an existing socket.
* First, get the heartbeating working, and only //then// add in the rest of the message flow. You should be able to prove the heartbeating works by starting peers in any order, stopping and restarting them, simulating freezes, and so on.
* When your main loop is based on zmq_poll[3], use a secondary timer to trigger heartbeats. Do //not// use the poll loop for this, because it will either send too many heartbeats (overloading the network), or too few (causing peers to disconnect). The zhelpers package provides an s_clock() method that returns the current system clock in milliseconds. It's easy to use this to calculate when to send the next heartbeats. Thus, in C:
[[code]]
// Send out heartbeats at regular intervals
uint64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
while (1) {
...
zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000);
...
// Do this unconditionally, whatever zmq_poll did
if (s_clock () > heartbeat_at) {
... Send heartbeats to all peers that expect them
// Set timer for next heartbeat
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
}
}
[[/code]]
* Your main poll loop should use the heartbeat interval as its timeout. Obviously, don't use infinity. Anything less will just waste cycles.
* Use simple tracing, i.e. print to console, to get this working. Some tricks to help you trace the flow of messages between peers: a dump method such as zmsg offers; number messages incrementally so you can see if there are gaps.
* In a real application, heartbeating must be configurable and usually negotiated with the peer. Some peers will want aggressive heartbeating, as low as 10 msecs. Other peers will be far away and want heartbeating as high as 30 seconds.
* If you have different heartbeat intervals for different peers, your poll timeout should be the lowest of these.
* You might be tempted to open a separate socket dialog for heartbeats. This is superficially nice because you can separate different dialogs, e.g. the synchronous request-reply from the asynchronous heartbeating. However it's a bad idea for several reasons. First, if you're sending data you don't need to send heartbeats. Second, sockets may, due to network vagaries, become jammed. You need to know when your main data socket is silent because it's dead, rather than just not busy, so you need heartbeats on that socket. Lastly, two sockets is more complex than one.
* We're not doing heartbeating from client to queue. We could, but it would add //significant// complexity for no real benefit.
++++ Contracts and Protocols
If you're paying attention you'll realize that Paranoid Pirate is not compatible with Simple Pirate, because of the heartbeats.
In fact what we have here is a protocol that needs writing down. It's fun to experiment without specifications, but that's not a sensible basis for real applications. What happens if we want to write a worker in another language? Do we have to read code to see how things work? What if we want to change the protocol for some reason? The protocol may be simple but it's not obvious, and if it's successful it'll become more complex.
Lack of contracts is a sure sign of a disposable application. So, let's write a contract for this protocol. How do we do that?
* There's a wiki, at [http://rfc.zeromq.org rfc.zeromq.org], that we made especially as a home for public 0MQ contracts.
* To create a new specification, register, and follow the instructions. It's straight-forward, though technical writing is not for everyone.
It took me about fifteen minutes to draft the new [http://rfc.zeromq.org/spec:6 Pirate Pattern Protocol]. It's not a big specification but it does capture enough to act as the basis for arguments ("your queue isn't PPP compatible, please fix it!").
Turning PPP into a real protocol would take more work:
* There should be a protocol version number in the READY command so that it's possible to create new versions of PPP safely.
* Right now, READY and HEARTBEAT are not entirely distinct from requests and replies. To make them distinct, we would want a message structure that includes a "message type" part.
++++ Service-Oriented Reliable Queuing (Majordomo Pattern)
The nice thing about progress is how fast it happens when lawyers and committees aren't involved. Just a few sentences ago we were dreaming of a better protocol that would fix the world. And here we have it:
* http://rfc.zeromq.org/spec:7
This one-page specification takes PPP and turns it into something more solid. This is how we should design complex architectures: start by writing down the contracts, and only //then// write software to implement them.
The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. This made it easy to draft.
Adding service names is a small but significant change that turns our Paranoid Pirate queue into a service-oriented broker:
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| | | | | |
| Client | | Client | | Client |
| | | | | |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
"Give me coffee" | "Give me tea"
v
/-----------\
| |
| Broker |
| |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| "Water" | | "Tea" | | "Coffee" |
+-----------+ +-----------+ +-----------+
| | | | | |
| Worker | | Worker | | Worker |
| | | | | |
+-----------+ +-----------+ +-----------+
Figure # - Majordomo Pattern
[[/code]]
To implement Majordomo we need to write a framework for clients and workers. It's really not sane to ask every application developer to read the spec and make it work, when they could be using a simpler API built and tested just once.
So, while our first contract (MDP itself) defines how the pieces of our distributed architecture talk to each other, our second contract defines how user applications talk to the technical framework we're going to design.
Majordomo has two halves, a client side and a worker side. Since we'll write both client and worker applications, we will need two APIs. Here is a sketch for the client API, using a simple object-oriented approach. We write this in C, using the style of the [http://zfl.zeromq.org/page:read-the-manual ZFL library]:
[[code]]
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t *request);
[[/code]]
That's it. We open a to the broker, we send a request message and get a reply message back, and we eventually close the connection. Here's a sketch for the worker API:
[[code]]
mdwrk_t *mdwrk_new (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
[[/code]]
It's more or less symmetrical but the worker dialog is a little different. The first time a worker does a recv (), it passes a null reply, thereafter it passes the current reply, and gets a new request.
The client and worker APIs are fairly simple to construct, since they're heavily based on the Paranoid Pirate code we already developed. Here is the client API:
[[code type="example" title="Majordomo Client API" name="mdcliapi"]]
[[/code]]
And here is the worker API:
[[code type="example" title="Majordomo Worker API" name="mdwrkapi"]]
[[/code]]
Notes on this code:
* The APIs are single threaded. This means, for example, that the worker won't send heartbeats in the background. Happily, this is exactly what we want: if the worker application gets stuck, heartbeats will stop and the broker will stop sending requests to the worker.
* The worker API doesn't do an exponential backoff, it's not worth the extra complexity.
* The APIs don't do any error reporting. If something isn't as expected, they raise an assertion (or exception depending on the language). This is ideal for a reference implementation, so any protocol errors show immediately. For real applications the API should be robust against invalid messages.
Let's design the Majordomo broker. Its core structure is a set of queues, one per service. We will create these queues as workers appear (we could delete them as workers disappear but forget that for now, it gets complex). Additionally, we keep a queue of workers per service.
To make the C examples easier to write and read, I've taken the hash and list containers from the [http://zfl.zeromq.org ZFL project], and renamed them as zlist and zhash, as we did with zmsg. In any modern language you can of course use built-in containers.
- broker as thread, multiple in one process
- test, dev, etc... easy to do
- how to handle broker failure - live/live
.end
provides.
broker
- design first
- containers, zlist, zhash
- extensions to MD
- service presence
- delivery failure - mandatory, immediate
++++ Rust-based Reliability (Titanic Pattern)
- Majordomo + rust
Once you realize that the Paranoid Pirate queue is basically a message broker, you might be tempted to add rust-based reliability to it. After all, this works for all the enterprise messaging systems. It's such a tempting idea that it's a little sad to have to be negative. But that's one of my specialties. So, reasons you don't want rust-based brokers sitting in the center of your architecture:
* As you've seen, the Lazy Pirate client performs surprisingly well. It works across a whole range of architectures, from direct client-to-server to distributed queue devices. It does assume that workers are stateless and idempotent (see below). But we can work around that limitation without resorting to rust.
* Rust brings a whole set of problems, from slow performance to additional pieces to have to manage, repair, and create 6am panics as they inevitably break at the start of trading. The beauty of the Pirate queues we saw is their simplicity. They won't crash. And if you're still worried about the hardware, you can move to a peer-to-peer pattern that has no broker at all. I'll explain later in this chapter.
There is one sane use case for rust-based reliability, which is asynchronous fire-and-forget. This pattern, which I'll just sketch, works as follows:
* Clients use durable sockets to talk to a broker.
* They use a request-reply dialog to send requests to the broker, which accepts them and stores them on disk.
* When the broker has confirmed receipt of a request, this means it's stored on disk, safely.
* The broker then looks for workers to process the request, and it does this over time as fast as it can.
* Replies from workers are in the same way saved to disk, with an acknowledgement from the broker to the worker that the reply was received.
* When clients reconnect (or immediately if they stay connected), they receive these replies, and they confirm to the broker that they've received them, again with a request-reply handshake.
* The broker erases requests and replies only when they've been processed.
This model makes sense when clients and workers come and go. It solves the major problem with the Pirate pattern, namely that a client waits for an answer in realtime. When clients and workers are randomly connected like this, raw performance is not a big concern: it's far more important to just never lose messages. Some people will argue that "just never lose messages" is a use case by itself, but if clients and workers are connected, you don't need rust to do that: Pirate can work, as we've demonstrated.
++++ Idempotency
Idempotency is not something to take a pill for. What it means is that it's safe to repeat an operation. While many client-to-server use cases are idempotent, some are not. Examples of idempotent use cases are:
* Stateless task distribution, i.e. a collapsed pipeline where the client is both ventilator and sink, and the servers are stateless workers that compute a reply based purely on the state provided by a request. In such a case it's safe (though inefficient) to execute the same request many times.
* A name service that translates logical addresses into endpoints to bind or connect to. In such a case it's safe to make the same lookup request many times.
And here are examples of a non-idempotent use cases:
* A logging service. One does not want the same log information recorded more than once.
* Any service that has impact on downstream nodes, e.g. sends on information to other nodes. If that service gets the same request more than once, downstream nodes will get duplicate information.
* Any service that modifies shared data in some non-idempotent way. E.g. a service that debits a bank account is definitely not idempotent.
When our server is not idempotent, we have to think more carefully about when exactly a server might crash. If it dies when it's idle, or while it's processing a request, that's usually fine. We can use database transactions to make sure a debit and a credit are always done together, if at all. If the server dies while sending its reply, that's a problem, because as far as its concerned, it's done its work.
if the network dies just as the reply is making its way back to the client, the same problem arises. The client will think the server died, will resend the request, and the server will do the same work twice. Which is not what we want.
We use the common solution of detecting and rejecting duplicate requests. This means:
* The client must stamp every request with a unique client identifier and a unique message number.
* The server, before sending back a reply, stores it using the client id + message number as a key.
* The server, when getting a request from a given client, first checks if it has a reply for that client id + message number. If so, it does not process the request but just resends the reply.
++++ Distributed Reliable Client-Server (Freelance Pattern)
- heartbeating from clients outwards
- ZMQ_ROOT service
We've seen how to make a reliable client-server architecture using a queue device (essentially a broker) in the middle. We've discussed the advantages of a central broker a few times. The biggest pro is that workers can come and go silently, there is just one 'stable' node on the network.
But for many cases this isn't worth the hassle of an extra device. If you are not managing a pool of anonymous workers, but want to make the intelligence explicitly addressable, then a broker is an extra step for little gain.
Again, this is a matter of taste. Some architects will swear by a broker. Others hate them. Let's take a real example and see how this plays. Say we want a name service (we do, we do!) that translates logical names (like "authentication") into physical network addresses (like "tcp://192.168.55.121:5051").
Without a broker, every application needs to know the address of the name service. It can then talk to the name service (using a Pirate pattern) to translate logical names into endpoints as needed. Fair enough.
With a broker, every application needs to know the address of the broker. The broker hopefully supports some kind of service-based routing. So clients can then send a request to the "name lookup service" and this will be routed to
Lastly...
* Multiple clients talking to multiple servers with no intermediary devices. Use case: distributed services such as name resolution. Types of failure we aim to handle: service crashes and restarts, service busy looping, service overload, network disconnects.
- N clients to N servers
- move queue logic into client-side class
- ditto for server, make framework
- talk to it via inproc...
Handshaking at Startup
We must use XREP-to-XREP sockets because we want to connect N clients to N servers without (necessarily) an intermediary queue device.
In an XREP-to-XREP socket connection, one side of the connection must know the identity of the other. You cannot do xrep-to-xrep flows between two anonymous sockets since an XREP socket requires an explicit identity. In practice this means we will need a name service share the identities of the servers. The client will connect to the server, then send it a message using the server's known identity as address, and then the server can respond to the client.
In this prototype we'll use fixed, hardcoded identities for the servers. We'll develop the name service in a later prototype.
Pool management
* If there is just one server in the pool, the we wait with a timeout for the server to reply. If the server does not reply within the timeout, we retry a number of times before abandoning.
* If there are multiple servers in the pool, we try each server in succession, but do not retry the same server twice.
* If a server appears to be really dead (i.e. has not responded for some time), we remove it from the pool.