Skip to content

Commit

Permalink
siprec: add failover logic
Browse files Browse the repository at this point in the history
  • Loading branch information
razvancrainea committed Sep 28, 2017
1 parent bccc92a commit e47b3c4
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 54 deletions.
10 changes: 8 additions & 2 deletions modules/siprec/siprec.c
Expand Up @@ -73,8 +73,9 @@ static cmd_export_t cmds[] = {

/* exported parameters */
static param_export_t params[] = {
{"media_port_min", INT_PARAM, &siprec_port_min },
{"media_port_max", INT_PARAM, &siprec_port_max },
{"media_port_min", INT_PARAM, &siprec_port_min },
{"media_port_max", INT_PARAM, &siprec_port_max },
{"skip_failover_codes", STR_PARAM, &skip_failover_codes.s },
{0, 0, 0}
};

Expand Down Expand Up @@ -111,6 +112,11 @@ static int mod_init(void)
return -1;
}

if (src_init() < 0) {
LM_ERR("cannot initialize src structures!\n");
return -1;
}

if (load_dlg_api(&srec_dlg) != 0) {
LM_ERR("dialog module not loaded! Cannot use siprec module\n");
return -1;
Expand Down
155 changes: 120 additions & 35 deletions modules/siprec/src_logic.c
Expand Up @@ -25,8 +25,71 @@

#include "src_logic.h"
#include "srs_body.h"
#include "../../mod_fix.h"

struct b2b_api srec_b2b;
str skip_failover_codes = str_init("");
static regex_t skip_codes_regex;

static int srs_send_invite(struct src_sess *sess);

int src_init(void)
{
skip_failover_codes.len = strlen(skip_failover_codes.s);
if (!skip_failover_codes.len)
return 0;

/* here skip_failover_codes.s is always NULL terminated! */
if (regcomp(&skip_codes_regex, skip_failover_codes.s, (REG_EXTENDED|REG_ICASE|REG_NOSUB))) {
LM_ERR("cannot compile skip_failover_codes regex [%.*s]!\n",
skip_failover_codes.len, skip_failover_codes.s);
return -1;
}

return 0;
}

static int srs_skip_failover(str status)
{
regmatch_t pmatch;
char tmp_buff[4];

if (skip_failover_codes.len == 0)
return 0;
if (status.len > 3) {
LM_WARN("Unknown status %.*s\n", status.len, status.s);
return 0;
}
memcpy(tmp_buff, status.s, status.len);
tmp_buff[status.len] = 0;

if (!regexec(&skip_codes_regex, tmp_buff, 1, &pmatch, 0))
return 1;
return 0;
}

static int srs_do_failover(struct src_sess *sess)
{
struct srs_node *node;

if (list_empty(&sess->srs)) {
LM_BUG("failover without any destination!\n");
return -1;
}
srec_logic_destroy(sess);

/* pop the first element */
node = list_entry(sess->srs.next, struct srs_node, list);
list_del(&node->list);
shm_free(node);

if (list_empty(&sess->srs)) {
LM_INFO("no more SRS servers to use!\n");
return -1;
}

return srs_send_invite(sess);
}

static void tm_update_recording(struct cell *t, int type, struct tmcb_params *ps);

Expand Down Expand Up @@ -64,6 +127,7 @@ static void srec_dlg_end(struct dlg_cell *dlg, int type, struct dlg_cb_params *_
if (srec_b2b.send_request(&req) < 0)
LM_ERR("Cannot end recording session for key %.*s\n",
req.b2b_key->len, req.b2b_key->s);
srec_logic_destroy(ss);
}

static void srec_dlg_sequential(struct dlg_cell *dlg, int type, struct dlg_cb_params *_params)
Expand Down Expand Up @@ -158,8 +222,12 @@ static int srec_b2b_notify(struct sip_msg *msg, str *key, int type, void *param)
/* wait for a final reply */
return 0;
} else if (msg->REPLY_STATUS > 300) {
LM_DBG("recording is not available!\n");
goto no_recording;
if (srs_skip_failover(msg->first_line.u.reply.status) ||
srs_do_failover(ss) < 0) {
LM_DBG("no more to failover!\n");
goto no_recording;
} else
return 0;
}
ret = -1;

Expand Down Expand Up @@ -211,9 +279,11 @@ static int srec_b2b_notify(struct sip_msg *msg, str *key, int type, void *param)
LM_ERR("Cannot send bye for recording session with key %.*s\n",
req.b2b_key->len, req.b2b_key->s);
}
srec_logic_destroy(ss);

/* we finishd everything with the dialog, let it be! */
srec_dlg.unref_dlg(ss->dlg, 1);
ss->dlg = NULL;
SIPREC_UNREF(ss);
return ret;
}
Expand Down Expand Up @@ -270,15 +340,12 @@ static int srec_b2b_confirm(str* key, str* entity_key, int src, b2b_dlginfo_t* i
return 0;
}

