Permalink
Browse files

Work on Clone Server 4

  • Loading branch information...
1 parent 173ce3f commit c27ede29fda1877ac6f839892311c046568e1651 @hintjens hintjens committed Apr 11, 2011
View
@@ -7,7 +7,7 @@
.output chapter1.wd
Written by Pieter Hintjens <ph@imatix.com>, CEO iMatix Corporation. Thanks to Bill Desmarais, Brian Dorsey, CAF, Daniel Lin, Eric Desgranges, Gonzalo Diethelm, Guido Goldstein, Hunter Ford, Kamil Shakirov, Martin Sustrik, Mike Castleman, Naveen Chawla, Nicola Peduzzi, Oliver Smith, Olivier Chamoux, Peter Alexander, Pierre Rouleau, Randy Dryburgh, John Unwin, Alex Thomas, Mihail Minkov, Jeremy Avnet, Michael Compton, Kamil Kisiel, Mark Kharitonov, Guillaume Aubert, Ian Barber, Mike Sheridan, Faruk Akgul, Oleg Sidorov, Lev Givon, Allister MacLeod, Alexander D'Archangel, Andreas Hoelzlwimmer, Han Holl, Robert G. Jakabosky, Felipe Cruz, and Zed Shaw for their contributions, and to Stathis Sideris for [http://www.ditaa.org Ditaa].
-Please use the [$(GIT)/issues issue tracker] for all comments and errata. This version covers the latest stable release 0MQ 2.1.3 and was published on &date("ddd d mmmm, yyyy").
+Please use the [$(GIT)/issues issue tracker] for all comments and errata. This version covers the latest stable release 0MQ/2.1.3 and was published on &date("ddd d mmmm, yyyy").
++ Chapter One - Basic Stuff
@@ -494,9 +494,9 @@ worker_thread (void *arg) {
In the end, the problem was that the application was passing sockets between threads, which crashed weirdly. It became legal behavior in 0MQ/2.1, but remains dangerous and something we advise against doing.
-+++ 0MQ 2.1
++++ 0MQ/2.1
-History tells us that 0MQ 2.0 is when low-latency distributed messaging crawled out of the primeval mud, shook off a heavy coat of buzzwords and enterprise jargon, and reached its branches up to the sky, as if to cry, "no limits!". We've been using this stable branch since it spawned 0MQ 2.0.8 during the hot days of August, 2010.
+History tells us that 0MQ/2.0 is when low-latency distributed messaging crawled out of the primeval mud, shook off a heavy coat of buzzwords and enterprise jargon, and reached its branches up to the sky, as if to cry, "no limits!". We've been using this stable branch since it spawned 0MQ/2.0.8 during the hot days of August, 2010.
But times change, and what was cool in 2010 is no longer //a la mode// in 2011. The 0MQ developers and community have been frantically busy redefining messaging chic, and anyone who's anyone knows that 2.1 is the new stable.
@@ -513,6 +513,8 @@ zmq_setsockopt (mysocket, ZMQ_LINGER, &zero, sizeof (zero));
* In 2.0, zmq_poll[3] would return arbitrarily early, so you could not use it as a timer. We would work around this with a loop checked how much time was left, and called zmq_poll again as needed. In 2.1, zmq_poll properly waits for the full timeout if there are no events.
+* In 2.0, 0MQ would ignore interrupted system calls, which meant that no libzmq call would ever return EINTR if a signal was received during its operation. This caused problems with loss of signals such as SIGINT (Ctrl-C handling), especially for language runtimes. In 2.1, any blocking call such as zmq_recv[3] will return EINTR if it is interrupted by a signal.
+
+++ Getting the Context Right
0MQ applications always start by creating a //context//, and then using that for creating sockets. In C, it's the zmq_init[3] call. You should create and use exactly one context in your process. Technically, the context is the container for all sockets in a single process, and acts as the transport for {{inproc}} sockets, which are the fastest way to connect threads in one process. If at runtime a process has two contexts, these are like separate 0MQ instances. If that's explicitly what you want, OK, but otherwise remember:
View
@@ -957,27 +957,26 @@ Binary Star is useful and generic enough to package up as a reusable reactor cla
[[code language="C"]]
// Create a new Binary Star instance, using local (bind) and
// remote (connect) endpoints to set-up the server peering.
-bstar_t *
- bstar_new (int primary, char *local, char *remote);
+bstar_t *bstar_new (int primary, char *local, char *remote);
// Destroy a Binary Star instance
-void
- bstar_destroy (bstar_t **self_p);
+void bstar_destroy (bstar_t **self_p);
// Return underlying zloop reactor, for timer and reader
// registration and cancelation.
-zloop_t *
- bstar_zloop (bstar_t *self);
+zloop_t *bstar_zloop (bstar_t *self);
// Register voting reader
-int
- bstar_voter (bstar_t *self, char *endpoint, int type,
+int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);
+// Register main state change handlers
+void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
+void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
+
// Start the reactor, ends if a callback function returns -1, or the
// process received SIGINT or SIGTERM.
-int
- bstar_start (bstar_t *self);
+int bstar_start (bstar_t *self);
[[/code]]
And here is the class implementation:
View
@@ -393,7 +393,9 @@ As you play with model three, you'll stop and restart the server. It might look
Let's list the failures we want to be able to handle:
* Clone server process crashes and is automatically or manually restarted. The process loses its state and has to get it back from somewhere.
+
* Clone server machine dies and is off-line for a significant time. Clients have to switch to an alternate server somewhere.
+
* Clone server process or machine gets disconnected from the network, e.g. a switch dies. It may come back at some point, but in the meantime clients need an alternate server.
Our first step is to add a second server. We can use the Binary Star pattern from Chapter four to organize these into primary and backup.
@@ -414,6 +416,61 @@ So, Model Four introduces these changes over Model Three:
* The backup server keeps a "pending list" of updates that it has received from clients, but not yet from the primary server. The list is ordered from oldest to newest, so that it is easy to remove updates off the head.
+Voting is kind of delicate. If clients randomly try both servers, this will break the Binary Star three-way voting model, and trigger split-brain, if the network between the servers has a glitch. It's a rare error case but there's no point aiming for resilience if we don't handle obvious errors.
+
+So, clients must work as follows:
+
+* After opening and connecting their sockets, they wait for their first server to respond with a state snapshot. Until that happens, they stubbornly stay where they are.
+
+* When that server responds, the client treats it as active, and works with it until or unless it disappears.
+
+* When the current server disappears, the client then moves onto the next server, and continues like this, wrapping back around to the first server afterwards.
+
+We can draw this as a finite state machine, with events in caps, and where the client switches between reading its snapshot socket (in the Waiting and Syncing states), and its subscriber socket (in the Active state):
+
+[[code type="textdiagram"]]
+
+ +-----------+
+ | |<----------------------\
+ | Initial |<-------------------\ |
+ | | | |
+ +-----+-----+ | |
+ Request|snapshot | |
+ | | |
+ | /----------------\ | |
+ | | | | |
+ v v | | |
+ +-----------+ | | |
+ | | | | |
+ | Waiting +-SILENCE------/ | |
+ | | | |
+ +-----+-----+ | |
+ INPUT | |
+ Store|snapshot | |
+ | /----------------\ | |
+ | | | | |
+ v v | | |
+ +-----------+ | | |
+ | +-INPUT--------/ | |
+ | Syncing | Store snapshot | |
+ | | | |
+ | +-SILENCE------------/ |
+ +-----+-----+ Failover to next |
+ KTHXBAI |
+ | /----------------\ |
+ | | | |
+ v v | |
+ +-----------+ | |
+ | +-INPUT--------/ |
+ | Active | Store update |
+ | | |
+ | +-SILENCE---------------/
+ +-----------+ Failover to next
+
+
+ Figure # - Clone client FSM
+[[/code]]
+
Failover happens as follows:
* The client detects that primary server is no longer sending heartbeats, so has died. The client connects to the backup server and requests a new state snapshot.
@@ -463,7 +520,7 @@ So here is our high-availability server pair, using the Binary Star pattern:
Figure # - High-availability Clone Server Pair
[[/code]]
-As a first step to building this, we're going to refactor the client as a reusable class. This is partly for fun (writing asynchronous classes with 0MQ is like an exercise in elegance), but mainly because we want Clone to be really easy to plug-in to random applications. When we start to handle failover in clients, it does get a little complex (imagine mixing a Freelance client with a Clone client). So, reusability ahoy!
+As a first step to building this, we're going to refactor the client as a reusable class. This is partly for fun (writing asynchronous classes with 0MQ is like an exercise in elegance), but mainly because we want Clone to be really easy to plug-in to random applications. Since resilience depends on clients behaving correctly, it's much easier to guarantee this when there's a reusable client API. When we start to handle failover in clients, it does get a little complex (imagine mixing a Freelance client with a Clone client). So, reusability ahoy!
My usual design approach is to first design an API that feels right, then to implement that. So, we start by taking the clone client, and rewriting it to sit on top of some presumed class API called **clone**. Turning random code into an API means defining a reasonably stable and abstract contract with applications. For example, in Model Three, the client opened three separate sockets to the server, using endpoints that were hard-coded in the source. We could make an API with three methods, like this:
@@ -504,16 +561,11 @@ The frontend class talks to the agent class over an {{inproc}} 'pipe' socket. In
Without 0MQ, this kind of asynchronous class design would be weeks of really hard work. With 0MQ, it was a day or two of work. The results are kind of complex, given the simplicity of the Clone protocol it's actually running. There are some reasons for this. We could turn this into a reactor, but that'd make it harder to use in applications. So the API looks a bit like a key-value table that magically talks to some servers:
[[code language="C"]]
-clone_t *
- clone_new (void);
-void
- clone_destroy (clone_t **self_p);
-void
- clone_connect (clone_t *self, char *address, char *service);
-void
- clone_set (clone_t *self, char *key, char *value);
-char *
- clone_get (clone_t *self, char *key);
+clone_t *clone_new (void);
+void clone_destroy (clone_t **self_p);
+void clone_connect (clone_t *self, char *address, char *service);
+void clone_set (clone_t *self, char *key, char *value);
+char *clone_get (clone_t *self, char *key);
[[/code]]
So here is Model Four of the clone client, which has now become just a thin shell using the clone class:
View
@@ -31,7 +31,7 @@ client_task (void *args)
// Tick once per second, pulling in arriving messages
int centitick;
for (centitick = 0; centitick < 100; centitick++) {
- zmq_poll (items, 1, 10000);
+ zmq_poll (items, 1, 10 * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (client);
zframe_print (zmsg_last (msg), identity);
View
@@ -59,8 +59,10 @@ struct _bstar_t {
int64_t peer_expiry; // When peer is considered 'dead'
zloop_fn *voter_fn; // Voting socket handler
void *voter_arg; // Arguments for voting handler
- zloop_fn *failover_fn; // Failover event handler
- void *failover_arg; // Arguments for failover handler
+ zloop_fn *master_fn; // Call when become master
+ void *master_arg; // Arguments for handler
+ zloop_fn *slave_fn; // Call when become slave
+ void *slave_arg; // Arguments for handler
};
@@ -83,6 +85,8 @@ s_execute_fsm (bstar_t *self)
if (self->event == PEER_ACTIVE) {
printf ("I: connected to backup (master), ready as slave\n");
self->state = STATE_PASSIVE;
+ if (self->slave_fn)
+ (self->slave_fn) (self->loop, NULL, self->slave_arg);
}
}
else
@@ -92,14 +96,19 @@ s_execute_fsm (bstar_t *self)
if (self->event == PEER_ACTIVE) {
printf ("I: connected to primary (master), ready as slave\n");
self->state = STATE_PASSIVE;
+ if (self->slave_fn)
+ (self->slave_fn) (self->loop, NULL, self->slave_arg);
}
else
- if (self->event == CLIENT_REQUEST)
+ if (self->event == CLIENT_REQUEST) {
rc = -1;
+ puts ("NOT VALID IN STATE BACKUP");
+ }
}
else
// Server is active
// Accepts CLIENT_REQUEST events in this state
+ // The only way out of ACTIVE is death
if (self->state == STATE_ACTIVE) {
if (self->event == PEER_ACTIVE) {
// Two masters would mean split-brain
@@ -137,14 +146,16 @@ s_execute_fsm (bstar_t *self)
// If peer is dead, switch to the active state
printf ("I: failover successful, ready as master\n");
self->state = STATE_ACTIVE;
- if (self->failover_fn)
- (self->failover_fn) (self->loop,
- NULL, self->failover_arg);
}
- else
+ else {
// If peer is alive, reject connections
+ puts ("NOT VALID WHEN PEER IS ALIVE");
rc = -1;
+ }
}
+ // Call state change handler if necessary
+ if (self->state == STATE_ACTIVE && self->master_fn)
+ (self->master_fn) (self->loop, NULL, self->master_arg);
}
return rc;
}
@@ -176,10 +187,18 @@ int zstr_recv_state (zloop_t *loop, void *socket, void *arg)
int s_voter_ready (zloop_t *loop, void *socket, void *arg)
{
bstar_t *self = (bstar_t *) arg;
+puts ("REQUEST FROM VOTER");
// If server can accept input now, call appl handler
self->event = CLIENT_REQUEST;
if (s_execute_fsm (self) == 0)
(self->voter_fn) (self->loop, socket, self->voter_arg);
+ else {
+ puts ("REJECTED...");
+ // Destroy waiting message, no-one to read it
+ zmsg_t *msg = zmsg_recv (socket);
+ zmsg_dump (msg);
+ zmsg_destroy (&msg);
+ }
return 0;
}
@@ -263,14 +282,22 @@ bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
}
// ---------------------------------------------------------------------
-// Register failover handler
+// Register state change handlers
void
-bstar_failover (bstar_t *self, zloop_fn handler, void *arg)
+bstar_new_master (bstar_t *self, zloop_fn handler, void *arg)
{
- assert (!self->failover_fn);
- self->failover_fn = handler;
- self->failover_arg = arg;
+ assert (!self->master_fn);
+ self->master_fn = handler;
+ self->master_arg = arg;
+}
+
+void
+bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg)
+{
+ assert (!self->slave_fn);
+ self->slave_fn = handler;
+ self->slave_arg = arg;
}
@@ -284,13 +311,3 @@ bstar_start (bstar_t *self)
assert (self->voter_fn);
return zloop_start (self->loop);
}
-
-// ---------------------------------------------------------------------
-// Returns TRUE if the current server is master in the pair,
-// false if it is slave.
-
-Bool
-bstar_master (bstar_t *self)
-{
- return self->state == STATE_ACTIVE;
-}
View
@@ -41,36 +41,26 @@ typedef struct _bstar_t bstar_t;
// Create a new Binary Star instance, using local (bind) and
// remote (connect) endpoints to set-up the server peering.
-bstar_t *
- bstar_new (int primary, char *local, char *remote);
+bstar_t *bstar_new (int primary, char *local, char *remote);
// Destroy a Binary Star instance
-void
- bstar_destroy (bstar_t **self_p);
+void bstar_destroy (bstar_t **self_p);
// Return underlying zloop reactor, for timer and reader
// registration and cancelation.
-zloop_t *
- bstar_zloop (bstar_t *self);
+zloop_t *bstar_zloop (bstar_t *self);
// Register voting reader
-int
- bstar_voter (bstar_t *self, char *endpoint, int type,
+int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);
-// Register failover handler
-void
- bstar_failover (bstar_t *self, zloop_fn handler, void *arg);
+// Register main state change handlers
+void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
+void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
// Start the reactor, ends if a callback function returns -1, or the
// process received SIGINT or SIGTERM.
-int
- bstar_start (bstar_t *self);
-
-// Returns TRUE if the current server is master in the pair,
-// false if it is slave.
-Bool
- bstar_master (bstar_t *self);
+int bstar_start (bstar_t *self);
#ifdef __cplusplus
}
View
@@ -28,7 +28,7 @@ int main (void)
while (expect_reply) {
// Poll socket for a reply, with timeout
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
- int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * 1000);
+ int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
View
@@ -156,7 +156,7 @@ int main (int argc, char *argv [])
int time_left = (int) ((send_state_at - zclock_time ()));
if (time_left < 0)
time_left = 0;
- int rc = zmq_poll (items, 2, time_left * 1000);
+ int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down
Oops, something went wrong.

0 comments on commit c27ede2

Please sign in to comment.