Skip to content

Commit

Permalink
Merge branch 'feature/b2b-clustering'
Browse files Browse the repository at this point in the history
  • Loading branch information
rvlad-patrascu committed Mar 5, 2020
2 parents 1943983 + 39c7e83 commit f83fba9
Show file tree
Hide file tree
Showing 34 changed files with 1,634 additions and 299 deletions.
43 changes: 43 additions & 0 deletions bin_interface.c
Expand Up @@ -120,6 +120,25 @@ void bin_init_buffer(bin_packet_t *packet, char *buffer, int length)
LM_DBG("init buffer length %d\n", length);
}

int bin_append_buffer(bin_packet_t *packet, str *buf)
{
if (!packet->buffer.s || !packet->size) {
LM_ERR("not initialized yet, call bin_init before altering buffer\n");
return -1;
}

if (packet->buffer.len + buf->len > packet->size) {
if (bin_extend(packet, buf->len) < 0)
return -1;
}

memcpy(packet->buffer.s + packet->buffer.len, buf->s, buf->len);
packet->buffer.len += buf->len;
set_len(packet);

return packet->buffer.len;
}

/*
* copies the given string at the end position in the packet
* allows null strings (NULL content or NULL param)
Expand Down Expand Up @@ -493,6 +512,30 @@ int bin_get_buffer(bin_packet_t *packet, str *buffer)
return 1;
}

int bin_get_content_start(bin_packet_t *packet, str *buf)
{
if (!buf)
return -1;

buf->s = packet->buffer.s + HEADER_SIZE + LEN_FIELD_SIZE +
*(unsigned short *)(packet->buffer.s + HEADER_SIZE) + sizeof(int);
buf->len = packet->buffer.len - (buf->s - packet->buffer.s);

return buf->len;
}

int bin_get_content_pos(bin_packet_t *packet, str *buf)
{
if (!buf)
return -1;

buf->s = packet->front_pointer;
buf->len = packet->buffer.len - (packet->front_pointer -
packet->buffer.s);

return buf->len;
}

int bin_reset_back_pointer(bin_packet_t *packet)
{
int cap_len;
Expand Down
23 changes: 23 additions & 0 deletions bin_interface.h
Expand Up @@ -133,6 +133,17 @@ int bin_init(bin_packet_t *packet, str *capability, int packet_type, short versi
*/
void bin_init_buffer(bin_packet_t *packet, char *buffer, int length);

/*
* appends a buffer to a binary packet
* @buf: buffer to be appended
* @len: length of @buf
*
* @return:
* > 0: success, the size of the buffer
* < 0: internal buffer limit reached
*/
int bin_append_buffer(bin_packet_t *packet, str *buf);

/*
* adds a new string value to the packet being currently built
* @info: may also be NULL
Expand Down Expand Up @@ -243,5 +254,17 @@ int bin_reset_back_pointer(bin_packet_t *packet);
*/
int bin_get_buffer(bin_packet_t *packet, str *buffer);

/*
* returns the bin packet's buffer from the position where
* the serialized content actually starts
*/
int bin_get_content_start(bin_packet_t *packet, str *buf);

/*
* returns the bin packet's buffer from the position of the
* next field to be consumed
*/
int bin_get_content_pos(bin_packet_t *packet, str *buf);

#endif /* __BINARY_INTERFACE__ */

18 changes: 17 additions & 1 deletion db/schema/b2b_entities.xml
Expand Up @@ -9,7 +9,7 @@

<table id="b2b_entities" xmlns:db="http://docbook.org/ns/docbook">
<name>b2b_entities</name>
<version>1</version>
<version>2</version>
<type db="mysql">&MYSQL_TABLE_TYPE;</type>
<description>
<db:para>Table for the b2b_entities module. More information can be found at: &OPENSIPS_MOD_DOC;b2b_entities.html
Expand Down Expand Up @@ -160,6 +160,22 @@
<description>Logic parameter</description>
</column>

<column>
<name>mod_name</name>
<type>string</type>
<size>32</size>
<description>OpenSIPS module that this entity belongs to</description>
</column>

<column>
<name>storage</name>
<type>binary</type>
<size>4096</size>
<default><null/></default>
<null/>
<description>Generic binary data storage</description>
</column>

<column>
<name>lm</name>
<type>int</type>
Expand Down
19 changes: 18 additions & 1 deletion modules/b2b_entities/b2b_entities.c
Expand Up @@ -44,8 +44,9 @@
#include "b2b_entities.h"
#include "server.h"
#include "dlg.h"
#include "b2be_clustering.h"

#define TABLE_VERSION 1
#define TABLE_VERSION 2

/** Functions declarations */
static int mod_init(void);
Expand Down Expand Up @@ -74,6 +75,8 @@ int b2be_db_mode = WRITE_BACK;
b2b_table server_htable;
b2b_table client_htable;

