From 778fe7d79be51b226d2445cf76364c6a181d3f6d Mon Sep 17 00:00:00 2001 From: Eseanu Marius Cristian Date: Wed, 12 Aug 2015 16:52:32 +0300 Subject: [PATCH] ratelimit: improved clusterer api usagefor replication --- modules/ratelimit/ratelimit_helper.c | 225 +++++++++++++-------------- 1 file changed, 107 insertions(+), 118 deletions(-) diff --git a/modules/ratelimit/ratelimit_helper.c b/modules/ratelimit/ratelimit_helper.c index 253ac5fa1c2..36a4e9305c4 100644 --- a/modules/ratelimit/ratelimit_helper.c +++ b/modules/ratelimit/ratelimit_helper.c @@ -88,12 +88,13 @@ int rl_buffer_th = RL_BUF_THRESHOLD; static str rl_name_buffer = {0, 0}; + static inline int rl_set_name(str * name) { if (name->len + db_prefix.len > rl_name_buffer.len) { rl_name_buffer.len = name->len + db_prefix.len; rl_name_buffer.s = pkg_realloc(rl_name_buffer.s, - rl_name_buffer.len); + rl_name_buffer.len); if (!rl_name_buffer.s) { LM_ERR("cannot realloc buffer\n"); rl_name_buffer.len = 0; @@ -105,7 +106,6 @@ static inline int rl_set_name(str * name) return 0; } - /* NOTE: assumes that the pipe has been locked. If fails, releases the lock */ static int rl_change_counter(str *name, rl_pipe_t *pipe, int c) { @@ -128,7 +128,7 @@ static int rl_change_counter(str *name, rl_pipe_t *pipe, int c) } else { if (pipe->my_counter) { ret = cdbf.sub(cdbc, &rl_name_buffer, pipe->my_counter, rl_expire_time, - &new_counter); + &new_counter); } else { ret = cdbf.get_counter(cdbc, &rl_name_buffer, &new_counter); } @@ -136,14 +136,14 @@ static int rl_change_counter(str *name, rl_pipe_t *pipe, int c) if (ret < 0) { LM_ERR("cannot change counter for pipe %.*s with %d\n", - name->len, name->s, c); + name->len, name->s, c); return -1; } pipe->my_counter = c ? pipe->my_counter + c : 0; pipe->counter = new_counter; LM_DBG("changed with %d; my_counter: %d; counter: %d\n", - c, pipe->my_counter, new_counter); + c, pipe->my_counter, new_counter); return 0; } @@ -168,11 +168,11 @@ int init_cachedb(str * db_url) { if (cachedb_bind_mod(db_url, &cdbf) < 0) { LM_ERR("cannot bind functions for db_url %.*s\n", - db_url->len, db_url->s); + db_url->len, db_url->s); return -1; } if (!CACHEDB_CAPABILITY(&cdbf, - CACHEDB_CAP_GET|CACHEDB_CAP_ADD|CACHEDB_CAP_SUB)) { + CACHEDB_CAP_GET | CACHEDB_CAP_ADD | CACHEDB_CAP_SUB)) { LM_ERR("not enough capabilities\n"); return -1; } @@ -231,15 +231,15 @@ int init_rl_table(unsigned int size) rl_default_algo = get_rl_algo(rl_default_algo_s); if (rl_default_algo < 0) { LM_ERR("unknown algoritm <%.*s>\n", rl_default_algo_s.len, - rl_default_algo_s.s); + rl_default_algo_s.s); return -1; } LM_DBG("default algorithm is %.*s [ %d ]\n", - rl_default_algo_s.len, rl_default_algo_s.s, rl_default_algo); + rl_default_algo_s.len, rl_default_algo_s.s, rl_default_algo); /* if at least 25% of the size locks can't be allocated * we return an error */ - for ( i = size; i > size / 4; i--) { + for (i = size; i > size / 4; i--) { rl_htable.locks = lock_set_alloc(i); if (!rl_htable.locks) continue; @@ -253,13 +253,13 @@ int init_rl_table(unsigned int size) if (!rl_htable.locks) { LM_ERR("unable to allocted at least %d locks for the hash table\n", - size/4); + size / 4); goto error; } rl_htable.locks_no = i; LM_DBG("%d locks allocated for %d hashsize\n", - rl_htable.locks_no, rl_htable.size); + rl_htable.locks_no, rl_htable.size); return 0; error: @@ -272,12 +272,14 @@ struct { str name; rl_algo_t algo; } rl_algo_names[] = { - { str_init("NOP"), PIPE_ALGO_NOP}, - { str_init("RED"), PIPE_ALGO_RED}, - { str_init("TAILDROP"), PIPE_ALGO_TAILDROP}, - { str_init("FEEDBACK"), PIPE_ALGO_FEEDBACK}, - { str_init("NETWORK"), PIPE_ALGO_NETWORK}, - { { 0, 0 }, 0}, + { str_init("NOP"), PIPE_ALGO_NOP}, + { str_init("RED"), PIPE_ALGO_RED}, + { str_init("TAILDROP"), PIPE_ALGO_TAILDROP}, + { str_init("FEEDBACK"), PIPE_ALGO_FEEDBACK}, + { str_init("NETWORK"), PIPE_ALGO_NETWORK}, + { + { 0, 0}, 0 + }, }; static rl_algo_t get_rl_algo(str name) @@ -286,9 +288,9 @@ static rl_algo_t get_rl_algo(str name) if (!name.s || !name.len) return -1; - for ( i = 0 ; rl_algo_names[i].name.s ; i++) { + for (i = 0; rl_algo_names[i].name.s; i++) { if (rl_algo_names[i].name.len == name.len && - strncasecmp(rl_algo_names[i].name.s, name.s, name.len) == 0) + strncasecmp(rl_algo_names[i].name.s, name.s, name.len) == 0) return rl_algo_names[i].algo; } return -1; @@ -297,13 +299,12 @@ static rl_algo_t get_rl_algo(str name) static str * get_rl_algo_name(rl_algo_t algo) { int i; - for (i = 0; rl_algo_names[i].name.s ; i++) + for (i = 0; rl_algo_names[i].name.s; i++) if (rl_algo_names[i].algo == algo) return &rl_algo_names[i].name; return NULL; } - int w_rl_check_2(struct sip_msg *_m, char *_n, char *_l) { return w_rl_check_3(_m, _n, _l, NULL); @@ -324,17 +325,17 @@ int w_rl_check_3(struct sip_msg *_m, char *_n, char *_l, char *_a) LM_ERR("invalid parameters\n"); goto end; } - if (fixup_get_svalue(_m, (gparam_p)_n, &name) < 0) { + if (fixup_get_svalue(_m, (gparam_p) _n, &name) < 0) { LM_ERR("cannot retrieve identifier\n"); goto end; } - if (fixup_get_ivalue(_m, (gparam_p)_l, &limit) < 0) { + if (fixup_get_ivalue(_m, (gparam_p) _l, &limit) < 0) { LM_ERR("cannot retrieve limit\n"); goto end; } algorithm.s = 0; - if (!_a || fixup_get_svalue(_m, (gparam_p)_a, &algorithm) < 0 || - (algo = get_rl_algo(algorithm)) < 0) { + if (!_a || fixup_get_svalue(_m, (gparam_p) _a, &algorithm) < 0 || + (algo = get_rl_algo(algorithm)) < 0) { algo = PIPE_ALGO_NOP; } @@ -344,8 +345,8 @@ int w_rl_check_3(struct sip_msg *_m, char *_n, char *_l, char *_a) if (*rl_feedback_limit) { if (*rl_feedback_limit != limit) { LM_WARN("FEEDBACK limit should be the same for all pipes, but" - " new limit %d differs - setting to %d\n", - limit, *rl_feedback_limit); + " new limit %d differs - setting to %d\n", + limit, *rl_feedback_limit); limit = *rl_feedback_limit; } } else { @@ -380,16 +381,16 @@ int w_rl_check_3(struct sip_msg *_m, char *_n, char *_l, char *_a) } memset(*pipe, 0, sizeof(rl_pipe_t)); LM_DBG("Pipe %.*s doens't exist, but was created %p\n", - name.len, name.s, *pipe); + name.len, name.s, *pipe); if (algo == PIPE_ALGO_NETWORK) should_update = 1; (*pipe)->algo = (algo == PIPE_ALGO_NOP) ? rl_default_algo : algo; } else { LM_DBG("Pipe %.*s found: %p - last used %lu\n", - name.len, name.s, *pipe, (*pipe)->last_used); + name.len, name.s, *pipe, (*pipe)->last_used); if (algo != PIPE_ALGO_NOP && (*pipe)->algo != algo) { LM_WARN("algorithm %d different from the initial one %d for pipe " - "%.*s", algo, (*pipe)->algo, name.len, name.s); + "%.*s", algo, (*pipe)->algo, name.len, name.s); } } @@ -410,8 +411,8 @@ int w_rl_check_3(struct sip_msg *_m, char *_n, char *_l, char *_a) ret = rl_pipe_check(*pipe); LM_DBG("Pipe %.*s counter:%d load:%d limit:%d should %sbe blocked (%p)\n", - name.len, name.s, (*pipe)->counter, (*pipe)->load, - (*pipe)->limit, ret == 1? "NOT " : "", *pipe); + name.len, name.s, (*pipe)->counter, (*pipe)->load, + (*pipe)->limit, ret == 1 ? "NOT " : "", *pipe); release: @@ -461,7 +462,7 @@ void rl_timer(unsigned int ticks, void *param) goto next_map; } for (; iterator_is_valid(&it);) { - pipe = (rl_pipe_t **)iterator_val(&it); + pipe = (rl_pipe_t **) iterator_val(&it); if (!pipe || !*pipe) { LM_ERR("[BUG] bogus map[%d] state\n", i); goto next_pipe; @@ -483,7 +484,7 @@ void rl_timer(unsigned int ticks, void *param) lock_release(rl_lock); } LM_DBG("Deleting ratelimit pipe key \"%.*s\"\n", - key->len, key->s); + key->len, key->s); value = iterator_delete(&del); /* free resources */ if (value) @@ -498,19 +499,19 @@ void rl_timer(unsigned int ticks, void *param) } } switch ((*pipe)->algo) { - case PIPE_ALGO_NETWORK: - /* handle network algo */ - (*pipe)->load = - (*rl_network_load > (*pipe)->limit) ? -1 : 1; - break; - - case PIPE_ALGO_RED: - if ((*pipe)->limit && rl_timer_interval) - (*pipe)->load = (*pipe)->counter / - ((*pipe)->limit * rl_timer_interval); - break; - default: - break; + case PIPE_ALGO_NETWORK: + /* handle network algo */ + (*pipe)->load = + (*rl_network_load > (*pipe)->limit) ? -1 : 1; + break; + + case PIPE_ALGO_RED: + if ((*pipe)->limit && rl_timer_interval) + (*pipe)->load = (*pipe)->counter / + ((*pipe)->limit * rl_timer_interval); + break; + default: + break; } (*pipe)->last_counter = rl_get_all_counters(*pipe); if (RL_USE_CDB(*pipe)) { @@ -524,7 +525,7 @@ void rl_timer(unsigned int ticks, void *param) next_pipe: if (iterator_next(&it) < 0) break; - } + } next_map: RL_RELEASE_LOCK(i); } @@ -541,9 +542,9 @@ static int rl_map_print(void *param, str key, void *value) struct mi_attr* attr; char* p; int len; - struct rl_param_t * rl_param = (struct rl_param_t *)param; + struct rl_param_t * rl_param = (struct rl_param_t *) param; struct mi_node * rpl; - rl_pipe_t *pipe = (rl_pipe_t *)value; + rl_pipe_t *pipe = (rl_pipe_t *) value; struct mi_node * node; str *alg; @@ -579,15 +580,15 @@ static int rl_map_print(void *param, str key, void *value) } if (!(attr = add_mi_attr(node, MI_DUP_VALUE, "algorithm", 9, - alg->s, alg->len))) + alg->s, alg->len))) return -1; - p = int2str((unsigned long)(pipe->limit), &len); + p = int2str((unsigned long) (pipe->limit), &len); if (!(attr = add_mi_attr(node, MI_DUP_VALUE, "limit", 5, p, len))) return -1; - p = int2str((unsigned long)rl_get_all_counters(pipe), &len); + p = int2str((unsigned long) rl_get_all_counters(pipe), &len); if (!(attr = add_mi_attr(node, MI_DUP_VALUE, "counter", 7, p, len))) return -1; @@ -619,7 +620,7 @@ int rl_stats(struct mi_root *rpl_tree, str * value) } if (rl_map_print(¶m, *value, *pipe)) { LM_ERR("cannot print value for key %.*s\n", - value->len, value->s); + value->len, value->s); goto error; } RL_RELEASE_LOCK(i); @@ -640,7 +641,8 @@ int rl_stats(struct mi_root *rpl_tree, str * value) return -1; } -static int bin_status_aux(struct mi_node *root, clusterer_node_t *nodes,int type,int cluster_id){ +static int bin_status_aux(struct mi_node *root, clusterer_node_t *nodes, int type, int cluster_id) +{ clusterer_node_t *d; struct mi_node *node = NULL; struct mi_attr* attr; @@ -648,7 +650,7 @@ static int bin_status_aux(struct mi_node *root, clusterer_node_t *nodes,int type str machine_id_s; str state; - for (d = nodes; d; d = d->next){ + for (d = nodes; d; d = d->next) { cluster_id_s.s = int2str(cluster_id, &cluster_id_s.len); node = add_mi_node_child(root, MI_DUP_VALUE, "Cluster ID", 10, cluster_id_s.s, cluster_id_s.len); @@ -666,13 +668,13 @@ static int bin_status_aux(struct mi_node *root, clusterer_node_t *nodes,int type if (d->description.s) attr = add_mi_attr(node, MI_DUP_VALUE, "DESCRIPTION", 11, - d->description.s, d->description.len); + d->description.s, d->description.len); else attr = add_mi_attr(node, MI_DUP_VALUE, "DESCRIPTION", 11, - "none", 4); + "none", 4); if (attr == NULL) goto error; - - if(type) + + if (type) attr = add_mi_attr(node, MI_DUP_VALUE, "TYPE", 4, "server", 6); else @@ -680,45 +682,44 @@ static int bin_status_aux(struct mi_node *root, clusterer_node_t *nodes,int type "client", 6); if (attr == NULL) goto error; } - + return 0; error: return -1; } - int rl_bin_status(struct mi_root *rpl_tree) { clusterer_node_t *nodes; struct mi_node *root = NULL; - if(rpl_tree == NULL) + if (rpl_tree == NULL) return -1; - root = &rpl_tree->node; - if(rl_repl_cluster){ + root = &rpl_tree->node; + if (rl_repl_cluster) { nodes = clusterer_api.get_nodes(rl_repl_cluster, PROTO_BIN); - if(nodes == NULL) + if (nodes == NULL) return -1; - if(bin_status_aux(root, nodes, 1, rl_repl_cluster) < 0) + if (bin_status_aux(root, nodes, 1, rl_repl_cluster) < 0) goto error; clusterer_api.free_nodes(nodes); } - - if(accept_repl_pipes){ + + if (accept_repl_pipes) { nodes = clusterer_api.get_nodes(accept_repl_pipes, PROTO_BIN); - if(nodes == NULL) + if (nodes == NULL) return -1; - if(bin_status_aux(root, nodes, 0, accept_repl_pipes) < 0) + if (bin_status_aux(root, nodes, 0, accept_repl_pipes) < 0) goto error; clusterer_api.free_nodes(nodes); } - - + + return 0; error: - clusterer_api.free_nodes(nodes); - return -1; + clusterer_api.free_nodes(nodes); + return -1; } int w_rl_set_count(str key, int val) @@ -751,7 +752,7 @@ int w_rl_set_count(str key, int val) } LM_DBG("new counter for key %.*s is %d\n", - key.len, key.s, (*pipe)->counter); + key.len, key.s, (*pipe)->counter); ret = 0; @@ -760,11 +761,11 @@ int w_rl_set_count(str key, int val) return ret; } -static inline int w_rl_change_counter(struct sip_msg *_m, char *_n, int dec) +static inline int w_rl_change_counter(struct sip_msg *_m, char *_n, int dec) { str name; - if (!_n || fixup_get_svalue(_m, (gparam_p)_n, &name) < 0) { + if (!_n || fixup_get_svalue(_m, (gparam_p) _n, &name) < 0) { LM_ERR("cannot retrieve identifier\n"); return -1; } @@ -816,13 +817,11 @@ static rl_repl_counter_t* find_destination(rl_pipe_t *pipe, int machine_id) -void rl_rcv_bin(int packet_type, struct receive_info *ri) +void rl_rcv_bin(int packet_type, struct receive_info *ri, int server_id) { rl_algo_t algo; int limit; int counter; - int rc; - int server_id; str name; char *ip; unsigned short port; @@ -839,16 +838,20 @@ void rl_rcv_bin(int packet_type, struct receive_info *ri) if (packet_type != RL_PIPE_COUNTER) return; - rc = bin_pop_int(&server_id); - if (rc < 0) - return; - - if (!clusterer_api.check(accept_repl_pipes, &ri->src_su, server_id, ri->proto)){ - get_su_info(&ri->src_su.s, ip, port); - LM_WARN("received bin packet from unknown source: %s:%hu\n", - ip, port); + if (packet_type == SERVER_TEMP_DISABLED) { + get_su_info(&ri->src_su.s, ip, port); + LM_WARN("server: %s:%hu temporary disabled\n", ip, port); + return; + } + + if (packet_type == SERVER_TIMEOUT) { + LM_WARN("server with clustererer id %d timeout\n", server_id); return; } + + if (packet_type != RL_PIPE_COUNTER) + return; + now = time(0); for (;;) { @@ -889,18 +892,18 @@ void rl_rcv_bin(int packet_type, struct receive_info *ri) } memset(*pipe, 0, sizeof(rl_pipe_t)); LM_DBG("Pipe %.*s doesn't exist, but was created %p\n", - name.len, name.s, *pipe); + name.len, name.s, *pipe); (*pipe)->algo = algo; (*pipe)->limit = limit; } else { LM_DBG("Pipe %.*s found: %p - last used %lu\n", - name.len, name.s, *pipe, (*pipe)->last_used); + name.len, name.s, *pipe, (*pipe)->last_used); if ((*pipe)->algo != algo) LM_WARN("algorithm %d different from the initial one %d for " - "pipe %.*s", algo, (*pipe)->algo, name.len, name.s); + "pipe %.*s", algo, (*pipe)->algo, name.len, name.s); if ((*pipe)->limit != limit) LM_WARN("limit %d different from the initial one %d for " - "pipe %.*s", limit, (*pipe)->limit, name.len, name.s); + "pipe %.*s", limit, (*pipe)->limit, name.len, name.s); } /* set the last used time */ (*pipe)->last_used = time(0); @@ -921,12 +924,14 @@ int rl_repl_init(void) if (rl_buffer_th > (BUF_SIZE * 0.9)) { LM_WARN("Buffer size too big %d - pipe information might get lost", - rl_buffer_th); + rl_buffer_th); return -1; } if (accept_repl_pipes && - bin_register_cb("ratelimit", rl_rcv_bin) < 0) { + clusterer_api.register_module("ratelimit", PROTO_BIN, rl_rcv_bin, + 10, 1, accept_repl_pipes) < 0) { + //bin_register_cb("ratelimit", rl_rcv_bin) < 0) { LM_ERR("Cannot register binary packet callback!\n"); return -1; } @@ -936,25 +941,9 @@ int rl_repl_init(void) static inline void rl_replicate(void) { - str send_buffer; - clusterer_node_t *nodes; - clusterer_node_t *d; - - nodes = clusterer_api.get_nodes(rl_repl_cluster, PROTO_BIN); - if(nodes == NULL) - return; - - bin_get_buffer(&send_buffer); - - for (d = nodes; d; d = d->next){ - if(msg_send(NULL, PROTO_BIN, &d->addr, 0, send_buffer.s,send_buffer.len,0) != 0){ - LM_ERR("cannot send message\n"); - clusterer_api.set_state(rl_repl_cluster, d->machine_id, 2, PROTO_BIN); - } - - } - - clusterer_api.free_nodes(nodes); + if (clusterer_api.send_to(rl_repl_cluster, PROTO_BIN) < 0) { + LM_INFO("ratelimit replicate failed\n"); + } } void rl_timer_repl(utime_t ticks, void *param) @@ -982,7 +971,7 @@ void rl_timer_repl(utime_t ticks, void *param) goto next_map; } for (; iterator_is_valid(&it);) { - pipe = (rl_pipe_t **)iterator_val(&it); + pipe = (rl_pipe_t **) iterator_val(&it); if (!pipe || !*pipe) { LM_ERR("[BUG] bogus map[%d] state\n", i); goto next_pipe; @@ -1026,7 +1015,7 @@ void rl_timer_repl(utime_t ticks, void *param) next_pipe: if (iterator_next(&it) < 0) break; - } + } next_map: RL_RELEASE_LOCK(i); } @@ -1048,7 +1037,7 @@ int rl_get_all_counters(rl_pipe_t *pipe) rl_repl_counter_t *nodes = pipe->dsts; rl_repl_counter_t *d; - for (d = nodes; d; d = d->next){ + for (d = nodes; d; d = d->next) { /* if the replication expired, reset its counter */ if ((d->update + rl_repl_timer_expire) < now) d->counter = 0;