Skip to content

Commit

Permalink
[clusterer] add reporting on the shared tag state changes
Browse files Browse the repository at this point in the history
The E_CLUSTERER_SHARING_TAG_CHANGED event is raised.
Also such events are logged via the status/report "sharing_tags" identifier

(opensips-cli): mi sr_list_reports clusterer
[
    {
        "Name": "sharing_tags",
        "Reports": [
            {
                "Timestamp": 1652367224,
                "Date": "Thu May 12 17:53:44 2022",
                "Log": "TAG <vip>, cluster 1, became backup due to cluster broadcast from 2"
            },
            {
                "Timestamp": 1652367326,
                "Date": "Thu May 12 17:55:26 2022",
                "Log": "TAG <vip>, cluster 1, became active due to MI command"
            }
        ]
    }
]
  • Loading branch information
bogdan-iancu committed May 12, 2022
1 parent 63524cc commit cb2c6b2
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 22 deletions.
35 changes: 33 additions & 2 deletions modules/clusterer/README
Expand Up @@ -72,8 +72,10 @@ CLUSTERER Module
1.8.1. E_CLUSTERER_REQ_RECEIVED
1.8.2. E_CLUSTERER_RPL_RECEIVED
1.8.3. E_CLUSTERER_NODE_STATE_CHANGED
1.8.4. E_CLUSTERER_SHARING_TAG_CHANGED

1.9. Usage Example
1.9. Provided Status/Report Identifiers
1.10. Usage Example

2. Developer Guide

Expand Down Expand Up @@ -983,7 +985,36 @@ f-repl 1
* new_state - The new state of the node, with the possible
values: 0 - down, 1 - up.

1.9. Usage Example
1.8.4. E_CLUSTERER_SHARING_TAG_CHANGED

This event is raised when the state of a sharing tag changes.

Parameters:
* name - The name of the sharing tag.
* cluster_id - The cluster ID.
* node_id - The ID of the node.
* state - The new state of the sharing tag, the possible
values: "active" or "backup".
* reason - short text describing what triggered the change of
the state, like a another node stepping as active, an MI
command or script variable.

1.9. Provided Status/Report Identifiers

The module provides the "clusterer" Status/Report group used so
far only for reporting purposes.

The "sharing_tags" identifier is provided for reporting state
changes of the sharing_tags (between active and backup), along
with the reason of the change. This identifier have a 200
records history before discarding the old ones.

For how to access and use the Status/Report information, please
see
https://www.opensips.org/Documentation/Interface-StatusReport-3
-3.

1.10. Usage Example

This section provides an usage example for replicating
ratelimit pipes between two OpenSIPS instances. It uses the
Expand Down
2 changes: 1 addition & 1 deletion modules/clusterer/clusterer.c
Expand Up @@ -1066,7 +1066,7 @@ void bin_rcv_cl_extra_packets(bin_packet_t *packet, int packet_type,
else if (packet_type == CLUSTERER_MI_CMD)
handle_cl_mi_msg(packet);
else if (packet_type == CLUSTERER_SHTAG_ACTIVE)
handle_shtag_active(packet, cluster_id);
handle_shtag_active(packet, cluster_id, source_id);
else if (packet_type == CLUSTERER_SYNC_REQ)
handle_sync_request(packet, cl, node);
else if (packet_type == CLUSTERER_SYNC || packet_type == CLUSTERER_SYNC_END)
Expand Down
1 change: 1 addition & 0 deletions modules/clusterer/clusterer.h
Expand Up @@ -145,6 +145,7 @@ extern enum sip_protos clusterer_proto;

extern str cl_internal_cap;
extern str cl_extra_cap;
extern void *cl_srg;

void seed_fb_check_timer(utime_t ticks, void *param);

Expand Down
13 changes: 12 additions & 1 deletion modules/clusterer/clusterer_mod.c
Expand Up @@ -31,6 +31,7 @@
#include "../../mi/mi.h"
#include "../../timer.h"
#include "../../bin_interface.h"
#include "../../status_report.h"

#include "api.h"
#include "node_info.h"
Expand Down Expand Up @@ -63,6 +64,9 @@ str description_col = str_init("description");
extern db_con_t *db_hdl;
extern db_func_t dr_dbf;

/* status-report group for clusterer */
void *cl_srg=NULL;

/* module interface functions */
static int mod_init(void);
static int child_init(int rank);
Expand Down Expand Up @@ -482,8 +486,15 @@ static int mod_init(void)
return -1;
}

cl_srg = sr_register_group( CHAR_INT("clusterer"), 0 /*not public*/);
if (cl_srg==NULL) {
LM_ERR("failed to create clusterer group for 'status-report'");
return -1;
}

/* check if the cluster IDs in the the sharing tag list are valid */
shtag_init_list();
shtag_init_reporting();
shtag_validate_list();

