Skip to content

Commit

Permalink
Broker functionality and bugfixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarbeutner committed Sep 6, 2011
1 parent 47d379f commit d82de90
Show file tree
Hide file tree
Showing 11 changed files with 691 additions and 47 deletions.
125 changes: 123 additions & 2 deletions libimq/broker.c
@@ -1,7 +1,128 @@
#include <stdlib.h>
#include <string.h>
#include <zmq.h>
#include "imq.h"

typedef struct imq_broker_s {
void *zmqcontext;
imq_listener_t *downstream;
imq_socket_t *upstream;
} imq_broker_t;

typedef struct imq_device_s {
int type;
void *frontend;
void *backend;

imq_endpoint_t **endpoints;
int endpointcount;
} imq_device_t;

static void imq_broker_adv_authn_callback(imq_socket_t *sock, void *pbroker,
imq_authn_event_t eventtype, const char *username, const char *password,
int super, const char *channel) {
imq_broker_t *broker = (imq_broker_t *)pbroker;
imq_user_t *user;

/* TODO: locking */

switch (eventtype) {
case IMQ_AUTHN_ROLLBACK:
imq_listener_clear_users(broker->downstream);
break;
case IMQ_AUTHN_COMMIT:
break;
case IMQ_AUTHN_ADD:
user = imq_alloc_user(username, password, super);

if (user == NULL)
break;

(void) imq_listener_add_user(broker->downstream, user);

break;
case IMQ_AUTHN_ALLOW:
user = imq_listener_find_user(broker->downstream, username);

if (user == NULL)
break;

(void) imq_user_allow_channel(user, channel);

break;
}
}

static void imq_broker_io_thread(void *pdevice) {
imq_device_t *device = (imq_device_t *)pdevice;

zmq_device(device->type, device->frontend, device->backend);

free(device);
}

static void imq_broker_adv_endpoint_callback(imq_socket_t *sock, void *pbroker,
const char *channel, const char *instance, int zmqtype) {
imq_broker_t *broker = (imq_broker_t *)pbroker;
int zmqupstream_type, zmqdevice_type;
pthread_t thread;
imq_device_t *device;

if (imq_listener_find_endpoint(broker->downstream, channel, instance) !=
NULL)
return;

switch (zmqtype) {
case ZMQ_XREP:
zmqupstream_type = ZMQ_XREQ;
zmqdevice_type = ZMQ_QUEUE;
break;
case ZMQ_PUB:
zmqupstream_type = ZMQ_SUB;
zmqdevice_type = ZMQ_FORWARDER;
break;
case ZMQ_PUSH:
zmqupstream_type = ZMQ_PULL;
zmqdevice_type = ZMQ_STREAMER;
break;
default:
return;
}

device = (imq_device_t *)malloc(sizeof (*device));

if (device == NULL)
return;

device->type = zmqdevice_type;

device->frontend = imq_open_zmq(sock, channel, instance,
broker->zmqcontext, zmqupstream_type);

if (zmqtype == ZMQ_PUB)
zmq_setsockopt(device->frontend, ZMQ_SUBSCRIBE, NULL, 0);

device->backend = imq_listener_create_zmq_endpoint(broker->downstream,
channel, instance, broker->zmqcontext, zmqtype);

if (pthread_create(&thread, NULL, imq_broker_io_thread, device) < 0)
return;

pthread_detach(thread);
}

int imq_run_broker(imq_listener_t *downstream, imq_socket_t *upstream) {

}
imq_broker_t broker;

memset(&broker, 0, sizeof (broker));

broker.zmqcontext = zmq_init(1);
broker.downstream = downstream;
broker.upstream = upstream;

imq_socket_set_callbacks(upstream, imq_broker_adv_authn_callback,
imq_broker_adv_endpoint_callback, &broker);

while (1)
sleep(10);
}
4 changes: 3 additions & 1 deletion libimq/endpoint.c
Expand Up @@ -39,6 +39,7 @@ imq_endpoint_t *imq_alloc_endpoint(const char *channel, const char *instance) {
}

endpoint->listenerfd = -1;
endpoint->owns_path = 1;

