From cd6351fdd99ad9da5d588db82c8d59c163adb89c Mon Sep 17 00:00:00 2001 From: Razvan Crainea Date: Fri, 19 Nov 2021 12:54:28 +0200 Subject: [PATCH] cgrates: rework connection re-connect Use IPC events to re-connect to disabled connections --- modules/cgrates/cgrates.c | 5 +- modules/cgrates/cgrates_common.c | 4 +- modules/cgrates/cgrates_engine.c | 91 ++++++++++++++++++++++++++++++-- modules/cgrates/cgrates_engine.h | 2 +- 4 files changed, 93 insertions(+), 9 deletions(-) diff --git a/modules/cgrates/cgrates.c b/modules/cgrates/cgrates.c index 48c6d0a282a..7d3644e4a30 100644 --- a/modules/cgrates/cgrates.c +++ b/modules/cgrates/cgrates.c @@ -31,6 +31,7 @@ #include "../../mem/mem.h" #include "../../db/db.h" #include "../../mod_fix.h" +#include "../../ipc.h" #include "../../lib/list.h" #include "../../resolve.h" #include "cgrates.h" @@ -277,8 +278,8 @@ static int child_init(int rank) if ((c = cgrc_new(e)) != NULL) { e->default_con = c; CGRC_SET_DEFAULT(c); - if (cgrc_conn(c) >= 0) - cgrc_start_listen(c); + if (ipc_send_rpc(process_no, cgrc_conn_rpc, c) < 0) + LM_ERR("could not send connect job!\n"); } } return cgr_init_common(); diff --git a/modules/cgrates/cgrates_common.c b/modules/cgrates/cgrates_common.c index 9b091a363cb..425716ff8bf 100644 --- a/modules/cgrates/cgrates_common.c +++ b/modules/cgrates/cgrates_common.c @@ -670,10 +670,8 @@ int cgr_handle_async_cmd(struct sip_msg *msg, json_object *jmsg, if (!(c = cgr_get_free_conn(e))) continue; /* found a free connection - build the buffer */ - if (cgrc_send(c, &smsg) < 0) { - cgrc_close(c, CGRC_IS_LISTEN(c)); + if (cgrc_send(c, &smsg) < 0) continue; - } cp->c = c; /* message successfully sent - now fetch the reply */ if (CGRC_IS_DEFAULT(c)) { diff --git a/modules/cgrates/cgrates_engine.c b/modules/cgrates/cgrates_engine.c index 96089bccd82..5df79c4fd58 100644 --- a/modules/cgrates/cgrates_engine.c +++ b/modules/cgrates/cgrates_engine.c @@ -27,6 +27,86 @@ #include "../../resolve.h" #include "../../async.h" +#include "../../lib/timerfd.h" + +static int cgrc_conn(struct cgr_conn *c); +static void cgrc_reconn_rpc(int sender, void *p); + +static int cgrc_reconn(struct cgr_conn *c) +{ + if (cgrc_conn(c) >= 0 && c == c->engine->default_con) + return cgrc_start_listen(c); + return -1; +} + +#ifdef HAVE_TIMER_FD +static void cgr_conn_schedule(struct cgr_conn *c); + +static int cgrc_conn_sched(int fd, void *p) +{ + struct cgr_conn *c = (struct cgr_conn *)p; + /* if connect not succeeded, re-schedule */ + LM_INFO("re-connecting to %.*s:%d\n", c->engine->host.len, + c->engine->host.s, c->engine->port); + if (cgrc_reconn(c) < 0) + cgr_conn_schedule(c); + return 1; +} + +static void cgr_conn_schedule(struct cgr_conn *c) +{ + int fd; + struct itimerspec its; + + if (c->disable_time + cgre_retry_tout <= time(NULL)) { + /* no need for timer schedule - we should re-connect immediately */ + if (ipc_send_rpc(process_no, cgrc_reconn_rpc, c) < 0) + LM_ERR("could not send re-connect job!\n"); + return; + } + + if ((fd = timerfd_create(CLOCK_REALTIME, 0)) < 0) { + LM_ERR("failed to create new timer FD (%d) <%s>\n", + errno, strerror(errno)); + return; + } + memset(&its, 0, sizeof(its)); + + /* set the time */ + its.it_value.tv_sec = cgre_retry_tout; + its.it_value.tv_nsec = 0; + its.it_interval.tv_sec = 0; + its.it_interval.tv_nsec = 0; + if (timerfd_settime(fd, 0, &its, NULL) < 0) { + LM_ERR("failed to set timer FD (%d) <%s>\n", + errno, strerror(errno)); + return; + } + + /* schedule re-connect */ + if (register_async_fd(fd, cgrc_conn_sched, c) < 0) + LM_ERR("could not schedule conn reconnect\n"); +} + +#else /* HAVE_TIMER_FD */ +#warning Your GLIB is too old, disabling cgrates async re-connect!!! +#define cgr_conn_schedule() +#endif /* HAVE_TIMER_FD */ + +void cgrc_conn_rpc(int sender, void *p) +{ + struct cgr_conn *c = (struct cgr_conn *)p; + cgrc_reconn(c); +} + +static void cgrc_reconn_rpc(int sender, void *p) +{ + struct cgr_conn *c = (struct cgr_conn *)p; + if (cgrc_reconn(c) < 0) + cgr_conn_schedule(c); +} + + struct cgr_conn *cgr_get_free_conn(struct cgr_engine *e) { struct list_head *l; @@ -45,6 +125,7 @@ struct cgr_conn *cgr_get_free_conn(struct cgr_engine *e) LM_INFO("cannot connect to %.*s:%d\n", c->engine->host.len, c->engine->host.s, c->engine->port); c->disable_time = now; + cgr_conn_schedule(c); } else { c->state = CGRC_FREE; e->disable_time = 0; @@ -99,6 +180,7 @@ struct cgr_conn *cgr_get_default_conn(struct cgr_engine *e) LM_INFO("cannot connect to %.*s:%d\n", e->host.len, e->host.s, e->port); e->default_con->disable_time = now; + cgr_conn_schedule(e->default_con); } else { LM_INFO("re-connected to %.*s:%d\n", e->host.len, e->host.s, e->port); @@ -144,6 +226,11 @@ void cgrc_close(struct cgr_conn *c, int release) c->disable_time = time(NULL); /* clean whatever was left in the buffer */ json_tokener_reset(c->jtok); + + /* we need to schedule the re-connect before closing the socket, otherwise + * there is a high chance that the schedule take the same FD, thus it will + * be later on removed from the reactor again */ + cgr_conn_schedule(c); if (release) { reactor_del_reader(c->fd, -1, IO_FD_CLOSING); close(c->fd); @@ -151,10 +238,9 @@ void cgrc_close(struct cgr_conn *c, int release) LM_INFO("closing connection %.*s:%hu\n", c->engine->host.len, c->engine->host.s, c->engine->port); - /* TODO: how should we re-enable connections? */ } -int cgrc_conn(struct cgr_conn *c) +static int cgrc_conn(struct cgr_conn *c) { int s = -1; union sockaddr_union my_name; @@ -187,7 +273,6 @@ int cgrc_conn(struct cgr_conn *c) return -1; } - /* sends a message to the cgrates engine */ int cgrc_send(struct cgr_conn *c, str *buf) { diff --git a/modules/cgrates/cgrates_engine.h b/modules/cgrates/cgrates_engine.h index c23873be1a9..fa64b9c03aa 100644 --- a/modules/cgrates/cgrates_engine.h +++ b/modules/cgrates/cgrates_engine.h @@ -54,7 +54,7 @@ extern struct list_head cgrates_engines; extern int cgre_retry_tout; extern int cgrc_max_conns; extern str cgre_bind_ip; -int cgrc_conn(struct cgr_conn *c); +void cgrc_conn_rpc(int sender, void *p); int cgrc_send(struct cgr_conn *c, str *buf); int cgrc_start_listen(struct cgr_conn *c); void cgrc_close(struct cgr_conn *c, int remove);