Skip to content

Commit

Permalink
qrouting: Add MI qr_reload command
Browse files Browse the repository at this point in the history
Refactor the "qr_profiles" code so it can be reloadable.  Also, after
reloading a new set of qr_profiles, make sure to refresh the cloned dr
rule structures so they point to the newly reloaded data instead.
  • Loading branch information
liviuchircu authored and razvancrainea committed Feb 11, 2020
1 parent 6bd8285 commit 3a77fee
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 94 deletions.
4 changes: 4 additions & 0 deletions modules/drouting/drouting.c
Expand Up @@ -552,6 +552,10 @@ static module_dependency_t *get_deps_probing_interval(param_export_t *param)
static dep_export_t deps = {
{ /* OpenSIPS module dependencies */
{ MOD_TYPE_SQLDB, NULL, DEP_ABORT },

/* if present, qrouting must first load its profiles,
* so they can be looked up during DRCB_RLD_INIT_RULE */
{ MOD_TYPE_DEFAULT, "qrouting", DEP_SILENT },
{ MOD_TYPE_NULL, NULL, 0 },
},
{ /* modparam dependencies */
Expand Down
2 changes: 0 additions & 2 deletions modules/qrouting/qr_acc.c
Expand Up @@ -29,8 +29,6 @@ struct tm_binds tmb;
struct dlg_binds dlgcb;
struct dr_binds drb;

#define QR_PTR_POISON ((void *)0x10203040)

/* free the parameter of the dialog callback */
inline static void release_dialog_prop(void *param)
{
Expand Down
161 changes: 120 additions & 41 deletions modules/qrouting/qr_load.c
Expand Up @@ -60,37 +60,93 @@ str crit_pdd_qp_col = str_init(CRIT_PDD_QP_COL);
str crit_ast_qp_col = str_init(CRIT_AST_QP_COL);
str crit_acd_qp_col = str_init(CRIT_ACD_QP_COL);

void add_profile(int id, char *name, double warn_asr, double warn_ccr,
double warn_pdd, double warn_ast, double warn_acd, double crit_asr,
double crit_ccr, double crit_pdd, double crit_ast, double crit_acd) {

((*qr_profiles)[*n_qr_profiles]).id = id;
((*qr_profiles)[*n_qr_profiles]).name.s = name;
((*qr_profiles)[*n_qr_profiles]).name.len = strlen(name);

(*qr_profiles)[*n_qr_profiles].asr1 = warn_asr;
(*qr_profiles)[*n_qr_profiles].ccr1 = warn_ccr;
(*qr_profiles)[*n_qr_profiles].pdd1 = warn_pdd;
(*qr_profiles)[*n_qr_profiles].ast1 = warn_ast;
(*qr_profiles)[*n_qr_profiles].acd1 = warn_acd;

(*qr_profiles)[*n_qr_profiles].asr2 = crit_asr;
(*qr_profiles)[*n_qr_profiles].ccr2 = crit_ccr;
(*qr_profiles)[*n_qr_profiles].pdd2 = crit_pdd;
(*qr_profiles)[*n_qr_profiles].ast2 = crit_ast;
(*qr_profiles)[*n_qr_profiles].acd2 = crit_acd;
(*n_qr_profiles)++;
static inline void add_profile(qr_thresholds_t *prof, int id, const char *name,
double warn_asr, double warn_ccr, double warn_pdd, double warn_ast,
double warn_acd, double crit_asr, double crit_ccr, double crit_pdd,
double crit_ast, double crit_acd)
{
prof->id = id;
strncpy(prof->name, name, QR_NAME_COL_SZ + 1);

prof->asr1 = warn_asr;
prof->ccr1 = warn_ccr;
prof->pdd1 = warn_pdd;
prof->ast1 = warn_ast;
prof->acd1 = warn_acd;

prof->asr2 = crit_asr;
prof->ccr2 = crit_ccr;
prof->pdd2 = crit_pdd;
prof->ast2 = crit_ast;
prof->acd2 = crit_acd;
}

int qr_load(db_func_t *qr_dbf, db_con_t* qr_db_hdl) {
/* refresh a single threshold set (1 row) */
static inline void qr_refresh_threshold_set(qr_thresholds_t *thr,
qr_thresholds_t *new)
{
qr_rule_t *r;
qr_partitions_t *parts;
int i;

lock_start_write(*rw_lock_qr);
parts = *qr_main_list;

/* XXX: is this dead code? also review qr_rotate_samples() */
if (!parts) {
lock_stop_write(*rw_lock_qr);
return;
}

for (i = 0; i < parts->n_parts; i++) /* for every partition */
for (r = parts->qr_rules_start[i]; r; r = r->next) /* and rule */
if (r->thresholds == thr)
r->thresholds = new;

lock_stop_write(*rw_lock_qr);
}

/* refresh all reloaded threshold sets (rows) */
static inline void qr_refresh_threshold_sets(qr_thresholds_t *old, int old_n,
qr_thresholds_t *new, int new_n)
{
int i, j, id, found;

LM_DBG("updating references for %p -> %p qr_profiles reload\n", old, new);

/* try to match each old qr profile with a new one:
* - if found, just refresh all references to it
* - otherwise, just set the references to NULL */
for (i = 0; i < old_n; i++) {
id = old[i].id;
found = 0;

for (j = 0; j < new_n; j++) {
if (id == new[j].id) {
LM_DBG("matched qr_profile %d with reloaded data\n", id);
qr_refresh_threshold_set(&old[i], &new[j]);
found = 1;
break;
}
}

/* this old threshold id was discarded (replaced?), then reloaded */
if (!found)
qr_refresh_threshold_set(&old[i], NULL);
}
}

int qr_reload(db_func_t *qr_dbf, db_con_t *qr_db_hdl)
{
int int_vals[N_INT_VALS];
char *str_vals[N_STR_VALS];
double double_vals[N_DOUBLE_VALS];

qr_thresholds_t *profs = NULL, *old_profs;
db_key_t columns[12];
db_res_t *res = 0;
db_row_t *row = 0;
int i, n, no_rows = 0;
int i, no_rows = 0, total_rows = 0, old_n;
int db_cols = 0;

memset(double_vals, 0, N_DOUBLE_VALS*sizeof(double));
Expand Down Expand Up @@ -124,7 +180,8 @@ int qr_load(db_func_t *qr_dbf, db_con_t* qr_db_hdl) {
goto error;
}

no_rows = estimate_available_rows( 4+64+10*sizeof(double), db_cols);
no_rows = estimate_available_rows(4+QR_NAME_COL_SZ+10*sizeof(double),
db_cols);
if (no_rows==0) no_rows = 10;
if(qr_dbf->fetch_result(qr_db_hdl, &res, no_rows )<0) {
LM_ERR("Error fetching rows\n");
Expand All @@ -137,20 +194,25 @@ int qr_load(db_func_t *qr_dbf, db_con_t* qr_db_hdl) {
}
}

if (RES_ROW_N(res) == 0) {
LM_INFO("table '%.*s' is empty\n",
qr_profiles_table.len, qr_profiles_table.s);
goto swap_data;
}

LM_DBG("%d records found in table %.*s\n",
RES_ROW_N(res), qr_profiles_table.len,qr_profiles_table.s);

n = 0;

*qr_profiles = (qr_thresholds_t*)shm_malloc(RES_ROW_N(res)*
sizeof(qr_thresholds_t));

if(*qr_profiles == NULL) {
LM_ERR("no more shm memory\n");
return -1;
}
do {
for(i = 0; i < RES_ROW_N(res); i++) {
profs = shm_realloc(profs, (total_rows + RES_ROW_N(res)) *
sizeof *profs);
if (!profs) {
LM_ERR("oom\n");
return -1;
}
memset(&profs[total_rows], 0, RES_ROW_N(res) * sizeof *profs);

for (i = 0; i < RES_ROW_N(res); i++) {
row = RES_ROWS(res) + i;

check_val(id_qp_col, ROW_VALUES(row), DB_INT, 1, 1);
Expand Down Expand Up @@ -188,7 +250,6 @@ int qr_load(db_func_t *qr_dbf, db_con_t* qr_db_hdl) {

check_val(crit_acd_qp_col, ROW_VALUES(row)+11, DB_DOUBLE, 1, 1);
double_vals[DOUBLE_VALS_CRIT_ACD] = VAL_DOUBLE(ROW_VALUES(row)+11);
n++;

LM_DBG("qr_profile row: %d %s %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf\n",
int_vals[INT_VALS_ID], str_vals[STR_VALS_PROFILE_NAME],
Expand All @@ -197,28 +258,46 @@ int qr_load(db_func_t *qr_dbf, db_con_t* qr_db_hdl) {
double_vals[DOUBLE_VALS_WARN_ACD], double_vals[DOUBLE_VALS_CRIT_ASR],
double_vals[DOUBLE_VALS_CRIT_CCR], double_vals[DOUBLE_VALS_CRIT_PDD],
double_vals[DOUBLE_VALS_CRIT_AST], double_vals[DOUBLE_VALS_CRIT_ACD]);
add_profile(
int_vals[INT_VALS_ID], str_vals[STR_VALS_PROFILE_NAME],

add_profile(&profs[total_rows], int_vals[INT_VALS_ID],
str_vals[STR_VALS_PROFILE_NAME],
double_vals[DOUBLE_VALS_WARN_ASR], double_vals[DOUBLE_VALS_WARN_CCR],
double_vals[DOUBLE_VALS_WARN_PDD], double_vals[DOUBLE_VALS_WARN_AST],
double_vals[DOUBLE_VALS_WARN_ACD], double_vals[DOUBLE_VALS_CRIT_ASR],
double_vals[DOUBLE_VALS_CRIT_CCR], double_vals[DOUBLE_VALS_CRIT_PDD],
double_vals[DOUBLE_VALS_CRIT_AST], double_vals[DOUBLE_VALS_CRIT_ACD]);

total_rows++;
}

if (DB_CAPABILITY(*qr_dbf, DB_CAP_FETCH)) {
if(qr_dbf->fetch_result(qr_db_hdl, &res, no_rows)<0) {
LM_ERR( "fetching rows (1)\n");
if (qr_dbf->fetch_result(qr_db_hdl, &res, no_rows) < 0) {
LM_ERR("fetching rows (1)\n");
goto error;
}
} else {
break;
}

} while(RES_ROW_N(res));
} while (RES_ROW_N(res));

swap_data:
lock_start_write(qr_profiles_rwl);
old_profs = *qr_profiles;
old_n = *qr_profiles_n;

*qr_profiles = profs;
*qr_profiles_n = total_rows;

qr_refresh_threshold_sets(old_profs, old_n, profs, total_rows);
lock_stop_write(qr_profiles_rwl);

shm_free(old_profs);

LM_DBG("reloaded into %d new profiles (%p -> %p)\n",
total_rows, old_profs, profs);
return 0;

error:
return -1;

}

9 changes: 7 additions & 2 deletions modules/qrouting/qr_load.h
Expand Up @@ -22,6 +22,7 @@
*/

#include "../../db/db.h"
#include "../../rw_locking.h"

/* qr_profiles table */
#define DOUBLE_VALS_WARN_ASR 0
Expand Down Expand Up @@ -55,8 +56,12 @@
#define CRIT_AST_QP_COL "crit_threshold_ast"
#define CRIT_ACD_QP_COL "crit_threshold_acd"

extern str qr_profiles_table;
#define QR_NAME_COL_SZ 64

int qr_load(db_func_t *qr_dbf, db_con_t* qr_db_hdl);
extern str qr_profiles_table;
extern rw_lock_t *qr_profiles_rwl;

extern db_func_t qr_dbf;
extern db_con_t *qr_db_hdl;

int qr_reload(db_func_t *qr_dbf, db_con_t *qr_db_hdl);
20 changes: 12 additions & 8 deletions modules/qrouting/qr_mi.c
Expand Up @@ -175,8 +175,7 @@ int qr_fill_mi_partition(mi_item_t *part, const str *part_name,
return 0;
}

mi_response_t *mi_qr_status_0(const mi_params_t *params,
struct mi_handler *async_hdl)
mi_response_t *mi_qr_status_0(const mi_params_t *_, struct mi_handler *__)
{
mi_response_t *resp;
mi_item_t *resp_obj, *part_arr, *part;
Expand Down Expand Up @@ -205,8 +204,7 @@ mi_response_t *mi_qr_status_0(const mi_params_t *params,
return NULL;
}

mi_response_t *mi_qr_status_1(const mi_params_t *params,
struct mi_handler *async_hdl)
mi_response_t *mi_qr_status_1(const mi_params_t *params, struct mi_handler *_)
{
qr_rule_t *qr_part;
mi_response_t *resp, *err_resp = NULL;
Expand Down Expand Up @@ -245,8 +243,7 @@ mi_response_t *mi_qr_status_1(const mi_params_t *params,
return err_resp;
}

mi_response_t *mi_qr_status_2(const mi_params_t *params,
struct mi_handler *async_hdl)
mi_response_t *mi_qr_status_2(const mi_params_t *params, struct mi_handler *_)
{
qr_rule_t *qr_part, *rule;
mi_response_t *resp, *err_resp = NULL;
Expand Down Expand Up @@ -297,8 +294,7 @@ mi_response_t *mi_qr_status_2(const mi_params_t *params,
return err_resp;
}

mi_response_t *mi_qr_status_3(const mi_params_t *params,
struct mi_handler *async_hdl)
mi_response_t *mi_qr_status_3(const mi_params_t *params, struct mi_handler *_)
{
qr_rule_t *qr_part, *rule;
qr_dst_t *dst;
Expand Down Expand Up @@ -356,3 +352,11 @@ mi_response_t *mi_qr_status_3(const mi_params_t *params,
err_resp = init_mi_error(500, MI_SSTR("Server Internal Error\n"));
return err_resp;
}

mi_response_t *mi_qr_reload_0(const mi_params_t *_, struct mi_handler *__)
{
if (qr_reload(&qr_dbf, qr_db_hdl) < 0)
LM_ERR("failed to load data from db\n");

return init_mi_result_ok();
}
14 changes: 6 additions & 8 deletions modules/qrouting/qr_mi.h
Expand Up @@ -21,13 +21,11 @@
#ifndef __QR_MI__
#define __QR_MI__

mi_response_t *mi_qr_status_0(const mi_params_t *params,
struct mi_handler *async_hdl);
mi_response_t *mi_qr_status_1(const mi_params_t *params,
struct mi_handler *async_hdl);
mi_response_t *mi_qr_status_2(const mi_params_t *params,
struct mi_handler *async_hdl);
mi_response_t *mi_qr_status_3(const mi_params_t *params,
struct mi_handler *async_hdl);
mi_cmd_f mi_qr_status_0;
mi_cmd_f mi_qr_status_1;
mi_cmd_f mi_qr_status_2;
mi_cmd_f mi_qr_status_3;

mi_cmd_f mi_qr_reload_0;

#endif /* __QR_MI__ */
14 changes: 12 additions & 2 deletions modules/qrouting/qr_sort.c
Expand Up @@ -80,6 +80,10 @@ int qr_score_gw(qr_gw_t *gw, qr_thresholds_t *thresholds)
double asr_v, ccr_v, pdd_v, ast_v, acd_v;
str *nam = drb.get_gw_name(gw->dr_gw);

/* the corresponding dr_rule points to an invalid qr_profile */
if (!thresholds)
goto set_score;

/* FIXME: might be better under a single lock
* because of possible changes between lock ( a
* new sampling interval might bring new statistics)
Expand Down Expand Up @@ -130,6 +134,7 @@ int qr_score_gw(qr_gw_t *gw, qr_thresholds_t *thresholds)
}
}

set_score:
/* update gw score and status */
lock_start_write(gw->ref_lock);
gw->score = score;
Expand Down Expand Up @@ -168,6 +173,7 @@ int qr_score_grp(qr_grp_t *grp, qr_thresholds_t * thresholds) {
int qr_insert_dst(qr_sorted_list_t **sorted, qr_rule_t *rule,
int cr_id, int gw_id)
{
qr_thresholds_t thr;
int cur_dst_score;
qr_gw_t *gw;

Expand All @@ -180,9 +186,13 @@ int qr_insert_dst(qr_sorted_list_t **sorted, qr_rule_t *rule,
lock_start_read(gw->ref_lock);
if(gw->state & QR_STATUS_DIRTY) {
lock_stop_read(gw->ref_lock);

lock_start_read(qr_profiles_rwl);
thr = *rule->thresholds;
lock_stop_read(qr_profiles_rwl);

LM_DBG("evaluating score for:cr_id = %d gw_id = %d\n", cr_id, gw_id);
cur_dst_score = qr_score_gw(gw, rule->thresholds); /* compute the
score */
cur_dst_score = qr_score_gw(gw, &thr);
} else {
cur_dst_score = gw->score;
lock_stop_read(gw->ref_lock);
Expand Down

0 comments on commit 3a77fee

Please sign in to comment.