Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
361 lines (309 sloc) 10.9 KB
/* =====================================================================
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "flcliapi.h"
// If no server replies within this time, abandon request
#define GLOBAL_TIMEOUT 3000 // msecs
// PING interval for servers we think are alive
#define PING_INTERVAL 2000 // msecs
// Server considered dead if silent for this long
#define SERVER_TTL 6000 // msecs
// =====================================================================
// Synchronous part, works in our application thread
// ---------------------------------------------------------------------
// Structure of our class
struct _flcliapi_t {
zctx_t *ctx; // Our context wrapper
void *pipe; // Pipe through to flcliapi agent
};
// This is the thread that handles our real flcliapi class
static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe);
// ---------------------------------------------------------------------
// Constructor
flcliapi_t *
flcliapi_new (void)
{
flcliapi_t
*self;
self = (flcliapi_t *) zmalloc (sizeof (flcliapi_t));
self->ctx = zctx_new ();
self->pipe = zthread_fork (self->ctx, flcliapi_agent, NULL);
return self;
}
// ---------------------------------------------------------------------
// Destructor
void
flcliapi_destroy (flcliapi_t **self_p)
{
assert (self_p);
if (*self_p) {
flcliapi_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Connect to new server endpoint
// Sends [CONNECT][endpoint] to the agent
void
flcliapi_connect (flcliapi_t *self, char *endpoint)
{
assert (self);
assert (endpoint);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "CONNECT");
zmsg_addstr (msg, endpoint);
zmsg_send (&msg, self->pipe);
zclock_sleep (100); // Allow connection to come up
}
// ---------------------------------------------------------------------
// Send & destroy request, get reply
zmsg_t *
flcliapi_request (flcliapi_t *self, zmsg_t **request_p)
{
assert (self);
assert (*request_p);
zmsg_pushstr (*request_p, "REQUEST");
zmsg_send (request_p, self->pipe);
zmsg_t *reply = zmsg_recv (self->pipe);
if (reply) {
char *status = zmsg_popstr (reply);
if (streq (status, "FAILED"))
zmsg_destroy (&reply);
free (status);
}
return reply;
}
// =====================================================================
// Asynchronous part, works in the background
// ---------------------------------------------------------------------
// Simple class for one server we talk to
typedef struct {
char *endpoint; // Server identity/endpoint
uint alive; // 1 if known to be alive
int64_t ping_at; // Next ping at this time
int64_t expires; // Expires at this time
} server_t;
server_t *
server_new (char *endpoint)
{
server_t *self = (server_t *) zmalloc (sizeof (server_t));
self->endpoint = strdup (endpoint);
self->alive = 0;
self->ping_at = zclock_time () + PING_INTERVAL;
self->expires = zclock_time () + SERVER_TTL;
return self;
}
void
server_destroy (server_t **self_p)
{
assert (self_p);
if (*self_p) {
server_t *self = *self_p;
free (self->endpoint);
free (self);
*self_p = NULL;
}
}
int
server_ping (char *key, void *server, void *socket)
{
server_t *self = (server_t *) server;
if (zclock_time () >= self->ping_at) {
zmsg_t *ping = zmsg_new ();
zmsg_addstr (ping, self->endpoint);
zmsg_addstr (ping, "PING");
zmsg_send (&ping, socket);
self->ping_at = zclock_time () + PING_INTERVAL;
}
return 0;
}
int
server_tickless (char *key, void *server, void *arg)
{
server_t *self = (server_t *) server;
uint64_t *tickless = (uint64_t *) arg;
if (*tickless > self->ping_at)
*tickless = self->ping_at;
return 0;
}
// ---------------------------------------------------------------------
// Simple class for one background agent
typedef struct {
zctx_t *ctx; // Own context
void *pipe; // Socket to talk back to application
void *router; // Socket to talk to servers
zhash_t *servers; // Servers we've connected to
zlist_t *actives; // Servers we know are alive
uint sequence; // Number of requests ever sent
zmsg_t *request; // Current request if any
zmsg_t *reply; // Current reply if any
int64_t expires; // Timeout for request/reply
} agent_t;
agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
self->servers = zhash_new ();
self->actives = zlist_new ();
return self;
}
void
agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
zhash_destroy (&self->servers);
zlist_destroy (&self->actives);
zmsg_destroy (&self->request);
zmsg_destroy (&self->reply);
free (self);
*self_p = NULL;
}
}
// Callback when we remove server from agent 'servers' hash table
static void
s_server_free (void *argument)
{
server_t *server = (server_t *) argument;
server_destroy (&server);
}
void
agent_control_message (agent_t *self)
{
zmsg_t *msg = zmsg_recv (self->pipe);
char *command = zmsg_popstr (msg);
if (streq (command, "CONNECT")) {
char *endpoint = zmsg_popstr (msg);
printf ("I: connecting to %s...\n", endpoint);
int rc = zmq_connect (self->router, endpoint);
assert (rc == 0);
server_t *server = server_new (endpoint);
zhash_insert (self->servers, endpoint, server);
zhash_freefn (self->servers, endpoint, s_server_free);
zlist_append (self->actives, server);
server->ping_at = zclock_time () + PING_INTERVAL;
server->expires = zclock_time () + SERVER_TTL;
free (endpoint);
}
else
if (streq (command, "REQUEST")) {
assert (!self->request); // Strict request-reply cycle
// Prefix request with sequence number and empty envelope
char sequence_text [10];
sprintf (sequence_text, "%u", ++self->sequence);
zmsg_pushstr (msg, sequence_text);
// Take ownership of request message
self->request = msg;
msg = NULL;
// Request expires after global timeout
self->expires = zclock_time () + GLOBAL_TIMEOUT;
}
free (command);
zmsg_destroy (&msg);
}
void
agent_router_message (agent_t *self)
{
zmsg_t *reply = zmsg_recv (self->router);
// Frame 0 is server that replied
char *endpoint = zmsg_popstr (reply);
server_t *server =
(server_t *) zhash_lookup (self->servers, endpoint);
assert (server);
free (endpoint);
if (!server->alive) {
zlist_append (self->actives, server);
server->alive = 1;
}
server->ping_at = zclock_time () + PING_INTERVAL;
server->expires = zclock_time () + SERVER_TTL;
// Frame 1 may be sequence number for reply
char *sequence = zmsg_popstr (reply);
if (atoi (sequence) == self->sequence) {
zmsg_pushstr (reply, "OK");
zmsg_send (&reply, self->pipe);
zmsg_destroy (&self->request);
}
else
zmsg_destroy (&reply);
}
// ---------------------------------------------------------------------
// Asynchronous agent manages server pool and handles request/reply
// dialog when the application asks for it.
static void
flcliapi_agent (void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new (ctx, pipe);
zmq_pollitem_t items [] = {
{ self->pipe, 0, ZMQ_POLLIN, 0 },
{ self->router, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
// Calculate tickless timer, up to 1 hour
uint64_t tickless = zclock_time () + 1000 * 3600;
if (self->request
&& tickless > self->expires)
tickless = self->expires;
zhash_foreach (self->servers, server_tickless, &tickless);
int rc = zmq_poll (items, 2,
(tickless - zclock_time ()) * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down
if (items [0].revents & ZMQ_POLLIN)
agent_control_message (self);
if (items [1].revents & ZMQ_POLLIN)
agent_router_message (self);
// If we're processing a request, dispatch to next server
if (self->request) {
if (zclock_time () >= self->expires) {
// Request expired, kill it
zstr_send (self->pipe, "FAILED");
zmsg_destroy (&self->request);
}
else {
// Find server to talk to, remove any expired ones
while (zlist_size (self->actives)) {
server_t *server =
(server_t *) zlist_first (self->actives);
if (zclock_time () >= server->expires) {
zlist_pop (self->actives);
server->alive = 0;
}
else {
zmsg_t *request = zmsg_dup (self->request);
zmsg_pushstr (request, server->endpoint);
zmsg_send (&request, self->router);
break;
}
}
}
}
// Disconnect and delete any expired servers
// Send heartbeats to idle servers if needed
zhash_foreach (self->servers, server_ping, self->router);
}
agent_destroy (&self);
}
Something went wrong with that request. Please try again.