Skip to content

Commit

Permalink
Added fragments
Browse files Browse the repository at this point in the history
  • Loading branch information
hintjens committed Oct 22, 2012
1 parent 00eb8af commit a0703ac
Show file tree
Hide file tree
Showing 54 changed files with 578 additions and 0 deletions.
3 changes: 3 additions & 0 deletions fragments/C/binding.c
@@ -0,0 +1,3 @@
zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");
22 changes: 22 additions & 0 deletions fragments/C/bstar.c
@@ -0,0 +1,22 @@
// Create a new Binary Star instance, using local (bind) and
// remote (connect) endpoints to set-up the server peering.
bstar_t *bstar_new (int primary, char *local, char *remote);

// Destroy a Binary Star instance
void bstar_destroy (bstar_t **self_p);

// Return underlying zloop reactor, for timer and reader
// registration and cancelation.
zloop_t *bstar_zloop (bstar_t *self);

// Register voting reader
int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);

// Register main state change handlers
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);

// Start the reactor, ends if a callback function returns -1, or the
// process received SIGINT or SIGTERM.
int bstar_start (bstar_t *self);
22 changes: 22 additions & 0 deletions fragments/C/bstarcli.c
@@ -0,0 +1,22 @@
// Create a new Binary Star instance, using local (bind) and
// remote (connect) endpoints to set-up the server peering.
bstar_t *bstar_new (int primary, char *local, char *remote);

// Destroy a Binary Star instance
void bstar_destroy (bstar_t **self_p);

// Return underlying zloop reactor, for timer and reader
// registration and cancelation.
zloop_t *bstar_zloop (bstar_t *self);

// Register voting reader
int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);

// Register main state change handlers
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);

// Start the reactor, ends if a callback function returns -1, or the
// process received SIGINT or SIGTERM.
int bstar_start (bstar_t *self);
9 changes: 9 additions & 0 deletions fragments/C/cloneapi1.c
@@ -0,0 +1,9 @@
// Specify endpoints for each socket we need
clone_subscribe (clone, "tcp://localhost:5556");
clone_snapshot (clone, "tcp://localhost:5557");
clone_updates (clone, "tcp://localhost:5558");

// Times two, since we have two servers
clone_subscribe (clone, "tcp://localhost:5566");
clone_snapshot (clone, "tcp://localhost:5567");
clone_updates (clone, "tcp://localhost:5568");
3 changes: 3 additions & 0 deletions fragments/C/cloneapi2.c
@@ -0,0 +1,3 @@
// Specify primary and backup servers
clone_connect (clone, "tcp://localhost:5551");
clone_connect (clone, "tcp://localhost:5561");
5 changes: 5 additions & 0 deletions fragments/C/cloneapi3.c
@@ -0,0 +1,5 @@
clone_t *clone_new (void);
void clone_destroy (clone_t **self_p);
void clone_connect (clone_t *self, char *address, char *service);
void clone_set (clone_t *self, char *key, char *value);
char *clone_get (clone_t *self, char *key);
9 changes: 9 additions & 0 deletions fragments/C/clonesrv5.c
@@ -0,0 +1,9 @@
// Specify endpoints for each socket we need
clone_subscribe (clone, "tcp://localhost:5556");
clone_snapshot (clone, "tcp://localhost:5557");
clone_updates (clone, "tcp://localhost:5558");

// Times two, since we have two servers
clone_subscribe (clone, "tcp://localhost:5566");
clone_snapshot (clone, "tcp://localhost:5567");
clone_updates (clone, "tcp://localhost:5568");
9 changes: 9 additions & 0 deletions fragments/C/errorhandling.c
@@ -0,0 +1,9 @@
void *context = zmq_ctx_new ();
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:5555");
if (rc != 0) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;
}
12 changes: 12 additions & 0 deletions fragments/C/fileio3.c
@@ -0,0 +1,12 @@
#include "czmq.h"
#include "nom_server.h"

int main (int argc, char *argv [])
{
printf ("Starting NOM protocol server on port 6000...\n");
nom_server_t *server = nom_server_new ();
nom_server_bind (server, "tcp://*:6000");
nom_server_wait (server);
nom_server_destroy (&server);
return 0;
}
15 changes: 15 additions & 0 deletions fragments/C/filemq-main.c
@@ -0,0 +1,15 @@
fmq_server_t *server = fmq_server_new ();
fmq_server_bind (server, "tcp://*:6000");
fmq_server_publish (server, "/home/ph/filemq/share", "/public");
fmq_server_publish (server, "/home/ph/photos/stream", "/photostream");

