Skip to content

Commit

Permalink
0MQ load balancer. randomized, threaded test client.
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd committed Feb 13, 2013
1 parent 13b1073 commit 70c0323
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 41 deletions.
27 changes: 17 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
CC=gcc
CFLAGS=-g -Wall -std=gnu99 -o3
LIBS=-lfcgi
CFLAGS=-g -Wall -std=gnu99 -o2
LIBS=-lfcgi -lzmq -lczmq
SOURCES=$(wildcard *.c)
OBJECTS=$(SOURCES:.c=.o)
EXECUTABLE=rrrr

all: $(SOURCES) $(EXECUTABLE)

$(EXECUTABLE): $(OBJECTS)
$(CC) $(OBJECTS) $(LIBS) -o $@
all: loadbalancer rrrr test

%.o: %.c
$(CC) -c $(CFLAGS) $^ -o $@

loadbalancer: loadbalancer.o
$(CC) $(CFLAGS) $^ $(LIBS) -o $@

rrrr: qstring.o router.o transitdata.o util.o rrrr.o
$(CC) $(CFLAGS) $^ $(LIBS) -o $@

test: qstring.o router.o transitdata.o util.o test.o
$(CC) $(CFLAGS) $^ $(LIBS) -o $@

clean:
rm -rf $(OBJECTS) $(EXECUTABLE)
rm -f *.o *~ core rrrr


test: $(SOURCES)
$(CC) -o $@ $^ $(CFLAGS) $(LIBS)
7 changes: 4 additions & 3 deletions config.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* config.h */

#define CONFIG_PROGRAM_NAME "rrrr"
#define CONFIG_INPUT_FILE "/tmp/timetable.dat"
#define CONFIG_MAX_ROUNDS 4
#define RRRR_TEST_CONCURRENCY 4
#define RRRR_TEST_REQUESTS 500
#define RRRR_INPUT_FILE "/tmp/timetable.dat"
#define RRRR_MAX_ROUNDS 6

30 changes: 30 additions & 0 deletions iter.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* iter.c : iterators */
#include "iter.h"
#include <stdbool.h>

inline void iter_init(iter it, int *ary, int ilo, int ihi) {
it.lo = ary + ilo;
it.hi = ary + ihi;
it.cur = it.lo;
}

inline bool iter_has_next(iter it) {
return it.cur < it.hi;
}

inline int iter_index(iter it) {
return it.cur - it.lo;
}

inline int iter_value(iter it) {
return *(it.cur);
}

inline int iter_next(iter it) {
return *((it.cur)++);
}

inline void iter_reset(iter it) {
it.cur = it.lo;
}

19 changes: 19 additions & 0 deletions iter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* iter.h */
#include <stdbool.h>

typedef struct iter iter;
struct iter {
int *lo;
int *cur;
int *hi;
};

inline bool iter_has_next(iter);

inline int iter_index(iter);

inline int iter_value(iter);

inline int iter_next(iter);

inline void iter_reset(iter);
80 changes: 80 additions & 0 deletions loadbalancer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/* Load-balancing broker using CZMQ API. Borrows heavily from load balancer pattern in 0MQ Guide. */

#include <syslog.h>
#include <czmq.h>
#include "rrrr.h"

int main (void) {

// initialize logging
setlogmask(LOG_UPTO(LOG_DEBUG));
openlog(PROGRAM_NAME, LOG_CONS | LOG_PID | LOG_PERROR, LOG_USER);
syslog(LOG_INFO, "broker starting up");

zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
//zsocket_bind (frontend, "ipc://frontend.ipc");
//zsocket_bind (backend, "ipc://backend.ipc");
zsocket_bind (frontend, "tcp://*:9292");
zsocket_bind (backend, "tcp://*:9293");
int frx = 0, ftx = 0, brx = 0, btx = 0, nworkers = 0, npoll = 0;

// Queue of available workers
zlist_t *workers = zlist_new ();

while (true) {
if (++npoll % 72 == 0)
syslog(LOG_INFO, "broker: frx %04d ftx %04d brx %04d btx %04d / %d workers\n",
frx, ftx, brx, btx, nworkers);
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
break; // Interrupted
// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
// Use worker identity for load-balancing
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Interrupted
zframe_t *identity = zmsg_unwrap (msg);
zlist_append (workers, identity);

// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0) {
zmsg_destroy (&msg);
nworkers++;
} else {
brx++;
zmsg_send (&msg, frontend);
ftx++;
}
}
if (items [1].revents & ZMQ_POLLIN) {
// Get client request, route to first available worker
zmsg_t *msg = zmsg_recv (frontend);
frx++;
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
zmsg_send (&msg, backend);
btx++;
}
}
}