return 0;
Expand Down Expand Up @@ -1367,7 +1378,7 @@ int load_clusterer(struct clusterer_binds *binds)
binds->sync_chunk_start = cl_sync_chunk_start;
binds->sync_chunk_iter = cl_sync_chunk_iter;
binds->shtag_get = shtag_get;
binds->shtag_activate = shtag_activate;
binds->shtag_activate = shtag_activate_api;
binds->shtag_get_all_active = shtag_get_all_active;
binds->shtag_register_callback = shtag_register_callback;
binds->shtag_get_sync_status = shtag_get_sync_status;
Expand Down
53 changes: 53 additions & 0 deletions modules/clusterer/doc/clusterer_admin.xml
Expand Up @@ -1250,8 +1250,61 @@ $ opensips-cli -x mi clusterer_list_cap
</para></listitem>
</itemizedlist>
</section>

<section id="event_E_CLUSTERER_SHARING_TAG_CHANGED" xreflabel="E_CLUSTERER_SHARING_TAG_CHANGED">
<title>
<function moreinfo="none">E_CLUSTERER_SHARING_TAG_CHANGED</function>
</title>
<para>
This event is raised when the state of a sharing tag changes.
</para>
<para>Parameters:</para>
<itemizedlist>
<listitem><para>
<emphasis>name</emphasis> - The name of the sharing tag.
</para></listitem>
<listitem><para>
<emphasis>cluster_id</emphasis> - The cluster ID.
</para></listitem>
<listitem><para>
<emphasis>node_id</emphasis> - The ID of the node.
</para></listitem>
<listitem><para>
<emphasis>state</emphasis> - The new state of the sharing tag,
the possible values: "active" or "backup".
</para></listitem>
<listitem><para>
<emphasis>reason</emphasis> - short text describing what
triggered the change of the state, like a another node
stepping as active, an MI command or script variable.
</para></listitem>
</itemizedlist>
</section>

</section>

<section id="sr_identifiers" xreflabel="Status/Report Identifiers">
<title>Provided Status/Report Identifiers</title>

<para>
The module provides the "clusterer" Status/Report group used so
far only for reporting purposes.
</para>
<para>
The "sharing_tags" identifier is provided for reporting state changes
of the sharing_tags (between active and backup), along with the reason
of the change. This identifier have a 200 records history before
discarding the old ones.
</para>
<para>
For how to access and use the Status/Report information, please see
<ulink url='>https://www.opensips.org/Documentation/Interface-StatusReport-3-3'>https://www.opensips.org/Documentation/Interface-StatusReport-3-3</ulink>.
</para>

</section>



<section>
<title>Usage Example</title>
<para> This section provides an usage example for replicating ratelimit
Expand Down
142 changes: 126 additions & 16 deletions modules/clusterer/sharing_tags.c
Expand Up @@ -23,11 +23,14 @@
#include "../../str.h"
#include "../../rw_locking.h"
#include "../../bin_interface.h"
#include "../../status_report.h"
#include "../../evi/evi.h"
#include "clusterer.h"
#include "node_info.h"
#include "sharing_tags.h"



