Skip to content

Commit

Permalink
Imported ZFL list and hash containers
Browse files Browse the repository at this point in the history
  • Loading branch information
hintjens committed Mar 1, 2011
1 parent 40b7263 commit 91e8d92
Show file tree
Hide file tree
Showing 8 changed files with 1,091 additions and 100 deletions.
2 changes: 0 additions & 2 deletions chapter3.txt
Expand Up @@ -946,8 +946,6 @@ Some comments on this code:

* We can set and use identities on both bound and connected sockets, as this example shows.

* 0MQ currently has [http://github.com/zeromq/zeromq2/issues/issue/82 a bug] when cross-connecting XREP sockets over {{inproc}}. The binding socket does not use any identity you set, and always uses a generated UUID.

Although the router-to-router pattern looks ideal for asynchronous N-to-N routing, it has some pitfalls. First, any design with N-to-N connections will not scale beyond a small number of clients and servers. You should really create a device in the middle that turns it into two 1-to-N patterns. This gives you a structure like the LRU queue broker, though you would use XREQ at the front-end and worker sides to get streaming.

Second, it may become confusing if you try to put two XREP sockets at the same logical level. One must bind, one must connect, and request-reply is inherently asymmetric. However, the next point takes care of this.
Expand Down
34 changes: 26 additions & 8 deletions chapter4.txt
Expand Up @@ -391,7 +391,7 @@ In fact what we have here is a protocol that needs writing down. It's fun to exp
Lack of contracts is a sure sign of a disposable application. So, let's write a contract for this protocol. How do we do that?

* There's a wiki, at [http://rfc.zeromq.org rfc.zeromq.org], that we made especially as a home for public 0MQ contracts.
* To create a new specification, register, and follow the instructions. It's fairly simple if you are not afraid of writing contracts.
* To create a new specification, register, and follow the instructions. It's straight-forward, though technical writing is not for everyone.

It took me about fifteen minutes to draft the new [http://rfc.zeromq.org/spec:6 Pirate Pattern Protocol]. It's not a big specification but it does capture enough to act as the basis for arguments ("your queue isn't PPP compatible, please fix it!").

Expand All @@ -408,7 +408,7 @@ The nice thing about progress is how fast it happens. Just a few sentences ago w

This one-page specification takes PPP and turns it into something more solid. This is how we should design complex architectures: start by writing down the contracts, and only //then// write software to implement them.

The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. It took literally only a couple of hours to draft, review, and publish.
The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. It took literally only an hour to draft.

Adding service names is a small but significant change that turns our Paranoid Pirate queue into a service-oriented broker:

Expand Down Expand Up @@ -468,22 +468,40 @@ zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);

It's more or less symmetrical but the worker dialog is a little different. The first time a worker does a recv (), it passes a null reply, thereafter it passes the current reply, and gets a new request.

The client and worker APIs are fairly simple to construct.
The client and worker APIs are fairly simple to construct, since they're heavily based on the Paranoid Pirate code we already developed. Here is the client API:

[[code type="example" title="Majordomo Client API" name="mdcliapi"]]
[[/code]]

And here is the worker API:

[[code type="example" title="Majordomo Worker API" name="mdwrkapi"]]
[[/code]]

Notes on this code:

* The APIs are single threaded. This means, for example, that the worker won't send heartbeats in the background. Happily, this is exactly what we want: if the worker application gets stuck, heartbeats will stop and the broker will stop sending requests to the worker.
* The worker API doesn't do an exponential backoff, it's not worth the extra complexity.
* The APIs don't do any error reporting. If something isn't as expected, they raise an assertion (or exception depending on the language). This is ideal for a reference implementation, so any protocol errors show immediately. For real applications the API should be robust against invalid messages.

Let's design the Majordomo broker. Its core structure is a set of queues, one per service. We will create these queues as workers appear (we could delete them as workers disappear but forget that for now, it gets complex). Additionally, we keep a queue of workers per service.

To make the C examples easier to write and read, I've taken the hash and list containers from the [http://zfl.zeromq.org ZFL project], and renamed them as zlist and zhash, similar to what we did with zmsg. In any modern language you can of course use built-in containers.


.end

- reconnect interval fixed, no backoff
- timeout for worker recv?

- single threaded worker API
- will not send heartbeats while busy
- synchronous, simple
provides.


broker
- design first
- containers, zlist, zhash

- extensions to MD
- service presence
- delivery failure - mandatory, immediate


++++ Rust-based Reliability (Titanic Pattern)
Expand Down
208 changes: 208 additions & 0 deletions examples/C/mdbroker.c
@@ -0,0 +1,208 @@
//
// Majordomo broker
// A minimal implementation
//
#include "zhelpers.h"
#include "zmsg.c"
#include "zlist.c"
#include "zhash.c"
#include "mdp.h"

#define MAX_SERVICES 100
#define MAX_WORKERS 100
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs

hash of services

// This defines a single service
typedef struct {
char *name; // Service name
list of messages waiting
number of messages?
list of workers waiting
number of workers?
} service_t;


// This defines one active worker in our worker queue
typedef struct {
char *identity; // Address of worker
int64_t expiry; // Expires at this time
} worker_t;

typedef struct {
size_t size; // Number of workers
worker_t workers [MAX_WORKERS];
} queue_t;

// Dequeue operation for queue implemented as array of anything
#define DEQUEUE(queue, index) memmove ( \
&(queue) [index], &(queue) [index + 1], \
(sizeof (queue) / sizeof (*queue) - index) * sizeof (queue [0]))