/* starts the recording to the srs */
int src_start_recording(struct sip_msg *msg, struct src_sess *sess)
static int srs_send_invite(struct src_sess *sess)
{
str *client;
str param, body;
struct socket_info *send_sock = sess->socket;
union sockaddr_union tmp;
client_info_t ci;
int streams;
str param, body;
str *client;

static str extra_headers = str_init(
"Require: siprec" CRLF
"Content-Type: multipart/mixed;boundary=" OSS_BOUNDARY CRLF
Expand All @@ -288,29 +355,13 @@ int src_start_recording(struct sip_msg *msg, struct src_sess *sess)
ci.method.s = INVITE;
ci.method.len = INVITE_LEN;
/* try the first srs_uri */
ci.req_uri = sess->srs_uri;
ci.req_uri = SIPREC_SRS(sess);
/* TODO: fix uris */
ci.to_uri = ci.req_uri;
ci.from_uri = ci.to_uri;
ci.extra_headers = &extra_headers;

if (!send_sock) {
send_sock = uri2sock(msg, &ci.req_uri, &tmp, PROTO_NONE);
if (!send_sock) {
LM_ERR("cannot get send socket for uri %.*s\n",
ci.req_uri.len, ci.req_uri.s);
return -3;
}
}
ci.local_contact.s = contact_builder(send_sock, &ci.local_contact.len);

streams = srs_fill_sdp_stream(msg, sess, &sess->participants[1], 0);
if (streams < 0) {
LM_ERR("cannot add SDP for callee!\n");
return -2;
}
if (streams == 0)
return 0;
ci.local_contact.s = contact_builder(sess->socket, &ci.local_contact.len);

if (srs_build_body(sess, &body, SRS_BOTH) < 0) {
LM_ERR("cannot generate request body!\n");
Expand All @@ -321,14 +372,13 @@ int src_start_recording(struct sip_msg *msg, struct src_sess *sess)
/* XXX: hack to pass a parameter :( */
param.s = (char *)&sess;
param.len = sizeof(void *);
SIPREC_REF_UNSAFE(sess);
client = srec_b2b.client_new(&ci, srec_b2b_notify, srec_b2b_confirm,
(str *)&param);
if (!client) {
LM_ERR("cannot start recording with %.*s!\n",
ci.req_uri.len, ci.req_uri.s);
pkg_free(body.s);
goto unref;
return -1;
}
/* release generated body */
pkg_free(body.s);
Expand All @@ -337,16 +387,47 @@ int src_start_recording(struct sip_msg *msg, struct src_sess *sess)
sess->b2b_key.s = shm_malloc(client->len);
if (!sess->b2b_key.s) {
LM_ERR("out of shm memory!\n");
goto unref;
return -1;
}
memcpy(sess->b2b_key.s, client->s, client->len);
sess->b2b_key.len = client->len;

return 0;
}

/* starts the recording to the srs */
int src_start_recording(struct sip_msg *msg, struct src_sess *sess)
{
union sockaddr_union tmp;
int streams, ret;

if (!sess->socket) {
sess->socket = uri2sock(msg, &SIPREC_SRS(sess), &tmp, PROTO_NONE);
if (!sess->socket) {
LM_ERR("cannot get send socket for uri %.*s\n",
SIPREC_SRS(sess).len, SIPREC_SRS(sess).s);
return -3;
}
}

streams = srs_fill_sdp_stream(msg, sess, &sess->participants[1], 0);
if (streams < 0) {
LM_ERR("cannot add SDP for callee!\n");
return -2;
}
if (streams == 0)
return 0;

SIPREC_REF_UNSAFE(sess);
ret = srs_send_invite(sess);
if (ret < 0) {
SIPREC_UNREF_UNSAFE(sess);
return ret;
}

sess->flags |= SIPREC_STARTED;

return 1;
unref:
SIPREC_UNREF_UNSAFE(sess);
return -1;
}

static int src_update_recording(struct sip_msg *msg, struct src_sess *sess, int part_no)
Expand Down Expand Up @@ -425,16 +506,20 @@ void srec_logic_destroy(struct src_sess *sess)
b2b_dlginfo_t info;
if (!sess->b2b_key.s)
return;
shm_free(sess->b2b_key.s);

info.fromtag = sess->b2b_fromtag;
info.totag = sess->b2b_totag;
info.callid = sess->b2b_callid;
srec_b2b.entity_delete(B2B_CLIENT, &sess->b2b_key, &info, 1);
srec_b2b.entity_delete(B2B_CLIENT, &sess->b2b_key,
(info.callid.s ? &info: NULL), 1);
if (sess->b2b_fromtag.s)
shm_free(sess->b2b_fromtag.s);
if (sess->b2b_totag.s)
shm_free(sess->b2b_totag.s);
if (sess->b2b_callid.s)
shm_free(sess->b2b_callid.s);
shm_free(sess->b2b_key.s);
sess->b2b_callid.s = sess->b2b_totag.s = sess->b2b_fromtag.s = NULL;
sess->b2b_key.s = NULL;
}

2 changes: 2 additions & 0 deletions modules/siprec/src_logic.h
Expand Up @@ -36,5 +36,7 @@ int srec_restore_callback(struct src_sess *sess);
void srec_logic_destroy(struct src_sess *sess);

extern struct b2b_api srec_b2b;
extern str skip_failover_codes;
int src_init(void);

#endif /* _SIPREC_LOGIC_H_ */

0 comments on commit e47b3c4

Please sign in to comment.