int b2be_cluster;

#define DB_COLS_NO 26

/* TM bind */
Expand All @@ -100,6 +103,7 @@ static param_export_t params[]={
{ "db_mode", INT_PARAM, &b2be_db_mode },
{ "update_period", INT_PARAM, &b2b_update_period },
{ "b2b_key_prefix", STR_PARAM, &b2b_key_prefix.s },
{ "cluster_id", INT_PARAM, &b2be_cluster },
{ 0, 0, 0 }
};

Expand All @@ -119,6 +123,7 @@ static dep_export_t deps = {
},
{ /* modparam dependencies */
{ "db_url", get_deps_sqldb_url },
{ "cluster_id", get_deps_clusterer },
{ NULL, NULL },
},
};
Expand Down Expand Up @@ -292,6 +297,11 @@ static int mod_init(void)
b2b_update_period, TIMER_FLAG_SKIP_ON_DELAY);
//register_timer("b2b2-clean", b2be_clean, 0, b2b_update_period);

if (b2be_init_clustering() < 0) {
LM_ERR("Failed to init clustering support\n");
return -1;
}

return 0;
}

Expand Down Expand Up @@ -444,6 +454,9 @@ int b2b_update_b2bl_param(enum b2b_entity_type type, str* key,
dlg->param.len = param->len;
lock_release(&table[hash_index].lock);

if (b2be_cluster)
replicate_entity_update(dlg, type, hash_index, param, NULL);

return 0;
}

Expand Down Expand Up @@ -519,6 +532,7 @@ int b2b_entities_bind(b2b_api_t* api)
api->send_reply = b2b_send_reply;
api->entity_delete = b2b_entity_delete;
api->restore_logic_info = b2b_restore_logic_info;
api->register_cb = b2b_register_cb;
api->update_b2bl_param = b2b_update_b2bl_param;
api->entities_db_delete = b2b_db_delete;
api->get_b2bl_key = b2b_get_b2bl_key;
Expand Down Expand Up @@ -550,6 +564,9 @@ static inline int mi_print_b2be_dlg(mi_item_t *resp_arr, b2b_table htable, unsig
if (add_mi_string(arr_item, MI_SSTR("param"),
dlg->param.s, dlg->param.len) < 0)
goto error;
if (add_mi_string(arr_item, MI_SSTR("mod_name"),
dlg->mod_name.s, dlg->mod_name.len) < 0)
goto error;
if (add_mi_number(arr_item, MI_SSTR("state"), dlg->state) < 0)
goto error;
if (add_mi_number(arr_item, MI_SSTR("last_invite_cseq"),
Expand Down
40 changes: 0 additions & 40 deletions modules/b2b_entities/b2b_entities.h
Expand Up @@ -43,34 +43,10 @@
#define WRITE_THROUGH 1
#define WRITE_BACK 2

typedef int (*b2b_restore_linfo_t)(enum b2b_entity_type type, str* key,
b2b_notify_t cback);

typedef int (*b2b_update_b2bl_param_t)(enum b2b_entity_type type, str* key,
str* param);
typedef void (*b2b_db_delete_t)(str param);
typedef int (*b2b_get_b2bl_key_t)(str* callid, str* from_tag, str* to_tag,
str* entity_key, str* tuple_key);

extern int uac_auth_loaded;
extern str b2b_key_prefix;
#define B2B_MAX_PREFIX_LEN 5

typedef struct b2b_api
{
b2b_server_new_t server_new;
b2b_client_new_t client_new;
b2b_send_request_t send_request;
b2b_send_reply_t send_reply;
b2b_entity_delete_t entity_delete;
b2b_db_delete_t entities_db_delete;
b2b_restore_linfo_t restore_logic_info;
b2b_update_b2bl_param_t update_b2bl_param;
b2b_get_b2bl_key_t get_b2bl_key;
b2b_apply_lumps_t apply_lumps;
}b2b_api_t;


extern unsigned int server_hsize;
extern unsigned int client_hsize;
extern struct tm_binds tmb;
Expand All @@ -83,20 +59,4 @@ extern db_func_t b2be_dbf;
extern str b2be_dbtable;
extern int b2be_db_mode;

typedef int(*load_b2b_f) (b2b_api_t* api);

static inline int load_b2b_api( struct b2b_api *b2b_api)
{
load_b2b_f load_b2b;

/* import the b2b_entities auto-loading function */
if ( !(load_b2b=(load_b2b_f)find_export("load_b2b", 0))) {
LM_ERR("can't import load_b2b\n");
return -1;
}

/* let the auto-loading function load all B2B entities stuff */
return load_b2b( b2b_api );
}

#endif

0 comments on commit f83fba9

Please sign in to comment.