// Insert worker at end of queue, reset expiry
// Worker must not already be in queue
static void
s_worker_append (queue_t *queue, char *identity)
{
int index;
for (index = 0; index < queue->size; index++)
if (strcmp (queue->workers [index].identity, identity) == 0)
break;

if (index < queue->size)
printf ("E: duplicate worker identity %s", identity);
else {
assert (queue->size < MAX_WORKERS);
queue->workers [queue->size].identity = identity;
queue->workers [queue->size].expiry = s_clock ()
+ HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
queue->size++;
}
}

// Remove worker from queue, if present
static void
s_worker_delete (queue_t *queue, char *identity)
{
int index;
for (index = 0; index < queue->size; index++)
if (strcmp (queue->workers [index].identity, identity) == 0)
break;

if (index < queue->size) {
free (queue->workers [index].identity);
DEQUEUE (queue->workers, index);
queue->size--;
}
}

// Reset worker expiry, worker must be present
static void
s_worker_refresh (queue_t *queue, char *identity)
{
int index;
for (index = 0; index < queue->size; index++)
if (strcmp (queue->workers [index].identity, identity) == 0)
break;

if (index < queue->size)
queue->workers [index].expiry = s_clock ()
+ HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
else
printf ("E: worker %s not ready\n", identity);
}

// Pop next available worker off queue, return identity
static char *
s_worker_dequeue (queue_t *queue)
{
assert (queue->size);
char *identity = queue->workers [0].identity;
DEQUEUE (queue->workers, 0);
queue->size--;
return identity;
}

// Look for & kill expired workers
static void
s_queue_purge (queue_t *queue)
{
// Work backwards from oldest so we don't do useless work
int index;
for (index = queue->size - 1; index >= 0; index--) {
if (s_clock () > queue->workers [index].expiry) {
free (queue->workers [index].identity);
DEQUEUE (queue->workers, index);
queue->size--;
index--;
}
}
}

int main (void)
{
s_version_assert (2, 1);

// Prepare our context and sockets
void *context = zmq_init (1);
void *frontend = zmq_socket (context, ZMQ_XREP);
void *backend = zmq_socket (context, ZMQ_XREP);
zmq_bind (frontend, "tcp://*:5555"); // For clients
zmq_bind (backend, "tcp://*:5556"); // For workers

// Queue of available workers
queue_t *queue = (queue_t *) calloc (1, sizeof (queue_t));

// Send out heartbeats at regular intervals
uint64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;

while (1) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
if (queue->size)
zmq_poll (items, 2, HEARTBEAT_INTERVAL * 1000);
else
zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000);

// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *zmsg = zmsg_recv (backend);
char *identity = zmsg_unwrap (zmsg);

