Permalink
Browse files

Rough Majordomo broker outline

  • Loading branch information...
1 parent 3617ac1 commit a8f9d920ddb3ec467d51bf0af2f68d9729f3a7b5 @hintjens hintjens committed Mar 2, 2011
Showing with 279 additions and 194 deletions.
  1. +7 −3 chapter4.txt
  2. +3 −0 examples/C/.gitignore
  3. +158 −149 examples/C/mdbroker.c
  4. +8 −7 examples/C/mdclient.c
  5. +15 −3 examples/C/mdworker.c
  6. +4 −1 examples/C/mdwrkapi.c
  7. +15 −15 examples/C/ppqueue.c
  8. +8 −8 examples/C/ppworker.c
  9. +7 −0 examples/C/xx
  10. +4 −0 examples/C/yy
  11. +0 −3 examples/C/zhash.c
  12. +50 −5 examples/C/zlist.c
View
@@ -402,13 +402,13 @@ Turning PPP into a real protocol would take more work:
++++ Service-Oriented Reliable Queuing (Majordomo Pattern)
-The nice thing about progress is how fast it happens. Just a few sentences ago we were dreaming of a better protocol that would fix the world. And here we have it:
+The nice thing about progress is how fast it happens when lawyers and committees aren't involved. Just a few sentences ago we were dreaming of a better protocol that would fix the world. And here we have it:
* http://rfc.zeromq.org/spec:7
This one-page specification takes PPP and turns it into something more solid. This is how we should design complex architectures: start by writing down the contracts, and only //then// write software to implement them.
-The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. It took literally only an hour to draft.
+The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. This made it easy to draft.
Adding service names is a small but significant change that turns our Paranoid Pirate queue into a service-oriented broker:
@@ -486,9 +486,13 @@ Notes on this code:
Let's design the Majordomo broker. Its core structure is a set of queues, one per service. We will create these queues as workers appear (we could delete them as workers disappear but forget that for now, it gets complex). Additionally, we keep a queue of workers per service.
-To make the C examples easier to write and read, I've taken the hash and list containers from the [http://zfl.zeromq.org ZFL project], and renamed them as zlist and zhash, similar to what we did with zmsg. In any modern language you can of course use built-in containers.
+To make the C examples easier to write and read, I've taken the hash and list containers from the [http://zfl.zeromq.org ZFL project], and renamed them as zlist and zhash, as we did with zmsg. In any modern language you can of course use built-in containers.
+- broker as thread, multiple in one process
+- test, dev, etc... easy to do
+- how to handle broker failure - live/live
+
.end
View
@@ -50,4 +50,7 @@ spqueue
spworker
ppqueue
ppworker
+mdclient
+mdbroker
+mdworker
View
@@ -8,201 +8,210 @@
#include "zhash.c"
#include "mdp.h"
-#define MAX_SERVICES 100
-#define MAX_WORKERS 100
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
-hash of services
+
+// This defines a single broker
+typedef struct {
+ void *context; // 0MQ context
+ void *broker; // Socket for clients & workers
+ zhash_t *services; // Hash of known services
+ zhash_t *workers; // Hash of known workers
+ zlist_t *idle_workers; // List of idle workers
+ uint64_t heartbeat_at; // When to send HEARTBEAT
+} broker_t;
// This defines a single service
typedef struct {
char *name; // Service name
- list of messages waiting
- number of messages?
- list of workers waiting
- number of workers?
+ zlist_t *requests; // List of client requests
+ zlist_t *workers; // All workers for this service
+ zlist_t *idle_workers; // List of idle workers
} service_t;
-
-// This defines one active worker in our worker queue
+// This defines one worker, idle or active
typedef struct {
char *identity; // Address of worker
- int64_t expiry; // Expires at this time
+ char *service; // Service name
+ int64_t expiry; // Expires at unless heartbeat
} worker_t;
-typedef struct {
- size_t size; // Number of workers
- worker_t workers [MAX_WORKERS];
-} queue_t;
-// Dequeue operation for queue implemented as array of anything
-#define DEQUEUE(queue, index) memmove ( \
- &(queue) [index], &(queue) [index + 1], \
- (sizeof (queue) / sizeof (*queue) - index) * sizeof (queue [0]))
-
-// Insert worker at end of queue, reset expiry
-// Worker must not already be in queue
-static void
-s_worker_append (queue_t *queue, char *identity)
+// Locate or create new service entry
+static service_t *
+s_require_service (broker_t *self, char *name)
{
- int index;
- for (index = 0; index < queue->size; index++)
- if (strcmp (queue->workers [index].identity, identity) == 0)
- break;
-
- if (index < queue->size)
- printf ("E: duplicate worker identity %s", identity);
- else {
- assert (queue->size < MAX_WORKERS);
- queue->workers [queue->size].identity = identity;
- queue->workers [queue->size].expiry = s_clock ()
- + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
- queue->size++;
+ assert (name);
+ service_t *service = zhash_lookup (self->services, name);
+ if (service == NULL) {
+ service = (service_t *) malloc (sizeof (service_t));
+ service->name = strdup (name);
+ service->requests = zlist_new ();
+ service->workers = zlist_new ();
+ service->idle_workers = zlist_new ();
+ zhash_insert (self->services, name, service);
}
}
-// Remove worker from queue, if present
+// Dispatch service requests to workers as possible
static void
-s_worker_delete (queue_t *queue, char *identity)
+s_dispatch_service (broker_t *self, char *service)
{
- int index;
- for (index = 0; index < queue->size; index++)
- if (strcmp (queue->workers [index].identity, identity) == 0)
- break;
-
- if (index < queue->size) {
- free (queue->workers [index].identity);
- DEQUEUE (queue->workers, index);
- queue->size--;
+#if 0
+ char *worker_id = zlist_first (service->idle_workers);
+ while (worker_id) {
+
+ worker_t *worker = zhash_lookup (self->workers, name);
+ assert (worker);
+ if (worker->expiry > s_clock ())
+ if message, route
+ else break
+ else
+ disconnect & kill
+
+ worker_id = zlist_next (service->idle_workers);
}
+ && zlist_size ()) {
+
+ if (zlist_size (service->requests))
+
+ msg_t *msg = zlist_pop (service->requests);
+ char *identity = zlist_pop (service->idle_workers);
+
+ /* Request:
+ * Address: workerid
+ * Frame 0: "MDPW01"
+ * Frame 1: 0x02 (one byte, representing REQUEST)
+ * Frame 2: Client address (envelope stack)
+ * Frame 3: Empty (zero bytes, envelope delimiter)
+ * Frames 4+: Request body (opaque binary)
+ */
+
+ printf ("B: dispatch to %s\n", identity);
+ zmsg_destroy (&msg);
+ free (identity);
+ }
+#endif
}
-// Reset worker expiry, worker must be present
+// Process a request coming from a client
static void
-s_worker_refresh (queue_t *queue, char *identity)
+s_process_client_message (broker_t *self, char *sender, zmsg_t *msg)
{
- int index;
- for (index = 0; index < queue->size; index++)
- if (strcmp (queue->workers [index].identity, identity) == 0)
- break;
-
- if (index < queue->size)
- queue->workers [index].expiry = s_clock ()
- + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
- else
- printf ("E: worker %s not ready\n", identity);
+ printf ("s_process_client_message\n");
+ zmsg_dump (msg);
+#if 0
+ assert (zmsg_parts (msg) >= 2); // Service name + body
+
+ char *service_name = zmsg_pop (msg);
+ service_t *service = s_require_service (self, service_name);
+ free (service_name);
+
+ // Append request to service request queue
+ zlist_append (service->requests, msg);
+ s_dispatch_service (self, service);
+#endif
}
-// Pop next available worker off queue, return identity
-static char *
-s_worker_dequeue (queue_t *queue)
-{
- assert (queue->size);
- char *identity = queue->workers [0].identity;
- DEQUEUE (queue->workers, 0);
- queue->size--;
- return identity;
-}
-// Look for & kill expired workers
+// ----------------------------------------------------------------------
+
static void
-s_queue_purge (queue_t *queue)
+s_process_worker_message (broker_t *self, char *sender, zmsg_t *msg)
{
- // Work backwards from oldest so we don't do useless work
- int index;
- for (index = queue->size - 1; index >= 0; index--) {
- if (s_clock () > queue->workers [index].expiry) {
- free (queue->workers [index].identity);
- DEQUEUE (queue->workers, index);
- queue->size--;
- index--;
- }
+ printf ("s_process_worker_message\n");
+ zmsg_dump (msg);
+#if 0
+ assert (zmsg_parts (msg) >= 2); // Service name + body
+
+ // Lookup worker, create if it's a new one
+ worker_t *worker = zhash_lookup (self->workers, name);
+ if (worker == NULL) {
+ worker = (worker_t *) malloc (sizeof (worker_t));
+ worker->identity = strdup (sender);
+ zhash_insert (self->workers, sender, worker);
+ }
+ ready
+ - append to idle queue
+ worker->expiry = s_clock () + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
+ heartbeat
+ worker->expiry = s_clock () + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
+
+ reply
+ /* Reply:
+ * Address: clientid
+ * Frame 0: Empty
+ * Frame 1: "MDPC01"
+ * Frame 2: Service name
+ * Frames 3+: Reply body
+ */
+ // send message back to originating client
+ zmsg_send (&msg, broker);
+ - append to idle queue
}
+#endif
}
+
+// ----------------------------------------------------------------------
+
int main (void)
{
s_version_assert (2, 1);
- // Prepare our context and sockets
- void *context = zmq_init (1);
- void *frontend = zmq_socket (context, ZMQ_XREP);
- void *backend = zmq_socket (context, ZMQ_XREP);
- zmq_bind (frontend, "tcp://*:5555"); // For clients
- zmq_bind (backend, "tcp://*:5556"); // For workers
-
- // Queue of available workers
- queue_t *queue = (queue_t *) calloc (1, sizeof (queue_t));
-
- // Send out heartbeats at regular intervals
- uint64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
-
+ // Initialize broker state
+ broker_t *self = (broker_t *) malloc (sizeof (broker_t));
+ self->context = zmq_init (1);
+ self->broker = zmq_socket (self->context, ZMQ_XREP);
+ self->services = zhash_new ();
+ self->workers = zhash_new ();
+ self->idle_workers = zlist_new ();
+ self->heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
+
+ // We use a single socket for both clients and workers
+ char *endpoint = "tcp://*:5555";
+ zmq_bind (self->broker, endpoint);
+ printf ("I: Majordomo broker ready at %s\n", endpoint);
+
+ // Get and process messages forever
while (1) {
- zmq_pollitem_t items [] = {
- { backend, 0, ZMQ_POLLIN, 0 },
- { frontend, 0, ZMQ_POLLIN, 0 }
- };
- // Poll frontend only if we have available workers
- if (queue->size)
- zmq_poll (items, 2, HEARTBEAT_INTERVAL * 1000);
- else
- zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000);
+ zmq_pollitem_t items [] = { { self->broker, 0, ZMQ_POLLIN, 0 } };
+ zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000);
- // Handle worker activity on backend
+ // Process next input message, if any
if (items [0].revents & ZMQ_POLLIN) {
- zmsg_t *zmsg = zmsg_recv (backend);
- char *identity = zmsg_unwrap (zmsg);
-
- // Return reply to client if it's not a control message
- if (zmsg_parts (zmsg) == 1) {
- if (strcmp (zmsg_address (zmsg), "READY") == 0) {
- s_worker_delete (queue, identity);
- s_worker_append (queue, identity);
- }
- else
- if (strcmp (zmsg_address (zmsg), "HEARTBEAT") == 0)
- s_worker_refresh (queue, identity);
- else {
- printf ("E: invalid message from %s\n", identity);
- zmsg_dump (zmsg);
- free (identity);
- }
- zmsg_destroy (&zmsg);
- }
+ zmsg_t *msg = zmsg_recv (self->broker);
+ char *sender = zmsg_unwrap (msg);
+ char *header = zmsg_pop (msg);
+
+ if (strcmp (header, MDPC_CLIENT) == 0)
+ s_process_client_message (self, sender, msg);
+ else
+ if (strcmp (header, MDPS_WORKER) == 0)
+ s_process_worker_message (self, sender, msg);
else {
- zmsg_send (&zmsg, frontend);
- s_worker_append (queue, identity);
+ printf ("E: invalid message\n");
+ zmsg_dump (msg);
}
+ free (sender);
+ free (header);
+ zmsg_destroy (&msg);
}
- if (items [1].revents & ZMQ_POLLIN) {
- // Now get next client request, route to next worker
- zmsg_t *zmsg = zmsg_recv (frontend);
- char *identity = s_worker_dequeue (queue);
- zmsg_wrap (zmsg, identity, "");
- zmsg_send (&zmsg, backend);
- free (identity);
- }
-
- // Send heartbeats to idle workers if it's time
- if (s_clock () > heartbeat_at) {
- int index;
- for (index = 0; index < queue->size; index++) {
- zmsg_t *zmsg = zmsg_new ();
- zmsg_body_set (zmsg, "HEARTBEAT");
- zmsg_wrap (zmsg, queue->workers [index].identity, NULL);
- zmsg_send (&zmsg, backend);
+ // Send heartbeats to idle workers if needed
+ if (s_clock () > self->heartbeat_at) {
+ char *worker = zlist_first (self->idle_workers);
+ while (worker) {
+ zmsg_t *msg = zmsg_new ();
+ zmsg_append (msg, worker);
+ zmsg_append (msg, MDPS_WORKER);
+ zmsg_append (msg, MDPS_HEARTBEAT);
+ zmsg_send (&msg, self->broker);
+ worker = zlist_next (self->idle_workers);
}
- heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
+ self->heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
}
- s_queue_purge (queue);
}
// We never exit the main loop
- // But pretend to do the right shutdown anyhow
- while (queue->size)
- free (s_worker_dequeue (queue));
- free (queue);
- zmq_close (frontend);
- zmq_close (backend);
return 0;
}
Oops, something went wrong.

0 comments on commit a8f9d92

Please sign in to comment.