Skip to content

Commit

Permalink
freeswitch: Add heartbeat event parsing & pushing to subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
liviuchircu committed Jan 30, 2017
1 parent 8580d2b commit 1c4bdd5
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 62 deletions.
71 changes: 40 additions & 31 deletions modules/freeswitch/fs_api.c
Expand Up @@ -33,30 +33,29 @@

struct list_head *fs_boxes;

typedef struct _fs_mod_ref {
str tag;
struct list_head list;
} fs_mod_ref;

static fs_mod_ref *mk_fs_mod_ref(str *tag);
static fs_mod_ref *mk_fs_mod_ref(str *tag, ev_hb_cb_f cbf, const void *priv);
static void free_fs_mod_ref(fs_mod_ref *mod_tag);
static fs_evs *find_fs_evs(str *hostport);

static fs_mod_ref *mk_fs_mod_ref(str *tag)
static fs_mod_ref *mk_fs_mod_ref(str *tag, ev_hb_cb_f cbf, const void *priv)
{
fs_mod_ref *fs_tag = NULL;
fs_mod_ref *mref = NULL;

fs_tag = shm_malloc(sizeof *fs_tag + tag->len);
if (!fs_tag) {
mref = shm_malloc(sizeof *mref + tag->len);
if (!mref) {
LM_ERR("out of mem\n");
return NULL;
}
memset(mref, 0, sizeof *mref);

mref->tag.s = (char *)(mref + 1);
mref->tag.len = tag->len;
memcpy(mref->tag.s, tag->s, tag->len);

fs_tag->tag.s = (char *)(fs_tag + 1);
fs_tag->tag.len = tag->len;
memcpy(fs_tag->tag.s, tag->s, tag->len);
mref->hb_cb = cbf;
mref->priv = priv;

return fs_tag;
return mref;
}

static void free_fs_mod_ref(fs_mod_ref *mod_tag)
Expand All @@ -70,7 +69,7 @@ static int has_tag(fs_evs *evs, str *tag)
struct list_head *ele;
fs_mod_ref *mtag;

list_for_each(ele, &evs->modlist) {
list_for_each(ele, &evs->modules) {
mtag = list_entry(ele, fs_mod_ref, list);

if (str_strcmp(&mtag->tag, tag) == 0) {
Expand Down Expand Up @@ -129,6 +128,7 @@ static fs_evs *mk_fs_evs(str *hostport)
return NULL;
}
memset(evs, 0, sizeof *evs);
INIT_LIST_HEAD(&evs->modules);

LM_DBG("new FS box: host=%.*s, port=%d\n", st.len, st.s, port);

Expand All @@ -141,11 +141,10 @@ static fs_evs *mk_fs_evs(str *hostport)
return evs;
}

fs_evs *add_fs_event_sock(str *evs_str, str *tag, enum fs_evs_types type,
ev_hrbeat_cb_f scb, void *info)
fs_evs *add_hb_evs(str *evs_str, str *tag, ev_hb_cb_f cbf, const void *priv)
{
fs_evs *evs;
fs_mod_ref *mtag;
fs_mod_ref *mref;

if (!evs_str->s || evs_str->len == 0 || !tag) {
LM_ERR("bad params: '%.*s', %.*s\n", evs_str->len, evs_str->s,
Expand All @@ -154,31 +153,41 @@ fs_evs *add_fs_event_sock(str *evs_str, str *tag, enum fs_evs_types type,
}

evs = find_fs_evs(evs_str);
if (evs) {
if (!has_tag(evs, tag)) {
mtag = mk_fs_mod_ref(tag);
if (!mtag) {
LM_ERR("mk tag failed\n");
return NULL;
}

list_add(&mtag->list, &evs->modlist);
}
} else {
if (!evs) {
evs = mk_fs_evs(evs_str);
if (!evs) {
LM_ERR("failed to create FS box!\n");
return NULL;
}
evs->type = type;
evs->type = FS_GW_STATS;

list_add(&evs->list, fs_boxes);
}

if (!has_tag(evs, tag)) {
mref = mk_fs_mod_ref(tag, cbf, priv);
if (!mref) {
LM_ERR("mk tag failed\n");
return NULL;
}

list_add(&mref->list, &evs->modules);
}

return evs;
}

int del_fs_event_sock(fs_evs *evs, str *tag)
int del_hb_evs(fs_evs *evs, str *tag)
{
return 0;
}

int fs_bind(fs_api_t *fapi)
{
memset(fapi, 0, sizeof *fapi);

fapi->add_hb_evs = add_hb_evs;
fapi->del_hb_evs = del_hb_evs;

return 0;
}
53 changes: 31 additions & 22 deletions modules/freeswitch/fs_api.h
Expand Up @@ -39,57 +39,66 @@ enum fs_evs_types {
FS_GW_STATS,
};

typedef struct _fs_evs {
typedef struct _fs_evs fs_evs;
typedef struct _fs_ev_hb fs_ev_hb;

typedef int (*ev_hb_cb_f) (fs_evs *evs, str *tag, fs_ev_hb *hb, const void *priv);

typedef struct _fs_mod_ref {
str tag;
ev_hb_cb_f hb_cb;
const void *priv;

struct list_head list;
} fs_mod_ref;

struct _fs_evs {
enum fs_evs_types type;
str host; /* host->s is also NULL-terminated */
str host; /* host->s is NULL-terminated */
esl_port_t port;

esl_handle_t *handle;

struct list_head list; /* distinct FS boxes */
struct list_head modlist; /* distinct module references to the same box */
} fs_evs;
struct list_head list; /* distinct FS boxes */
struct list_head modules; /* distinct modules referencing the same box */
};

/* statistics contained within a FreeSWITCH "HEARTBEAT" event */
typedef struct _fs_ev_hrbeat {
struct _fs_ev_hb {
float id_cpu;
int sess;
int max_sess;
} fs_ev_hrbeat;
};

typedef struct _fs_api_t fs_api_t;
/* host[:port] (8021) struct/lock/etc. */
typedef int (*ev_hrbeat_cb_f) (fs_evs *evs, str *tag, fs_ev_hrbeat *hb, void *info);
typedef fs_evs* (*add_fs_evs_f) (str *evs_str, str *tag, enum fs_evs_types type,
ev_hrbeat_cb_f scb, void *info);
typedef fs_evs* (*add_hb_evs_f) (str *evs_str, str *tag,
ev_hb_cb_f scb, const void *priv);

typedef int (*del_fs_evs_f) (fs_evs *evs, str *tag);
typedef int (*del_hb_evs_f) (fs_evs *evs, str *tag);

// XXX remove after dev
fs_evs *add_fs_event_sock(str *evs_str, str *tag, enum fs_evs_types type,
ev_hrbeat_cb_f scb, void *info);
int del_fs_event_sock(fs_evs *evs, str *tag);
fs_evs *add_hb_evs(str *evs_str, str *tag, ev_hb_cb_f scb, const void *priv);
int del_hb_evs(fs_evs *evs, str *tag);

struct _fs_api_t {
/*
* Creates & registers a new FS event socket
* (to be managed by the stat-fetching thread)
* Creates & registers a new FS "HEARTBEAT" event socket
* (all FS connections will be managed by one process)
*
* Return: the newly created event socket
*/
add_fs_evs_f add_fs_evs;
add_hb_evs_f add_hb_evs;

/*
* Detach & free a FS event sock from the stat-fetching thread's iteration list
* Detach & free a FS "HEARTBEAT" event sock from the
* stat-fetching process' iteration list
*
* Return: 0 on success, < 0 on failure
*/
del_fs_evs_f del_fs_evs;

del_hb_evs_f del_hb_evs;
};

extern struct list_head *fs_boxes;

int fs_bind(fs_api_t *fapi);

#endif /* __FREESWITCH_API__ */
18 changes: 11 additions & 7 deletions modules/freeswitch/fs_mod.c
Expand Up @@ -29,10 +29,14 @@
#include "../../timer.h"
#include "../../mod_fix.h"
#include "../../parser/msg_parser.h"
#include "../../mem/mem.h"
#include "../../lib/osips_malloc.h"

#include "fs_api.h"
#include "fs_proc.h"

extern struct list_head *fs_boxes;

static int mod_init(void);

static cmd_export_t cmds[] = {
Expand Down Expand Up @@ -81,21 +85,21 @@ struct module_exports exports= {
static int mod_init(void)
{
str st = { "test", 4};
cJSON_Hooks hooks;

fs_boxes = shm_malloc(sizeof *fs_boxes);
if (fs_boxes == NULL) {
if (!fs_boxes) {
LM_ERR("out of mem\n");
return -1;
}
INIT_LIST_HEAD(fs_boxes);

str dst = {MI_SSTR("10.0.0.238:8021")};
add_fs_event_sock(&dst, &st, FS_GW_STATS, NULL, NULL);
hooks.malloc_fn = osips_pkg_malloc;
hooks.free_fn = osips_pkg_free;
cJSON_InitHooks(&hooks);

return 0;
}
str dst = {MI_SSTR("10.0.0.238:8021")};
add_hb_evs(&dst, &st, NULL, NULL);

int fs_bind(fs_api_t *fapi)
{
return 0;
}
47 changes: 45 additions & 2 deletions modules/freeswitch/fs_proc.c
Expand Up @@ -34,10 +34,17 @@

#include "fs_api.h"

extern struct list_head *fs_boxes;

inline static int handle_io(struct fd_map *fm, int idx, int event_type)
{
struct list_head *ele;
fs_evs *box = (fs_evs *)fm->data;
fs_mod_ref *mod;
fs_ev_hb hb;
esl_status_t rc;
cJSON *ev = NULL;
char *s, *end;

LM_DBG("FS data is available!\n");

Expand Down Expand Up @@ -65,14 +72,50 @@ inline static int handle_io(struct fd_map *fm, int idx, int event_type)
return 0;
}

/* TODO: push stats to subscribers */
LM_INFO("FS stats:\n%s\n", box->handle->last_sr_event->body);
ev = cJSON_Parse(box->handle->last_sr_event->body);
if (!ev) {
LM_ERR("oom\n");
return 0;
}

s = cJSON_GetObjectItem(ev, "Idle-CPU")->valuestring;
hb.id_cpu = strtof(s, &end);
if (*end) {
LM_ERR("bad Idle-CPU: %s\n", s);
goto out_free;
}

s = cJSON_GetObjectItem(ev, "Session-Count")->valuestring;
hb.sess = strtol(s, &end, 0);
if (*end) {
LM_ERR("bad Session-Count: %s\n", s);
goto out_free;
}

s = cJSON_GetObjectItem(ev, "Max-Sessions")->valuestring;
hb.max_sess = strtol(s, &end, 0);
if (*end) {
LM_ERR("bad Max-Sessions: %s\n", s);
goto out_free;
}

LM_DBG("FS (%s:%d) heartbeat (id: %.3f, ch: %d/%d):\n%s\n", box->host.s,
box->port, hb.id_cpu, hb.sess, hb.max_sess,
box->handle->last_sr_event->body);

list_for_each(ele, &box->modules) {
mod = list_entry(ele, fs_mod_ref, list);
mod->hb_cb(box, &mod->tag, &hb, mod->priv);
}

break;
default:
LM_CRIT("unknown fd type %d in FreeSWITCH worker\n", fm->type);
return -1;
}

out_free:
cJSON_Delete(ev);
return 0;
}

Expand Down

0 comments on commit 1c4bdd5

Please sign in to comment.