// Return reply to client if it's not a control message
if (zmsg_parts (zmsg) == 1) {
if (strcmp (zmsg_address (zmsg), "READY") == 0) {
s_worker_delete (queue, identity);
s_worker_append (queue, identity);
}
else
if (strcmp (zmsg_address (zmsg), "HEARTBEAT") == 0)
s_worker_refresh (queue, identity);
else {
printf ("E: invalid message from %s\n", identity);
zmsg_dump (zmsg);
free (identity);
}
zmsg_destroy (&zmsg);
}
else {
zmsg_send (&zmsg, frontend);
s_worker_append (queue, identity);
}
}
if (items [1].revents & ZMQ_POLLIN) {
// Now get next client request, route to next worker
zmsg_t *zmsg = zmsg_recv (frontend);
char *identity = s_worker_dequeue (queue);
zmsg_wrap (zmsg, identity, "");
zmsg_send (&zmsg, backend);
free (identity);
}

// Send heartbeats to idle workers if it's time
if (s_clock () > heartbeat_at) {
int index;
for (index = 0; index < queue->size; index++) {
zmsg_t *zmsg = zmsg_new ();
zmsg_body_set (zmsg, "HEARTBEAT");
zmsg_wrap (zmsg, queue->workers [index].identity, NULL);
zmsg_send (&zmsg, backend);
}
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
}
s_queue_purge (queue);
}
// We never exit the main loop
// But pretend to do the right shutdown anyhow
while (queue->size)
free (s_worker_dequeue (queue));
free (queue);
zmq_close (frontend);
zmq_close (backend);
return 0;
}
50 changes: 8 additions & 42 deletions examples/C/mdcliapi.c
@@ -1,41 +1,10 @@
/* =========================================================================
mdcliapi.c
Majordomo Protocol Client API
Implements the MDP/Client spec at http://rfc.zeromq.org/spec:7.
Follows the ZFL class conventions and is further developed as the ZFL
mdcli class. See http://zfl.zeromq.org for more details.
-------------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under the
terms of the GNU Lesser General Public License as published by the Free
Software Foundation; either version 3 of the License, or (at your option)
any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL-
ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
=========================================================================
*/

#ifndef __MDCLIAPI_H_INCLUDED__
#define __MDCLIAPI_H_INCLUDED__

//
// Majordomo Protocol Client API
// Implements the MDP/Client spec at http://rfc.zeromq.org/spec:7
//
#include "zhelpers.h"
#include "zmsg.c"

// This is the version of MDP/Client we implement
#define MDPC_HEADER "MDPC01"
#include "mdp.h"

// Reliability parameters
#define REQUEST_TIMEOUT 2500 // msecs, (> 1000!)
Expand All @@ -56,8 +25,6 @@ zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t *request);
}
#endif

#endif

// Structure of our class
// We access these properties only via class methods

Expand Down Expand Up @@ -135,7 +102,7 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request)
// Frame 2: Service name (printable string)
zmsg_t *msg = zmsg_dup (request);
zmsg_push (msg, service);
zmsg_push (msg, MDPC_HEADER);
zmsg_push (msg, MDPC_CLIENT);
zmsg_send (&msg, self->client);

while (1) {
Expand All @@ -151,7 +118,7 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request)
assert (zmsg_parts (msg) >= 3);

char *header = zmsg_pop (msg);
assert (strcmp (header, MDPC_HEADER) == 0);
assert (strcmp (header, MDPC_CLIENT) == 0);
free (header);

char *service = zmsg_pop (msg);
Expand All @@ -166,7 +133,7 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request)
s_connect_to_broker (self);
zmsg_t *msg = zmsg_dup (request);
zmsg_push (msg, service);
zmsg_push (msg, MDPC_HEADER);
zmsg_push (msg, MDPC_CLIENT);
zmsg_send (&msg, self->client);
}
else
Expand All @@ -175,4 +142,3 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request)
}
return NULL;
}

22 changes: 22 additions & 0 deletions examples/C/mdp.h
@@ -0,0 +1,22 @@
//
// mdp.h
// Majordomo Protocol definitions
//
#ifndef __MDP_H_INCLUDED__
#define __MDP_H_INCLUDED__

// This is the version of MDP/Client we implement
#define MDPC_CLIENT "MDPC01"

// This is the version of MDP/Worker we implement
#define MDPS_WORKER "MDPW01"

// MDP/Server commands, as strings
#define MDPS_READY "\001"
#define MDPS_REQUEST "\002"
#define MDPS_REPLY "\003"
#define MDPS_HEARTBEAT "\004"
#define MDPS_DISCONNECT "\005"

#endif

0 comments on commit 91e8d92

Please sign in to comment.