Skip to content

Commit

Permalink
cgrates: rework connection re-connect
Browse files Browse the repository at this point in the history
Use IPC events to re-connect to disabled connections
  • Loading branch information
razvancrainea committed Nov 19, 2021
1 parent 394ba08 commit cd6351f
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 9 deletions.
5 changes: 3 additions & 2 deletions modules/cgrates/cgrates.c
Expand Up @@ -31,6 +31,7 @@
#include "../../mem/mem.h"
#include "../../db/db.h"
#include "../../mod_fix.h"
#include "../../ipc.h"
#include "../../lib/list.h"
#include "../../resolve.h"
#include "cgrates.h"
Expand Down Expand Up @@ -277,8 +278,8 @@ static int child_init(int rank)
if ((c = cgrc_new(e)) != NULL) {
e->default_con = c;
CGRC_SET_DEFAULT(c);
if (cgrc_conn(c) >= 0)
cgrc_start_listen(c);
if (ipc_send_rpc(process_no, cgrc_conn_rpc, c) < 0)
LM_ERR("could not send connect job!\n");
}
}
return cgr_init_common();
Expand Down
4 changes: 1 addition & 3 deletions modules/cgrates/cgrates_common.c
Expand Up @@ -670,10 +670,8 @@ int cgr_handle_async_cmd(struct sip_msg *msg, json_object *jmsg,
if (!(c = cgr_get_free_conn(e)))
continue;
/* found a free connection - build the buffer */
if (cgrc_send(c, &smsg) < 0) {
cgrc_close(c, CGRC_IS_LISTEN(c));
if (cgrc_send(c, &smsg) < 0)
continue;
}
cp->c = c;
/* message successfully sent - now fetch the reply */
if (CGRC_IS_DEFAULT(c)) {
Expand Down
91 changes: 88 additions & 3 deletions modules/cgrates/cgrates_engine.c
Expand Up @@ -27,6 +27,86 @@
#include "../../resolve.h"
#include "../../async.h"

#include "../../lib/timerfd.h"

static int cgrc_conn(struct cgr_conn *c);
static void cgrc_reconn_rpc(int sender, void *p);

static int cgrc_reconn(struct cgr_conn *c)
{
if (cgrc_conn(c) >= 0 && c == c->engine->default_con)
return cgrc_start_listen(c);
return -1;
}

#ifdef HAVE_TIMER_FD
static void cgr_conn_schedule(struct cgr_conn *c);

static int cgrc_conn_sched(int fd, void *p)
{
struct cgr_conn *c = (struct cgr_conn *)p;
/* if connect not succeeded, re-schedule */
LM_INFO("re-connecting to %.*s:%d\n", c->engine->host.len,
c->engine->host.s, c->engine->port);
if (cgrc_reconn(c) < 0)
cgr_conn_schedule(c);
return 1;
}

static void cgr_conn_schedule(struct cgr_conn *c)
{
int fd;
struct itimerspec its;

if (c->disable_time + cgre_retry_tout <= time(NULL)) {
/* no need for timer schedule - we should re-connect immediately */
if (ipc_send_rpc(process_no, cgrc_reconn_rpc, c) < 0)
LM_ERR("could not send re-connect job!\n");
return;
}

if ((fd = timerfd_create(CLOCK_REALTIME, 0)) < 0) {
LM_ERR("failed to create new timer FD (%d) <%s>\n",
errno, strerror(errno));
return;
}
memset(&its, 0, sizeof(its));

/* set the time */
its.it_value.tv_sec = cgre_retry_tout;
its.it_value.tv_nsec = 0;
its.it_interval.tv_sec = 0;
its.it_interval.tv_nsec = 0;
if (timerfd_settime(fd, 0, &its, NULL) < 0) {
LM_ERR("failed to set timer FD (%d) <%s>\n",
errno, strerror(errno));
return;
}

/* schedule re-connect */
if (register_async_fd(fd, cgrc_conn_sched, c) < 0)
LM_ERR("could not schedule conn reconnect\n");
}

#else /* HAVE_TIMER_FD */
#warning Your GLIB is too old, disabling cgrates async re-connect!!!
#define cgr_conn_schedule()
#endif /* HAVE_TIMER_FD */

void cgrc_conn_rpc(int sender, void *p)
{
struct cgr_conn *c = (struct cgr_conn *)p;
cgrc_reconn(c);
}

static void cgrc_reconn_rpc(int sender, void *p)
{
struct cgr_conn *c = (struct cgr_conn *)p;
if (cgrc_reconn(c) < 0)
cgr_conn_schedule(c);
}


struct cgr_conn *cgr_get_free_conn(struct cgr_engine *e)
{
struct list_head *l;
Expand All @@ -45,6 +125,7 @@ struct cgr_conn *cgr_get_free_conn(struct cgr_engine *e)
LM_INFO("cannot connect to %.*s:%d\n", c->engine->host.len,
c->engine->host.s, c->engine->port);
c->disable_time = now;
cgr_conn_schedule(c);
} else {
c->state = CGRC_FREE;
e->disable_time = 0;
Expand Down Expand Up @@ -99,6 +180,7 @@ struct cgr_conn *cgr_get_default_conn(struct cgr_engine *e)
LM_INFO("cannot connect to %.*s:%d\n", e->host.len,
e->host.s, e->port);
e->default_con->disable_time = now;
cgr_conn_schedule(e->default_con);
} else {
LM_INFO("re-connected to %.*s:%d\n", e->host.len,
e->host.s, e->port);
Expand Down Expand Up @@ -144,17 +226,21 @@ void cgrc_close(struct cgr_conn *c, int release)
c->disable_time = time(NULL);
/* clean whatever was left in the buffer */
json_tokener_reset(c->jtok);

/* we need to schedule the re-connect before closing the socket, otherwise
* there is a high chance that the schedule take the same FD, thus it will
* be later on removed from the reactor again */
cgr_conn_schedule(c);
if (release) {
reactor_del_reader(c->fd, -1, IO_FD_CLOSING);
close(c->fd);
}

LM_INFO("closing connection %.*s:%hu\n", c->engine->host.len,
c->engine->host.s, c->engine->port);
/* TODO: how should we re-enable connections? */
}

int cgrc_conn(struct cgr_conn *c)
static int cgrc_conn(struct cgr_conn *c)
{
int s = -1;
union sockaddr_union my_name;
Expand Down Expand Up @@ -187,7 +273,6 @@ int cgrc_conn(struct cgr_conn *c)
return -1;
}


/* sends a message to the cgrates engine */
int cgrc_send(struct cgr_conn *c, str *buf)
{
Expand Down
2 changes: 1 addition & 1 deletion modules/cgrates/cgrates_engine.h
Expand Up @@ -54,7 +54,7 @@ extern struct list_head cgrates_engines;
extern int cgre_retry_tout;
extern int cgrc_max_conns;
extern str cgre_bind_ip;
int cgrc_conn(struct cgr_conn *c);
void cgrc_conn_rpc(int sender, void *p);
int cgrc_send(struct cgr_conn *c, str *buf);
int cgrc_start_listen(struct cgr_conn *c);
void cgrc_close(struct cgr_conn *c, int remove);
Expand Down

0 comments on commit cd6351f

Please sign in to comment.