fmq_client_t *client = fmq_client_new ();
fmq_client_connect (client, "tcp://pieter.filemq.org:6000");
fmq_client_subscribe (server, "/public/", "/home/ph/filemq/share");
fmq_client_subscribe (server, "/photostream/", "/home/ph/photos/stream");

while (!zctx_interrupted)
sleep (1);

fmq_server_destroy (&server);
fmq_client_destroy (&client);
5 changes: 5 additions & 0 deletions fragments/C/fmq-server-api.c
@@ -0,0 +1,5 @@
server = fmq_server_new ();
fmq_server_configure (server, "server_test.cfg");
fmq_server_publish (server, "./fmqroot/send", "/");
fmq_server_publish (server, "./fmqroot/logs", "/logs");
fmq_server_bind (server, "tcp://*:6000");
3 changes: 3 additions & 0 deletions fragments/C/fsm-instance.c
@@ -0,0 +1,3 @@
event_t next_event; // Next event
state_t state; // Current state
event_t event; // Current event
36 changes: 36 additions & 0 deletions fragments/C/gsl-client-fsm.c
@@ -0,0 +1,36 @@
client_execute (client_t *self, int event)
{
self->next_event = event;
while (self->next_event) {
self->event = self->next_event;
self->next_event = 0;
switch (self->state) {
.for class.state
case $(name:c)_state:
. for event
. if index () > 1
else
. endif
if (self->event == $(name:c)_event) {
. for action
. if name = "send"
zmsg_addstr (self->reply, "$(message:)");
. else
$(name:c)_action (self);
. endif
. endfor
. if defined (event.next)
self->state = $(next:c)_state;
. endif
}
. endfor
break;
.endfor
}
if (zmsg_size (self->reply) > 1) {
zmsg_send (&self->reply, self->router);
self->reply = zmsg_new ();
zmsg_add (self->reply, zframe_dup (self->address));
}
}
}
14 changes: 14 additions & 0 deletions fragments/C/heartbeats.c
@@ -0,0 +1,14 @@
// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

while (true) {
...
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
...
// Send heartbeat to queue if it's time
if (zclock_time () > heartbeat_at) {
... Send heartbeats to all peers that expect them
// Set timer for next heartbeat
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
}
6 changes: 6 additions & 0 deletions fragments/C/highreader.c
@@ -0,0 +1,6 @@
while (true) {
zmsg_t *zmsg = zmsg_recv (worker);
zframe_print (zmsg_last (zmsg), "Worker: ");
zframe_reset (zmsg_last (zmsg), "OK", 2);
zmsg_send (&zmsg, worker);
}
9 changes: 9 additions & 0 deletions fragments/C/interface.c
@@ -0,0 +1,9 @@
#define BEACON_PROTOCOL "ZRE"
#define BEACON_VERSION 0x01

typedef struct {
byte protocol [3];
byte version;
uuid_t uuid;
uint16_t port;
} beacon_t;
9 changes: 9 additions & 0 deletions fragments/C/interrupt.c
@@ -0,0 +1,9 @@
while (true) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
4 changes: 4 additions & 0 deletions fragments/C/iothreads.c
@@ -0,0 +1,4 @@
int io_threads = 4;
void *context = zmq_ctx_new ();
zmq_ctx_set (context, ZMQ_IO_THREADS, io_threads);
assert (zmq_ctx_get (context, ZMQ_IO_THREADS) == io_threads);
7 changes: 7 additions & 0 deletions fragments/C/killsignal.c
@@ -0,0 +1,7 @@
void *control = zmq_socket (context, ZMQ_PUB);
zmq_bind (control, "tcp://*:5559");
...
// Send kill signal to workers
zmq_msg_init_data (&message, "KILL", 5);
zmq_msg_send (control, &message, 0);
zmq_msg_close (&message);
1 change: 1 addition & 0 deletions fragments/C/kvmsg.c
@@ -0,0 +1 @@
kvmsg_set_prop (kvmsg, "ttl", "%d", randof (30));
1 change: 1 addition & 0 deletions fragments/C/kvsetttl.c
@@ -0,0 +1 @@
kvmsg_set_prop (kvmsg, "ttl", "%d", randof (30));
21 changes: 21 additions & 0 deletions fragments/C/listing_5.c
@@ -0,0 +1,21 @@
void *mousetrap;

// Create socket for catching mice
mousetrap = zmq_socket (context, ZMQ_PULL);

// Configure the socket
int64_t jawsize = 10000;
zmq_setsockopt (mousetrap, ZMQ_HWM, &jawsize, sizeof jawsize);

// Plug socket into mouse hole
zmq_connect (mousetrap, "tcp://192.168.55.221:5001");

// Wait for juicy mouse to arrive
zmq_msg_t mouse;
zmq_msg_init (&mouse);
zmq_msg_recv (&mouse, mousetrap, 0);
// Destroy the mouse
zmq_msg_close (&mouse);

// Destroy the socket
zmq_close (mousetrap);
32 changes: 32 additions & 0 deletions fragments/C/lowreader.c
@@ -0,0 +1,32 @@
while (true) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
zmq_msg_t address;
zmq_msg_init (&address);
zmq_msg_recv (worker, &address, 0);

zmq_msg_t empty;
zmq_msg_init (&empty);
zmq_msg_recv (worker, &empty, 0);

// Get request, send reply
zmq_msg_t payload;
zmq_msg_init (&payload);
zmq_msg_recv (worker, &payload, 0);

int char_nbr;
printf ("Worker: ");
for (char_nbr = 0; char_nbr < zmq_msg_size (&payload); char_nbr++)
printf ("%c", *(char *) (zmq_msg_data (&payload) + char_nbr));
printf ("\n");

zmq_msg_init_size (&payload, 2);
memcpy (zmq_msg_data (&payload), "OK", 2);

zmq_msg_send (worker, &address, ZMQ_SNDMORE);
zmq_close (&address);
zmq_msg_send (worker, &empty, ZMQ_SNDMORE);
zmq_close (&empty);
zmq_msg_send (worker, &payload, 0);
zmq_close (&payload);
}
18 changes: 18 additions & 0 deletions fragments/C/lruqueue.c
@@ -0,0 +1,18 @@
while (true) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);

