Skip to content

Commit

Permalink
freeswitch: Refactor API
Browse files Browse the repository at this point in the history
    * add generic FS event subscribe/unsubscribe function stubs
    * new modparam: "event_heartbeat_interval"
    * expose "event_heartbeat_interval" in API, use wherever needed
    * improve naming for stats-based socket API functions
    * remove the callback-based support for updating stats
      (this never worked anyway, ds/lb would always pull data)
  • Loading branch information
liviuchircu committed Dec 20, 2017
1 parent 580fdb0 commit ff83a31
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 81 deletions.
20 changes: 10 additions & 10 deletions modules/dispatcher/dispatch.c
Expand Up @@ -135,7 +135,7 @@ static void ds_destroy_data_set( ds_data_t *d)
if(dest->param)
shm_free(dest->param);
if (dest->fs_sock)
fs_api.del_hb_evs(dest->fs_sock, &ds_str);
fs_api.put_stats_evs(dest->fs_sock, &ds_str);
dest = dest->next;
}while(dest);
shm_free(sp_curr->dlist);
Expand Down Expand Up @@ -318,7 +318,7 @@ int add_dest2list(int id, str uri, struct socket_info *sock, str *comsock, int s

if (fetch_freeswitch_stats) {
if (comsock->s && comsock->len > 0) {
dp->fs_sock = fs_api.add_hb_evs(comsock, &ds_str, NULL, NULL);
dp->fs_sock = fs_api.get_stats_evs(comsock, &ds_str);
if (!dp->fs_sock) {
LM_ERR("failed to create FreeSWITCH stats socket!\n");
} else {
Expand Down Expand Up @@ -398,21 +398,21 @@ static inline void re_calculate_active_dsts(ds_set_p sp)
/* pre-calculate the running weights for each destination */
for( j=0,i=-1,sp->active_nr=sp->nr ; j<sp->nr ; j++ ) {
dst = &sp->dlist[j];
if (dst->fs_sock && dst->fs_sock->hb_data.is_valid) {
lock_start_read(dst->fs_sock->hb_data_lk);
if (dst->fs_sock && dst->fs_sock->stats.valid) {
lock_start_read(dst->fs_sock->stats_lk);

oldw = dst->weight;
dst->weight = round(max_freeswitch_weight *
(1 - dst->fs_sock->hb_data.sess /
(float)dst->fs_sock->hb_data.max_sess) *
(dst->fs_sock->hb_data.id_cpu / (float)100));
(1 - dst->fs_sock->stats.sess /
(float)dst->fs_sock->stats.max_sess) *
(dst->fs_sock->stats.id_cpu / (float)100));

LM_DBG("weight update for %.*s: %d -> %d (%d %d %.3f)\n",
dst->uri.len, dst->uri.s, oldw, dst->weight,
dst->fs_sock->hb_data.sess, dst->fs_sock->hb_data.max_sess,
dst->fs_sock->hb_data.id_cpu);
dst->fs_sock->stats.sess, dst->fs_sock->stats.max_sess,
dst->fs_sock->stats.id_cpu);

lock_stop_read(dst->fs_sock->hb_data_lk);
lock_stop_read(dst->fs_sock->stats_lk);
}

/* running weight is the current weight plus the running weight of
Expand Down
2 changes: 1 addition & 1 deletion modules/dispatcher/dispatcher.c
Expand Up @@ -879,7 +879,7 @@ static int mod_init(void)
/* Register the weight-recalculation timer */
if (fetch_freeswitch_stats &&
register_timer("ds-update-weights", ds_update_weights, NULL,
FS_HEARTBEAT_ITV, TIMER_FLAG_SKIP_ON_DELAY)<0) {
fs_api.stats_update_interval, TIMER_FLAG_SKIP_ON_DELAY)<0) {
LM_ERR("failed to register timer for weight recalc!\n");
return -1;
}
Expand Down
52 changes: 37 additions & 15 deletions modules/freeswitch/fs_api.c
Expand Up @@ -28,21 +28,24 @@
#include "../../resolve.h"
#include "../../forward.h"
#include "../../ut.h"
#include "../../lib/url.h"

#include "fs_api.h"

extern int event_heartbeat_interval;

struct list_head *fs_boxes;
rw_lock_t *box_lock;

static fs_mod_ref *mk_fs_mod_ref(str *tag, ev_hb_cb_f cbf, const void *priv);
static fs_mod_ref *mk_fs_mod_ref(str *tag);
static void free_fs_mod_ref(fs_mod_ref *mref);
static fs_evs *get_fs_evs(str *hostport);

/*
* Parses and validates FreeSWITCH URLs:
* "fs://[username]:password@host[:port]"
* "[fs://][[username]:password@]host[:port][;EVENT1[,EVENT2[,.. ]]]"
*/
static int parse_fs_url(str *in, str *user_out, str *pass_out, str *host_out,
static int parse_fs_url(const str *in, str *user_out, str *pass_out, str *host_out,
unsigned int *port_out)
{
str st = *in;
Expand All @@ -58,7 +61,7 @@ static int parse_fs_url(str *in, str *user_out, str *pass_out, str *host_out,

p = memchr(st.s, ':', st.len);
if (!p || !(h = memchr(p, '@', st.len - (p - st.s)))) {
LM_ERR("missing password!\n");
LM_ERR("failed to locate \":password\" part!\n");
return -1;
}

Expand Down Expand Up @@ -93,7 +96,7 @@ static int parse_fs_url(str *in, str *user_out, str *pass_out, str *host_out,
return 0;
}

static fs_mod_ref *mk_fs_mod_ref(str *tag, ev_hb_cb_f cbf, const void *priv)
static fs_mod_ref *mk_fs_mod_ref(str *tag)
{
fs_mod_ref *mref = NULL;

Expand All @@ -108,9 +111,6 @@ static fs_mod_ref *mk_fs_mod_ref(str *tag, ev_hb_cb_f cbf, const void *priv)
mref->tag.len = tag->len;
memcpy(mref->tag.s, tag->s, tag->len);

mref->hb_cb = cbf;
mref->priv = priv;

return mref;
}

Expand Down Expand Up @@ -169,6 +169,15 @@ static fs_evs *mk_fs_evs(str *fs_url)
fs_evs *evs;
str user, pass, host;
unsigned int port;
struct url *url;

url = parse_url(fs_url, URL_REQ_SCHEME|URL_REQ_PASS, 0);
if (!url) {
LM_ERR("bad FS URL: '%.*s'! "
"Need: fs://[user]:pass@host[:port][;EVENT1[EVENT2,..]]\n",
fs_url->len, fs_url->s);
return NULL;
}

if (parse_fs_url(fs_url, &user, &pass, &host, &port) != 0) {
LM_ERR("bad FS URL: '%.*s'! Need: fs://[user]:pass@host[:port]\n",
Expand All @@ -184,8 +193,8 @@ static fs_evs *mk_fs_evs(str *fs_url)
memset(evs, 0, sizeof *evs);
INIT_LIST_HEAD(&evs->modules);

evs->hb_data_lk = lock_init_rw();
if (!evs->hb_data_lk) {
evs->stats_lk = lock_init_rw();
if (!evs->stats_lk) {
LM_ERR("out of mem\n");
shm_free(evs);
return NULL;
Expand All @@ -212,7 +221,17 @@ static fs_evs *mk_fs_evs(str *fs_url)
return evs;
}

fs_evs *add_hb_evs(str *evs_str, str *tag, ev_hb_cb_f cbf, const void *priv)
fs_evs *get_evs(str *evs_str, str *tag, struct str_list *sub_events)
{
return NULL;
}

int put_evs(fs_evs *evs, str *tag, struct str_list *unsub_events)
{
return 0;
}

fs_evs *get_stats_evs(str *evs_str, str *tag)
{
fs_evs *evs;
fs_mod_ref *mref;
Expand All @@ -238,7 +257,7 @@ fs_evs *add_hb_evs(str *evs_str, str *tag, ev_hb_cb_f cbf, const void *priv)
list_add(&evs->list, fs_boxes);
}

mref = mk_fs_mod_ref(tag, cbf, priv);
mref = mk_fs_mod_ref(tag);
if (!mref) {
LM_ERR("mk tag failed\n");
goto out_err;
Expand All @@ -256,7 +275,7 @@ fs_evs *add_hb_evs(str *evs_str, str *tag, ev_hb_cb_f cbf, const void *priv)
return NULL;
}

int del_hb_evs(fs_evs *evs, str *tag)
int put_stats_evs(fs_evs *evs, str *tag)
{
fs_mod_ref *mref;

Expand Down Expand Up @@ -306,8 +325,11 @@ int fs_bind(struct fs_binds *fapi)

memset(fapi, 0, sizeof *fapi);

fapi->add_hb_evs = add_hb_evs;
fapi->del_hb_evs = del_hb_evs;
fapi->stats_update_interval = event_heartbeat_interval;
fapi->get_evs = get_evs;
fapi->put_evs = put_evs;
fapi->get_stats_evs = get_stats_evs;
fapi->put_stats_evs = put_stats_evs;

return 0;
}
73 changes: 51 additions & 22 deletions modules/freeswitch/fs_api.h
Expand Up @@ -27,6 +27,7 @@
#ifndef __FREESWITCH_API__
#define __FREESWITCH_API__

#include "../../str_list.h"
#include "../../lib/list.h"
#include "../../rw_locking.h"
#include "../../ip_addr.h"
Expand All @@ -38,32 +39,27 @@
#define FS_SOCK_PREFIX_LEN (sizeof(FS_SOCK_PREFIX) - 1)

#define FS_DEFAULT_EVS_PORT 8021
#define FS_HEARTBEAT_ITV 1 /* assumed value, for best performance */

enum fs_evs_types {
FS_GW_STATS,
};

typedef struct _fs_evs fs_evs;
typedef struct _fs_ev_hb fs_ev_hb;

typedef int (*ev_hb_cb_f) (fs_evs *evs, str *tag, const void *priv);
typedef struct _fs_stats fs_stats;

typedef struct _fs_mod_ref {
str tag;
ev_hb_cb_f hb_cb;
const void *priv;

struct list_head list;
} fs_mod_ref;

/* statistics contained within a FreeSWITCH "HEARTBEAT" event */
struct _fs_ev_hb {
/* statistics contained within a FreeSWITCH instance */
struct _fs_stats {
float id_cpu;
int sess;
int max_sess;

int is_valid; /* FS load stats are invalid until the first heartbeat */
int valid; /* FS stats are invalid until the first heartbeat */
};

struct _fs_evs {
Expand All @@ -75,37 +71,70 @@ struct _fs_evs {

esl_handle_t *handle;

rw_lock_t *hb_data_lk;
fs_ev_hb hb_data;
rw_lock_t *stats_lk;
fs_stats stats;

int ref;

struct list_head list; /* distinct FS boxes */
struct list_head modules; /* distinct modules referencing the same box */
};

/* host[:port] (8021) struct/lock/etc. */
typedef fs_evs* (*add_hb_evs_f) (str *evs_str, str *tag,
ev_hb_cb_f scb, const void *priv);
typedef fs_evs* (*get_evs_f) (str *evs_str, str *tag,
struct str_list *sub_events);
typedef int (*put_evs_f) (fs_evs *evs, str *tag,
struct str_list *unsub_events);

typedef int (*del_hb_evs_f) (fs_evs *evs, str *tag);
typedef fs_evs* (*get_stats_evs_f) (str *evs_str, str *tag);
typedef int (*put_stats_evs_f) (fs_evs *evs, str *tag);

struct fs_binds {
/*
* Creates & registers a new FS "HEARTBEAT" event socket
* (all FS connections will be managed by one process)
* seconds-based interval at which the latest stats from a FreeSWITCH
* instance are expected to periodically arrive
*/
int stats_update_interval;

/*
* Obtain a FreeSWITCH event socket that is guaranteed to be subscribed
* to all given events that are FreeSWITCH-valid.
*
* Examples of some valid events FreeSWITCH allows subscriptions to:
* CHANNEL_STATE, CHANNEL_ANSWER, BACKGROUND_JOB, DTMF, HEARTBEAT
*
* NOTE: each get() must be paired up with an eventual put()
* (e.g., put() makes sense during a reload or shutdown)
*/
get_evs_f get_evs;

/*
* Return a FreeSWITCH event socket and unsubscribe from the given
* list of FreeSWITCH-valid events.
*
* Return: 0 on success, < 0 on failure
*/
put_evs_f put_evs;

/*
* Obtain a FreeSWITCH statistics event socket. The relevant statistics
* can be found under "evs->stats", and calling code can expect them to be
* updated at most every "api->stats_update_interval" seconds
*
* Return: the newly created event socket
* NOTE 1: always grab "evs->stats_lk" before reading the stats,
* otherwise you may read partially updated / corrupt data!
*
* NOTE 2: each get() must be paired up with an eventual put()
* (e.g., put() makes sense during a reload or shutdown)
*/
add_hb_evs_f add_hb_evs;
get_stats_evs_f get_stats_evs;

/*
* Detach & free a FS "HEARTBEAT" event sock from the
* stat-fetching process' iteration list
* Return a FreeSWITCH statistics event socket.
*
* Return: 0 on success, < 0 on failure
*/
del_hb_evs_f del_hb_evs;
put_stats_evs_f put_stats_evs;

};

static inline int is_fs_url(str *in)
Expand Down
28 changes: 27 additions & 1 deletion modules/freeswitch/fs_mod.c
Expand Up @@ -31,24 +31,34 @@
#include "../../parser/msg_parser.h"
#include "../../mem/mem.h"
#include "../../lib/osips_malloc.h"
#include "../../lib/csv.h"
#include "../../lib/url.h"

#include "fs_api.h"
#include "fs_proc.h"

extern struct list_head *fs_boxes;
extern rw_lock_t *box_lock;

/* this correlates with FreeSWITCH's "event-heartbeat-interval" param,
* located in autoload_configs/switch.conf.xml. The default there is 20s,
* but we're using a more granular default, just to be on the safe side */
int event_heartbeat_interval = 1;

static int mod_init(void);

int fs_bind(struct fs_binds *fapi);
int modparam_sub_evs(modparam_t type, void *string);

static cmd_export_t cmds[] = {
{ "fs_bind", (cmd_function)fs_bind, 1, NULL, NULL, 0 },
{ NULL, NULL, 0, NULL, NULL, 0 }
};

static param_export_t mod_params[] = {
{ 0,0,0 }
{"event_heartbeat_interval", INT_PARAM, &event_heartbeat_interval},
{"fs_subscribe", STR_PARAM|USE_FUNC_PARAM, modparam_sub_evs },
{0, 0, 0}
};

static proc_export_t procs[] = {
Expand Down Expand Up @@ -111,3 +121,19 @@ static int mod_init(void)

return 0;
}

int modparam_sub_evs(modparam_t type, void *string)
{
struct url *url;
str st = {string, strlen(string)};

url = parse_url(&st, URL_REQ_SCHEME|URL_REQ_PASS, 0);
if (!url) {
LM_ERR("failed to parse FS URL '%.*s'\n", st.len, st.s);
return 0;
}

print_url(url);

return 0;
}

0 comments on commit ff83a31

Please sign in to comment.