diff --git a/chapter3.txt b/chapter3.txt index fdc8e9157..d6dfafde0 100644 --- a/chapter3.txt +++ b/chapter3.txt @@ -946,8 +946,6 @@ Some comments on this code: * We can set and use identities on both bound and connected sockets, as this example shows. -* 0MQ currently has [http://github.com/zeromq/zeromq2/issues/issue/82 a bug] when cross-connecting XREP sockets over {{inproc}}. The binding socket does not use any identity you set, and always uses a generated UUID. - Although the router-to-router pattern looks ideal for asynchronous N-to-N routing, it has some pitfalls. First, any design with N-to-N connections will not scale beyond a small number of clients and servers. You should really create a device in the middle that turns it into two 1-to-N patterns. This gives you a structure like the LRU queue broker, though you would use XREQ at the front-end and worker sides to get streaming. Second, it may become confusing if you try to put two XREP sockets at the same logical level. One must bind, one must connect, and request-reply is inherently asymmetric. However, the next point takes care of this. diff --git a/chapter4.txt b/chapter4.txt index 65f0ded5b..b9cfb3ab8 100644 --- a/chapter4.txt +++ b/chapter4.txt @@ -391,7 +391,7 @@ In fact what we have here is a protocol that needs writing down. It's fun to exp Lack of contracts is a sure sign of a disposable application. So, let's write a contract for this protocol. How do we do that? * There's a wiki, at [http://rfc.zeromq.org rfc.zeromq.org], that we made especially as a home for public 0MQ contracts. -* To create a new specification, register, and follow the instructions. It's fairly simple if you are not afraid of writing contracts. +* To create a new specification, register, and follow the instructions. It's straight-forward, though technical writing is not for everyone. It took me about fifteen minutes to draft the new [http://rfc.zeromq.org/spec:6 Pirate Pattern Protocol]. It's not a big specification but it does capture enough to act as the basis for arguments ("your queue isn't PPP compatible, please fix it!"). @@ -408,7 +408,7 @@ The nice thing about progress is how fast it happens. Just a few sentences ago w This one-page specification takes PPP and turns it into something more solid. This is how we should design complex architectures: start by writing down the contracts, and only //then// write software to implement them. -The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. It took literally only a couple of hours to draft, review, and publish. +The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. It took literally only an hour to draft. Adding service names is a small but significant change that turns our Paranoid Pirate queue into a service-oriented broker: @@ -468,22 +468,40 @@ zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply); It's more or less symmetrical but the worker dialog is a little different. The first time a worker does a recv (), it passes a null reply, thereafter it passes the current reply, and gets a new request. -The client and worker APIs are fairly simple to construct. +The client and worker APIs are fairly simple to construct, since they're heavily based on the Paranoid Pirate code we already developed. Here is the client API: + +[[code type="example" title="Majordomo Client API" name="mdcliapi"]] +[[/code]] + +And here is the worker API: + +[[code type="example" title="Majordomo Worker API" name="mdwrkapi"]] +[[/code]] + +Notes on this code: + +* The APIs are single threaded. This means, for example, that the worker won't send heartbeats in the background. Happily, this is exactly what we want: if the worker application gets stuck, heartbeats will stop and the broker will stop sending requests to the worker. +* The worker API doesn't do an exponential backoff, it's not worth the extra complexity. +* The APIs don't do any error reporting. If something isn't as expected, they raise an assertion (or exception depending on the language). This is ideal for a reference implementation, so any protocol errors show immediately. For real applications the API should be robust against invalid messages. + +Let's design the Majordomo broker. Its core structure is a set of queues, one per service. We will create these queues as workers appear (we could delete them as workers disappear but forget that for now, it gets complex). Additionally, we keep a queue of workers per service. + +To make the C examples easier to write and read, I've taken the hash and list containers from the [http://zfl.zeromq.org ZFL project], and renamed them as zlist and zhash, similar to what we did with zmsg. In any modern language you can of course use built-in containers. .end -- reconnect interval fixed, no backoff -- timeout for worker recv? -- single threaded worker API -- will not send heartbeats while busy -- synchronous, simple + provides. + broker - design first - containers, zlist, zhash +- extensions to MD +- service presence +- delivery failure - mandatory, immediate ++++ Rust-based Reliability (Titanic Pattern) diff --git a/examples/C/mdbroker.c b/examples/C/mdbroker.c new file mode 100644 index 000000000..e33c15b3f --- /dev/null +++ b/examples/C/mdbroker.c @@ -0,0 +1,208 @@ +// +// Majordomo broker +// A minimal implementation +// +#include "zhelpers.h" +#include "zmsg.c" +#include "zlist.c" +#include "zhash.c" +#include "mdp.h" + +#define MAX_SERVICES 100 +#define MAX_WORKERS 100 +#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable +#define HEARTBEAT_INTERVAL 1000 // msecs + +hash of services + +// This defines a single service +typedef struct { + char *name; // Service name + list of messages waiting + number of messages? + list of workers waiting + number of workers? +} service_t; + + +// This defines one active worker in our worker queue +typedef struct { + char *identity; // Address of worker + int64_t expiry; // Expires at this time +} worker_t; + +typedef struct { + size_t size; // Number of workers + worker_t workers [MAX_WORKERS]; +} queue_t; + +// Dequeue operation for queue implemented as array of anything +#define DEQUEUE(queue, index) memmove ( \ + &(queue) [index], &(queue) [index + 1], \ + (sizeof (queue) / sizeof (*queue) - index) * sizeof (queue [0])) + +// Insert worker at end of queue, reset expiry +// Worker must not already be in queue +static void +s_worker_append (queue_t *queue, char *identity) +{ + int index; + for (index = 0; index < queue->size; index++) + if (strcmp (queue->workers [index].identity, identity) == 0) + break; + + if (index < queue->size) + printf ("E: duplicate worker identity %s", identity); + else { + assert (queue->size < MAX_WORKERS); + queue->workers [queue->size].identity = identity; + queue->workers [queue->size].expiry = s_clock () + + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS; + queue->size++; + } +} + +// Remove worker from queue, if present +static void +s_worker_delete (queue_t *queue, char *identity) +{ + int index; + for (index = 0; index < queue->size; index++) + if (strcmp (queue->workers [index].identity, identity) == 0) + break; + + if (index < queue->size) { + free (queue->workers [index].identity); + DEQUEUE (queue->workers, index); + queue->size--; + } +} + +// Reset worker expiry, worker must be present +static void +s_worker_refresh (queue_t *queue, char *identity) +{ + int index; + for (index = 0; index < queue->size; index++) + if (strcmp (queue->workers [index].identity, identity) == 0) + break; + + if (index < queue->size) + queue->workers [index].expiry = s_clock () + + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS; + else + printf ("E: worker %s not ready\n", identity); +} + +// Pop next available worker off queue, return identity +static char * +s_worker_dequeue (queue_t *queue) +{ + assert (queue->size); + char *identity = queue->workers [0].identity; + DEQUEUE (queue->workers, 0); + queue->size--; + return identity; +} + +// Look for & kill expired workers +static void +s_queue_purge (queue_t *queue) +{ + // Work backwards from oldest so we don't do useless work + int index; + for (index = queue->size - 1; index >= 0; index--) { + if (s_clock () > queue->workers [index].expiry) { + free (queue->workers [index].identity); + DEQUEUE (queue->workers, index); + queue->size--; + index--; + } + } +} + +int main (void) +{ + s_version_assert (2, 1); + + // Prepare our context and sockets + void *context = zmq_init (1); + void *frontend = zmq_socket (context, ZMQ_XREP); + void *backend = zmq_socket (context, ZMQ_XREP); + zmq_bind (frontend, "tcp://*:5555"); // For clients + zmq_bind (backend, "tcp://*:5556"); // For workers + + // Queue of available workers + queue_t *queue = (queue_t *) calloc (1, sizeof (queue_t)); + + // Send out heartbeats at regular intervals + uint64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL; + + while (1) { + zmq_pollitem_t items [] = { + { backend, 0, ZMQ_POLLIN, 0 }, + { frontend, 0, ZMQ_POLLIN, 0 } + }; + // Poll frontend only if we have available workers + if (queue->size) + zmq_poll (items, 2, HEARTBEAT_INTERVAL * 1000); + else + zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000); + + // Handle worker activity on backend + if (items [0].revents & ZMQ_POLLIN) { + zmsg_t *zmsg = zmsg_recv (backend); + char *identity = zmsg_unwrap (zmsg); + + // Return reply to client if it's not a control message + if (zmsg_parts (zmsg) == 1) { + if (strcmp (zmsg_address (zmsg), "READY") == 0) { + s_worker_delete (queue, identity); + s_worker_append (queue, identity); + } + else + if (strcmp (zmsg_address (zmsg), "HEARTBEAT") == 0) + s_worker_refresh (queue, identity); + else { + printf ("E: invalid message from %s\n", identity); + zmsg_dump (zmsg); + free (identity); + } + zmsg_destroy (&zmsg); + } + else { + zmsg_send (&zmsg, frontend); + s_worker_append (queue, identity); + } + } + if (items [1].revents & ZMQ_POLLIN) { + // Now get next client request, route to next worker + zmsg_t *zmsg = zmsg_recv (frontend); + char *identity = s_worker_dequeue (queue); + zmsg_wrap (zmsg, identity, ""); + zmsg_send (&zmsg, backend); + free (identity); + } + + // Send heartbeats to idle workers if it's time + if (s_clock () > heartbeat_at) { + int index; + for (index = 0; index < queue->size; index++) { + zmsg_t *zmsg = zmsg_new (); + zmsg_body_set (zmsg, "HEARTBEAT"); + zmsg_wrap (zmsg, queue->workers [index].identity, NULL); + zmsg_send (&zmsg, backend); + } + heartbeat_at = s_clock () + HEARTBEAT_INTERVAL; + } + s_queue_purge (queue); + } + // We never exit the main loop + // But pretend to do the right shutdown anyhow + while (queue->size) + free (s_worker_dequeue (queue)); + free (queue); + zmq_close (frontend); + zmq_close (backend); + return 0; +} diff --git a/examples/C/mdcliapi.c b/examples/C/mdcliapi.c index aa1088253..d48b35a31 100644 --- a/examples/C/mdcliapi.c +++ b/examples/C/mdcliapi.c @@ -1,41 +1,10 @@ -/* ========================================================================= - mdcliapi.c - - Majordomo Protocol Client API - Implements the MDP/Client spec at http://rfc.zeromq.org/spec:7. - - Follows the ZFL class conventions and is further developed as the ZFL - mdcli class. See http://zfl.zeromq.org for more details. - - ------------------------------------------------------------------------- - Copyright (c) 1991-2011 iMatix Corporation - Copyright other contributors as noted in the AUTHORS file. - - This file is part of the ZeroMQ Guide: http://zguide.zeromq.org - - This is free software; you can redistribute it and/or modify it under the - terms of the GNU Lesser General Public License as published by the Free - Software Foundation; either version 3 of the License, or (at your option) - any later version. - - This software is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL- - ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General - Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . - ========================================================================= -*/ - -#ifndef __MDCLIAPI_H_INCLUDED__ -#define __MDCLIAPI_H_INCLUDED__ - +// +// Majordomo Protocol Client API +// Implements the MDP/Client spec at http://rfc.zeromq.org/spec:7 +// #include "zhelpers.h" #include "zmsg.c" - -// This is the version of MDP/Client we implement -#define MDPC_HEADER "MDPC01" +#include "mdp.h" // Reliability parameters #define REQUEST_TIMEOUT 2500 // msecs, (> 1000!) @@ -56,8 +25,6 @@ zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t *request); } #endif -#endif - // Structure of our class // We access these properties only via class methods @@ -135,7 +102,7 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request) // Frame 2: Service name (printable string) zmsg_t *msg = zmsg_dup (request); zmsg_push (msg, service); - zmsg_push (msg, MDPC_HEADER); + zmsg_push (msg, MDPC_CLIENT); zmsg_send (&msg, self->client); while (1) { @@ -151,7 +118,7 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request) assert (zmsg_parts (msg) >= 3); char *header = zmsg_pop (msg); - assert (strcmp (header, MDPC_HEADER) == 0); + assert (strcmp (header, MDPC_CLIENT) == 0); free (header); char *service = zmsg_pop (msg); @@ -166,7 +133,7 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request) s_connect_to_broker (self); zmsg_t *msg = zmsg_dup (request); zmsg_push (msg, service); - zmsg_push (msg, MDPC_HEADER); + zmsg_push (msg, MDPC_CLIENT); zmsg_send (&msg, self->client); } else @@ -175,4 +142,3 @@ mdcli_send (mdcli_t *self, char *service, zmsg_t *request) } return NULL; } - diff --git a/examples/C/mdp.h b/examples/C/mdp.h new file mode 100644 index 000000000..53f42e936 --- /dev/null +++ b/examples/C/mdp.h @@ -0,0 +1,22 @@ +// +// mdp.h +// Majordomo Protocol definitions +// +#ifndef __MDP_H_INCLUDED__ +#define __MDP_H_INCLUDED__ + +// This is the version of MDP/Client we implement +#define MDPC_CLIENT "MDPC01" + +// This is the version of MDP/Worker we implement +#define MDPS_WORKER "MDPW01" + +// MDP/Server commands, as strings +#define MDPS_READY "\001" +#define MDPS_REQUEST "\002" +#define MDPS_REPLY "\003" +#define MDPS_HEARTBEAT "\004" +#define MDPS_DISCONNECT "\005" + +#endif + diff --git a/examples/C/mdwrkapi.c b/examples/C/mdwrkapi.c index 0c31d00c9..3eefa83c9 100644 --- a/examples/C/mdwrkapi.c +++ b/examples/C/mdwrkapi.c @@ -1,54 +1,16 @@ -/* ========================================================================= - mdwrkapi.c - - Majordomo Protocol Worker API - Implements the MDP/Server spec at http://rfc.zeromq.org/spec:7. - - Follows the ZFL class conventions and is further developed as the ZFL - mdwrk class. See http://zfl.zeromq.org for more details. - - ------------------------------------------------------------------------- - Copyright (c) 1991-2011 iMatix Corporation - Copyright other contributors as noted in the AUTHORS file. - - This file is part of the ZeroMQ Guide: http://zguide.zeromq.org - - This is free software; you can redistribute it and/or modify it under the - terms of the GNU Lesser General Public License as published by the Free - Software Foundation; either version 3 of the License, or (at your option) - any later version. - - This software is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL- - ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General - Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . - ========================================================================= -*/ - -#ifndef __MDWRKAPI_H_INCLUDED__ -#define __MDWRKAPI_H_INCLUDED__ - +// +// Majordomo Protocol Worker API +// Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7. +// #include "zhelpers.h" #include "zmsg.c" - -// This is the version of MDP/Server we implement -#define MDPS_HEADER "MDPS01" +#include "mdp.h" // Reliability parameters #define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable #define HEARTBEAT_INTERVAL 1000 // msecs #define RECONNECT_INTERVAL 1000 // Delay between attempts -// Protocol commands -#define MDPS_READY "\001" -#define MDPS_REQUEST "\002" -#define MDPS_REPLY "\003" -#define MDPS_HEARTBEAT "\004" -#define MDPS_DISCONNECT "\005" - #ifdef __cplusplus extern "C" { #endif @@ -64,8 +26,6 @@ zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply); } #endif -#endif - // Structure of our class // We access these properties only via class methods @@ -98,7 +58,7 @@ void s_connect_to_broker (mdwrk_t *self) // Register service with broker zmsg_t *msg = zmsg_new (); - zmsg_append (msg, MDPS_HEADER); + zmsg_append (msg, MDPS_WORKER); zmsg_append (msg, MDPS_READY); zmsg_append (msg, self->service); zmsg_send (&msg, self->worker); @@ -162,7 +122,7 @@ mdwrk_recv (mdwrk_t *self, zmsg_t *reply) if (reply) { zmsg_t *msg = zmsg_dup (reply); zmsg_push (msg, MDPS_REPLY); - zmsg_push (msg, MDPS_HEADER); + zmsg_push (msg, MDPS_WORKER); zmsg_send (&msg, self->worker); } self->expect_reply = 1; @@ -179,7 +139,7 @@ mdwrk_recv (mdwrk_t *self, zmsg_t *reply) assert (zmsg_parts (msg) >= 3); char *header = zmsg_pop (msg); - assert (strcmp (header, MDPS_HEADER) == 0); + assert (strcmp (header, MDPS_WORKER) == 0); free (header); char *command = zmsg_pop (msg); diff --git a/examples/C/zhash.c b/examples/C/zhash.c new file mode 100644 index 000000000..c56934a1a --- /dev/null +++ b/examples/C/zhash.c @@ -0,0 +1,504 @@ +/* ========================================================================= + zfl_hash.h - ZFL singly-linked hash class + + ------------------------------------------------------------------------- + Copyright (c) 1991-2011 iMatix Corporation + Copyright other contributors as noted in the AUTHORS file. + + This file is part of the ZeroMQ Function Library: http://zfl.zeromq.org + + This is free software; you can redistribute it and/or modify it under the + terms of the GNU Lesser General Public License as published by the Free + Software Foundation; either version 3 of the License, or (at your option) + any later version. + + This software is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL- + ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General + Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . + ========================================================================= +*/ + +#ifndef __ZFL_HASH_H_INCLUDED__ +#define __ZFL_HASH_H_INCLUDED__ + +#ifdef __cplusplus +extern "C" { +#endif + +// Callback function for zfl_hash_apply method +typedef int (zfl_hash_apply_fn) (char *key, void *value, void *argument); +// Callback function for zfl_hash_freefn method +typedef void (zfl_hash_free_fn) (void *data); + +// Opaque class structure +typedef struct _zfl_hash zfl_hash_t; + +zfl_hash_t * + zfl_hash_new (void); +void + zfl_hash_destroy (zfl_hash_t **self_p); +int + zfl_hash_insert (zfl_hash_t *self, char *key, void *value); +void + zfl_hash_delete (zfl_hash_t *self, char *key); +void * + zfl_hash_lookup (zfl_hash_t *self, char *key); +void * + zfl_hash_freefn (zfl_hash_t *self, char *key, zfl_hash_free_fn *free_fn); +size_t + zfl_hash_size (zfl_hash_t *self); +int + zfl_hash_apply (zfl_hash_t *self, zfl_hash_apply_fn *callback, void *argument); +void + zfl_hash_test (int verbose); + +#ifdef __cplusplus +} +#endif + +#endif +/* ========================================================================= + zfl_hash.h - hash table + + Expandable hash table container + + Note that it's relatively slow (~50k insertions/deletes per second), so + don't do inserts/updates on the critical path for message I/O. It can + do ~2.5M lookups per second for 16-char keys. Timed on a 1.6GHz CPU. + + ------------------------------------------------------------------------- + Copyright (c) 1991-2011 iMatix Corporation + Copyright other contributors as noted in the AUTHORS file. + + This file is part of the ZeroMQ Function Library: http://zfl.zeromq.org + + This is free software; you can redistribute it and/or modify it under the + terms of the GNU Lesser General Public License as published by the Free + Software Foundation; either version 3 of the License, or (at your option) + any later version. + + This software is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL- + ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General + Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . + ========================================================================= +*/ + +#include "../include/zfl_prelude.h" +#include "../include/zfl_hash.h" + +// Hash table performance parameters + +#define INITIAL_SIZE 255 // Initial size in items +#define LOAD_FACTOR 75 // Percent loading before splitting +#define GROWTH_FACTOR 200 // Increase in % after splitting + + +// Hash item, used internally only + +typedef struct _item_t item_t; +struct _item_t { + void + *value; // Opaque item value + item_t + *next; // Next item in the hash slot + qbyte + index; // Index of item in table + char + *key; // Item's original key + zfl_hash_free_fn + *free_fn; // Value free function if any +}; + +// Hash table structure + +struct _zfl_hash { + size_t + size; // Current size of hash table + size_t + limit; // Current hash table limit + item_t + **items; // Array of items + uint + cached_index; // Avoids duplicate hash calculations +}; + + +// -------------------------------------------------------------------------- +// Local helper function +// Compute hash for key string + +static uint +s_item_hash (char *key, size_t limit) +{ + uint + key_hash = 0; + + // Torek hashing function + while (*key) { + key_hash *= 33; + key_hash += *key; + key++; + } + key_hash %= limit; + return key_hash; +} + + +// -------------------------------------------------------------------------- +// Local helper function +// Lookup item in hash table, returns item or NULL + +static item_t * +s_item_lookup (zfl_hash_t *self, char *key) +{ + // Look in bucket list for item by key + self->cached_index = s_item_hash (key, self->limit); + item_t *item = self->items [self->cached_index]; + while (item) { + if (streq (item->key, key)) + break; + item = item->next; + } + return item; +} + + +// -------------------------------------------------------------------------- +// Local helper function +// Insert new item into hash table, returns item +// If item already existed, returns NULL + +static item_t * +s_item_insert (zfl_hash_t *self, char *key, void *value) +{ + // Check that item does not already exist in hash table + // Leaves self->cached_index with calculated hash value + item_t *item = s_item_lookup (self, key); + if (item == NULL) { + item = (item_t *) zmalloc (sizeof (item_t)); + item->value = value; + item->key = strdup (key); + item->index = self->cached_index; + // Insert into start of bucket list + item->next = self->items [self->cached_index]; + self->items [self->cached_index] = item; + self->size++; + } + else + item = NULL; // Signal duplicate insertion + return item; +} + + +// -------------------------------------------------------------------------- +// Local helper function +// Destroy item in hash table, item must exist in table + +static void +s_item_destroy (zfl_hash_t *self, item_t *item) +{ + // Find previous item since it's a singly-linked list + item_t *cur_item = self->items [item->index]; + item_t **prev_item = &(self->items [item->index]); + while (cur_item) { + if (cur_item == item) + break; + prev_item = &(cur_item->next); + cur_item = cur_item->next; + } + assert (cur_item); + *prev_item = item->next; + self->size--; + if (item->free_fn) + (item->free_fn) (item->value); + free (item->key); + free (item); +} + + +// -------------------------------------------------------------------------- +// Hash table constructor + +zfl_hash_t * +zfl_hash_new (void) +{ + zfl_hash_t *self = (zfl_hash_t *) zmalloc (sizeof (zfl_hash_t)); + self->limit = INITIAL_SIZE; + self->items = (item_t **) zmalloc (sizeof (item_t *) * self->limit); + return self; +} + + +// -------------------------------------------------------------------------- +// Hash table destructor + +void +zfl_hash_destroy (zfl_hash_t **self_p) +{ + assert (self_p); + if (*self_p) { + zfl_hash_t *self = *self_p; + uint index; + for (index = 0; index < self->limit; index++) { + // Destroy all items in this hash bucket + item_t *cur_item = self->items [index]; + while (cur_item) { + item_t *next_item = cur_item->next; + s_item_destroy (self, cur_item); + cur_item = next_item; + } + } + if (self->items) + free (self->items); + + free (self); + *self_p = NULL; + } +} + + +// -------------------------------------------------------------------------- +// Insert item into hash table with specified key and value +// If key is already present returns -1 and leaves existing item unchanged +// Returns 0 on success. + +int +zfl_hash_insert (zfl_hash_t *self, char *key, void *value) +{ + assert (self); + assert (key); + + // If we're exceeding the load factor of the hash table, + // resize it according to the growth factor + if (self->size >= self->limit * LOAD_FACTOR / 100) { + item_t + *cur_item, + *next_item; + item_t + **new_items; + size_t + new_limit; + qbyte + index, + new_index; + + // Create new hash table + new_limit = self->limit * GROWTH_FACTOR / 100; + new_items = (item_t **) zmalloc (sizeof (item_t *) * new_limit); + + // Move all items to the new hash table, rehashing to + // take into account new hash table limit + for (index = 0; index != self->limit; index++) { + cur_item = self->items [index]; + while (cur_item) { + next_item = cur_item->next; + new_index = s_item_hash (cur_item->key, new_limit); + cur_item->index = new_index; + cur_item->next = new_items [new_index]; + new_items [new_index] = cur_item; + cur_item = next_item; + } + } + // Destroy old hash table + free (self->items); + self->items = new_items; + self->limit = new_limit; + } + return s_item_insert (self, key, value)? 0: -1; +} + + +// -------------------------------------------------------------------------- +// Remove an item specified by key from the hash table. If there was no such +// item, this function does nothing. + +void +zfl_hash_delete (zfl_hash_t *self, char *key) +{ + assert (self); + assert (key); + + item_t *item = s_item_lookup (self, key); + if (item) + s_item_destroy (self, item); +} + + +// -------------------------------------------------------------------------- +// Look for item in hash table and return its value, or NULL + +void * +zfl_hash_lookup (zfl_hash_t *self, char *key) +{ + assert (self); + assert (key); + + item_t *item = s_item_lookup (self, key); + if (item) + return item->value; + else + return NULL; +} + + +// -------------------------------------------------------------------------- +// Set a free function for the specified hash table item. When the item is +// destroyed, the free function, if any, is called on that item value. +// Use this when hash item values are dynamically allocated, to ensure that +// you don't have memory leaks. You can pass 'free' or NULL as a free_fn. +// Returns the item value, or NULL if there is no such item. + +void * +zfl_hash_freefn (zfl_hash_t *self, char *key, zfl_hash_free_fn *free_fn) +{ + assert (self); + assert (key); + + item_t *item = s_item_lookup (self, key); + if (item) { + item->free_fn = free_fn; + return item->value; + } + else + return NULL; +} + + +// -------------------------------------------------------------------------- +// Return size of hash table + +size_t +zfl_hash_size (zfl_hash_t *self) +{ + assert (self); + return self->size; +} + + +// -------------------------------------------------------------------------- +// Apply function to each item in the hash table. Items are iterated in no +// defined order. Stops if callback function returns non-zero and returns +// final return code from callback function (zero = success). + +int +zfl_hash_apply (zfl_hash_t *self, zfl_hash_apply_fn *callback, void *argument) +{ + assert (self); + uint + index; + item_t + *item; + int + rc = 0; + + for (index = 0; index != self->limit; index++) { + item = self->items [index]; + while (item) { + // Invoke callback, passing item properties and argument + rc = callback (item->key, item->value, argument); + if (rc) + break; // End if non-zero return code + item = item->next; + } + } + return rc; +} + + +// -------------------------------------------------------------------------- +// Runs selftest of class + +void +zfl_hash_test (int verbose) +{ + printf (" * zfl_hash: "); + + zfl_hash_t *hash = zfl_hash_new (); + assert (hash); + assert (zfl_hash_size (hash) == 0); + + // Insert some values + int rc; + rc = zfl_hash_insert (hash, "DEADBEEF", (void *) 0xDEADBEEF); + assert (rc == 0); + rc = zfl_hash_insert (hash, "ABADCAFE", (void *) 0xABADCAFE); + assert (rc == 0); + rc = zfl_hash_insert (hash, "C0DEDBAD", (void *) 0xC0DEDBAD); + assert (rc == 0); + rc = zfl_hash_insert (hash, "DEADF00D", (void *) 0xDEADF00D); + assert (rc == 0); + assert (zfl_hash_size (hash) == 4); + + // Look for existing values + void *value; + value = zfl_hash_lookup (hash, "DEADBEEF"); + assert (value == (void *) 0xDEADBEEF); + value = zfl_hash_lookup (hash, "ABADCAFE"); + assert (value == (void *) 0xABADCAFE); + value = zfl_hash_lookup (hash, "C0DEDBAD"); + assert (value == (void *) 0xC0DEDBAD); + value = zfl_hash_lookup (hash, "DEADF00D"); + assert (value == (void *) 0xDEADF00D); + + // Look for non-existent values + value = zfl_hash_lookup (hash, "0xF0000000"); + assert (value == NULL); + + // Try to insert duplicate values + rc = zfl_hash_insert (hash, "DEADBEEF", (void *) 0xF0000000); + assert (rc == -1); + value = zfl_hash_lookup (hash, "DEADBEEF"); + assert (value == (void *) 0xDEADBEEF); + + // Delete a value + zfl_hash_delete (hash, "DEADBEEF"); + value = zfl_hash_lookup (hash, "DEADBEEF"); + assert (value == NULL); + assert (zfl_hash_size (hash) == 3); + + // Check that the queue is robust against random usage + struct { + char name [100]; + Bool exists; + } testset [200]; + memset (testset, 0, sizeof (testset)); + + int + testmax = 200, + testnbr, + iteration; + + srandom ((unsigned) time (NULL)); + for (iteration = 0; iteration < 25000; iteration++) { + testnbr = randof (testmax); + if (testset [testnbr].exists) { + value = zfl_hash_lookup (hash, testset [testnbr].name); + assert (value); + zfl_hash_delete (hash, testset [testnbr].name); + testset [testnbr].exists = FALSE; + } + else { + sprintf (testset [testnbr].name, "%x-%x", rand (), rand ()); + if (zfl_hash_insert (hash, testset [testnbr].name, "") == 0) + testset [testnbr].exists = TRUE; + } + } + // Test 1M lookups + for (iteration = 0; iteration < 1000000; iteration++) + value = zfl_hash_lookup (hash, "DEADBEEFABADCAFE"); + + // Destructor should be safe to call twice + zfl_hash_destroy (&hash); + zfl_hash_destroy (&hash); + assert (hash == NULL); + + printf ("OK\n"); +} diff --git a/examples/C/zlist.c b/examples/C/zlist.c new file mode 100644 index 000000000..951be8ecc --- /dev/null +++ b/examples/C/zlist.c @@ -0,0 +1,315 @@ +/* ========================================================================= + zfl_list.h - ZFL singly-linked list class + + ------------------------------------------------------------------------- + Copyright (c) 1991-2011 iMatix Corporation + Copyright other contributors as noted in the AUTHORS file. + + This file is part of the ZeroMQ Function Library: http://zfl.zeromq.org + + This is free software; you can redistribute it and/or modify it under the + terms of the GNU Lesser General Public License as published by the Free + Software Foundation; either version 3 of the License, or (at your option) + any later version. + + This software is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL- + ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General + Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . + ========================================================================= +*/ + +#ifndef __ZFL_LIST_H_INCLUDED__ +#define __ZFL_LIST_H_INCLUDED__ + +#ifdef __cplusplus +extern "C" { +#endif + +// Opaque class structure +typedef struct _zfl_list zfl_list_t; + +zfl_list_t * + zfl_list_new (void); +void + zfl_list_destroy (zfl_list_t **self_p); +void * + zfl_list_first (zfl_list_t *self); +void + zfl_list_append (zfl_list_t *self, void *value); +void + zfl_list_push (zfl_list_t *self, void *value); +void + zfl_list_remove (zfl_list_t *self, void *value); +zfl_list_t * + zfl_list_copy (zfl_list_t *self); +size_t + zfl_list_size (zfl_list_t *self); +void + zfl_list_test (int verbose); + +#ifdef __cplusplus +} +#endif + +#endif +/* ========================================================================= + zfl_list.c - singly-linked list container + + Provides a generic container implementing a fast singly-linked list. You + can use this to construct multi-dimensional lists, and other structures + together with other generic containers like zfl_hash. + + ------------------------------------------------------------------------- + Copyright (c) 1991-2011 iMatix Corporation + Copyright other contributors as noted in the AUTHORS file. + + This file is part of the ZeroMQ Function Library: http://zfl.zeromq.org + + This is free software; you can redistribute it and/or modify it under the + terms of the GNU Lesser General Public License as published by the Free + Software Foundation; either version 3 of the License, or (at your option) + any later version. + + This software is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL- + ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General + Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . + ========================================================================= +*/ + +#include "../include/zfl_prelude.h" +#include "../include/zfl_list.h" + +// List node, used internally only + +struct node_t { + struct node_t + *next; + void + *value; +}; + +// Actual list object + +struct _zfl_list { + struct node_t + *head, *tail; + size_t + size; +}; + + +// -------------------------------------------------------------------------- +// List constructor + +zfl_list_t * +zfl_list_new (void) +{ + zfl_list_t *self = (zfl_list_t *) zmalloc (sizeof (zfl_list_t)); + return self; +} + + +// -------------------------------------------------------------------------- +// List destructor + +void +zfl_list_destroy (zfl_list_t **self_p) +{ + assert (self_p); + if (*self_p) { + zfl_list_t *self = *self_p; + struct node_t *node, *next; + for (node = (*self_p)->head; node != NULL; node = next) { + next = node->next; + free (node); + } + free (self); + *self_p = NULL; + } +} + + +// -------------------------------------------------------------------------- +// Return the value at the head of list. If the list is empty, returns NULL. +// Note that this function does not remove the value from the list. + +void * +zfl_list_first (zfl_list_t *self) +{ + assert (self); + if (self->head) + return self->head->value; + else + return NULL; +} + + +// -------------------------------------------------------------------------- +// Add value to the end of the list + +void +zfl_list_append (zfl_list_t *self, void *value) +{ + struct node_t *node; + node = (struct node_t *) zmalloc (sizeof (struct node_t)); + node->value = value; + if (self->tail) + self->tail->next = node; + else + self->head = node; + self->tail = node; + node->next = NULL; + self->size++; +} + + +// -------------------------------------------------------------------------- +// Insert value at the beginning of the list + +void +zfl_list_push (zfl_list_t *self, void *value) +{ + struct node_t *node; + node = (struct node_t *) zmalloc (sizeof (struct node_t)); + node->value = value; + node->next = self->head; + self->head = node; + if (self->tail == NULL) + self->tail = node; + self->size++; +} + + +// -------------------------------------------------------------------------- +// Remove the value value from the list. The value must be stored in the list. +// The function does not deallocate the memory pointed to by the removed value. + +void +zfl_list_remove (zfl_list_t *self, void *value) +{ + struct node_t *node, *prev = NULL; + + // First off, we need to find the list node. + for (node = self->head; node != NULL; node = node->next) { + if (node->value == value) + break; + prev = node; + } + assert (node); + + if (prev) + prev->next = node->next; + else + self->head = node->next; + + if (node->next == NULL) + self->tail = prev; + + free (node); + self->size--; +} + + +// -------------------------------------------------------------------------- +// Make copy of itself + +zfl_list_t * +zfl_list_copy (zfl_list_t *self) +{ + if (!self) + return NULL; + + zfl_list_t *copy = zfl_list_new (); + assert (copy); + + struct node_t *node; + for (node = self->head; node; node = node->next) + zfl_list_append (copy, node->value); + return copy; +} + + +// -------------------------------------------------------------------------- +// Return the number of items in the list + +size_t +zfl_list_size (zfl_list_t *self) +{ + return self->size; +} + + +// -------------------------------------------------------------------------- +// Runs selftest of class + +void +zfl_list_test (int verbose) +{ + printf (" * zfl_list: "); + + zfl_list_t *list = zfl_list_new (); + assert (list); + assert (zfl_list_size (list) == 0); + + // Three values we'll use as test data + // List values are void *, not particularly strings + char *cheese = "boursin"; + char *bread = "baguette"; + char *wine = "bordeaux"; + + zfl_list_append (list, cheese); + assert (zfl_list_size (list) == 1); + zfl_list_append (list, bread); + assert (zfl_list_size (list) == 2); + zfl_list_append (list, wine); + assert (zfl_list_size (list) == 3); + + assert (zfl_list_first (list) == cheese); + assert (zfl_list_size (list) == 3); + zfl_list_remove (list, wine); + assert (zfl_list_size (list) == 2); + + assert (zfl_list_first (list) == cheese); + zfl_list_remove (list, cheese); + assert (zfl_list_size (list) == 1); + assert (zfl_list_first (list) == bread); + + zfl_list_remove (list, bread); + assert (zfl_list_size (list) == 0); + + zfl_list_push (list, cheese); + assert (zfl_list_size (list) == 1); + assert (zfl_list_first (list) == cheese); + + zfl_list_push (list, bread); + assert (zfl_list_size (list) == 2); + assert (zfl_list_first (list) == bread); + + zfl_list_append (list, wine); + assert (zfl_list_size (list) == 3); + assert (zfl_list_first (list) == bread); + + zfl_list_remove (list, bread); + assert (zfl_list_first (list) == cheese); + + zfl_list_remove (list, cheese); + assert (zfl_list_first (list) == wine); + + zfl_list_remove (list, wine); + assert (zfl_list_size (list) == 0); + + // Destructor should be safe to call twice + zfl_list_destroy (&list); + zfl_list_destroy (&list); + assert (list == NULL); + + printf ("OK\n"); +}