Skip to content

Commit

Permalink
Working on Chapter 4
Browse files Browse the repository at this point in the history
  • Loading branch information
hintjens committed Feb 20, 2011
1 parent ae1e5c0 commit 72194d0
Showing 1 changed file with 74 additions and 17 deletions.
91 changes: 74 additions & 17 deletions chapter4.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

In Chapter Three we looked at advanced use of 0MQ's request-reply pattern with worked examples. In this chapter we'll look at the general question of reliability and learn how to build reliable messaging on top of 0MQ's patterns.

+++ Necessary Philosophy
+++ Design Discussions

++++ What is "Reliability"?

Expand Down Expand Up @@ -34,7 +34,11 @@ Handling slow clients correctly is delicate. On the one hand, you really don't w

* For cases where network congestion or client disconnection stops the sender getting rid of messages, 0MQ offers a "high water mark" that limits the size of a given socket queue. Since each use case has its own needs, 0MQ lets you set this per outgoing socket. When a queue hits the HWM, messages are just dropped. It's brutal, but there is no other sane strategy.

++++ Pattern Reliability
++++ Disk-based Reliability

You can, and people do, use spinning rust to store messages. It rather makes a mess of the idea of "performance" but we're usually more comfortable knowing a really important message (such as that transfer of $400M to my Cyprus account) is stored on disk rather than only in memory. Spinning rust only makes sense for some patterns, mainly request-reply. If we get bored in this chapter we'll play with that, but otherwise, just shove really critical messages into a database that all parties can access, and skip 0MQ for those parts of your dialog.

+++ Reliability Patterns

So to make things brutally simple, reliability is "keeping things working properly when code freezes or crashes", a situation we'll shorten to "dies". However the things we want to keep working properly are more complex than just messages. We need to take each 0MQ messaging pattern and see how to make it work (if we can) even when code dies.

Expand All @@ -46,11 +50,7 @@ Let's take them one by one:

* Pipeline: if a worker dies (while working), the ventilator doesn't know about it. Pipelines, like pubsub, and the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn't get done, and send a message back to the ventilator saying, "hey, resend task 324!" If the ventilator or collector die, then whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It's not elegant but system code should really not die often enough to matter.

++++ Disk-based Reliability

You can, and people do, use spinning rust to store messages. It rather makes a mess of the idea of "performance" but we're usually more comfortable knowing a really important message (such as that transfer of $400M to my Cyprus account) is stored on disk rather than only in memory. Spinning rust only makes sense for some patterns, mainly request-reply. If we get bored in this chapter we'll play with that, but otherwise, just shove really critical messages into a database that all parties can access, and skip 0MQ for those parts of your dialog.

+++ Reliable Request-Reply - the Pirate Pattern
++++ Reliable Request-Reply - the Pirate Pattern

The basic request-reply pattern (a REQ socket talking to a REP socket) fails miserably if the server or network dies, so it's basically unreliable. By default the REQ socket blocks on a recv, meaning the client just waits forever. If there's a problem down the line, too bad. I'd probably only use the basic request-reply pattern between two threads in the same process where there's no network or separate server process to die.

Expand Down Expand Up @@ -96,25 +96,82 @@ We use the standard solution of detecting and rejecting duplicate requests. This

The final touch to a robust Pirate pattern is server heartbeating. This means getting the server to say "hello" every so often even when it's not doing any work. The smoothest design is where the client pings the server, which pongs back. We don't need to send heartbeats to a working server, only one that's idle. Knowing when an idle server has died means we don't uselessly send requests to dead servers, which improves response time in those cases.

We'll explore Pirates with running code later. For now, let's focus our attention on another
We'll explore Pirates with running code later. For now, let's continue the design discussion by looking at reliable pubsub.

+++ Reliable Publish-Subscribe - the Cache Pattern
++++ Reliable Publish-Subscribe - the Clone Pattern

Pubsub is inherently unreliable. In fact it's so easy to lose messages with this pattern that you might wonder why 0MQ bothers to implement it at all. :-)
Pubsub is inherently unreliable. In fact it's so easy to lose messages with this pattern that you might wonder why 0MQ bothers to implement it at all. :-) That's a rhetorical question. There are many cases where simplicity and speed are more important than reliable delivery. In fact this covers perhaps the majority of information distribution in the real world.

However, reliable pubsub is also a useful tool. Let's do as before and define what that 'reliability' means in terms of what can go wrong.

The basic request-reply pattern (a REQ socket talking to a REP socket) fails miserably if the server or network dies, so it's basically unreliable. By default the REQ socket blocks on a recv, meaning the client just waits forever. If there's a problem down the line, too bad. I'd probably only use the basic request-reply pattern between two threads in the same process where there's no network or separate server process to die.
Happens all the time:

* Subscribers join late, so miss messages the server already sent.
* Subscriber connections take a non-zero time, and can lose messages during that time.

Happens exceptionally:

* Subscribers can crash, and restart, and lose whatever data they already received.
* Subscribers can fetch messages too slowly, so queues build up and then overflow.
* Networks can become overloaded and drop data (specifically, for PGM).
* Networks can become too slow, so publisher-side queues overflow.

A lot more can go wrong but these are the typical failures we see in a realistic system. The difficulty in defining 'reliability' now is that we have no idea, at the messaging level, what the application actually does with its data. So we need a generic model that we can implement once, and then use for a wide range of applications.