struct n_send_info {
int node_id;
struct n_send_info *next;
Expand Down Expand Up @@ -63,6 +66,13 @@ static rw_lock_t *shtags_lock = NULL;

static struct shtag_cb *shtag_cb_list=NULL;

static str cl_sh_event = str_init("E_CLUSTERER_SHARING_TAG_CHANGED");
static event_id_t cl_sh_evi_id;

static str sh_sr_ident = str_init("sharing_tags");
static str sh_active_str = str_init("active");
static str sh_backup_str = str_init("backup");


int shtag_register_callback(str *tag_name, int c_id, void *param,
shtag_cb_f func)
Expand Down Expand Up @@ -203,22 +213,105 @@ static struct sharing_tag *shtag_get_unsafe(str *tag_name, int c_id)

int shtag_init_list(void)
{
if (shtags_list==NULL) {
if ((shtags_list = shm_malloc(sizeof *shtags_list)) == NULL) {
LM_CRIT("No more shm memory\n");
return -1;
}
*shtags_list = NULL;
if (shtags_list)
return 0;

if ((shtags_lock = lock_init_rw()) == NULL) {
LM_CRIT("Failed to init lock\n");
return -1;
}
if ((shtags_list = shm_malloc(sizeof *shtags_list)) == NULL) {
LM_CRIT("No more shm memory\n");
return -1;
}
*shtags_list = NULL;

if ((shtags_lock = lock_init_rw()) == NULL) {
LM_CRIT("Failed to init lock\n");
return -1;
}

return 0;
}


int shtag_init_reporting(void)
{
if (sr_register_identifier( cl_srg, STR2CI(sh_sr_ident),
SR_STATUS_READY, CHAR_INT_NULL, 200 ) ) {
LM_ERR("failed to register status report identifier\n");
return -1;
}

cl_sh_evi_id = evi_publish_event(cl_sh_event);
if (cl_sh_evi_id == EVI_ERROR) {
LM_ERR("cannot register %.*s event\n", cl_sh_event.len, cl_sh_event.s);
return -1;
}

return 0;
}


static void report_shtag_change(str *tag_name, int cluster_id, int state,
char *reason_s, int reason_len)
{
static str cl_sh_name_str = str_init("name");
static str cl_sh_cluster_str = str_init("cluster");
static str cl_sh_state_str = str_init("state");
static str cl_sh_reason_str = str_init("reason");
evi_params_p list;
str *txt, reason;

if (state==SHTAG_STATE_ACTIVE) {
txt = &sh_active_str;
} else {
txt = &sh_backup_str;
}

reason.s = reason_s;
reason.len = reason_len;

sr_add_report_fmt( cl_srg, STR2CI(sh_sr_ident), 0 /*is_public*/,
"TAG <%.*s>, cluster %d, became %.*s due to %.*s",
tag_name->len, tag_name->s, cluster_id,
txt->len, txt->s, reason.len, reason.s);

if (cl_sh_evi_id == EVI_ERROR || !evi_probe_event(cl_sh_evi_id))
return;

list = evi_get_params();
if (!list) {
LM_ERR("cannot create event params\n");
return;
}

if (evi_param_add_str(list, &cl_sh_name_str, tag_name) < 0) {
LM_ERR("cannot add tag name\n");
goto error;
}

if (evi_param_add_int(list, &cl_sh_cluster_str, &cluster_id) < 0) {
LM_ERR("cannot add cluster ID\n");
goto error;
}

if (evi_param_add_str(list, &cl_sh_state_str, txt) < 0) {
LM_ERR("cannot add state\n");
goto error;
}

if (evi_param_add_str(list, &cl_sh_reason_str, &reason) < 0) {
LM_ERR("cannot add reason\n");
goto error;
}

if (evi_raise_event(cl_sh_evi_id, list)) {
LM_ERR("unable to send dr event\n");
}
return;

error:
evi_free_params(list);
}


int shtag_modparam_func(modparam_t type, void *val_s)
{
str tag_name;
Expand Down Expand Up @@ -565,7 +658,13 @@ static int shtag_send_active_info(int c_id, str *tag_name, int node_id)
}


int shtag_activate(str *tag_name, int cluster_id)
int shtag_activate_api(str *tag_name, int cluster_id)
{
return shtag_activate( tag_name, cluster_id, MI_SSTR("internal API call"));
}


int shtag_activate(str *tag_name, int cluster_id, char *reason, int reason_len)
{
struct sharing_tag *tag;
int lock_old_flag;
Expand Down Expand Up @@ -629,6 +728,9 @@ int shtag_activate(str *tag_name, int cluster_id)

/* run the callbacks */
shtag_run_callbacks( tag_name, SHTAG_STATE_ACTIVE, cluster_id);

report_shtag_change( tag_name, cluster_id, SHTAG_STATE_ACTIVE,
reason, reason_len);
} else
lock_stop_sw_read(shtags_lock);

Expand Down Expand Up @@ -711,9 +813,10 @@ static void free_active_msgs_info(struct sharing_tag *tag)
}


int handle_shtag_active(bin_packet_t *packet, int cluster_id)
int handle_shtag_active(bin_packet_t *packet, int cluster_id, int source_id)
{
str tag_name;
char buf[27];
str tag_name, reason = {buf,0};
struct sharing_tag *tag;
int old_state;

Expand All @@ -739,9 +842,15 @@ int handle_shtag_active(bin_packet_t *packet, int cluster_id)

lock_stop_write(shtags_lock);

if (old_state!=SHTAG_STATE_BACKUP)
if (old_state!=SHTAG_STATE_BACKUP) {
shtag_run_callbacks( &tag_name, SHTAG_STATE_BACKUP, cluster_id);

reason.len = snprintf( reason.s, 26,
"cluster broadcast from %d", source_id);
report_shtag_change( &tag_name, cluster_id, SHTAG_STATE_BACKUP,
STR2CI(reason) );
}

return 0;
}

Expand Down Expand Up @@ -836,7 +945,7 @@ mi_response_t *shtag_mi_set_active(const mi_params_t *params,
}
lock_stop_read(cl_list_lock);

if (shtag_activate( &tag, c_id)<0) {
if (shtag_activate( &tag, c_id, MI_SSTR("MI command"))<0) {
LM_ERR("Failed set active the tag [%.*s/%d] \n",
tag.len, tag.s, c_id);
return init_mi_error(500, MI_SSTR("Internal failure when activating "
Expand Down Expand Up @@ -927,7 +1036,8 @@ int var_set_sh_tag(struct sip_msg* msg, pv_param_t *param, int op,
return 0;
}

if (shtag_activate( &v_name->shtag, v_name->cluster_id)==-1) {
if (shtag_activate( &v_name->shtag, v_name->cluster_id,
MI_SSTR("script variable"))==-1) {
LM_ERR("failed to set sharing tag <%.*s/%d> to new state %d\n",
v_name->shtag.len, v_name->shtag.s, v_name->cluster_id, state);
return -1;
Expand Down

0 comments on commit cb2c6b2

Please sign in to comment.