Permalink
Browse files

Working on Clone Failover

  • Loading branch information...
1 parent c94d0fa commit 03c53626d2bfbfd81a144205e4a9ea731dd8ed4c @hintjens hintjens committed Mar 31, 2011
Showing with 542 additions and 509 deletions.
  1. +1 −0 CONTRIBUTORS
  2. +1 −1 bin/z2w
  3. +5 −5 chapter1.txt
  4. +24 −24 chapter5.txt
  5. +2 −0 examples/C/.gitignore
  6. +6 −6 examples/C/bstar.c
  7. +2 −2 examples/C/bstarsrv2.c
  8. +180 −256 examples/C/clone.c
  9. +1 −1 examples/C/clone.h
  10. +5 −6 examples/C/clonecli4.c
  11. +3 −5 examples/C/clonesrv3.c
  12. +165 −0 examples/C/clonesrv4.c
  13. +123 −79 examples/C/kvmsg.c
  14. +8 −6 examples/C/kvmsg.h
  15. +5 −58 notes.txt
  16. +11 −60 wip.txt
View
@@ -29,6 +29,7 @@ Faruk Akgul <faakgul@gmail.com> (Java)
Oleg Sidorov <i4pcbr@gmail.com> (Ruby)
Lev Givon <lev@columbia.edu> (Python)
Allister MacLeod <allister.macleod@gmail.com> (Java)
+Alexander D'Archangel <darksuji@gmail.com> (Perl)
Hoelzlwimmer Andreas <Andreas.Hoelzlwimmer@fh-hagenberg.at> (C++)
Han Holl <han.holl@pobox.com> (Ruby)
Robert G. Jakabosky <bobby@sharedrealm.com> (Lua)
View
@@ -118,7 +118,7 @@ while (<>) {
writeln ("[[cell]]");
writeln ("$indent [#$anchor $header]");
writeln ("[[/cell]]");
- writeln ("[[cell style=\"text-align:right; font-size:80%; color:#333\"]]");
+ writeln ("[[cell style=\"text-align:right; font-size:80%;\"]]");
writeln ("[#top top] [#header-$prev prev] [#header-$next next]");
writeln ("[[/cell]]");
writeln ("[[/row]]");
View
@@ -5,7 +5,7 @@
.set LIST=http://lists.zeromq.org/mailman/listinfo/zeromq-dev
.output chapter1.wd
-Written by Pieter Hintjens <ph@imatix.com>, CEO iMatix Corporation. Thanks to Bill Desmarais, Brian Dorsey, CAF, Daniel Lin, Eric Desgranges, Gonzalo Diethelm, Guido Goldstein, Hunter Ford, Kamil Shakirov, Martin Sustrik, Mike Castleman, Naveen Chawla, Nicola Peduzzi, Oliver Smith, Olivier Chamoux, Peter Alexander, Pierre Rouleau, Randy Dryburgh, John Unwin, Alex Thomas, Mihail Minkov, Jeremy Avnet, Michael Compton, Kamil Kisiel, Mark Kharitonov, Guillaume Aubert, Ian Barber, Mike Sheridan, Faruk Akgul, Oleg Sidorov, Lev Givon, Allister MacLeod, Hoelzlwimmer Andreas, Han Holl, Robert G. Jakabosky, Felipe Cruz, and Zed Shaw for their contributions, and to Stathis Sideris for [http://www.ditaa.org Ditaa].
+Written by Pieter Hintjens <ph@imatix.com>, CEO iMatix Corporation. Thanks to Bill Desmarais, Brian Dorsey, CAF, Daniel Lin, Eric Desgranges, Gonzalo Diethelm, Guido Goldstein, Hunter Ford, Kamil Shakirov, Martin Sustrik, Mike Castleman, Naveen Chawla, Nicola Peduzzi, Oliver Smith, Olivier Chamoux, Peter Alexander, Pierre Rouleau, Randy Dryburgh, John Unwin, Alex Thomas, Mihail Minkov, Jeremy Avnet, Michael Compton, Kamil Kisiel, Mark Kharitonov, Guillaume Aubert, Ian Barber, Mike Sheridan, Faruk Akgul, Oleg Sidorov, Lev Givon, Allister MacLeod, Alexander D'Archangel, Andreas Hoelzlwimmer, Han Holl, Robert G. Jakabosky, Felipe Cruz, and Zed Shaw for their contributions, and to Stathis Sideris for [http://www.ditaa.org Ditaa].
Please use the [$(GIT)/issues issue tracker] for all comments and errata. This version covers the latest stable release 0MQ 2.1.3 and was published on &date("ddd d mmmm, yyyy").
@@ -797,11 +797,11 @@ As you start to program with 0MQ you will come across one problem more than once
| | Yes | drops messages | | before not after|
| {o} | | it can't route. | | you connect. |
+--------+--------+ +-----------------+ +--------+--------+
- | No
- |
- v
+ | No
+ |
+ v
+-----------------+ +--------------------+
- | | | You probably have |
+ | | | You probably have |
| Are you losing | | a client running |
| one message +------->| in the background. |
| in two? | Yes | Kill it and start |
View
@@ -64,14 +64,14 @@ So the subscriber looks something like a queue device. We could use various sock
[[code type="textdiagram"]]
- +-----------+
- | |
- | Publisher |
+ +-----------+
+ | |
+ | Publisher |
| |
- +-----------+
- | PUB |
- \-----+-----/
- |
+ +-----------+
+ | PUB |
+ \-----+-----/
+ |
+---------------------------|---------------------------+
: | Fast box :
: v :
@@ -119,7 +119,7 @@ So let's see how to shard into two streams:
+-----+-----+
| PUB | PUB |
\--+--+--+--/
- | |
+ | |
+------------------------|--=--|------------------------+
: | | Fast box :
: v v :
@@ -241,14 +241,14 @@ And here is the client:
Some notes about this code:
-* All the hard work is done in the **kvmsg** class. This class works with key-value message objects, which are multipart 0MQ messages structured as three frames: a key (a 0MQ string), a sequence number (64-bit value, in network byte order), and a binary body (holds everything else).
+* All the hard work is done in the **kvmsg** class. This class works with key-value message objects, which are multipart 0MQ messages structured as four frames: a key (a 0MQ string), a sequence number (64-bit value, in network byte order), a binary UUID, and a binary body (holds everything else).
* The server generates messages with a randomized 4-digit key, which lets us simulate a large but not enormous hash table (10K entries).
* The server does a 200 millisecond pause after binding its socket. This is to prevent "slow joiner syndrome" where the subscriber loses messages as it connects to the server's socket. We'll remove that in later models.
* We'll use the terms 'publisher' and 'subscriber' in the code to refer to sockets. This will help later when we have multiple sockets doing different things.
Here is the kvmsg class:
-[[code type="example" title="Key-value message class " name="kvmsg"]]
+[[code type="example" title="Key-value message class" name="kvmsg"]]
[[/code]]
Both the server and client maintain hash tables, but this first model only works properly if we start all clients before the server, and the clients never crash. That's not 'reliability'.
@@ -271,8 +271,8 @@ In order to allow a late (or recovering) client to catch up with a server it has
updates +---------------\
| |
/----------------+----------------\ |
- | | | |
- | | | |
+ | | | |
+ | | | |
v v v |
/------+-----\ /------+-----\ /------+--+--\
| SUB | REQ | | SUB | REQ | | SUB | REQ |
@@ -309,7 +309,7 @@ Some notes about this code:
* The server uses two threads, for simpler design. One thread produces random updates, and the second thread handles state. The two communicate across PAIR sockets. You might like to use SUB sockets but you'd hit the "slow joiner" problem where the subscriber would randomly miss some messages while connecting. PAIR sockets let us explicitly synchronize the two threads.
* We set a HWM on the updates socket pair, since hash table insertions are relatively slow. Without this, the server runs out of memory. On {{inproc}} connections, the real HWM is the sum of the HWM of //both// sockets, so we set the HWM on each socket.
-* The client is really simple. In C, under 60 lines of code. A lot of the heavy lifting is done in the kvmsg class, but still, the Clone pattern is easier to implement than it seemed at first.
+* The client is really simple. In C, under 60 lines of code. A lot of the heavy lifting is done in the kvmsg class, but still, the basic Clone pattern is easier to implement than it seemed at first.
* We don't use anything fancy for serializing the state. The hash table holds a set of kvmsg objects, and the server sends these, as a batch of messages, to the client requesting state. If multiple clients request state at once, each will get a different snapshot.
* We assume that the client has exactly one server to talk to. The server **must** be running; we do not try to solve the question of what happens if the server crashes.
@@ -335,12 +335,12 @@ Updates from clients go via a PUSH-PULL socket flow from client to server:
\----+---+--------+--------/
| ^ ^
| | | state update
- | | \---------\
- | | state request |
+ | | \---------\
+ | | state request |
updates \------------\ |
| | |
/-----------+-------------\ | |
- | | | |
+ | | | |
| ^ ^ | | |
v | | v | |
/------+--+--+--+---\ /------+--+--+--+---\
@@ -406,7 +406,7 @@ So, Model Four introduces these changes over Model Three:
Failover happens as follows:
-* The client detects that primary server is no longer sending heartbeats, so has died. The client connects to backup server and requests a new state snapshot.
+* The client detects that primary server is no longer sending heartbeats, so has died. The client connects to the backup server and requests a new state snapshot.
* The backup server starts to receive snapshot requests from clients, and detects that primary server has gone, so takes over as primary.
@@ -461,12 +461,12 @@ My usual design approach is to first design an API that feels right, then to imp
// Specify endpoints for each socket we need
clone_subscribe (clone, "tcp://localhost:5556");
clone_snapshot (clone, "tcp://localhost:5557");
- clone_endpoints (clone, "tcp://localhost:5558");
+ clone_updates (clone, "tcp://localhost:5558");
// Times two, since we have two servers
clone_subscribe (clone, "tcp://localhost:5566");
clone_snapshot (clone, "tcp://localhost:5567");
- clone_endpoints (clone, "tcp://localhost:5568");
+ clone_updates (clone, "tcp://localhost:5568");
[[/code]]
But this is both verbose and fragile. It's not a good idea to expose the internals of a design to applications. Today, we use three sockets. Tomorrow, two, or four. Do we really want to go and change every application that uses the clone class?
@@ -479,10 +479,10 @@ So we make a small abstraction, like this:
clone_connect (clone, "tcp://localhost:5561");
[[/code]]
-Which has the advantage of simplicity (one server sits at one endpoint) but has an impact on our internal design. We now need to somehow turn that single endpoint into three endpoints. One way would be to bake the knowledge "client and server talk over three consecutive ports" into our client-server protocol. Another way would be to get the two missing endpoints from the server. We'll take the simplest way, i.e.:
+Which has the advantage of simplicity (one server sits at one endpoint) but has an impact on our internal design. We now need to somehow turn that single endpoint into three endpoints. One way would be to bake the knowledge "client and server talk over three consecutive ports" into our client-server protocol. Another way would be to get the two missing endpoints from the server. We'll take the simplest way, which is:
-* The server updates publisher (PUB) is at port P.
-* The server state router (ROUTER) is at port P + 1.
+* The server state router (ROUTER) is at port P.
+* The server updates publisher (PUB) is at port P + 1.
* The server updates subscriber (SUB) is at port P + 2.
So here is Model Four of the clone client, which has now become just a thin shell using the clone class:
@@ -504,7 +504,7 @@ This is an advanced architecture for asynchronous APIs, and usually takes weeks
++++ Clone API
-We don't need a lot of code to make a Clone client, but it's still subtle enough to be profitably hidden in a small API.
+We don't need a lot of code to make a Clone client, but it's still subtle enough to be profitably hidden in a small API.
While the code we need to implement Clone is short, it's subtle. You'd not want to implement this directly in your applications. The server is fine, this can be a generic process that doesn't include any application code. For the client, let's make an event-driven API.
@@ -519,7 +519,7 @@ Clone is complex enough in practice that you don't want to implement it directly
- two or more clients asking for same state
- ...? how
-Changes to the
+Changes to the
- clones will not have identical state, unless all started at once and cloned in parallel
- clients may send updates back to server, via pub/sub
View
@@ -75,6 +75,8 @@ clonecli2
clonesrv2
clonecli3
clonesrv3
+clonecli4
+clonesrv4
bstarsrv
bstarcli
bstarsrv2
View
@@ -50,7 +50,7 @@ typedef enum {
// Structure of our class
struct _bstar_t {
- void *context; // 0MQ context
+ void *context; // 0MQ context
void *statepub; // State publisher
void *statesub; // State subscriber
zlist_t *frontends; // List of frontends
@@ -78,15 +78,15 @@ bstar_new (int primary, char *bind_to, char *connect_to)
self->frontends = zlist_new ();
self->send_state = s_clock () + BSTAR_HEARTBEAT;
s_catch_signals ();
-
+
// We'll manage our own 0MQ context and sockets
self->context = zmq_init (1);
// Create publisher for state going to peer
self->statepub = zmq_socket (self->context, ZMQ_PUB);
int rc = zmq_bind (self->statepub, bind_to);
assert (rc == 0);
-
+
// Create subscriber for state coming from peer
self->statesub = zmq_socket (self->context, ZMQ_SUB);
zmq_setsockopt (self->statesub, ZMQ_SUBSCRIBE, "", 0);
@@ -106,7 +106,7 @@ bstar_destroy (bstar_t **self_p)
assert (self_p);
if (*self_p) {
bstar_t *self = *self_p;
-
+
// Shutdown sockets and context
int zero = 0;
zmq_setsockopt (self->statepub, ZMQ_LINGER, &zero, sizeof (zero));
@@ -141,7 +141,7 @@ bstar_listen (bstar_t *self, char *endpoint, int type)
zlist_append (self->frontends, frontend);
}
-
+
// --------------------------------------------------------------------------
// Execute finite state machine (apply event to state)
// Returns -1 if there was an exception, 0 if event was valid.
@@ -233,7 +233,7 @@ bstar_wait (bstar_t *self)
{
// Build poll set
int poll_size = zlist_size (self->frontends) + 1;
- zmq_pollitem_t *poll_set
+ zmq_pollitem_t *poll_set
= calloc (1, poll_size * sizeof (zmq_pollitem_t));
poll_set [0].socket = self->statesub;
poll_set [0].events = ZMQ_POLLIN;
View
@@ -13,7 +13,7 @@ int main (int argc, char *argv [])
bstar_t *bstar;
if (argc == 2 && streq (argv [1], "-p")) {
printf ("I: Primary master, waiting for backup (slave)\n");
- bstar = bstar_new (BSTAR_PRIMARY,
+ bstar = bstar_new (BSTAR_PRIMARY,
"tcp://*:5003", "tcp://localhost:5004");
bstar_listen (bstar, "tcp://*:5001", ZMQ_ROUTER);
}
@@ -25,7 +25,7 @@ int main (int argc, char *argv [])
bstar_listen (bstar, "tcp://*:5002", ZMQ_ROUTER);
}
else {
- printf ("Usage: bstarsrv { -p | -b }\n");
+ printf ("Usage: bstarsrvs { -p | -b }\n");
exit (0);
}
// Now handle activity from clients
Oops, something went wrong.

0 comments on commit 03c5362

Please sign in to comment.