Permalink
Browse files

Basic programs for message routing subscription.

  • Loading branch information...
1 parent 34235e3 commit 376b034c6adc24dc02a992944956529ec732693d @postwait postwait committed Jul 18, 2012
Showing with 229 additions and 25 deletions.
  1. +1 −1 Makefile
  2. +1 −1 fq_rcvr.c
  3. +15 −10 fqd.h
  4. +24 −0 fqd_prog.c
  5. +188 −13 fqd_routemgr.c
View
@@ -7,7 +7,7 @@ EXTRA_CFLAGS+=-DDEBUG
CLIENT_OBJ=fq_client.o fq_msg.o fq_utils.o
FQD_OBJ=fqd.o fqd_listener.o fqd_ccs.o fqd_dss.o fqd_config.o \
- fqd_queue.o fqd_routemgr.o fqd_queue_mem.o \
+ fqd_queue.o fqd_routemgr.o fqd_queue_mem.o fqd_prog.o \
$(CLIENT_OBJ)
FQC_OBJ=fqc.o $(CLIENT_OBJ)
CPPFLAGS=-I./$(CKDIR)/include
View
@@ -49,7 +49,7 @@ int main(int argc, char **argv) {
memcpy(breq.exchange.name, "maryland", 8);
breq.exchange.len = 8;
breq.peermode = 0;
- breq.program = (char *)"prefix:\"test.prefix.\"";
+ breq.program = (char *)"prefix:\"test.prefix.\" sample(0.2)";
fq_client_bind(c, &breq);
while(breq.out__route_id == 0) usleep(100);
View
25 fqd.h
@@ -138,32 +138,37 @@ extern void fqd_routemgr_ruleset_free(fqd_route_rules *set);
* args: arg, args
* arg: "string"
* arg: true|false
- * arg: [1-9][0-9]*
+ * arg: [0-9][0-9]*(?:.[0-9]*)
*
* functions are dynamically loadable with type signature
- * strings: s, booleans: b, integers: i
- * function: substr_eq(9,10,"tailorings",true)
- * C symbol: fqd_route_prog__substr_eq__iisb(int nargs, valnode_t *args);
+ * strings: s, booleans: b, integers: d
+ * function: substr_eq(9.3,10,"tailorings",true)
+ * C symbol: fqd_route_prog__substr_eq__ddsb(int nargs, valnode_t *args);
*/
-typedef struct {
+typedef struct valnode {
enum {
RP_VALUE_STRING = 1,
RP_VALUE_BOOLEAN = 2,
- RP_VALUE_INTEGER = 3
+ RP_VALUE_DOUBLE = 3
} value_type;
union {
- char *s;
- bool b;
- int i;
+ char *s;
+ bool b;
+ double d;
} value;
} valnode_t;
+
+#define MAX_VALNODE_ARGS 16
+
typedef struct exprnode {
- bool (*match)(int nargs, valnode_t *args);
+ bool (*match)(fq_msg *m, int nargs, valnode_t *args);
int nargs;
valnode_t *args;
} exprnode_t;
+
typedef struct rulenode {
+ uint32_t refcnt;
char oper;
struct rulenode *left;
struct rulenode *right;
View
@@ -0,0 +1,24 @@
+#include <assert.h>
+#include "fqd.h"
+
+bool fqd_route_prog__true__(fq_msg *, int, valnode_t *);
+bool fqd_route_prog__sample__d(fq_msg *, int, valnode_t *);
+
+
+
+bool fqd_route_prog__true__(fq_msg *m, int nargs, valnode_t *args) {
+ assert(nargs == 0);
+ (void)m;
+ (void)nargs;
+ (void)args;
+ return true;
+}
+
+bool
+fqd_route_prog__sample__d(fq_msg *m, int nargs, valnode_t *args) {
+ (void)m;
+ assert(nargs == 1);
+ assert(args[0].value_type == RP_VALUE_DOUBLE);
+ if(drand48() < args[0].value.d) return true;
+ return false;
+}
View
@@ -5,6 +5,7 @@
#include <assert.h>
#include <arpa/nameser_compat.h>
#include <ctype.h>
+#include <dlfcn.h>
#include "fqd.h"
uint32_t global_route_id = 1;
@@ -38,12 +39,28 @@ struct prefix_jumptable {
struct fqd_route_rules {
struct prefix_jumptable master;
};
+static bool apply_compiled_program(rulenode_t *, fq_msg *);
+static bool
+apply_compiled_program(rulenode_t *p, fq_msg *m) {
+ bool lval = false, rval = false;
+ if(p->left) lval = apply_compiled_program(p->left, m);
+ if(p->right) rval = apply_compiled_program(p->right, m);
+ if(p->oper == '|') return lval || rval;
+ if(p->oper == '&') return lval && rval;
+ if(p->expr) {
+ return p->expr->match(m, p->expr->nargs, p->expr->args);
+ }
+ assert("Bad program" == NULL);
+ return false;
+}
static void
walk_jump_table(struct prefix_jumptable *jt, fq_msg *m, int offset, int *mcnt, int *dcnt) {
if(jt->tabletype == RULETABLE) {
struct fqd_route_rule *r;
for(r=jt->rules;r;r=r->next) {
- if(m->route.len >= r->prefix.len && m->route.len <= r->match_maxlen) {
+ if(m->route.len >= r->prefix.len &&
+ m->route.len <= r->match_maxlen &&
+ apply_compiled_program(r->compiled_program, m)) {
int dropped = 0;
fq_rk *rk = (fq_rk *)r->queue;
fq_debug(FQ_DEBUG_ROUTE, "M[%p] -> Q[%.*s]\n", (void *)m, rk->len, rk->name);
@@ -162,13 +179,15 @@ fqd_routemgr_compile(const char *program, int peermode, fqd_queue *q) {
cp = parse_prog_string(cp, (char *)r->prefix.name, &alen);
r->prefix.len = (uint8_t)alen;
if(cp == NULL) {
+ fq_debug(FQ_DEBUG_ROUTE, "Failed to parse: %s\n", r->prefix.name);
free(r);
return NULL;
}
if(*cp) {
char err[128];
r->compiled_program = prog_compile(cp, sizeof(err), err);
if(r->compiled_program == NULL) {
+ fq_debug(FQ_DEBUG_ROUTE, "Failed to compile[%s]: %s\n", cp, err);
free(r);
return NULL;
}
@@ -297,6 +316,11 @@ fqd_routemgr_ruleset_add_rule(fqd_route_rules *set, fqd_route_rule *newrule) {
fq_debug(FQ_DEBUG_ROUTE, "rule[%u] -> %p\n", newrule->route_id, (void *)newrule);
return newrule->route_id;
}
+static rulenode_t *
+copy_compiled_program(rulenode_t *in) {
+ ck_pr_inc_uint(&in->refcnt);
+ return in;
+}
static fqd_route_rule *
copy_rule(fqd_route_rule *in) {
fqd_route_rule *out;
@@ -305,6 +329,7 @@ copy_rule(fqd_route_rule *in) {
memcpy(out, in, sizeof(*out));
assert(out->queue);
out->program = strdup(in->program);
+ out->compiled_program = copy_compiled_program(in->compiled_program);
fqd_queue_ref(out->queue);
out->next = NULL;
fq_debug(FQ_DEBUG_MEM, "copy to [%p] -> Q[%p]\n", (void *)out, (void *)out->queue);
@@ -374,10 +399,14 @@ fqd_routemgr_ruleset_free(fqd_route_rules *set) {
static void
prog_free(rulenode_t *p) {
+ bool zero;
if(!p) return;
+ ck_pr_dec_uint_zero(&p->refcnt, &zero);
+ if(!zero) return;
if(p->left) prog_free(p->left);
if(p->right) prog_free(p->right);
if(p->expr) expr_free(p->expr);
+ free(p);
}
static void
expr_free(exprnode_t *e) {
@@ -392,11 +421,92 @@ expr_free(exprnode_t *e) {
}
#define EAT_SPACE(p) while(*p != '\0' && isspace(*p)) (p)++
-static rulenode_t *rule_parse(const char **cp, int errlen, char *err);
-static rulenode_t *
+static int is_valid_term_char(char ch, bool first) {
+ if((ch >= 'a' && ch <= 'z') ||
+ (ch >= 'A' && ch <= 'Z') ||
+ (ch == '_')) return 1;
+ if(first) return 0;
+ if(ch >= '0' && ch <= '9') return 1;
+ return 0;
+}
+static int rule_getterm(const char **cp, char *term, int len) {
+ int idx = 0;
+ while(idx < (len-1) && is_valid_term_char((*cp)[idx], idx == 0)) {
+ term[idx] = (*cp)[idx];
+ idx++;
+ }
+ term[idx] = '\0';
+ (*cp) += idx;
+ fq_debug(FQ_DEBUG_ROUTE, "term[%s]\n", term);
+ if(idx > 0) return 0;
+ return 1;
+}
+static int rule_getstring(const char **cp, valnode_t *arg) {
+ const char *begin;
+ if(**cp != '\"') return 0;
+ (*cp)++;
+ begin = *cp;
+ while(**cp != '\0') {
+ if(**cp == '\\' && (*cp)[1] != '\0') (*cp)++;
+ (*cp)++;
+ if(**cp == '\"') {
+ int len = (*cp) - begin;
+ arg->value_type = RP_VALUE_STRING;
+ arg->value.s = malloc(len + 1);
+ /* TODO: unescape */
+ memcpy(arg->value.s, begin, len);
+ arg->value.s[len] = '\0';
+ return 1;
+ }
+ }
+ return 0;
+}
+static exprnode_t *
+rule_compose_expression(const char *fname, int nargs, valnode_t *args,
+ int errlen, char *err) {
+ int i;
+ exprnode_t *expr = NULL;
+ char symbol_name[256];
+ char argsig[MAX_VALNODE_ARGS];
+ void *symbol;
+
+ for(i=0;i<nargs;i++) {
+ switch(args[i].value_type) {
+ case RP_VALUE_STRING: argsig[i] = 's'; break;
+ case RP_VALUE_BOOLEAN: argsig[i] = 'b'; break;
+ case RP_VALUE_DOUBLE: argsig[i] = 'd'; break;
+ }
+ }
+ argsig[i] = '\0';
+ snprintf(symbol_name, sizeof(symbol_name), "fqd_route_prog__%s__%s",
+ fname, argsig);
+ symbol = dlsym(RTLD_DEFAULT, symbol_name);
+ if(!symbol) {
+ snprintf(err, errlen, "cannot find symbol: %s\n", symbol_name);
+ return NULL;
+ }
+ expr = calloc(1, sizeof(*expr));
+ expr->match = (bool (*)(fq_msg *, int, valnode_t *)) symbol;
+ if(nargs > 0) {
+ expr->nargs = nargs;
+ expr->args = calloc(nargs, sizeof(*expr->args));
+ for(i=0; i<nargs; i++) {
+ memcpy(&expr->args[i], &args[i], sizeof(*args));
+ /* don't need to double the alloc */
+ }
+ }
+ return expr;
+}
+rulenode_t *rule_parse(const char **cp, int errlen, char *err);
+#define rule_parse_busted(fmt, ...) do { \
+ snprintf(err, errlen, fmt, __VAR_ARGS__); \
+ goto busted; \
+} while(0)
+rulenode_t *
rule_parse(const char **cp, int errlen, char *err) {
- rulenode_t *nr;
+ rulenode_t *nr = NULL;
EAT_SPACE(*cp); if(**cp == '\0') goto busted;
+ fq_debug(FQ_DEBUG_ROUTE, "parse->(%s)\n", *cp);
if(**cp == '(') {
(*cp)++;
EAT_SPACE(*cp); if(**cp == '\0') goto busted;
@@ -408,14 +518,84 @@ rule_parse(const char **cp, int errlen, char *err) {
if((*cp)[0] == '&' && (*cp)[1] == '&') nr->oper = '&';
else if((*cp)[0] == '|' || (*cp)[1] == '|') nr->oper = '|';
else goto busted;
+ (*cp) += 2;
nr->right = rule_parse(cp, errlen, err);
if(nr->right == NULL) goto busted;
EAT_SPACE(*cp); if(**cp == '\0') goto busted;
if(**cp != ')') goto busted;
+ (*cp)++;
+ return nr;
+ }
+ else {
+ char term[128];
+ int nargs = 0;
+ valnode_t args[MAX_VALNODE_ARGS];
+
+ fq_debug(FQ_DEBUG_ROUTE, "parsing function: %s\n", *cp);
+ if(rule_getterm(cp, term, sizeof(term))) goto busted;
+ EAT_SPACE(*cp); if(**cp == '\0') goto busted;
+ if(**cp != '(') goto busted;
+ (*cp)++;
+ while(**cp != '\0' && **cp != ')') {
+ EAT_SPACE(*cp); if(**cp == '\0') goto busted;
+ if(nargs > 0) {
+ if(**cp != ',') goto busted;
+ EAT_SPACE(*cp); if(**cp == '\0') goto busted;
+ }
+ if(**cp == '\"') {
+ if(rule_getstring(cp, &args[nargs])) goto busted;
+ nargs++;
+ }
+ else if(!strcmp(*cp, "true")) {
+ args[nargs].value_type = RP_VALUE_BOOLEAN;
+ args[nargs].value.b = true;
+ nargs++;
+ (*cp) += strlen("true");
+ }
+ else if(!strcmp(*cp, "false")) {
+ args[nargs].value_type = RP_VALUE_BOOLEAN;
+ args[nargs].value.b = false;
+ nargs++;
+ (*cp) += strlen("false");
+ }
+ else {
+ char *endptr;
+ /* parse a double */
+ args[nargs].value_type = RP_VALUE_DOUBLE;
+ args[nargs].value.d = strtod(*cp, &endptr);
+ if(endptr == *cp) goto busted;
+ nargs++;
+ *cp = endptr;
+ }
+ EAT_SPACE(*cp); if(**cp == '\0') goto busted;
+ }
+ if(**cp != ')') goto busted;
+ (*cp)++;
+ nr = calloc(1, sizeof(*nr));
+ nr->expr = rule_compose_expression(term, nargs, args, errlen, err);
+ if(!nr->expr) {
+ int i;
+ for(i=0;i<nargs;i++)
+ if(args[i].value_type == RP_VALUE_STRING)
+ free(args[i].value.s);
+ goto busted;
+ }
+ return nr;
}
busted:
+ fq_debug(FQ_DEBUG_ROUTE, "parse failed at: %s\n", *cp);
if(nr) {
+ if(nr->expr) {
+ if(nr->expr->nargs) {
+ int i;
+ for(i=0;i<nr->expr->nargs;i++)
+ if(nr->expr->args[i].value_type == RP_VALUE_STRING)
+ free(nr->expr->args[i].value.s);
+ free(nr->expr->args);
+ }
+ free(nr->expr);
+ }
free(nr);
}
return NULL;
@@ -428,25 +608,20 @@ prog_compile(const char *program, int errlen, char *err) {
}
else {
rulenode_t *nr;
+ if(errlen>0) err[0] = '\0';
nr = rule_parse(&program, errlen, err);
EAT_SPACE(program);
+ if(nr) nr->refcnt = 1;
if(*program) {
- if(err) snprintf(err, errlen, "trailing trash");
+ if(err && err[0] == '\0') snprintf(err, errlen, "trailing trash: %s", program);
prog_free(nr);
return NULL;
}
+ return nr;
}
if(err) {
snprintf(err, errlen, "internal route program error");
}
return NULL;
}
-
-bool fqd_route_prog__true__(int nargs, valnode_t *args);
-bool fqd_route_prog__true__(int nargs, valnode_t *args) {
- assert(nargs == 0);
- (void)nargs;
- (void)args;
- return true;
-}

0 comments on commit 376b034

Please sign in to comment.