// Get request, send reply
char *request = s_recv (worker);
printf ("Worker: %s\n", request);
free (request);

s_sendmore (worker, address);
s_sendmore (worker, "");
s_send (worker, "OK");
free (address);
}
9 changes: 9 additions & 0 deletions fragments/C/lruqueue2.c
@@ -0,0 +1,9 @@
while (true) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
18 changes: 18 additions & 0 deletions fragments/C/lrureader.c
@@ -0,0 +1,18 @@
while (true) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);

// Get request, send reply
char *request = s_recv (worker);
printf ("Worker: %s\n", request);
free (request);

s_sendmore (worker, address);
s_sendmore (worker, "");
s_send (worker, "OK");
free (address);
}
4 changes: 4 additions & 0 deletions fragments/C/maxsockets.c
@@ -0,0 +1,4 @@
int max_sockets = 1024;
void *context = zmq_ctx_new ();
zmq_ctx_get (context, ZMQ_MAX_SOCKETS, max_sockets);
assert (zmq_ctx_get (context, ZMQ_MAX_SOCKETS) == max_sockets);
4 changes: 4 additions & 0 deletions fragments/C/mdclient-async.c
@@ -0,0 +1,4 @@
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t *mdcli_recv (mdcli_t *self);
3 changes: 3 additions & 0 deletions fragments/C/mdclient.c
@@ -0,0 +1,3 @@
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
3 changes: 3 additions & 0 deletions fragments/C/mdworker.c
@@ -0,0 +1,3 @@
mdwrk_t *mdwrk_new (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
21 changes: 21 additions & 0 deletions fragments/C/mousetrap.c
@@ -0,0 +1,21 @@
void *mousetrap;

// Create socket for catching mice
mousetrap = zmq_socket (context, ZMQ_PULL);

// Configure the socket
int64_t jawsize = 10000;
zmq_setsockopt (mousetrap, ZMQ_HWM, &jawsize, sizeof jawsize);

// Plug socket into mouse hole
zmq_connect (mousetrap, "tcp://192.168.55.221:5001");

// Wait for juicy mouse to arrive
zmq_msg_t mouse;
zmq_msg_init (&mouse);
zmq_msg_recv (&mouse, mousetrap, 0);
// Destroy the mouse
zmq_msg_close (&mouse);

// Destroy the socket
zmq_close (mousetrap);
9 changes: 9 additions & 0 deletions fragments/C/mspoller.c
@@ -0,0 +1,9 @@
void *context = zmq_ctx_new ();
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:5555");
if (rc != 0) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;
}
12 changes: 12 additions & 0 deletions fragments/C/nomserver.c
@@ -0,0 +1,12 @@
#include "czmq.h"
#include "nom_server.h"

int main (int argc, char *argv [])
{
printf ("Starting NOM protocol server on port 6000...\n");
nom_server_t *server = nom_server_new ();
nom_server_bind (server, "tcp://*:6000");
nom_server_wait (server);
nom_server_destroy (&server);
return 0;
}
2 changes: 2 additions & 0 deletions fragments/C/polling.c
@@ -0,0 +1,2 @@
if (zmq_poll (items, 2, 1000 * 1000) == -1)
break; // Interrupted

0 comments on commit a0703ac

Please sign in to comment.