Permalink
Browse files

Majordomo APIs first cut

  • Loading branch information...
1 parent 834e960 commit 40b726343499742473e39647811a053468d171bd @hintjens hintjens committed Mar 1, 2011
Showing with 67 additions and 67 deletions.
  1. +16 −7 chapter4.txt
  2. +6 −4 examples/C/mdcliapi.c
  3. +44 −56 examples/C/mdwrkapi.c
  4. +1 −0 wip.txt
View
@@ -342,9 +342,9 @@ You should see the workers die, one by one, as they simulate a crash, and the cl
++++ Heartbeating
-When writing the Paranoid Pirate examples, it took about five hours to get the heartbeating working properly. The rest of the request-reply chain took perhaps ten minutes. Heartbeating is one of those reliability layers that often causes more trouble than it saves. It is especially easy to create 'false failures', i.e. peers decide that they are disconnected because the heartbeats aren't sent properly.
+When writing the Paranoid Pirate examples, it took about five hours to get the queue-to-worker heartbeating working properly. The rest of the request-reply chain took perhaps ten minutes. Heartbeating is one of those reliability layers that often causes more trouble than it saves. It is especially easy to create 'false failures', i.e. peers decide that they are disconnected because the heartbeats aren't sent properly.
-Here are some tips to getting this right:
+Some points to consider when understanding and implementing heartbeating:
* Note that heartbeats are not request-reply. They flow asynchronously in both directions. Either peer can decide the other is 'dead' and stop talking to it.
@@ -380,6 +380,8 @@ Here are some tips to getting this right:
* You might be tempted to open a separate socket dialog for heartbeats. This is superficially nice because you can separate different dialogs, e.g. the synchronous request-reply from the asynchronous heartbeating. However it's a bad idea for several reasons. First, if you're sending data you don't need to send heartbeats. Second, sockets may, due to network vagaries, become jammed. You need to know when your main data socket is silent because it's dead, rather than just not busy, so you need heartbeats on that socket. Lastly, two sockets is more complex than one.
+* We're not doing heartbeating from client to queue. We could, but it would add //significant// complexity for no real benefit.
+
++++ Contracts and Protocols
If you're paying attention you'll realize that Paranoid Pirate is not compatible with Simple Pirate, because of the heartbeats.
@@ -406,9 +408,9 @@ The nice thing about progress is how fast it happens. Just a few sentences ago w
This one-page specification takes PPP and turns it into something more solid. This is how we should design complex architectures: start by writing down the contracts, and only //then// write software to implement them.
-The Majordomo Protocol extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services.
+The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. It took literally only a couple of hours to draft, review, and publish.
-This is a small but significant change that turns our Paranoid Pirate queue into a service-oriented broker:
+Adding service names is a small but significant change that turns our Paranoid Pirate queue into a service-oriented broker:
[[code type="textdiagram"]]
@@ -444,9 +446,9 @@ This is a small but significant change that turns our Paranoid Pirate queue into
Figure # - Majordomo Pattern
[[/code]]
-To implement Majordomo we need to write a framework for clients and workers. It's really not efficient to ask every application to read the spec and make it work, when they could be using a simpler API.
+To implement Majordomo we need to write a framework for clients and workers. It's really not sane to ask every application developer to read the spec and make it work, when they could be using a simpler API built and tested just once.
-So, while our first contract defines how the pieces of our distributed architecture talk to each other, our second contract defines how user applications talk to the technical framework we're going to design.
+So, while our first contract (MDP itself) defines how the pieces of our distributed architecture talk to each other, our second contract defines how user applications talk to the technical framework we're going to design.
Majordomo has two halves, a client side and a worker side. Since we'll write both client and worker applications, we will need two APIs. Here is a sketch for the client API, using a simple object-oriented approach. We write this in C, using the style of the [http://zfl.zeromq.org/page:read-the-manual ZFL library]:
@@ -468,8 +470,12 @@ It's more or less symmetrical but the worker dialog is a little different. The f
The client and worker APIs are fairly simple to construct.
+
.end
+- reconnect interval fixed, no backoff
+- timeout for worker recv?
+
- single threaded worker API
- will not send heartbeats while busy
- synchronous, simple
@@ -525,7 +531,10 @@ We use the common solution of detecting and rejecting duplicate requests. This m
* The server, before sending back a reply, stores it using the client id + message number as a key.
* The server, when getting a request from a given client, first checks if it has a reply for that client id + message number. If so, it does not process the request but just resends the reply.
-++++ Distributed Reliable Client-Server (P2P Pirate Pattern)
+++++ Distributed Reliable Client-Server (Freelance Pattern)
+
+- heartbeating from clients outwards
+- ZMQ_ROOT service
We've seen how to make a reliable client-server architecture using a queue device (essentially a broker) in the middle. We've discussed the advantages of a central broker a few times. The biggest pro is that workers can come and go silently, there is just one 'stable' node on the network.
View
@@ -134,8 +134,9 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request)
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
zmsg_t *msg = zmsg_dup (request);
- zmsg_wrap (msg, MDPC_HEADER, service);
- zmsg_send (self->client, &msg);
+ zmsg_push (msg, service);
+ zmsg_push (msg, MDPC_HEADER);
+ zmsg_send (&msg, self->client);
while (1) {
// Poll socket for a reply, with timeout
@@ -164,8 +165,9 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request)
// Reconnect, and resend message
s_connect_to_broker (self);
zmsg_t *msg = zmsg_dup (request);
- zmsg_wrap (msg, MDPC_HEADER, service);
- zmsg_send (self->client, &msg);
+ zmsg_push (msg, service);
+ zmsg_push (msg, MDPC_HEADER);
+ zmsg_send (&msg, self->client);
}
else
break; // Give up
View
@@ -40,8 +40,7 @@
// Reliability parameters
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
-#define INTERVAL_INIT 1000 // Initial reconnect
-#define INTERVAL_MAX 32000 // After exponential backoff
+#define RECONNECT_INTERVAL 1000 // Delay between attempts
// Protocol commands
#define MDPS_READY "\001"
@@ -79,7 +78,9 @@ struct _mdwrk_t {
// Heartbeat management
uint64_t heartbeat_at; // When to send HEARTBEAT
size_t liveness; // How many attempts left
- size_t interval; // Reconnect interval
+
+ // Internal state
+ int expect_reply; // Zero only at start
};
@@ -104,7 +105,6 @@ void s_connect_to_broker (mdwrk_t *self)
// If liveness hits zero, queue is considered disconnected
self->liveness = HEARTBEAT_LIVENESS;
- self->interval = INTERVAL_INIT;
self->heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
}
@@ -152,74 +152,62 @@ mdwrk_destroy (mdwrk_t **self_p)
// --------------------------------------------------------------------------
-// Send reply, if any, to broker and wait for request.
+// Send reply, if any, to broker and wait for next request.
zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t *reply)
{
- // Prefix reply with protocol frames
- // Frame 1: "MDPSxy" (six bytes, MDP/Server x.y)
- // Frame 2: Service name (printable string)
-// zmsg_t *msg = zmsg_dup (request);
- // zmsg_wrap (msg, MDPS_HEADER, service);
- // zmsg_send (self->worker, &msg);
-
- - if reply not null,
- take copy
- prefix with frames
- send to broker
-
- - poll loop for input request
- return to caller
+ // Format and send the reply if we were provided one
+ assert (reply || !self->expect_reply);
+ if (reply) {
+ zmsg_t *msg = zmsg_dup (reply);
+ zmsg_push (msg, MDPS_REPLY);
+ zmsg_push (msg, MDPS_HEADER);
+ zmsg_send (&msg, self->worker);
+ }
+ self->expect_reply = 1;
while (1) {
- zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
+ zmq_pollitem_t items [] = { { self->worker, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000);
if (items [0].revents & ZMQ_POLLIN) {
- // Get message
- // - 3-part envelope + content -> request
- // - 1-part "HEARTBEAT" -> heartbeat
- zmsg_t *zmsg = zmsg_recv (worker);
-
- if (zmsg_parts (zmsg) == 3) {
- zmsg_send (&zmsg, worker);
- liveness = HEARTBEAT_LIVENESS;
- }
+ zmsg_t *msg = zmsg_recv (self->worker);
+ self->liveness = HEARTBEAT_LIVENESS;
+
+ // Don't try to handle errors, just assert noisily
+ assert (zmsg_parts (msg) >= 3);
+
+ char *header = zmsg_pop (msg);
+ assert (strcmp (header, MDPS_HEADER) == 0);
+ free (header);
+
+ char *command = zmsg_pop (msg);
+ if (strcmp (command, MDPS_REQUEST) == 0)
+ return msg; // We have a request to process
+ else
+ if (strcmp (command, MDPS_HEARTBEAT) == 0)
+ ; // Do nothing for heartbeats
else
- if (zmsg_parts (zmsg) == 1
- && strcmp (zmsg_body (zmsg), "HEARTBEAT") == 0)
- liveness = HEARTBEAT_LIVENESS;
+ if (strcmp (command, MDPS_DISCONNECT) == 0)
+ break; // Return empty handed
else {
- printf ("E: (%s) invalid message\n", identity);
- zmsg_dump (zmsg);
+ printf ("E: invalid input message (%d)\n", (int) command [1]);
+ zmsg_dump (msg);
}
- interval = INTERVAL_INIT;
+ free (command);
}
else
- if (--liveness == 0) {
- printf ("W: (%s) heartbeat failure, can't reach queue\n",
- identity);
- printf ("W: (%s) reconnecting in %zd msec...\n",
- identity, interval);
- s_sleep (interval);
-
- if (interval < INTERVAL_MAX)
- interval *= 2;
- zmq_close (worker);
- worker = s_worker_socket (context);
- liveness = HEARTBEAT_LIVENESS;
+ if (--self->liveness == 0) {
+ s_sleep (RECONNECT_INTERVAL);
+ s_connect_to_broker (self);
}
-
- // Send heartbeat to queue if it's time
+ // Send HEARTBEAT if it's time
if (s_clock () > self->heartbeat_at) {
- heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
- printf ("I: (%s) worker heartbeat\n", identity);
- s_send (worker, "HEARTBEAT");
+ self->heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
+ s_send (self->worker, "HEARTBEAT");
}
}
-
-
-// return request;
-return NULL;
+ // We exit if we've been disconnected
+ return NULL;
}
View
@@ -75,4 +75,5 @@ The Harmony pattern takes pipeline and makes it robust against the only failure
- if task missing, resend
- if end of batch missing, resend from last response
+++ How to deliver jobs one by one using push/pull? i.e. ensure jobs don't get lost...

0 comments on commit 40b7263

Please sign in to comment.