// When we're done, clean up properly
syslog(LOG_INFO, "broker terminating");
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;

}
17 changes: 14 additions & 3 deletions restart.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
#!/bin/bash
NUM_HANDLERS=8
echo killing old processes
killall loadbalancer
killall rrrr
cgi-fcgi -start -connect /tmp/fastcgi.socket ./rrrr $NUM_HANDLERS
chmod a+rw /tmp/fastcgi.socket
sleep 1
echo starting new processes
./loadbalancer&
# 8 workers is not significantly faster than 4 on a 4-core machine (confirmed twice)
for i in {1..4} ; do ./rrrr & done
sleep 1
echo done

# old fcgi:
# cgi-fcgi -start -connect /tmp/fastcgi.socket ./rrrr $NUM_HANDLERS
# chmod a+rw /tmp/fastcgi.socket

18 changes: 9 additions & 9 deletions router.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "qstring.h"
#include "transitdata.h"
#include <stdlib.h>
#include <fcgi_stdio.h>
#include <stdio.h>
#include <string.h>
#include <time.h>

Expand All @@ -14,7 +14,7 @@
void router_setup(router_t *router, transit_data_t *td) {
srand(time(NULL));
router->tdata = *td;
router->table_size = td->nstops * CONFIG_MAX_ROUNDS;
router->table_size = td->nstops * RRRR_MAX_ROUNDS;
router->best = malloc(sizeof(int) * td->nstops);
router->arrivals = malloc(sizeof(int) * router->table_size);
router->back_route = malloc(sizeof(int) * router->table_size);
Expand Down Expand Up @@ -69,7 +69,7 @@ static void dump_results(router_t *prouter) {
for (int s = 0; s < r.tdata.nstops; ++s) {
printf("%4d ", s);
int *a = r.arrivals + s;
for (int round = 0; round < CONFIG_MAX_ROUNDS; ++round, a += r.tdata.nstops) {
for (int round = 0; round < RRRR_MAX_ROUNDS; ++round, a += r.tdata.nstops) {
printf("%8s ", timetext(*a));
}
printf("\n");
Expand All @@ -90,8 +90,8 @@ bool router_route(router_t *prouter, router_request_t *preq) {
int *arr_prev = router.arrivals + nstops;
// set initial state (maybe group as a "state" struct?
arr_prev[req.from] = req.time;
for (int round = 0; round < CONFIG_MAX_ROUNDS; ++round) {
printf("round %d\n", round);
for (int round = 0; round < RRRR_MAX_ROUNDS; ++round) {
//printf("round %d\n", round);
int *arr = router.arrivals + round * nstops;
int *back_route = router.back_route; //+ round * nstops;
int *back_stop = router.back_stop; //+ round * nstops;
Expand Down Expand Up @@ -164,7 +164,7 @@ void router_result_dump(router_t *prouter, router_request_t *preq) {
router_t router = *prouter;
router_request_t req = *preq;
printf("routing result\n");
int last_round = router.tdata.nstops * (CONFIG_MAX_ROUNDS - 1);
int last_round = router.tdata.nstops * (RRRR_MAX_ROUNDS - 1);

int *arr = router.arrivals + last_round;
int *back_route = router.back_route;// + last_round;
Expand All @@ -191,7 +191,7 @@ int rrrrandom(int limit) {
return (int) (limit * (random() / (RAND_MAX + 1.0)));
}

inline static void set_random(router_request_t *req) {
void router_request_randomize(router_request_t *req) {
req->walk_speed = 1.5; // m/sec
req->from = rrrrandom(5500);
req->to = rrrrandom(5500);
Expand Down Expand Up @@ -226,7 +226,7 @@ bool router_request_from_qstring(router_request_t *req) {
req->walk_speed = atof(val);
} else if (strcmp(key, "randomize") == 0) {
printf("RANDOMIZING\n");
set_random(req);
router_request_randomize(req);
return true;
} else {
printf("unrecognized parameter: key=%s val=%s\n", key, val);
Expand All @@ -238,7 +238,7 @@ bool router_request_from_qstring(router_request_t *req) {
void router_request_dump(router_request_t *req) {
printf("from: %d\n"
"to: %d\n"
"time: %d\n"
"time: %ld\n"
"speed: %f\n", req->from, req->to, req->time, req->walk_speed);
}

2 changes: 2 additions & 0 deletions router.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ bool router_request_from_qstring(router_request_t*);

void router_request_dump(router_request_t*);

void router_request_randomize(router_request_t*);

void router_teardown(router_t*);

bool router_route(router_t*, router_request_t*);
Expand Down
70 changes: 54 additions & 16 deletions rrrr.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#include <syslog.h>
#include <fcgi_stdio.h>
#include <stdlib.h>
#include <zmq.h>
#include <czmq.h>
#include <assert.h>
#include "config.h"
#include "rrrr.h"
#include "transitdata.h"
#include "router.h"

Expand All @@ -11,32 +14,67 @@ int main(int argc, char **argv) {

// logging
setlogmask(LOG_UPTO(LOG_DEBUG));
openlog(CONFIG_PROGRAM_NAME, LOG_CONS | LOG_PID | LOG_PERROR, LOG_USER);
syslog(LOG_INFO, "starting up");
openlog(PROGRAM_NAME, LOG_CONS | LOG_PID | LOG_PERROR, LOG_USER);
syslog(LOG_INFO, "worker starting up");

// load transit data from disk
transit_data_t tdata;
transit_data_load(CONFIG_INPUT_FILE, &tdata);
transit_data_load(RRRR_INPUT_FILE, &tdata);

// initialize router
router_t router;
router_setup(&router, &tdata);

// establish zmq connection
zctx_t *zctx = zctx_new ();
void *zsock = zsocket_new(zctx, ZMQ_REQ);
int zrc = zsocket_connect(zsock, "tcp://localhost:9293");
if (zrc != 0) exit(1);

// signal to the broker/load balancer that this worker is ready
// move to shared header file
zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, zsock, 0);
syslog(LOG_INFO, "worker sent ready message to load balancer");

/* MAIN LOOP */
while(FCGI_Accept() >= 0) {
printf("Content-type: text/plain\r\n\r\n");
router_request_t req;
router_request_from_qstring(&req);
router_request_dump(&req);
//transit_data_dump(&tdata);
router_route(&router, &req);
router_result_dump(&router, &req);
}
int request_count = 0;
while (true) {
zmsg_t *msg = zmsg_recv (zsock);
if (!msg) // interrupted (signal)
break;
if (++request_count % 100 == 0)
syslog(LOG_INFO, "worker received %d requests\n", request_count);

// only manipulate the last frame, then send the recycled message back to the broker
zframe_t *frame = zmsg_last (msg);
if (zframe_size (frame) == sizeof (router_request_t)) {
router_request_t *req;
req = (router_request_t*) zframe_data (frame);
//syslog(LOG_INFO, "received request");
//router_request_from_qstring(&req);
//router_request_dump(&req);
//transit_data_dump(&tdata);
router_route(&router, req);
//router_result_dump(&router, &req);
zframe_reset (frame, "OK", 2);
} else {
syslog(LOG_WARNING, "worker received reqeust with wrong length");
zframe_reset (frame, "ERR", 3);
}

// send response to broker, thereby requesting more work
zmsg_send (&msg, zsock);
}

/* TEAR DOWN */
router_teardown(&router);
syslog(LOG_INFO, "worker terminating");
// frame = zframe_new (WORKER_LEAVING, 1);
// zframe_send (&frame, zmq_sock, 0);
// syslog(LOG_INFO, "departure message sent to load balancer");
// zmsg_t *msg = zmsg_recv (zmq_sock);
transit_data_close(&tdata);

return EXIT_SUCCESS;
zctx_destroy (&zctx); //zmq_close(socket) necessary before context destroy?
exit(EXIT_SUCCESS);
}

4 changes: 4 additions & 0 deletions rrrr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#define PROGRAM_NAME "rrrr"
#define WORKER_READY "\001" // Signals worker is ready
#define WORKER_LEAVE "\002" // Signals worker is shutting down

Loading

0 comments on commit 70c0323

Please sign in to comment.