Skip to content

Commit

Permalink
b2b_sdp_demux: fix replication events
Browse files Browse the repository at this point in the history
  • Loading branch information
razvancrainea committed May 13, 2022
1 parent 8f8c12d commit f20b55c
Showing 1 changed file with 132 additions and 15 deletions.
147 changes: 132 additions & 15 deletions modules/b2b_sdp_demux/b2b_sdp_demux.c
Expand Up @@ -27,6 +27,7 @@
#include "../../parser/sdp/sdp.h"
#include "../../lib/list.h"
#include "../../msg_translator.h"
#include "../../rw_locking.h"

static dep_export_t mod_deps = {
{ /* OpenSIPS module dependencies */
Expand Down Expand Up @@ -74,10 +75,25 @@ static cmd_export_t mod_cmds[] = {
{0,0,{{0,0,0}},0}
};

static rw_lock_t *b2b_sdp_contexts_lock;
static struct list_head *b2b_sdp_contexts;

/** Module init function */
static int mod_init(void)
{

b2b_sdp_contexts_lock = lock_init_rw();
if (!b2b_sdp_contexts_lock) {
LM_ERR("could not allocate contexts lock\n");
return -1;
}
b2b_sdp_contexts = shm_malloc(sizeof *b2b_sdp_contexts);
if (!b2b_sdp_contexts) {
LM_ERR("cannot create B2B SDP contexts list\n");
return -1;
}
INIT_LIST_HEAD(b2b_sdp_contexts);

/* load b2b_entities api */
if(load_b2b_api(&b2b_api) < 0) {
LM_ERR("Failed to load b2b api\n");
Expand Down Expand Up @@ -190,6 +206,7 @@ static struct b2b_sdp_stream *b2b_sdp_stream_raw_new(struct b2b_sdp_client *clie
#define B2B_SDP_CLIENT_EARLY (1<<0)
#define B2B_SDP_CLIENT_STARTED (1<<1)
#define B2B_SDP_CLIENT_PENDING (1<<2)
#define B2B_SDP_CLIENT_REPL (1<<3)

struct b2b_sdp_client {
unsigned int flags;
Expand All @@ -211,8 +228,11 @@ struct b2b_sdp_ctx {
gen_lock_t lock;
struct list_head clients;
struct list_head streams;
struct list_head contexts;
};



static str *b2b_sdp_label_from_sdp(sdp_stream_cell_t *stream)
{
sdp_attr_t *attr;
Expand Down Expand Up @@ -400,7 +420,11 @@ static void b2b_sdp_client_terminate(struct b2b_sdp_client *client, str *key, in

static void b2b_sdp_client_release(struct b2b_sdp_client *client, int lock)
{
struct b2b_sdp_ctx *ctx = client->ctx;
struct list_head *it, *safe;

client->ctx = NULL;

if (client->hdrs.s)
shm_free(client->hdrs.s);

Expand All @@ -410,11 +434,11 @@ static void b2b_sdp_client_release(struct b2b_sdp_client *client, int lock)
list_for_each_safe(it, safe, &client->streams)
b2b_sdp_stream_free(list_entry(it, struct b2b_sdp_stream, list));
if (lock)
lock_get(&client->ctx->lock);
lock_get(&ctx->lock);
list_del(&client->list);
client->ctx->clients_no--;
ctx->clients_no--;
if (lock)
lock_release(&client->ctx->lock);
lock_release(&ctx->lock);
shm_free(client);
}

Expand All @@ -436,10 +460,51 @@ static struct b2b_sdp_ctx *b2b_sdp_ctx_new(void)
INIT_LIST_HEAD(&ctx->streams);
lock_init(&ctx->lock);
time(&ctx->sess_id);
lock_start_write(b2b_sdp_contexts_lock);
list_add(&ctx->contexts, b2b_sdp_contexts);
lock_stop_write(b2b_sdp_contexts_lock);
return ctx;
}

static void b2b_sdp_ctx_free(struct b2b_sdp_ctx *ctx)
static struct b2b_sdp_ctx *b2b_sdp_ctx_get(str *key)
{
struct list_head *it;
struct b2b_sdp_ctx *ctx;

lock_start_read(b2b_sdp_contexts_lock);
list_for_each(it, b2b_sdp_contexts) {
ctx = list_entry(it, struct b2b_sdp_ctx, contexts);
if (!ctx->b2b_key.len)
continue;
if (str_match(key, &ctx->b2b_key)) {
lock_stop_read(b2b_sdp_contexts_lock);
return ctx;
}
}
lock_stop_read(b2b_sdp_contexts_lock);
return NULL;
}


/* has the ctx's lock taken */
static struct b2b_sdp_client *b2b_sdp_client_get(struct b2b_sdp_ctx *ctx, str *key)
{
struct list_head *it;
struct b2b_sdp_client *client;

lock_get(&ctx->lock);
list_for_each(it, &ctx->clients) {
client = list_entry(it, struct b2b_sdp_client, list);
if (!ctx->b2b_key.len)
continue;
if (str_match(key, &client->b2b_key))
return client;
}
lock_release(&ctx->lock);
return NULL;
}

static void b2b_sdp_ctx_free(struct b2b_sdp_ctx *ctx, int replicate)
{
struct list_head *it, *safe;

Expand All @@ -449,9 +514,12 @@ static void b2b_sdp_ctx_free(struct b2b_sdp_ctx *ctx)
list_for_each_safe(it, safe, &ctx->streams)
b2b_sdp_stream_free(list_entry(it, struct b2b_sdp_stream, ordered));
if (ctx->b2b_key.s) {
b2b_api.entity_delete(B2B_SERVER, &ctx->b2b_key, NULL, 1, 1);
b2b_api.entity_delete(B2B_SERVER, &ctx->b2b_key, NULL, 1, replicate);
shm_free(ctx->b2b_key.s);
}
lock_start_write(b2b_sdp_contexts_lock);
list_del(&ctx->contexts);
lock_stop_write(b2b_sdp_contexts_lock);
shm_free(ctx);
}

Expand Down Expand Up @@ -1278,14 +1346,14 @@ static int b2b_sdp_server_reply_invite(struct sip_msg *msg, struct b2b_sdp_ctx *

static int b2b_sdp_server_reply_bye(struct sip_msg *msg, struct b2b_sdp_ctx *ctx)
{
b2b_sdp_ctx_free(ctx);
b2b_sdp_ctx_free(ctx, 1);
return 0;
}

static int b2b_sdp_server_bye(struct sip_msg *msg, struct b2b_sdp_ctx *ctx)
{
b2b_sdp_reply(&ctx->b2b_key, B2B_SERVER, msg->REQ_METHOD, 200, NULL);
b2b_sdp_ctx_free(ctx);
b2b_sdp_ctx_free(ctx, 1);
return 0;
}

Expand Down Expand Up @@ -1525,7 +1593,7 @@ static int b2b_sdp_demux(struct sip_msg *msg, str *uri,
return 0;
error:
free_sdp_content(&sdp);
b2b_sdp_ctx_free(ctx);
b2b_sdp_ctx_free(ctx, 1);
return -1;
}

Expand Down Expand Up @@ -1621,6 +1689,10 @@ static int b2b_sdp_client_restore(struct b2b_sdp_client *client)
str hack;
hack.s = (char *)&client;
hack.len = sizeof(void *);

if ((client->flags & B2B_SDP_CLIENT_REPL) == 0)
return 0;

if (b2b_api.update_b2bl_param(B2B_CLIENT, &client->b2b_key, &hack, 0) < 0) {
LM_ERR("could not update restore param!\n");
return -1;
Expand All @@ -1629,6 +1701,7 @@ static int b2b_sdp_client_restore(struct b2b_sdp_client *client)
LM_ERR("could not register restore logic!\n");
return -1;
}
client->flags &= ~B2B_SDP_CLIENT_REPL;

return 0;
}
Expand Down Expand Up @@ -1672,10 +1745,15 @@ static void b2b_sdp_server_event_received_create(str *key, bin_packet_t *store)
LM_ERR("could not duplicate b2b client key!\n");
goto error;
}
if (b2b_sdp_client_restore(client) < 0) {
client->flags |= B2B_SDP_CLIENT_REPL;

/* check if the key exists */
if (b2b_api.entity_exists(B2B_CLIENT, &client->b2b_key) &&
b2b_sdp_client_restore(client) < 0) {
LM_ERR("could not restore b2b client logic!\n");
goto error;
}

bin_pop_str(store, &tmp);
if (tmp.len && shm_str_sync(&client->hdrs, &tmp) < 0) {
LM_ERR("could not duplicate b2b client headers!\n");
Expand Down Expand Up @@ -1705,12 +1783,12 @@ static void b2b_sdp_server_event_received_create(str *key, bin_packet_t *store)
}
return;
error:
b2b_sdp_ctx_free(ctx);
b2b_sdp_ctx_free(ctx, 0);
}

static void b2b_sdp_server_event_received_delete(struct b2b_sdp_ctx *ctx, bin_packet_t *store)
{
b2b_sdp_ctx_free(ctx);
b2b_sdp_ctx_free(ctx, 0);
}

static void b2b_sdp_server_event_trigger(enum b2b_entity_type et, str *key,
Expand All @@ -1726,7 +1804,7 @@ static void b2b_sdp_server_event_trigger(enum b2b_entity_type et, str *key,
* but for clusterer, during CREATE, thus we don't need to store
* it one more type on the ACK
*/
if (backend == B2BCB_BACKEND_CLUSTER)
if (backend & B2BCB_BACKEND_CLUSTER)
return;
case B2B_EVENT_CREATE:
b2b_sdp_server_event_trigger_create(ctx, store);
Expand Down Expand Up @@ -1759,6 +1837,12 @@ static void b2b_sdp_server_event_received(enum b2b_entity_type et, str *key,
}
}

static void b2b_sdp_client_event_trigger_create(struct b2b_sdp_client *client,
bin_packet_t *store)
{
bin_push_str(store, &client->ctx->b2b_key);
}

static void b2b_sdp_client_event_trigger_update(struct b2b_sdp_client *client,
bin_packet_t *store)
{
Expand Down Expand Up @@ -1807,6 +1891,31 @@ static void b2b_sdp_client_event_receive_update(struct b2b_sdp_client *client,
lock_release(&client->ctx->lock);
}

static void b2b_sdp_client_event_receive_create(str *key, bin_packet_t *store)
{
str ctx_key;
struct b2b_sdp_ctx *ctx;
struct b2b_sdp_client *client;

/* in the packet, we have the server's key */
bin_pop_str(store, &ctx_key);
ctx = b2b_sdp_ctx_get(&ctx_key);
if (!ctx) {
LM_DBG("no ctx available for %.*s\n", ctx_key.len, ctx_key.s);
return;
}
client = b2b_sdp_client_get(ctx, key);
if (!client) {
LM_DBG("no client %.*s available for ctx %.*s\n", key->len, key->s,
ctx_key.len, ctx_key.s);
return;
}

if (b2b_sdp_client_restore(client) < 0)
LM_ERR("could not restore b2b client logic!\n");
lock_release(&ctx->lock);
}

static void b2b_sdp_client_event_receive_delete(struct b2b_sdp_client *client,
bin_packet_t *store)
{
Expand All @@ -1822,7 +1931,7 @@ static void b2b_sdp_client_event_trigger(enum b2b_entity_type et, str *key,

switch (event_type) {
case B2B_EVENT_CREATE:
/* already handled by server */
b2b_sdp_client_event_trigger_create(client, store);
break;
case B2B_EVENT_UPDATE:
b2b_sdp_client_event_trigger_update(client, store);
Expand All @@ -1840,17 +1949,25 @@ static void b2b_sdp_client_event_received(enum b2b_entity_type et, str *key,
str *param, enum b2b_event_type event_type, bin_packet_t *store,
int backend)
{
struct b2b_sdp_client *client = *(struct b2b_sdp_client **)((str *)param)->s;
struct b2b_sdp_client *client;
if (!store)
return;

if (!param || !param->s)
client = *(struct b2b_sdp_client **)((str *)param)->s;
else
client = NULL;

switch (event_type) {
case B2B_EVENT_CREATE:
b2b_sdp_client_event_receive_create(key, store);
break;
case B2B_EVENT_UPDATE:
b2b_sdp_client_event_receive_update(client, store);
break;
case B2B_EVENT_DELETE:
b2b_sdp_client_event_receive_delete(client, store);
break;
case B2B_EVENT_CREATE:
default:
/* nothing else for now */
break;
Expand Down

0 comments on commit f20b55c

Please sign in to comment.