return endpoint;
}
Expand All @@ -58,7 +59,7 @@ void imq_free_endpoint(imq_endpoint_t *endpoint) {
if (endpoint->listenerfd != -1)
close(endpoint->listenerfd);

if (endpoint->path != NULL) {
if (endpoint->path != NULL && endpoint->owns_path) {
(void) unlink(endpoint->path);
free(endpoint->path);
}
Expand Down Expand Up @@ -253,6 +254,7 @@ imq_endpoint_t *imq_shallow_clone_endpoint(imq_endpoint_t *endpoint) {

clone_endpoint->listenerfd = -1;
clone_endpoint->zmqtype = endpoint->zmqtype;
clone_endpoint->owns_path = 0;

return clone_endpoint;
}
54 changes: 43 additions & 11 deletions libimq/imq.h
Expand Up @@ -14,34 +14,43 @@ typedef struct imq_endpoint_s {
char *instance;

char *path;
int owns_path;

int listenerfd;
void *zmqsocket;
int zmqtype;

imq_circuit_t **circuits;
int circuitcount;

pthread_rwlock_t rwlock;
} imq_endpoint_t;

typedef struct imq_user_s {
char *username;
char *password;
int super;

char **channels;
int channelcount;
} imq_user_t;

struct imq_socket_s;

typedef int (*imq_authn_checkpw_cb)(const char *username, const char *password);
typedef int (*imq_authz_endpoint_cb)(struct imq_socket_s *socket,
imq_endpoint_t *endpoint);
typedef enum imq_authn_event_e {
IMQ_AUTHN_ROLLBACK,
IMQ_AUTHN_COMMIT,
IMQ_AUTHN_ADD,
IMQ_AUTHN_ALLOW
} imq_authn_event_t;

typedef void (*imq_adv_authn_cb_t)(struct imq_socket_s *sock, void *cookie,
imq_authn_event_t eventtype, const char *username, const char *password,
int super, const char *channel);
typedef void (*imq_adv_endpoint_cb_t)(struct imq_socket_s *sock, void *cookie,
const char *channel, const char *instance, int zmqtype);

typedef struct imq_listener_s {
int fd;

imq_authn_checkpw_cb authn_checkpw_cb;
imq_authz_endpoint_cb authz_endpoint_cb;

int has_iothread;
pthread_t iothread;
pthread_mutex_t mutex;
Expand All @@ -67,8 +76,14 @@ typedef struct imq_socket_s {

imq_socket_type_t type;

imq_adv_authn_cb_t adv_authn_cb;
imq_adv_endpoint_cb_t adv_endpoint_cb;
void *callback_cookie;

char *host;
unsigned short port;

imq_user_t *user;
char *username;
char *password;

Expand Down Expand Up @@ -106,6 +121,9 @@ int imq_detach_circuit(imq_endpoint_t *endpoint, imq_circuit_t *circuit);
/* socket functions */
imq_socket_t *imq_connect(const char *host, unsigned short port,
const char *username, const char *password);
void imq_socket_set_callbacks(imq_socket_t *sock,
imq_adv_authn_cb_t adv_authn_cb, imq_adv_endpoint_cb_t adv_endpoint_cb,
void *cookie);
imq_socket_t *imq_server_socket(int fd, imq_listener_t *listener);
void *imq_open_zmq(imq_socket_t *socket, const char *channel,
const char *instance, void *zmqcontext, int zmqtype);
Expand All @@ -115,17 +133,31 @@ void imq_close_socket(imq_socket_t *socket);
/* low-level listener functions */
int imq_listener_attach_endpoint(imq_listener_t *listener,
imq_endpoint_t *endpoint);
void imq_listener_clear_endpoints(imq_listener_t *listener);
imq_endpoint_t *imq_listener_find_endpoint(imq_listener_t *listener,
const char *channel, const char *instance);

/* listener functions */
imq_listener_t *imq_listener(unsigned short port,
imq_authn_checkpw_cb authn_checkpw_cb,
imq_authz_endpoint_cb authz_endpoint_cb);
imq_listener_t *imq_listener(unsigned short port);
void *imq_listener_create_zmq_endpoint(imq_listener_t *listener,
const char *channel, const char *instance, void *zmqcontext, int zmqtype);
void imq_close_listener(imq_listener_t *listener);

/* listener authentication functions */
void imq_listener_clear_users(imq_listener_t *listener);
int imq_listener_add_user(imq_listener_t *listener, imq_user_t *user);
int imq_listener_allow_user(imq_listener_t *listener, const char *username,
const char *channel);
imq_user_t *imq_listener_find_user(imq_listener_t *listener,
const char *username);

/* low-level user functions */
imq_user_t *imq_alloc_user(const char *username, const char *password,
int super);
void imq_free_user(imq_user_t *user);
int imq_user_allow_channel(imq_user_t *user, const char *channel);
int imq_user_authz_endpoint(imq_user_t *user, imq_endpoint_t *endpoint);

/* broker functions */
int imq_run_broker(imq_listener_t *downstream, imq_socket_t *upstream);

Expand Down
24 changes: 24 additions & 0 deletions libimq/imqmessage.h
Expand Up @@ -11,10 +11,15 @@ typedef enum imq_message_type_e {
IMQ_MSG_CLOSE_CIRCUIT,
IMQ_MSG_DATA_CIRCUIT,
IMQ_MSG_ADV_USER,
IMQ_MSG_ADV_USER_ALLOW,
IMQ_MSG_ADV_USER_COMMIT,
IMQ_MSG_ADV_ENDPOINT
} imq_message_type_t;

typedef struct imq_msg_error_s {
char *message;
} imq_msg_error_t;

typedef struct imq_msg_auth_s {
char *username;
char *password;
Expand All @@ -36,6 +41,21 @@ typedef struct imq_msg_data_circuit_s {
void *data;
} imq_msg_data_circuit_t;

typedef struct imq_msg_adv_user_s {
char *username;
char *password;
uint16_t super;
} imq_msg_adv_user_t;

typedef struct imq_msg_adv_user_allow_s {
char *username;
char *channel;
} imq_msg_adv_user_allow_t;

typedef struct imq_msg_adv_user_commit_s {
uint16_t success;
} imq_msg_adv_user_commit_t;

typedef struct imq_msg_adv_endpoint_s {
char *channel;
char *instance;
Expand All @@ -46,11 +66,15 @@ typedef struct imq_msg_s {
imq_message_type_t type;

union {
imq_msg_error_t error;
imq_msg_auth_t auth;
imq_msg_open_circuit_t open_circuit;
imq_msg_close_circuit_t close_circuit;
imq_msg_data_circuit_t data_circuit;
imq_msg_adv_endpoint_t adv_endpoint;
imq_msg_adv_user_t adv_user;
imq_msg_adv_user_allow_t adv_user_allow;
imq_msg_adv_user_commit_t adv_user_commit;
} content;
} imq_msg_t;

Expand Down

0 comments on commit d82de90

Please sign in to comment.