From c5700cce4a37e484869941502e495733b8138b98 Mon Sep 17 00:00:00 2001 From: ionutrazvanionita Date: Mon, 18 Apr 2016 19:03:11 +0300 Subject: [PATCH] [sipcapture][HOMERv5 incompatibility fix]generic capturing function Added new function called report_capture which allows capturing hep packets in a generic format. It will be very effective for the third version of the hep protocol for capturing packets other than SIP. Also added a new table called rtcp_capture for storing everything capture with report capture. --- modules/sipcapture/sipcapture.c | 810 ++++++++++++++++++++----- modules/sipcapture/sql/rtcpcapture.sql | 18 + 2 files changed, 679 insertions(+), 149 deletions(-) create mode 100644 modules/sipcapture/sql/rtcpcapture.sql diff --git a/modules/sipcapture/sipcapture.c b/modules/sipcapture/sipcapture.c index bdbaf08b85..961b887f14 100644 --- a/modules/sipcapture/sipcapture.c +++ b/modules/sipcapture/sipcapture.c @@ -102,6 +102,8 @@ #define HAVE_SHARED_QUERIES (max_async_queries > 1) #define HAVE_MULTIPLE_ASYNC_INSERT (DB_CAPABILITY(db_funcs, DB_CAP_ASYNC_RAW_QUERY) && HAVE_SHARED_QUERIES) +#define IS_ASYNC_F (resume_f && resume_param) + #define MAX_QUERY 65535 struct _async_query { str last_query_suffix; @@ -144,14 +146,18 @@ str current_table; /* modparam defined table */ tz_table_t tz_table; +tz_table_t rc_table; /* list of script used tables - we use this list to hold async queries; * when opensips is closed we need to run all queries for all the tables * in case max_async_queries is used */ struct tz_table_list* tz_list=NULL; +struct tz_table_list* rc_list=NULL; /* modparam defined table */ struct tz_table_list tz_global; +/* modparam defined report_capture table */ +struct tz_table_list rc_global; struct _sipcapture_object { str method; @@ -205,9 +211,11 @@ struct _sipcapture_object { * VALUES_STR */ #define NR_KEYS 38 +#define RTCP_NR_KEYS 12 typedef void* sc_async_param_t; db_key_t db_keys[NR_KEYS]; +db_key_t rtcp_db_keys[RTCP_NR_KEYS]; /* module function prototypes */ static int mod_init(void); @@ -221,6 +229,27 @@ static int sip_capture_fixup(void** param, int param_no); static int sip_capture_async_fixup(void** param, int param_no); static int w_sip_capture(struct sip_msg *msg, char *table_name, async_resume_module **resume_f, void **resume_param); + + +static void set_rtcp_keys(void); + +static int rc_fixup_1(void** param, int param_no); +static int rc_async_fixup_1(void** param, int param_no); + +static int rc_fixup(void** param, int param_no); +static int rc_async_fixup(void** param, int param_no); + + +static int w_report_capture_1(struct sip_msg* msg, char* cor_id_p); +static int w_report_capture_2(struct sip_msg* msg, char* table_p, char* cor_id_p); +static int w_report_capture_async_1(struct sip_msg* msg, + async_resume_module** resume_f, void** resume_param, char* cor_id_p); +static int w_report_capture_async_2(struct sip_msg* msg, + async_resume_module** resume_f, void** resume_param, + char* table_p, char* cor_id_p); +static int w_report_capture(struct sip_msg* msg, char* table_p, char* cor_id_p, + async_resume_module **resume_f, void** resume_param); + int hep_msg_received(void); int extract_host_port(void); int raw_capture_socket(struct ip_addr* ip, str* iface, int port_start, int port_end, int proto); @@ -229,10 +258,16 @@ int sipcapture_db_init(const str* db_url); void sipcapture_db_close(void); static struct mi_root* sip_capture_mi(struct mi_root* cmd, void* param ); -static int db_sync_store(db_val_t* db_vals); -static int db_async_store(db_val_t* db_vals, - async_resume_module **resume_f, void **resume_param, - struct tz_table_list* t_el); +static int db_sync_store(db_val_t* vals, db_key_t* keys, int num_keys); + +typedef int (*append_db_vals_f)(char *buf, int max_len, db_val_t* db_vals); +static inline int append_sc_values(char* buf, int max_len, db_val_t* db_vals); +static inline int append_rc_values(char* buf, int max_len, db_val_t* db_vals); + +static int +db_async_store(db_val_t* vals, db_key_t* keys, int num_keys, + append_db_vals_f append_db_vals, async_resume_module **resume_f, + void **resume_param, struct tz_table_list* t_el); int resume_async_dbquery(int fd, struct sip_msg *msg, void *_param); /* setter functions */ @@ -275,6 +310,7 @@ static int parse_hep_index(str *s_index); static str db_url = {NULL, 0}; static str table_name = str_init("sip_capture"); +static str rtcp_table_name = str_init("rtcp_capture"); static str id_column = str_init("id"); static str date_column = str_init("date"); static str micro_ts_column = str_init("micro_ts"); @@ -317,6 +353,9 @@ static str type_column = str_init("type"); static str node_column = str_init("node"); static str msg_column = str_init("msg"); static str capture_node = str_init("homer01"); + +static str correlation_column = str_init("correlation_id"); + /* hep pvar related */ static str afinet_str = str_init("AF_INET"); static str afinet6_str = str_init("AF_INET6"); @@ -368,6 +407,9 @@ struct sip_msg dummy_req; "'%.*s','%.*s','%.*s','%.*s','%.*s','%.*s',%d,'%.*s',%d," \ "'%.*s',%d,'%.*s',%d,%d,%d,%d,'%.*s',%d,'%.*s','%.*s')" +#define RTCP_VALUES_STR "(%ld, %lld, '%.*s', '%.*s', %d, '%.*s', %d," \ + "%d, %d, %d, '%.*s', '%.*s')" + int max_async_queries=5; int raw_sock_desc = -1; /* raw socket used for ip packets */ @@ -412,8 +454,12 @@ static struct sock_filter BPF_code[] = { { 0x28, 0, 0, 0x0000000c }, { 0x15, 0, db_func_t db_funcs; /*!< Database functions */ db_con_t* db_con = 0; /*!< database connection */ -static db_ps_t sipcapture_ps = NULL; -static query_list_t *ins_list = NULL; +static db_ps_t sc_ps = NULL; +static query_list_t *sc_ins_list = NULL; + + +static db_ps_t rc_ps = NULL; +static query_list_t *rc_ins_list = NULL; proto_hep_api_t hep_api; load_hep_f load_hep; @@ -446,14 +492,19 @@ static cmd_export_t cmds[] = { REQUEST_ROUTE|FAILURE_ROUTE|ONREPLY_ROUTE|BRANCH_ROUTE|LOCAL_ROUTE}, {"hep_resume_sip", (cmd_function)w_hep_resume_sip, 0, 0, 0, REQUEST_ROUTE}, - + {"report_capture", (cmd_function)w_report_capture_1, 1, rc_fixup_1, 0, + REQUEST_ROUTE|FAILURE_ROUTE|ONREPLY_ROUTE|BRANCH_ROUTE|LOCAL_ROUTE}, + {"report_capture", (cmd_function)w_report_capture_2, 2, rc_fixup, 0, + REQUEST_ROUTE|FAILURE_ROUTE|ONREPLY_ROUTE|BRANCH_ROUTE|LOCAL_ROUTE}, {0, 0, 0, 0, 0, 0} }; static acmd_export_t acmds[] = { - {"sip_capture", (acmd_function)async_sip_capture, 0, 0}, - {"sip_capture", (acmd_function)async_sip_capture, 1, sip_capture_async_fixup}, + {"sip_capture", (acmd_function)async_sip_capture, 0, 0}, + {"sip_capture", (acmd_function)async_sip_capture, 1, sip_capture_async_fixup}, + {"report_capture", (acmd_function)w_report_capture_async_1, 1, rc_async_fixup_1}, + {"report_capture", (acmd_function)w_report_capture_async_2, 2, rc_async_fixup}, {0, 0, 0, 0} }; @@ -469,6 +520,7 @@ static proc_export_t procs[] = { static param_export_t params[] = { {"db_url", STR_PARAM, &db_url.s }, {"table_name", STR_PARAM, &table_name.s }, + {"rtcp_table_name", STR_PARAM, &rtcp_table_name.s }, {"id_column", STR_PARAM, &id_column.s }, {"date_column", STR_PARAM, &date_column.s }, {"micro_ts_column", STR_PARAM, µ_ts_column.s }, @@ -656,6 +708,7 @@ void parse_table_str(str* table_s, tz_table_t* tz_table) /*! \brief Initialize sipcapture module */ static int mod_init(void) { + int i; struct ip_addr *ip = NULL; /* init db keys */ @@ -698,6 +751,8 @@ static int mod_init(void) { db_keys[36] = &node_column; db_keys[37] = &msg_column; + set_rtcp_keys(); + #ifdef STATISTICS /* register statistics */ @@ -712,6 +767,7 @@ static int mod_init(void) { procs[0].no = (ipip_capture_on || moni_capture_on) ? raw_sock_children:0; table_name.len = strlen(table_name.s); + rtcp_table_name.len = strlen(rtcp_table_name.s); date_column.len = strlen(date_column.s); id_column.len = strlen(id_column.s); micro_ts_column.len = strlen(micro_ts_column.s); @@ -750,8 +806,10 @@ static int mod_init(void) { msg_column.len = strlen(msg_column.s); capture_node.len = strlen(capture_node.s); + /* extract prefix and suffix from table name */ parse_table_str(&table_name, &tz_table); + parse_table_str(&rtcp_table_name, &rc_table); if(raw_socket_listen.s) raw_socket_listen.len = strlen(raw_socket_listen.s); @@ -788,9 +846,11 @@ static int mod_init(void) { } /* db_url is mandatory if sip_capture is used */ - if ((is_script_func_used("sip_capture", -1) || + if (((is_script_func_used("sip_capture", -1) || is_script_async_func_used("sip_capture", -1)) || - hep_route_id == HEP_NO_ROUTE) { + hep_route_id == HEP_NO_ROUTE) || + (is_script_func_used("report_capture", -1) || + is_script_async_func_used("report_capture", -1))) { init_db_url(db_url, 0); } else { init_db_url(db_url, 1); @@ -821,33 +881,48 @@ static int mod_init(void) { if (DB_CAPABILITY(db_funcs, DB_CAP_ASYNC_RAW_QUERY)) { if (!HAVE_SHARED_QUERIES) { - global_async_query = pkg_malloc(sizeof(struct _async_query)); - if (global_async_query == NULL) { - LM_ERR("no more pkg\n"); - return -1; + for (i=0; i < 2; i++) { + global_async_query = pkg_malloc(sizeof(struct _async_query)); + if (global_async_query == NULL) { + LM_ERR("no more pkg\n"); + return -1; + } + memset(global_async_query, 0, sizeof(struct _async_query)); + + if (i==0) { + tz_global.as_qry = global_async_query; + } else { + rc_global.as_qry = global_async_query; + } } - memset(global_async_query, 0, sizeof(struct _async_query)); } else { - global_async_query = shm_malloc(sizeof(struct _async_query)); - if (global_async_query == NULL) { - LM_ERR("no more shm\n"); - return -1; - } - memset(global_async_query, 0, sizeof(struct _async_query)); + for (i=0; i < 2; i++) { + global_async_query = shm_malloc(sizeof(struct _async_query)); + if (global_async_query == NULL) { + LM_ERR("no more shm\n"); + return -1; + } + memset(global_async_query, 0, sizeof(struct _async_query)); - LAST_SUFFIX(global_async_query).s = shm_malloc(CAPTURE_TABLE_MAX_LEN); - if (global_async_query == NULL) { - LM_ERR("no more shm\n"); - return -1; - } + LAST_SUFFIX(global_async_query).s = shm_malloc(CAPTURE_TABLE_MAX_LEN); + if (global_async_query == NULL) { + LM_ERR("no more shm\n"); + return -1; + } - LAST_SUFFIX(global_async_query).len = 0; + LAST_SUFFIX(global_async_query).len = 0; - INIT_QUERY_LOCK(global_async_query); + INIT_QUERY_LOCK(global_async_query); + if( i == 0) { + tz_global.as_qry = global_async_query; + } else { + rc_global.as_qry = global_async_query; + } + } } - tz_global.as_qry = global_async_query; tz_global.table = &tz_table; + rc_global.table = &rc_table; } /*Check the table name*/ @@ -2110,16 +2185,35 @@ static void destroy(void) /* execute the uninserted queries - async only */ if (DB_CAPABILITY(db_funcs, DB_CAP_ASYNC_RAW_QUERY)) { while (it) { - if (CURR_QUERIES(it->as_qry)) { + if (HAVE_SHARED_QUERIES && CURR_QUERIES(it->as_qry)) { query_str.s = QUERY_BUF(it->as_qry); query_str.len = QUERY_LEN(it->as_qry); do_remaining_queries(&query_str); } - if (!HAVE_SHARED_QUERIES) { + if (HAVE_SHARED_QUERIES) { shm_free(LAST_SUFFIX(it->as_qry).s); + DESTROY_QUERY_LOCK(it->as_qry); shm_free(it->as_qry); + } + + tz_free=it; + it=it->next; + pkg_free(tz_free); + } + + it=rc_list; + while (it) { + if (HAVE_SHARED_QUERIES && CURR_QUERIES(it->as_qry)) { + query_str.s = QUERY_BUF(it->as_qry); + query_str.len = QUERY_LEN(it->as_qry); + do_remaining_queries(&query_str); + } + + if (HAVE_SHARED_QUERIES) { + shm_free(LAST_SUFFIX(it->as_qry).s); DESTROY_QUERY_LOCK(it->as_qry); + shm_free(it->as_qry); } tz_free=it; @@ -2128,11 +2222,35 @@ static void destroy(void) } if (!HAVE_SHARED_QUERIES) { - pkg_free(global_async_query); + if (tz_global.as_qry) + pkg_free(tz_global.as_qry); + if (rc_global.as_qry) + pkg_free(rc_global.as_qry); } else { - shm_free(LAST_SUFFIX(global_async_query).s); - shm_free(global_async_query); - DESTROY_QUERY_LOCK(global_async_query); + /* execute remaining queries for both sip_capture and report_capture */ + if (tz_global.as_qry) { + if (CURR_QUERIES(tz_global.as_qry)) { + query_str.s = QUERY_BUF(tz_global.as_qry); + query_str.len = QUERY_LEN(tz_global.as_qry); + do_remaining_queries(&query_str); + } + + shm_free(LAST_SUFFIX(tz_global.as_qry).s); + DESTROY_QUERY_LOCK(tz_global.as_qry); + shm_free(tz_global.as_qry); + } + + if (rc_global.as_qry) { + if (CURR_QUERIES(rc_global.as_qry)) { + query_str.s = QUERY_BUF(rc_global.as_qry); + query_str.len = QUERY_LEN(rc_global.as_qry); + do_remaining_queries(&query_str); + } + + shm_free(LAST_SUFFIX(rc_global.as_qry).s); + DESTROY_QUERY_LOCK(rc_global.as_qry); + shm_free(rc_global.as_qry); + } } } @@ -2265,18 +2383,14 @@ int hep_msg_received(void) return 0; } -static int sip_capture_fixup(void** param, int param_no) + +static int fixup_tz_table(void** param, struct tz_table_list** list) { str table_s; tz_table_t* tz_fxup_param; struct tz_table_list* list_el,* it; - if (param_no != 1) { - LM_ERR("Invalid param number!\n"); - return -1; - } - tz_fxup_param = pkg_malloc(sizeof(tz_table_t)); if (tz_fxup_param == NULL) { LM_ERR("no more pkg mem!\n"); @@ -2291,7 +2405,7 @@ static int sip_capture_fixup(void** param, int param_no) *param = tz_fxup_param; /* if not there add this table to the list */ - for ( it=tz_list; it; it=it->next) { + for ( it=*list; it; it=it->next) { if (it->table->prefix.len == tz_fxup_param->prefix.len && it->table->suffix.len == tz_fxup_param->suffix.len && !memcmp(it->table->prefix.s, tz_fxup_param->prefix.s, @@ -2311,24 +2425,25 @@ static int sip_capture_fixup(void** param, int param_no) memset(list_el, 0, sizeof(struct tz_table_list)); list_el->table = tz_fxup_param; - if (tz_list == NULL) { - tz_list = list_el; + if (*list == NULL) { + *list = list_el; } else { - list_el->next = tz_list; - tz_list = list_el; + list_el->next = *list; + *list = list_el; } return 0; } -static int sip_capture_async_fixup(void** param, int param_no) + +static int fixup_async_tz_table(void** param, struct tz_table_list** list) { struct tz_table_list* list_el; - sip_capture_fixup(param, param_no); + if (fixup_tz_table(param, list) < 0) + return -1; - /* last inserted element(current one) is always in front of the list*/ - list_el = tz_list; + list_el = *list; /* we store this in shm; need the queries in the end */ if (HAVE_MULTIPLE_ASYNC_INSERT) { @@ -2355,6 +2470,28 @@ static int sip_capture_async_fixup(void** param, int param_no) } +static int sip_capture_fixup(void** param, int param_no) +{ + if (param_no != 1) { + LM_ERR("Invalid param number!\n"); + return -1; + } + + return fixup_tz_table(param, &tz_list); +} + +static int sip_capture_async_fixup(void** param, int param_no) +{ + + if (param_no != 1) { + LM_ERR("Invalid param number!\n"); + return -1; + } + + + return fixup_async_tz_table(param, &tz_list); +} + @@ -2501,11 +2638,20 @@ static int sip_capture_store(struct _sipcapture_object *sco, db_vals[i].nul = 0; ret=1; - if (!resume_f && db_sync_store(db_vals) != 1) { + + + /* each query has it's own parameters for the prepared statements */ + if (con_set_inslist(&db_funcs,db_con,&sc_ins_list,db_keys,NR_KEYS) < 0 ) + CON_RESET_INSLIST(db_con); + CON_PS_REFERENCE(db_con) = &sc_ps; + + + if (!resume_f && db_sync_store(db_vals, db_keys, NR_KEYS) != 1) { LM_ERR("failed to insert into database\n"); return -1; } else if (resume_f) { - ret = db_async_store(db_vals, resume_f, resume_param, t_el); + ret = db_async_store(db_vals, db_keys, NR_KEYS, append_sc_values, + resume_f, resume_param, t_el); } #ifdef STATISTICS @@ -2516,14 +2662,10 @@ static int sip_capture_store(struct _sipcapture_object *sco, } -static int db_sync_store(db_val_t* db_vals) +static int db_sync_store(db_val_t* vals, db_key_t* keys, int num_keys) { LM_DBG("storing info...\n"); - if (con_set_inslist(&db_funcs,db_con,&ins_list,db_keys,NR_KEYS) < 0 ) - CON_RESET_INSLIST(db_con); - CON_PS_REFERENCE(db_con) = &sipcapture_ps; - if (current_table.s && current_table.len) { if (db_funcs.use_table(db_con, ¤t_table) < 0) { LM_ERR("use table failed!\n"); @@ -2532,7 +2674,7 @@ static int db_sync_store(db_val_t* db_vals) } - if (db_funcs.insert(db_con, db_keys, db_vals, NR_KEYS) < 0) { + if (db_funcs.insert(db_con, keys, vals, num_keys) < 0) { LM_ERR("failed to insert into database\n"); goto error; } @@ -2586,19 +2728,21 @@ static inline int append_sc_values(char* buf, int max_len, db_val_t* db_vals) return len; } -static inline int init_raw_query(char* buf, int max_len, str* table_name) +static inline int init_raw_query(char* buf, int max_len, str* table_name, + db_key_t* keys, int num_keys) { int len, i, ret; len = snprintf(buf, max_len, "INSERT INTO %.*s(", table_name->len, table_name->s); - for (i=0; ilen, db_keys[i]->s); + for (i=0; ilen, keys[i]->s); if (ret<0) return ret; len += ret; } - ret=snprintf(buf+len, max_len-len, "%.*s) VALUES", db_keys[NR_KEYS-1]->len, db_keys[NR_KEYS-1]->s); + ret=snprintf(buf+len, max_len-len, "%.*s) VALUES", + keys[num_keys-1]->len, keys[num_keys-1]->s); if (ret<0) return ret; len += ret; @@ -2606,9 +2750,10 @@ static inline int init_raw_query(char* buf, int max_len, str* table_name) } -static int db_async_store(db_val_t* db_vals, - async_resume_module **resume_f, void **resume_param, - struct tz_table_list* t_el) +static int +db_async_store(db_val_t* vals, db_key_t* keys, int num_keys, + append_db_vals_f append_db_vals, async_resume_module **resume_f, + void **resume_param, struct tz_table_list* t_el) { int ret; int read_fd; @@ -2623,7 +2768,7 @@ static int db_async_store(db_val_t* db_vals, *resume_f = NULL; *resume_param = NULL; async_status = ASYNC_NO_IO; - return db_sync_store(db_vals); + return db_sync_store(vals, keys, num_keys); } if (HAVE_MULTIPLE_ASYNC_INSERT && t_el == NULL) { @@ -2633,12 +2778,7 @@ static int db_async_store(db_val_t* db_vals, return -1; } - if (t_el) { - crt_as_query = t_el->as_qry; - } else { - /* Do we really need this? */ - crt_as_query = global_async_query; - } + crt_as_query = t_el->as_qry; if (HAVE_SHARED_QUERIES) GET_QUERY_LOCK(crt_as_query); @@ -2646,22 +2786,18 @@ static int db_async_store(db_val_t* db_vals, /* use the global async query; we do this only once */ if (CURR_QUERIES(crt_as_query) == 0) { QUERY_LEN(crt_as_query)=init_raw_query(QUERY_BUF(crt_as_query), MAX_QUERY, - ¤t_table); - - ret=append_sc_values(QUERY_BUF(crt_as_query)+QUERY_LEN(crt_as_query), MAX_QUERY-QUERY_LEN(crt_as_query), db_vals); - if (ret < 0) - goto no_buffer; - - QUERY_LEN(crt_as_query) += ret; + ¤t_table, keys, num_keys); } else { QUERY_BUF(crt_as_query)[QUERY_LEN(crt_as_query)++] = ','; - ret=append_sc_values(QUERY_BUF(crt_as_query)+QUERY_LEN(crt_as_query), MAX_QUERY-QUERY_LEN(crt_as_query), db_vals); - if (ret < 0) - goto no_buffer; - - QUERY_LEN(crt_as_query) += ret; } + ret=append_db_vals(QUERY_BUF(crt_as_query)+QUERY_LEN(crt_as_query), + MAX_QUERY-QUERY_LEN(crt_as_query), vals); + if (ret < 0) + goto no_buffer; + + QUERY_LEN(crt_as_query) += ret; + if ((++CURR_QUERIES(crt_as_query)) == max_async_queries) { CURR_QUERIES(crt_as_query) = 0; @@ -2718,6 +2854,93 @@ int resume_async_dbquery(int fd, struct sip_msg *msg, void *_param) return 1; } +static inline int change_table_unsafe(struct tz_table_list* t_el, str* new_table_name) +{ + str query_str; + + /* execute remaining queries for the old table */ + if (CURR_QUERIES(t_el->as_qry)) { + query_str.s = QUERY_BUF(t_el->as_qry); + query_str.len = QUERY_LEN(t_el->as_qry); + if (do_remaining_queries(&query_str) < 0){ + LM_ERR("failed to execute remaining queries " + "when switching to new table!\n"); + RELEASE_QUERY_LOCK(t_el->as_qry); + return -1; + } + CURR_QUERIES(t_el->as_qry) = 0; + + /* update the suffix */ + LAST_SUFFIX(t_el->as_qry).len = new_table_name->len - t_el->table->prefix.len; + memcpy(LAST_SUFFIX(t_el->as_qry).s, + new_table_name->s+t_el->table->prefix.len, + LAST_SUFFIX(t_el->as_qry).len); + } + + return 0; +} + +static inline int try_change_suffix(struct tz_table_list* t_el, str* new_table) +{ + int ret=0; + + struct _async_query* as_qry=t_el->as_qry; + + + GET_QUERY_LOCK(as_qry); + + if (LAST_SUFFIX(as_qry).len) { + if (memcmp(LAST_SUFFIX(as_qry).s, new_table->s+t_el->table->prefix.len, + LAST_SUFFIX(as_qry).len)) { + /* try changing table */ + if (change_table_unsafe(t_el, new_table) < 0) { + LM_ERR("failed changing tables!\n"); + ret=-1; + goto out_safe; + } + } + } + +out_safe: + RELEASE_QUERY_LOCK(t_el->as_qry); + return ret; +} + +/* + * no need to allocate output string buffer + * */ +static inline void build_table_name(tz_table_t* table_format, str* table_s) +{ + time_t rawtime; + struct tm* gmtm; + + table_s->s = table_buf; + memcpy(current_table.s, table_format->prefix.s, table_format->prefix.len); + table_s->len = table_format->prefix.len; + + if (table_format->suffix.len && table_format->suffix.s) { + time(&rawtime); + gmtm = gmtime(&rawtime); + table_s->len += strftime(table_s->s+table_s->len, CAPTURE_TABLE_MAX_LEN-table_s->len, + table_format->suffix.s, gmtm); + } +} + +static inline struct tz_table_list* search_table(tz_table_t* el, struct tz_table_list* list) { + struct tz_table_list* it = NULL; + + for (it=list; it; it=it->next) + if (el->prefix.len && el->prefix.len == it->table->prefix.len && + !memcmp(el->prefix.s, it->table->prefix.s, el->prefix.len) && + el->suffix.len == it->table->suffix.len && + !memcmp(el->suffix.s, it->table->suffix.s, el->suffix.len)) + return it; + + return it; +} + + + static int sip_capture(struct sip_msg *msg, char* s1, char* s2) { return w_sip_capture(msg, s1, NULL, NULL); @@ -2745,88 +2968,33 @@ static int w_sip_capture(struct sip_msg *msg, char *table_name, struct timezone tz; char tmp_node[100]; - time_t rawtime; - struct tm* gmtm; - struct hep_desc *h=NULL; struct hep_context* ctx; - tz_table_t* tzt = (tz_table_t*)table_name, *t_el; + tz_table_t* tzt = (tz_table_t*)table_name; - struct tz_table_list* t_it=NULL; - str query_str; + struct tz_table_list* t_it=&tz_global; if (tzt == NULL ) { tzt = &tz_table; } - /**/ - if (resume_f && resume_param && HAVE_MULTIPLE_ASYNC_INSERT) + /* need list element only if for async */ + if (IS_ASYNC_F && HAVE_MULTIPLE_ASYNC_INSERT) { - if (table_name != NULL) { /* find the table in the list */ - for (t_it=tz_list; t_it; t_it=t_it->next) { - t_el = t_it->table; - if (t_el->prefix.len == tzt->prefix.len && - t_el->suffix.len == tzt->suffix.len && - !memcmp(t_el->prefix.s, tzt->prefix.s, t_el->prefix.len) && - !memcmp(t_el->suffix.s, tzt->suffix.s, t_el->suffix.len)) - break; - } - - if (t_it == NULL) { + if ((t_it=search_table(tzt, tz_list)) == NULL) { LM_ERR("Invalid table given!\n"); return -1; } - } else{ - /* or use the default one */ - t_it = &tz_global; } - } - - current_table.s = table_buf; - memcpy(current_table.s, tzt->prefix.s, tzt->prefix.len); - current_table.len = tzt->prefix.len; - if (tzt->suffix.s && tzt->suffix.len) { - time(&rawtime); - gmtm = gmtime(&rawtime); - - current_table.len += strftime(current_table.s+current_table.len, - CAPTURE_TABLE_MAX_LEN - current_table.len, tzt->suffix.s, gmtm); - - /* check if table name changed and execute the remaining queries - * we do this only if we have a suffix, else table name won't change */ - if (resume_f && resume_param && HAVE_MULTIPLE_ASYNC_INSERT) { - GET_QUERY_LOCK(t_it->as_qry); - if (LAST_SUFFIX(t_it->as_qry).len && - memcmp(LAST_SUFFIX(t_it->as_qry).s, - current_table.s+tzt->prefix.len, - current_table.len-tzt->prefix.len)) { - - /* execute remaining queries for the old table */ - if (CURR_QUERIES(t_it->as_qry)) { - query_str.s = QUERY_BUF(t_it->as_qry); - query_str.len = QUERY_LEN(t_it->as_qry); - if (do_remaining_queries(&query_str) < 0){ - LM_ERR("failed to execute remaining queries " - "when switching to new table!\n"); - RELEASE_QUERY_LOCK(t_it->as_qry); - return -1; - } - CURR_QUERIES(t_it->as_qry) = 0; - - /* update the suffix */ - LAST_SUFFIX(t_it->as_qry).len = current_table.len - tzt->prefix.len; - memcpy(LAST_SUFFIX(t_it->as_qry).s, - current_table.s+t_it->table->prefix.len, - LAST_SUFFIX(t_it->as_qry).len); - } - } - RELEASE_QUERY_LOCK(t_it->as_qry); - } + build_table_name(tzt, ¤t_table); + if (tzt->suffix.s && tzt->suffix.len && IS_ASYNC_F && HAVE_MULTIPLE_ASYNC_INSERT) { + if (try_change_suffix(t_it, ¤t_table) < 0) + return -1; } gettimeofday( &tvb, &tz ); @@ -4370,6 +4538,350 @@ static int w_hep_resume_sip(struct sip_msg *msg) (capture_on_flag==NULL || *capture_on_flag==0) +/* + * Report Capture logic + */ + + +/* fixup */ +static void set_rtcp_keys(void) +{ + rtcp_db_keys[0] = &date_column; + rtcp_db_keys[1] = µ_ts_column; + rtcp_db_keys[2] = &correlation_column; + rtcp_db_keys[3] = &source_ip_column; + rtcp_db_keys[4] = &source_port_column; + rtcp_db_keys[5] = &dest_ip_column; + rtcp_db_keys[6] = &dest_port_column; + rtcp_db_keys[7] = &proto_column; + rtcp_db_keys[8] = &family_column; + rtcp_db_keys[9] = &type_column; + rtcp_db_keys[10] = &node_column; + rtcp_db_keys[11] = &msg_column; +} + + +#define FIXUP_RC_PARAMS(param, fix_func, param_no) \ + do { \ + switch (param_no) { \ + case 1: \ + return fix_func(param, &rc_list); \ + case 2: \ + return fixup_sgp(param); \ + default: \ + LM_ERR("Invalid param number!\n"); \ + return -1; \ + } \ + } while(0); + + +static int rc_fixup_1(void** param, int param_no) +{ + if (param_no != 1) { + LM_ERR("Invalid param number!\n"); + return -1; + } + + return fixup_sgp(param); +} + +static int rc_fixup(void** param, int param_no) +{ + if (param_no < 1 || param_no > 2) { + LM_ERR("Invalid param number!\n"); + return -1; + } + + FIXUP_RC_PARAMS(param, fixup_tz_table, param_no); + + return 0; +} + +static int rc_async_fixup_1(void** param, int param_no) +{ + if (param_no != 1) { + LM_ERR("Invalid param number!\n"); + return -1; + } + + return fixup_sgp(param); +} + +static int rc_async_fixup(void** param, int param_no) +{ + if (param_no < 1 || param_no > 2) { + LM_ERR("Invalid param number!\n"); + return -1; + } + + FIXUP_RC_PARAMS(param, fixup_async_tz_table, param_no); + + return 0; +} + +static inline void build_hepv3_obj(struct hepv3* h3, struct _sipcapture_object* sco) { + + sco->proto = h3->hg.ip_proto.data; + sco->family = h3->hg.ip_family.data; + + if (h3->hg.ip_family.data == AF_INET) { + inet_ntop(AF_INET, &(h3->addr.ip4_addr.dst_ip4.data), sco->destination_ip.s, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(h3->addr.ip4_addr.src_ip4.data), sco->source_ip.s, INET_ADDRSTRLEN); + } else { + inet_ntop(AF_INET, &(h3->addr.ip6_addr.dst_ip6.data), sco->destination_ip.s, INET6_ADDRSTRLEN); + inet_ntop(AF_INET, &(h3->addr.ip6_addr.src_ip6.data), sco->source_ip.s, INET6_ADDRSTRLEN); + } + + sco->source_ip.len = strlen(sco->source_ip.s); + sco->source_port = h3->hg.src_port.data; + + sco->destination_ip.len = strlen(sco->destination_ip.s); + sco->destination_port = h3->hg.dst_port.data; + + if (h3->hg.proto_t.data == 5 || h3->hg.proto_t.data == 99 || h3->hg.proto_t.data==100) + sco->proto_type = 1; + else if (h3->hg.proto_t.data == 32) + sco->proto_type = 2; + else + sco->proto_type = h3->hg.proto_t.data; + + sco->tmstamp = h3->hg.time_sec.data*1000000 + h3->hg.time_usec.data; + + /* WARN node must be allocated */ + sco->node.len = snprintf(sco->node.s, 100, "%.*s:%i", capture_node.len, capture_node.s, h3->hg.capt_id.data); +} + +static inline void build_hepv2_obj(struct hepv12* h2, struct _sipcapture_object* sco) { + struct timeval tvb; + + sco->proto = h2->hdr.hp_p; + sco->family = h2->hdr.hp_f; + + if (h2->hdr.hp_f == AF_INET) { + inet_ntop(AF_INET, &(h2->addr.hep_ipheader.hp_dst), sco->destination_ip.s, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(h2->addr.hep_ipheader.hp_src), sco->source_ip.s, INET_ADDRSTRLEN); + } else { + inet_ntop(AF_INET, &(h2->addr.hep_ip6header.hp6_dst), sco->destination_ip.s, INET6_ADDRSTRLEN); + inet_ntop(AF_INET, &(h2->addr.hep_ip6header.hp6_src), sco->source_ip.s, INET6_ADDRSTRLEN); + } + + sco->source_ip.len = strlen(sco->source_ip.s); + sco->source_port = h2->hdr.hp_sport; + + sco->destination_ip.len = strlen(sco->destination_ip.s); + sco->destination_port = h2->hdr.hp_dport; + + /* only sip in hepv1/2 */ + sco->proto_type = 1; + + if (h2->hdr.hp_v == 2) { + sco->tmstamp = h2->hep_time.tv_sec*1000000 + h2->hep_time.tv_usec; + + /* WARN node must be allocated */ + sco->node.len = snprintf(sco->node.s, 100, "%.*s:%i", capture_node.len, capture_node.s, h2->hep_time.captid); + } else { + gettimeofday(&tvb, NULL); + sco->tmstamp = tvb.tv_sec * 1000000 + tvb.tv_usec; + + sco->node = capture_node; + } +} + +static inline int append_rc_values(char* buf, int max_len, db_val_t* db_vals) +{ + int len; + + len = snprintf(buf, max_len, RTCP_VALUES_STR, + VAL_TIME(db_vals+0), VAL_BIGINT(db_vals+1), + VAL_STR(db_vals+2).len, VAL_STR(db_vals+2).s, + VAL_STR(db_vals+3).len, VAL_STR(db_vals+3).s, + VAL_INT(db_vals+4), + VAL_STR(db_vals+5).len, VAL_STR(db_vals+5).s, + VAL_INT(db_vals+6), VAL_INT(db_vals+7), VAL_INT(db_vals+8), VAL_INT(db_vals+9), + VAL_STR(db_vals+10).len, VAL_STR(db_vals+10).s, + VAL_STR(db_vals+11).len, VAL_STR(db_vals+11).s + ); + + return len; +} + + +static int report_capture(struct sip_msg* msg, str* table, str* cor_id, + struct tz_table_list* t_el, async_resume_module **resume_f, void** resume_param) +{ + char node[100]; + char src_ip[INET6_ADDRSTRLEN], dst_ip[INET6_ADDRSTRLEN]; + + struct _sipcapture_object sco; + + struct hep_desc *h; + struct hep_context *ctx; + + db_val_t db_vals[RTCP_NR_KEYS]; + + if ((ctx=HEP_GET_CONTEXT(hep_api)) == NULL) { + LM_WARN("not a hep message!\n"); + return -1; + } + + h= &ctx->h; + + memset(&sco, 0, sizeof(struct _sipcapture_object)); + + sco.node.s = node; + sco.source_ip.s = src_ip; + sco.destination_ip.s = dst_ip; + + if (h->version == 3) { + build_hepv3_obj(&h->u.hepv3, &sco); + } else { + build_hepv2_obj(&h->u.hepv12, &sco); + } + + + memset(db_vals, 0, sizeof(db_val_t) * RTCP_NR_KEYS); + + db_vals[0].type = DB_DATETIME; + db_vals[0].val.time_val = time(NULL); + + db_vals[1].type = DB_BIGINT; + db_vals[1].val.bigint_val = sco.tmstamp; + + db_vals[2].type = DB_STR; + db_vals[2].val.str_val = *cor_id; + + db_vals[3].type = DB_STR; + db_vals[3].val.str_val = sco.source_ip; + + db_vals[4].type = DB_INT; + db_vals[4].val.int_val = sco.source_port; + + db_vals[5].type = DB_STR; + db_vals[5].val.str_val = sco.destination_ip; + + db_vals[6].type = DB_INT; + db_vals[6].val.int_val = sco.destination_port; + + db_vals[7].type = DB_INT; + db_vals[7].val.int_val = sco.proto; + + db_vals[8].type = DB_INT; + db_vals[8].val.int_val = sco.family; + + db_vals[9].type = DB_INT; + db_vals[9].val.int_val = sco.type; + + db_vals[10].type = DB_STR; + db_vals[10].val.str_val = sco.node; + + db_vals[11].type = DB_BLOB; + + + /* we can have other pyload than sip only for hepv3 */ + if (h->version == 3) { + db_vals[11].val.str_val.s = h->u.hepv3.payload_chunk.data; + db_vals[11].val.str_val.len = h->u.hepv3.payload_chunk.chunk.length - sizeof(h->u.hepv3.payload_chunk.chunk); + } else { + db_vals[11].val.str_val.s = msg->buf; + db_vals[11].val.str_val.len = msg->len; + } + + + /* each query has it's own parameters for the prepared statements */ + if (con_set_inslist(&db_funcs,db_con,&rc_ins_list,db_keys,NR_KEYS) < 0 ) + CON_RESET_INSLIST(db_con); + CON_PS_REFERENCE(db_con) = &rc_ps; + + if (!resume_f && db_sync_store(db_vals, rtcp_db_keys, RTCP_NR_KEYS) != 1) { + LM_ERR("failed to insert into database\n"); + return -1; + } else if (resume_f) { + return db_async_store(db_vals, rtcp_db_keys, RTCP_NR_KEYS, append_rc_values, + resume_f, resume_param, t_el); + } + + return 1; +} + +static int w_report_capture_1(struct sip_msg* msg, char* cor_id_p) +{ + return w_report_capture(msg, NULL, cor_id_p, NULL, NULL); +} + +static int w_report_capture_2(struct sip_msg* msg, char* table_p, char* cor_id_p) +{ + return w_report_capture(msg, table_p, cor_id_p, NULL, NULL); +} + +static int w_report_capture_async_1(struct sip_msg* msg, + async_resume_module** resume_f, void** resume_param, char* cor_id_p) +{ + return w_report_capture(msg, NULL, cor_id_p, resume_f, resume_param); +} + +static int w_report_capture_async_2(struct sip_msg* msg, + async_resume_module** resume_f, void** resume_param, + char* table_p, char* cor_id_p) +{ + return w_report_capture(msg, table_p, cor_id_p, resume_f, resume_param); +} + + + +static int w_report_capture(struct sip_msg* msg, char* table_p, char* cor_id_p, + async_resume_module **resume_f, void** resume_param) +{ + str cor_id_s; + + tz_table_t* rct; + struct tz_table_list* t_el=&rc_global; + + + if (cor_id_p == NULL) { + LM_ERR("correaltion id param is mandatory!\n"); + return -1; + } + + + if (table_p) { + rct = (tz_table_t *)table_p; + } else { + rct = &rc_table; + } + + if (fixup_get_svalue(msg, (gparam_p)cor_id_p, &cor_id_s) < 0 ) { + LM_ERR("failed to fetch correlation id!\n"); + return -1; + } + + if (cor_id_s.s == NULL || cor_id_s.len == 0) { + LM_ERR("empty correlation id!\n"); + return -1; + } + + + if (IS_ASYNC_F && HAVE_MULTIPLE_ASYNC_INSERT) { + if (table_p) { + if ((t_el=search_table(rct, rc_list)) == NULL) { + LM_ERR("Invalid table given!\n"); + return -1; + } + } + } + + build_table_name(rct, ¤t_table); + if (rct->suffix.s && rct->suffix.len && IS_ASYNC_F && HAVE_MULTIPLE_ASYNC_INSERT) { + if (try_change_suffix(t_el, ¤t_table) < 0) + return -1; + } + + return report_capture(msg, ¤t_table, &cor_id_s, t_el, resume_f, resume_param); +} + +/* + * + */ + /*! \brief * MI Sip_capture command * diff --git a/modules/sipcapture/sql/rtcpcapture.sql b/modules/sipcapture/sql/rtcpcapture.sql new file mode 100644 index 0000000000..a1e775a293 --- /dev/null +++ b/modules/sipcapture/sql/rtcpcapture.sql @@ -0,0 +1,18 @@ +CREATE TABLE IF NOT EXISTS `rtcp_capture` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `micro_ts` bigint(18) NOT NULL DEFAULT '0', + `correlation_id` varchar(256) NOT NULL DEFAULT '', + `source_ip` varchar(60) NOT NULL DEFAULT '', + `source_port` int(10) NOT NULL DEFAULT 0, + `destination_ip` varchar(60) NOT NULL DEFAULT '', + `destination_port` int(10) NOT NULL DEFAULT 0, + `proto` int(5) NOT NULL DEFAULT 0, + `family` int(1) DEFAULT NULL, + `type` int(2) NOT NULL DEFAULT 0, + `node` varchar(125) NOT NULL DEFAULT '', + `msg` varchar(1500) NOT NULL DEFAULT '', + PRIMARY KEY (`id`,`date`), + KEY `date` (`date`), + KEY `correlationid` (`correlation_id`(255)) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8;