Permalink
Browse files

Further refinement of examples for book

  • Loading branch information...
1 parent fd0f502 commit 151bb70b1421915bc9353e9feb650d535488ec99 @hintjens hintjens committed May 5, 2012
Showing with 170 additions and 186 deletions.
  1. +4 −3 bin/mkbook
  2. +1 −1 chapter3.txt
  3. +32 −32 chapter4.txt
  4. +4 −4 chapter5.txt
  5. +1 −3 examples/C/bstar.c
  6. +44 −39 examples/C/clone.c
  7. +36 −40 examples/C/clonesrv5.c
  8. +48 −61 examples/C/clonesrv6.c
  9. +0 −1 examples/C/flcliapi.h
  10. +0 −1 examples/C/kvmsg.h
  11. +0 −1 examples/C/kvsimple.h
View
@@ -13,7 +13,7 @@ require 'sflcvdp.pl'; # SFL date picture formatting
use Digest::SHA1 qw(sha1_hex);
# Listings longer than this are truncated and turned into URLs
-$cutoff = 64;
+$cutoff = 60;
@languages = ('C', 'C++', 'PHP', 'Python');
%extension = ('C' => 'c', 'C++' => 'cpp', 'PHP' => 'php', 'Python' => 'py' );
@@ -110,13 +110,14 @@ while (<>) {
$start = 0;
while (<EXAMPLE>) {
chop;
- if (/\/\/ \.split/) {
+ if (/\/\/ \.split\s(.*)/) {
# Long code example, split into separate pieces
writeln ("</programlisting>\n");
writeln ("</example>");
printf ("$filename: $lines lines ($start)\n") if $lines > $cutoff;
$text = "";
$start = $.;
+ $comment = $1;
while (<EXAMPLE>) {
chop while /\s$/;
last unless /\/\/ /; # End at any non-comment line
@@ -125,7 +126,7 @@ while (<>) {
$split++;
writeln ("<para>$text\n</para>\n");
writeln ("<example id=\"$name-$ext-$split\">");
- writeln ("<title>$title ($name.$ext) - continued</title>");
+ writeln ("<title>$title ($name.$ext) - $1</title>");
writeln ("<programlisting language=\"$lexer\">");
$lines = 0;
}
View
@@ -953,7 +953,7 @@ Now we scale this out to more than one cluster. Each cluster has a set of client
The question is: how do we get the clients of each cluster talking to the workers of the other cluster? There are a few possibilities, each with pros and cons:
-* Clients could connect directly to both brokers. The advantage is that we don't need to modify brokers or workers. But clients get more complex, and become aware of the overall topology. If we want to add, e.g. a third or forth cluster, all the clients are affected. In effect we have to move routing and failover logic into the clients and that's not nice.
+* Clients could connect directly to both brokers. The advantage is that we don't need to modify brokers or workers. But clients get more complex, and become aware of the overall topology. If we want to add, e.g. a third or forth cluster, all the clients are affected. In effect we have to move routing and fail-over logic into the clients and that's not nice.
* Workers might connect directly to both brokers. But mama workers can't do that, they can only reply to one broker. We might use papas but papas don't give us customizable broker-to-worker routing like LRU, only the built-in load balancing. That's a fail, if we want to distribute work to idle workers: we precisely need LRU. One solution would be to use router sockets for the worker nodes. Let's label this "Idea #1".
View

Large diffs are not rendered by default.

Oops, something went wrong.
View
@@ -364,9 +364,9 @@ And here is the client:
Some notes about this code:
-* The server has collapsed to one thread, which collects updates from clients and redistributes them. It manages a PULL socket for incoming updates, a ROUTER socket for state requests, and a PUB socket for outgoing updates.
+* The server has collapsed to a single task. It manages a PULL socket for incoming updates, a ROUTER socket for state requests, and a PUB socket for outgoing updates.
-* The client uses a simple tickless timer to send a random update to the server once a second. In reality, updates would be driven by application code.
+* The client uses a simple tickless timer to send a random update to the server once a second. In a real implementation we would drive updates from application code.
++++ Clone Subtrees
@@ -421,7 +421,7 @@ The Model Five server has totally changed. Instead of a poll loop, we're now usi
++++ Clone Server Reliability
-Clone models one to five are relatively simple. We're now going to get into unpleasantly complex territory here that has me getting up for another espresso. You should appreciate that making "reliable" messaging is complex enough that you always need to ask, "do we actually need this?" before jumping into it. If you can get away with unreliable, or "good enough" reliability, you can make a huge win in terms of cost and complexity. Sure, you may lose some data now and then. It is often a good trade-off. Having said, that, and since the espresso is really good, let's jump in!
+Clone models one to five are relatively simple. We're now going to get into unpleasantly complex territory here that has me getting up for another espresso. You should appreciate that making "reliable" messaging is complex enough that you always need to ask, "do we actually need this?" before jumping into it. If you can get away with unreliable, or "good enough" reliability, you can make a huge win in terms of cost and complexity. Sure, you may lose some data now and then. It is often a good trade-off. Having said, that, and (sips) since the espresso is really good, let's jump in!
As you play with model three, you'll stop and restart the server. It might look like it recovers, but of course it's applying updates to an empty state, instead of the proper current state. Any new client joining the network will get just the latest updates, instead of all of them. So let's work out a design for making Clone work despite server failures.
@@ -599,7 +599,7 @@ Finally, here is the sixth and last model of the clone server:
[[code type="example" title="Clone server, Model Six" name="clonesrv6"]]
[[/code]]
-This main program is only a few hundred lines of code, but it took some time to get working. To be accurate, building Model Six was a bitch, and took about a full week of "sweet god, this is just too complex for the Guide" hacking. We've assembled pretty much everything and the kitchen sink into this small application. We have failover, ephemeral values, subtrees, and so on. What surprised me was that the upfront design was pretty accurate. But the details of writing and debugging so many socket flows is something special. Here's how I made this work:
+This main program is only a few hundred lines of code, but it took some time to get working. To be accurate, building Model Six took about a full week of "sweet god, this is just too complex for the Guide" hacking. We've assembled pretty much everything and the kitchen sink into this small application. We have fail-over, ephemeral values, subtrees, and so on. What surprised me was that the upfront design was pretty accurate. But the details of writing and debugging so many socket flows is something special. Here's how I made this work:
* By using reactors (bstar, on top of zloop), which remove a lot of grunt-work from the code, and leave what remains simpler and more obvious. The whole server runs as one thread, so there's no inter-thread weirdness going on. Just pass a structure pointer ('self') around to all handlers, which can do their thing happily. One nice side-effect of using reactors is that code, being less tightly integrated into a poll loop, is much easier to reuse. Large chunks of Model Six are taken from Model Five.
View
@@ -191,10 +191,8 @@ int s_voter_ready (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
bstar_t *self = (bstar_t *) arg;
// If server can accept input now, call appl handler
self->event = CLIENT_REQUEST;
- if (s_execute_fsm (self) == 0) {
- puts ("CLIENT REQUEST");
+ if (s_execute_fsm (self) == 0)
(self->voter_fn) (self->loop, poller, self->voter_arg);
- }
else {
// Destroy waiting message, no-one to read it
zmsg_t *msg = zmsg_recv (poller->socket);
View
@@ -6,10 +6,6 @@
// If no server replies within this time, abandon request
#define GLOBAL_TIMEOUT 4000 // msecs
-// Server considered dead if silent for this long
-#define SERVER_TTL 5000 // msecs
-// Number of servers we will talk to
-#define SERVER_MAX 2
// =====================================================================
@@ -26,9 +22,8 @@ struct _clone_t {
// This is the thread that handles our real clone class
static void clone_agent (void *args, zctx_t *ctx, void *pipe);
-
-// ---------------------------------------------------------------------
-// Constructor
+// .split constructor and destructor
+// Constructor and destructor for the clone class:
clone_t *
clone_new (void)
@@ -42,9 +37,6 @@ clone_new (void)
return self;
}
-// ---------------------------------------------------------------------
-// Destructor
-
void
clone_destroy (clone_t **self_p)
{
@@ -57,9 +49,9 @@ clone_destroy (clone_t **self_p)
}
}
-// ---------------------------------------------------------------------
-// Specify subtree for snapshot and updates, do before connect
-// Sends [SUBTREE][subtree] to the agent
+// .split subtree method
+// Specify subtree for snapshot and updates, do before connect.
+// Sends [SUBTREE][subtree] to the agent:
void clone_subtree (clone_t *self, char *subtree)
{
@@ -70,9 +62,9 @@ void clone_subtree (clone_t *self, char *subtree)
zmsg_send (&msg, self->pipe);
}
-// ---------------------------------------------------------------------
-// Connect to new server endpoint
-// Sends [CONNECT][endpoint][service] to the agent
+// .split connect method
+// Connect to new server endpoint.
+// Sends [CONNECT][endpoint][service] to the agent:
void
clone_connect (clone_t *self, char *address, char *service)
@@ -85,9 +77,9 @@ clone_connect (clone_t *self, char *address, char *service)
zmsg_send (&msg, self->pipe);
}
-// ---------------------------------------------------------------------
-// Set new value in distributed hash table
-// Sends [SET][key][value][ttl] to the agent
+// .split set method
+// Set new value in distributed hash table.
+// Sends [SET][key][value][ttl] to the agent:
void
clone_set (clone_t *self, char *key, char *value, int ttl)
@@ -104,10 +96,10 @@ clone_set (clone_t *self, char *key, char *value, int ttl)
zmsg_send (&msg, self->pipe);
}
-// ---------------------------------------------------------------------
-// Lookup value in distributed hash table
-// Sends [GET][key] to the agent and waits for a value response
-// If there is no clone available, will eventually return NULL.
+// .split get method
+// Lookup value in distributed hash table.
+// Sends [GET][key] to the agent and waits for a value response.
+// If there is no clone available, will eventually return NULL:
char *
clone_get (clone_t *self, char *key)
@@ -129,11 +121,9 @@ clone_get (clone_t *self, char *key)
}
-// =====================================================================
-// Asynchronous part, works in the background
-
-// ---------------------------------------------------------------------
-// Simple class for one server we talk to
+// .split working with servers
+// The back-end agent manages a set of servers, which we implement using
+// our simple class model:
typedef struct {
char *address; // Server address
@@ -173,8 +163,14 @@ server_destroy (server_t **self_p)
}
}
-// ---------------------------------------------------------------------
-// Our agent class
+// .split back-end agent class
+// Here is the implementation of the back-end agent itself:
+
+// Number of servers we will talk to
+#define SERVER_MAX 2
+
+// Server considered dead if silent for this long
+#define SERVER_TTL 5000 // msecs
// States we can be in
#define STATE_INITIAL 0 // Before asking server for state
@@ -223,14 +219,17 @@ agent_destroy (agent_t **self_p)
}
}
-// Returns -1 if thread was interrupted
+// .split handling a control message
+// Here we handle the different control messages from the front-end;
+// SUBTREE, CONNECT, SET, and GET:
+
static int
agent_control_message (agent_t *self)
{
zmsg_t *msg = zmsg_recv (self->pipe);
char *command = zmsg_popstr (msg);
if (command == NULL)
- return -1;
+ return -1; // Interrupted
if (streq (command, "SUBTREE")) {
free (self->subtree);
@@ -253,6 +252,9 @@ agent_control_message (agent_t *self)
free (service);
}
else
+ // .split set and get commands
+ // When we set a property, we push the new key-value pair onto
+ // all our connected servers:
if (streq (command, "SET")) {
char *key = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
@@ -268,7 +270,6 @@ agent_control_message (agent_t *self)
kvmsg_set_prop (kvmsg, "ttl", ttl);
kvmsg_send (kvmsg, self->publisher);
kvmsg_destroy (&kvmsg);
-puts (key);
free (ttl);
free (key); // Value is owned by hash table
}
@@ -288,10 +289,9 @@ puts (key);
return 0;
}
-
-// ---------------------------------------------------------------------
-// Asynchronous agent manages server pool and handles request/reply
-// dialog when the application asks for it.
+// .split back-end agent
+// The asynchronous agent manages a server pool and handles the
+// request/reply dialog when the application asks for it:
static void
clone_agent (void *args, zctx_t *ctx, void *pipe)
@@ -325,11 +325,13 @@ clone_agent (void *args, zctx_t *ctx, void *pipe)
else
poll_size = 1;
break;
+
case STATE_SYNCING:
// In this state we read from snapshot and we expect
// the server to respond, else we fail over.
poll_set [1].socket = server->snapshot;
break;
+
case STATE_ACTIVE:
// In this state we read from subscriber and we expect
// the server to give hugz, else we fail over.
@@ -342,8 +344,11 @@ clone_agent (void *args, zctx_t *ctx, void *pipe)
if (poll_timer < 0)
poll_timer = 0;
}
- // ------------------------------------------------------------
- // Poll loop
+ // .split client poll loop
+ // We're ready to process incoming messages; if nothing at all
+ // comes from our server within the timeout, that means the
+ // server is dead:
+
int rc = zmq_poll (poll_set, poll_size, poll_timer);
if (rc == -1)
break; // Context has been shut down
View
@@ -58,11 +58,9 @@ int main (void)
return 0;
}
-
-// ---------------------------------------------------------------------
-// Send snapshots to clients who ask for them
-
-static int s_send_single (char *key, void *data, void *args);
+// .split send snapshots
+// We handle ICANHAZ? requests by sending snapshot data to the
+// client that requested it:
// Routing information for a key-value snapshot
typedef struct {
@@ -71,6 +69,23 @@ typedef struct {
char *subtree; // Client subtree specification
} kvroute_t;
+// Send one state snapshot key-value pair to a socket
+// Hash item data is our kvmsg object, ready to send
+static int
+s_send_single (char *key, void *data, void *args)
+{
+ kvroute_t *kvroute = (kvroute_t *) args;
+ kvmsg_t *kvmsg = (kvmsg_t *) data;
+ if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
+ && memcmp (kvroute->subtree,
+ kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
+ zframe_send (&kvroute->identity, // Choose recipient
+ kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
+ kvmsg_send (kvmsg, kvroute->socket);
+ }
+ return 0;
+}
+
static int
s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
@@ -103,32 +118,14 @@ s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args)
kvmsg_destroy (&kvmsg);
free (subtree);
}
+ zframe_destroy(&identity);
}
return 0;
}
-
-// Send one state snapshot key-value pair to a socket
-// Hash item data is our kvmsg object, ready to send
-static int
-s_send_single (char *key, void *data, void *args)
-{
- kvroute_t *kvroute = (kvroute_t *) args;
- kvmsg_t *kvmsg = (kvmsg_t *) data;
- if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
- && memcmp (kvroute->subtree,
- kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
- // Send identity of recipient first
- zframe_send (&kvroute->identity,
- kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
- kvmsg_send (kvmsg, kvroute->socket);
- }
- return 0;
-}
-
-
-// ---------------------------------------------------------------------
-// Collect updates from clients
+// .split collect updates
+// We store each update with a new sequence number, and if necessary, a
+// time-to-live. We publish updates immediately on our publisher socket:
static int
s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args)
@@ -149,19 +146,9 @@ s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args)
return 0;
}
-
-// ---------------------------------------------------------------------
-// Purge ephemeral values that have expired
-
-static int s_flush_single (char *key, void *data, void *args);
-
-static int
-s_flush_ttl (zloop_t *loop, zmq_pollitem_t *poller, void *args)
-{
- clonesrv_t *self = (clonesrv_t *) args;
- zhash_foreach (self->kvmap, s_flush_single, args);
- return 0;
-}
+// .split flush ephemeral values
+// At regular intervals we flush ephemeral values that have expired. This
+// could be slow on very large data sets:
// If key-value pair has expired, delete it and publish the
// fact to listening clients.
@@ -182,3 +169,12 @@ s_flush_single (char *key, void *data, void *args)
}
return 0;
}
+
+static int
+s_flush_ttl (zloop_t *loop, zmq_pollitem_t *poller, void *args)
+{
+ clonesrv_t *self = (clonesrv_t *) args;
+ if (self->kvmap)
+ zhash_foreach (self->kvmap, s_flush_single, args);
+ return 0;
+}
Oops, something went wrong.

0 comments on commit 151bb70

Please sign in to comment.