diff --git a/modules/cachedb_redis/cachedb_redis_dbase.c b/modules/cachedb_redis/cachedb_redis_dbase.c index 5b9f037086e..5aa3d0a88ae 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.c +++ b/modules/cachedb_redis/cachedb_redis_dbase.c @@ -30,11 +30,13 @@ #include "../../ut.h" #include "../../pt.h" #include "../../cachedb/cachedb.h" +#include "../../lib/csv.h" #include #include #define QUERY_ATTEMPTS 2 +#define REDIS_DF_PORT 6379 int redis_query_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT; int redis_connnection_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT; @@ -49,6 +51,9 @@ redisContext *redis_get_ctx(char *ip, int port) static char warned = 0; redisContext *ctx; + if (!port) + port = REDIS_DF_PORT; + if (!redis_connnection_tout) { if (!warned++) LM_WARN("Connecting to redis without timeout might block your server\n"); @@ -211,7 +216,7 @@ int redis_connect(redis_con *con) struct tls_domain *tls_dom = NULL; /* connect to redis DB */ - ctx = redis_get_ctx(con->id->host,con->id->port); + ctx = redis_get_ctx(con->host,con->port); if (!ctx) return -1; @@ -241,7 +246,7 @@ int redis_connect(redis_con *con) if (rpl == NULL || rpl->type == REDIS_REPLY_ERROR) { /* single instace mode */ con->flags |= REDIS_SINGLE_INSTANCE; - len = strlen(con->id->host); + len = strlen(con->host); con->nodes = pkg_malloc(sizeof(cluster_node) + len + 1); if (con->nodes == NULL) { LM_ERR("no more pkg\n"); @@ -251,8 +256,8 @@ int redis_connect(redis_con *con) } con->nodes->ip = (char *)(con->nodes + 1); - strcpy(con->nodes->ip,con->id->host); - con->nodes->port = con->id->port; + strcpy(con->nodes->ip,con->host); + con->nodes->port = con->port; con->nodes->start_slot = 0; con->nodes->end_slot = 4096; con->nodes->context = NULL; @@ -299,39 +304,123 @@ int redis_connect(redis_con *con) return -1; } +/* free a circular list of Redis connections */ +void redis_free_conns(redis_con *con) +{ + redis_con *aux = NULL, *head = con; + + while (con && (con != head || !aux)) { + aux = con; + con = con->next_con; + pkg_free(aux->host); + pkg_free(aux); + } +} + +/* parse a string of: "host[:port]" */ +int redis_get_hostport(const str *hostport, char **host, unsigned short *port) +{ + str in, out; + + char *p = q_memchr(hostport->s, ':', hostport->len); + if (!p) { + if (pkg_nt_str_dup(&out, hostport) != 0) { + LM_ERR("oom\n"); + return -1; + } + + *host = out.s; + *port = REDIS_DF_PORT; + } else { + in.s = hostport->s; + in.len = p - hostport->s; + if (pkg_nt_str_dup(&out, &in) != 0) { + LM_ERR("oom\n"); + return -1; + } + *host = out.s; + + in.s = p + 1; + in.len = hostport->s + hostport->len - (p + 1); + if (in.len <= 0) { + LM_ERR("bad/missing Redis port in URL\n"); + return -1; + } + + unsigned int out_port; + if (str2int(&in, &out_port) != 0) { + LM_ERR("failed to parse Redis port in URL\n"); + return -1; + } + + *port = out_port; + } + + LM_DBG("extracted from '%.*s': '%s' and %d\n", hostport->len, hostport->s, + *host, *port); + + return 0; +} + redis_con* redis_new_connection(struct cachedb_id* id) { - redis_con *con; + redis_con *con, *cons = NULL; + csv_record *r, *it; + unsigned int multi_hosts; if (id == NULL) { LM_ERR("null cachedb_id\n"); - return 0; + return NULL; } - if (id->flags & CACHEDB_ID_MULTIPLE_HOSTS) { - LM_ERR("multiple hosts are not supported for redis\n"); - return 0; - } + if (id->flags & CACHEDB_ID_MULTIPLE_HOSTS) + multi_hosts = REDIS_MULTIPLE_HOSTS; + else + multi_hosts = 0; - con = pkg_malloc(sizeof(redis_con)); - if (con == NULL) { - LM_ERR("no more pkg \n"); - return 0; - } + r = parse_csv_record(_str(id->host)); + for (it = r; it; it = it->next) { + LM_DBG("parsed Redis host: '%.*s'\n", it->s.len, it->s.s); - memset(con,0,sizeof(redis_con)); - con->id = id; - con->ref = 1; + con = pkg_malloc(sizeof(redis_con)); + if (con == NULL) { + LM_ERR("no more pkg\n"); + goto out_err; + } - if (redis_connect(con) < 0) { - LM_ERR("failed to connect to DB\n"); - if (shutdown_on_error) { - pkg_free(con); - return NULL; + memset(con,0,sizeof(redis_con)); + + if (redis_get_hostport(&it->s, &con->host, &con->port) != 0) { + LM_ERR("no more pkg\n"); + goto out_err; } + + con->id = id; + con->ref = 1; + con->flags |= multi_hosts; /* if the case */ + + /* if doing failover Redises, only connect the 1st one for now! */ + if (!cons && redis_connect(con) < 0) { + LM_ERR("failed to connect to DB\n"); + if (shutdown_on_error) + goto out_err; + } + + _add_last(con, cons, next_con); } - return con; + /* turn @cons into a circular list */ + con->next_con = cons; + /* set the "last-known-to-work" connection */ + cons->current = cons; + + free_csv_record(r); + return cons; + +out_err: + free_csv_record(r); + redis_free_conns(cons); + return NULL; } cachedb_con *redis_init(str *url) @@ -339,16 +428,22 @@ cachedb_con *redis_init(str *url) return cachedb_do_init(url,(void *)redis_new_connection); } -void redis_free_connection(cachedb_pool_con *con) +void redis_free_connection(cachedb_pool_con *cpc) { - redis_con * c; + redis_con *con = (redis_con *)cpc, *aux = NULL, *head = con; LM_DBG("in redis_free_connection\n"); - if (!con) return; - c = (redis_con *)con; - destroy_cluster_nodes(c); - pkg_free(c); + if (!con) + return; + + while (con && (con != head || !aux)) { + aux = con; + con = con->next_con; + destroy_cluster_nodes(aux); + pkg_free(aux->host); + pkg_free(aux); + } } void redis_destroy(cachedb_con *con) { @@ -356,65 +451,112 @@ void redis_destroy(cachedb_con *con) { cachedb_do_close(con,redis_free_connection); } -#define redis_run_command(con,key,fmt,args...) \ - do {\ - con = (redis_con *)connection->data; \ - if (!(con->flags & REDIS_INIT_NODES) && redis_connect(con) < 0) { \ - LM_ERR("failed to connect to DB\n"); \ - return -9; \ - } \ - node = get_redis_connection(con,key); \ - if (node == NULL) { \ - LM_ERR("Bad cluster configuration\n"); \ - return -10; \ - } \ - if (node->context == NULL) { \ - if (redis_reconnect_node(con,node) < 0) { \ - return -1; \ - } \ - } \ - for (i = QUERY_ATTEMPTS; i; i--) { \ - reply = redisCommand(node->context,fmt,##args); \ - if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { \ - LM_INFO("Redis query failed: %p %.*s\n",\ - reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE"); \ - if (reply) \ - freeReplyObject(reply); \ - if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) { \ - i = 0; break; \ - }\ - } else break; \ - } \ - if (i==0) { \ - LM_ERR("giving up on query\n"); \ - return -1; \ - } \ - if (i != QUERY_ATTEMPTS) \ - LM_INFO("successfully ran query after %d failed attempt(s)\n", \ - QUERY_ATTEMPTS - i); \ - } while (0) +/* + * Upon returning 0 (success), @rpl is guaranteed to be: + * - non-NULL + * - non-REDIS_REPLY_ERROR + * + * On error, a negative code is returned + */ +static int redis_run_command(cachedb_con *connection, redisReply **rpl, + str *key, char *cmd_fmt, ...) +{ + redis_con *con = NULL, *first; + cluster_node *node; + redisReply *reply = NULL; + va_list ap; + int i, last_err = 0; + + first = ((redis_con *)connection->data)->current; + while (((redis_con *)connection->data)->current != first || !con) { + con = ((redis_con *)connection->data)->current; + + if (!(con->flags & REDIS_INIT_NODES) && redis_connect(con) < 0) { + LM_ERR("failed to connect to DB\n"); + last_err = -9; + goto try_next_con; + } + + node = get_redis_connection(con,key); + if (node == NULL) { + LM_ERR("Bad cluster configuration\n"); + last_err = -10; + goto try_next_con; + } + + if (node->context == NULL) { + if (redis_reconnect_node(con,node) < 0) { + last_err = -1; + goto try_next_con; + } + } + + va_start(ap, cmd_fmt); + + for (i = QUERY_ATTEMPTS; i; i--) { + reply = redisvCommand(node->context, cmd_fmt, ap); + if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { + LM_INFO("Redis query failed: %p %.*s (%s)\n", + reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE", + node->context->errstr); + if (reply) { + freeReplyObject(reply); + reply = NULL; + } + if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) { + i = 0; break; + } + } else break; + } + + va_end(ap); + + if (i==0) { + LM_ERR("giving up on query to %s:%d\n", con->host, con->port); + last_err = -1; + goto try_next_con; + } + + if (i != QUERY_ATTEMPTS) + LM_INFO("successfully ran query after %d failed attempt(s)\n", + QUERY_ATTEMPTS - i); + + last_err = 0; + break; + +try_next_con: + ((redis_con *)connection->data)->current = con->next_con; + if (con->next_con != first) + LM_INFO("failing over to next Redis host (%s:%d)\n", + con->next_con->host, con->next_con->port); + } + + *rpl = reply; + return last_err; +} int redis_get(cachedb_con *connection,str *attr,str *val) { - redis_con *con; - cluster_node *node; redisReply *reply; - int i; + int rc; if (!attr || !val || !connection) { LM_ERR("null parameter\n"); return -1; } - redis_run_command(con,attr,"GET %b",attr->s,attr->len); + rc = redis_run_command(connection, &reply, attr, "GET %b", attr->s, attr->len); + if (rc != 0) { + if (reply && (reply->type == REDIS_REPLY_NIL || reply->str == NULL + || reply->len == 0)) { + LM_DBG("no such key - %.*s\n",attr->len,attr->s); + val->s = NULL; + val->len = 0; + freeReplyObject(reply); + return -2; + } - if (reply->type == REDIS_REPLY_NIL || reply->str == NULL - || reply->len == 0) { - LM_DBG("no such key - %.*s\n",attr->len,attr->s); - val->s = NULL; - val->len = 0; - freeReplyObject(reply); - return -2; + goto out_err; } LM_DBG("GET %.*s - %.*s\n",attr->len,attr->s,(unsigned)reply->len,reply->str); @@ -422,29 +564,33 @@ int redis_get(cachedb_con *connection,str *attr,str *val) val->s = pkg_malloc(reply->len); if (val->s == NULL) { LM_ERR("no more pkg\n"); - freeReplyObject(reply); - return -1; + goto out_err; } memcpy(val->s,reply->str,reply->len); val->len = reply->len; freeReplyObject(reply); return 0; + +out_err: + freeReplyObject(reply); + return rc; } int redis_set(cachedb_con *connection,str *attr,str *val,int expires) { - redis_con *con; - cluster_node *node; redisReply *reply; - int i; + int rc; if (!attr || !val || !connection) { LM_ERR("null parameter\n"); return -1; } - redis_run_command(con,attr,"SET %b %b",attr->s,attr->len,val->s,val->len); + rc = redis_run_command(connection, &reply, attr, "SET %b %b", + attr->s, (size_t)attr->len, val->s, (size_t)val->len); + if (rc != 0) + goto out_err; LM_DBG("set %.*s to %.*s - status = %d - %.*s\n",attr->len,attr->s,val->len, val->s,reply->type,(unsigned)reply->len,reply->str); @@ -452,7 +598,10 @@ int redis_set(cachedb_con *connection,str *attr,str *val,int expires) freeReplyObject(reply); if (expires) { - redis_run_command(con,attr,"EXPIRE %b %d",attr->s,attr->len,expires); + rc = redis_run_command(connection, &reply, attr, "EXPIRE %b %d", + attr->s, (size_t)attr->len, expires); + if (rc != 0) + goto out_err; LM_DBG("set %.*s to expire in %d s - %.*s\n",attr->len,attr->s,expires, (unsigned)reply->len,reply->str); @@ -461,6 +610,10 @@ int redis_set(cachedb_con *connection,str *attr,str *val,int expires) } return 0; + +out_err: + freeReplyObject(reply); + return rc; } /* returns 0 in case of successful remove @@ -468,49 +621,58 @@ int redis_set(cachedb_con *connection,str *attr,str *val,int expires) * return -1 in case of error */ int redis_remove(cachedb_con *connection,str *attr) { - redis_con *con; - cluster_node *node; redisReply *reply; - int ret=0,i; + int rc; if (!attr || !connection) { LM_ERR("null parameter\n"); return -1; } - redis_run_command(con,attr,"DEL %b",attr->s,attr->len); + rc = redis_run_command(connection, &reply, attr, "DEL %b", + attr->s, (size_t)attr->len); + if (rc != 0) + goto out_err; if (reply->integer == 0) { LM_DBG("Key %.*s does not exist in DB\n",attr->len,attr->s); - ret = 1; + rc = 1; } else LM_DBG("Key %.*s successfully removed\n",attr->len,attr->s); freeReplyObject(reply); - return ret; + return rc; + +out_err: + freeReplyObject(reply); + return rc; } /* returns the new value of the counter */ int redis_add(cachedb_con *connection,str *attr,int val,int expires,int *new_val) { - redis_con *con; - cluster_node *node; redisReply *reply; - int i; + int rc; if (!attr || !connection) { LM_ERR("null parameter\n"); return -1; } - redis_run_command(con,attr,"INCRBY %b %d",attr->s,attr->len,val); + rc = redis_run_command(connection, &reply, attr, "INCRBY %b %d", + attr->s, (size_t)attr->len,val); + if (rc != 0) + goto out_err; if (new_val) *new_val = reply->integer; freeReplyObject(reply); if (expires) { - redis_run_command(con,attr,"EXPIRE %b %d",attr->s,attr->len,expires); + rc = redis_run_command(connection, &reply, attr, "EXPIRE %b %d", + attr->s, (size_t)attr->len,expires); + if (rc != 0) + goto out_err; LM_DBG("set %.*s to expire in %d s - %.*s\n",attr->len,attr->s,expires, (unsigned)reply->len,reply->str); @@ -518,29 +680,37 @@ int redis_add(cachedb_con *connection,str *attr,int val,int expires,int *new_val freeReplyObject(reply); } - return 0; + return rc; + +out_err: + freeReplyObject(reply); + return rc; } int redis_sub(cachedb_con *connection,str *attr,int val,int expires,int *new_val) { - redis_con *con; - cluster_node *node; redisReply *reply; - int i; + int rc; if (!attr || !connection) { LM_ERR("null parameter\n"); return -1; } - redis_run_command(con,attr,"DECRBY %b %d",attr->s,attr->len,val); + rc = redis_run_command(connection, &reply, attr, "DECRBY %b %d", + attr->s, (size_t)attr->len, val); + if (rc != 0) + goto out_err; if (new_val) *new_val = reply->integer; freeReplyObject(reply); if (expires) { - redis_run_command(con,attr,"EXPIRE %b %d",attr->s,attr->len,expires); + rc = redis_run_command(connection, &reply, attr, "EXPIRE %b %d", + attr->s, (size_t)attr->len, expires); + if (rc != 0) + goto out_err; LM_DBG("set %.*s to expire in %d s - %.*s\n",attr->len,attr->s,expires, (unsigned)reply->len,reply->str); @@ -549,14 +719,16 @@ int redis_sub(cachedb_con *connection,str *attr,int val,int expires,int *new_val } return 0; + +out_err: + freeReplyObject(reply); + return rc; } int redis_get_counter(cachedb_con *connection,str *attr,int *val) { - redis_con *con; - cluster_node *node; redisReply *reply; - int i,ret; + int ret, rc; str response; if (!attr || !val || !connection) { @@ -564,7 +736,10 @@ int redis_get_counter(cachedb_con *connection,str *attr,int *val) return -1; } - redis_run_command(con,attr,"GET %b",attr->s,attr->len); + rc = redis_run_command(connection, &reply, attr, "GET %b", + attr->s, (size_t)attr->len); + if (rc != 0) + goto out_err; if (reply->type == REDIS_REPLY_NIL || reply->str == NULL || reply->len == 0) { @@ -588,6 +763,10 @@ int redis_get_counter(cachedb_con *connection,str *attr,int *val) freeReplyObject(reply); return 0; + +out_err: + freeReplyObject(reply); + return rc; } int redis_raw_query_handle_reply(redisReply *reply,cdb_raw_entry ***ret, @@ -740,68 +919,26 @@ int redis_raw_query_extract_key(str *attr,str *query_key) return 0; } -int redis_raw_query_send(cachedb_con *connection,redisReply **reply,cdb_raw_entry ***rpl,int expected_kv_no,int *reply_no,str *attr, ...) +int redis_raw_query_send(cachedb_con *connection, redisReply **reply, + cdb_raw_entry ***_, int __, int *___, str *attr, ...) { - redis_con *con; - cluster_node *node; - int i,end; - va_list ap; + static str attr_nt; str query_key; - con = (redis_con *)connection->data; - - if (!(con->flags & REDIS_INIT_NODES) && redis_connect(con) < 0) { - LM_ERR("failed to connect to DB\n"); - return -9; - } - - if (redis_raw_query_extract_key(attr,&query_key) < 0) { - LM_ERR("Failed to extra Redis raw query key \n"); + if (redis_raw_query_extract_key(attr, &query_key) < 0) { + LM_ERR("Failed to extract Redis raw query key\n"); return -1; } - node = get_redis_connection(con,&query_key); - if (node == NULL) { - LM_ERR("Bad cluster configuration\n"); - return -10; - } - - if (node->context == NULL) { - if (redis_reconnect_node(con,node) < 0) { - return -1; - } - } - - va_start(ap,attr); - end = attr->s[attr->len]; - attr->s[attr->len] = 0; - - for (i = QUERY_ATTEMPTS; i; i--) { - *reply = redisvCommand(node->context,attr->s,ap); - if (*reply == NULL || (*reply)->type == REDIS_REPLY_ERROR) { - LM_INFO("Redis query failed: %.*s\n", - *reply?(unsigned)((*reply)->len):7,*reply?(*reply)->str:"FAILURE"); - if (*reply) - freeReplyObject(*reply); - if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) { - i = 0; break; - } - } else break; - } - - va_end(ap); - attr->s[attr->len]=end; - - if (i==0) { - LM_ERR("giving up on query\n"); + if (pkg_str_extend(&attr_nt, attr->len + 1) < 0) { + LM_ERR("oom\n"); return -1; } - if (i != QUERY_ATTEMPTS) - LM_INFO("successfully ran query after %d failed attempt(s)\n", - QUERY_ATTEMPTS - i); + memcpy(attr_nt.s, attr->s, attr->len); + attr_nt.s[attr->len] = '\0'; - return 0; + return redis_run_command(connection, reply, &query_key, attr_nt.s); } int redis_raw_query(cachedb_con *connection,str *attr,cdb_raw_entry ***rpl,int expected_kv_no,int *reply_no) diff --git a/modules/cachedb_redis/cachedb_redis_dbase.h b/modules/cachedb_redis/cachedb_redis_dbase.h index efc0cd04746..dcadf4ba704 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.h +++ b/modules/cachedb_redis/cachedb_redis_dbase.h @@ -62,16 +62,29 @@ enum redis_flag { REDIS_SINGLE_INSTANCE = 1 << 0, REDIS_CLUSTER_INSTANCE = 1 << 1, REDIS_INIT_NODES = 1 << 2, + + /* failover set (combination of single and/or cluster instances) */ + REDIS_MULTIPLE_HOSTS = 1 << 3, }; -typedef struct { +typedef struct _redis_con { + /* ------ Fixed conn header -------- */ struct cachedb_id *id; unsigned int ref; struct cachedb_pool_con_t *next; + /* --------------------------------- */ + + char *host; // Note: the .id may contain multi-hosts, so the + unsigned short port; // host/port of this connection are extracted here enum redis_flag flags; unsigned short slots_assigned; /* total slots for cluster */ cluster_node *nodes; /* one or more Redis nodes */ + + /* circular list of Redis instances to be attempted in failover fashion */ + struct _redis_con *next_con; + /* only populated for 1st item in the list: the "last-known-to-work" con */ + struct _redis_con *current; } redis_con; cachedb_con* redis_init(str *url);