What we'll design is a simple *shared key-value cache* that stores a set of blobs indexed by unique keys. Don't confuse this with *distributed hash tables*, which solve the wider problem of connecting peers in a distributed network, or with *distributed key-value tables*, which act like non-SQL databases. All we will build is a system that reliably clones some in-memory state from a server to a set of clients. We want to:

* Let a client join the network at any time, and reliably get the current server state.
* Let any client update the key-value cache (inserting new key-value pairs, updating existing ones, or deleting them).
* Reliably propagates changes to all clients, and does this with minimum latency overhead.
* Handle very large numbers of clients, e.g. tens of thousands or more.

The key aspect of the Clone pattern is that clients talk back to servers, which is more than we do in a simple pub-sub dialog. This is why I use the terms 'server' and 'client' instead of 'publisher' and 'subscriber'. We'll use pubsub as part of the Clone pattern but it is more than that.

When a client joins the network, it subscribes a SUB socket, as we'd expect, to the data stream coming from the server (the publisher). This goes across some pub-sub topology (a multicast bus, perhaps, or a tree of forwarder devices, or direct client-to-server connections).

At some undetermined point, it will start getting messages from the server. Note that we can't predict what the client will receive as its first message. If a zmq_connect[3] call takes 10msec, and in that time the server has sent 100 messages, the client might get messages starting from the 100th message.

Let's define a message as a key-value pair. The semantics are simple: if the value is provided, it's an insert or update operation. If there is no value, it's a delete operation. The key provides the subscription filter, so clients can treat the cache as a tree, and select whatever branches of the tree they want to hold.

The client now connects to the server using a different socket (a REQ socket) and asks for a snapshot of the cache. It tells the server two things: which message it received (which means the server has to number messages), and which branch or branches of the cache it wants. To keep things simple we'll assume that any client has exactly one server that it talks to, and gets its cache from. The server *must* be running; we do not try to solve the question of what happens if the server crashes (that's left as an exercise for you to hurt your brain over).

The server builds a snapshot and sends that to the client's REQ socket. This can take some time, especially if the cache is large. The client continues to receive updates from the server on its SUB socket, which it queues but does not process. We'll assume these updates fit into memory. At some point it gets the snapshot on its REQ socket. It then applies the updates to that snapshot, which gives it a working cache.

You'll perhaps see one difficulty here. If the client asks for a snapshot based on message 100, how does the server provide this? After all, it may have sent out lots of updates in the meantime. We solve this by cheating gracefully. The server just sends its current snapshot, but tells the client what its latest message number is. Say that's 200. The client gets the snapshot, and in its queue, it has messages 100 to 300. It throws out 100 to 200, and starts applying 201 to 300 to the snapshot.

Once the client has happily gotten its cache, it disconnects from the server (destroys that REQ socket), which is not used for anything more.

How does Clone handle updates from clients? There are several options but the simplest seems to be that each client acts as a publisher back to the server, which subscribes. In a TCP network this will mean persistent connections between clients and servers. In a PGM network this will mean using a shared multicast bus that clients write to, and the server listens to.

So the client, at startup, opens a PUB socket and part of its initial request to the server includes the address of that socket, so the server can open a SUB socket and connect back to it.

Why don't we allow clients to publish updates directly to other clients? While this would reduce latency, it makes it impossible to sequence messages. Updates *must* pass through the server to make sense to other clients. There's a more subtle second reason. In many applications it's important that updates have a single order, across many clients. Forcing all updates through the server ensures that they have the same order when they finally get to clients.

With unique sequencing, clients can detect the nastier failures - network congestion and queue overflow. If a client discovers that its incoming message stream has a hole, it can take action. It seems sensible that the client contact the server and ask for the missing messages, but in practice that isn't useful. If there are holes, adding more stress to the network will make things worse. All the client can really do is warn its users "Unable to continue", and stop, and not restart until someone has manually checked the cause of the problem.

Clone is complex enough in practice that you don't want to implement it directly in your applications. Instead, it makes a good basis for an application server framework, which talks to applications via the key-value table.

++++ Reliable Pipeline - the Harmony Pattern

0MQ's pipeline pattern (using PUSH and PULL sockets) is reliable to the extent that:

We'll look at the common question of getting a consistent state across a set of subscribers that come and go at any time. We will build a simple distributed cache:
* Workers and collectors don't crash;
* Workers and collecters read their data fast enough to avoid queue overflows.

* We have a set of peers that each hold a copy of the cache
* Any peer can update its copy of the cache at any time
* Updates are published to all other peers
* A peer can join the network at any time, and get the cache state
As with all our reliability patterns, we'll ignore what happens if an upstream node (the ventilator for a pipeline pattern) dies. In practice a ventilator will be the client of another reliability pattern, e.g. Clone.

I'll leave the design for later. We'll need a forwarder to act as stable location in the network. Should be fun!
The Harmony pattern takes pipeline and makes it robust against the only failure we can reasonably handle, namely workers and (less commonly) collectors that crash and lose messages or work.

- assume workers are idempotent
- assume batch size is known in advance (because...)
- assume memory enough to hold full batch
- batch: start (address of collector), tasks, end
- messages numbered 0 upwards inside batch
- assume multiple ventilators for same cluster
- assume collector talks to ventilator, (not same to allow walk-up-and use by ventilators)
- call ventilator the 'client'
- if task missing, resend
- if end of batch missing, resend from last response



Expand Down

0 comments on commit 72194d0

Please sign in to comment.