diff --git a/modules/b2b_sdp_demux/b2b_sdp_demux.c b/modules/b2b_sdp_demux/b2b_sdp_demux.c index 3edc2eb6565..aa8c56abbf9 100644 --- a/modules/b2b_sdp_demux/b2b_sdp_demux.c +++ b/modules/b2b_sdp_demux/b2b_sdp_demux.c @@ -27,6 +27,7 @@ #include "../../parser/sdp/sdp.h" #include "../../lib/list.h" #include "../../msg_translator.h" +#include "../../rw_locking.h" static dep_export_t mod_deps = { { /* OpenSIPS module dependencies */ @@ -74,10 +75,25 @@ static cmd_export_t mod_cmds[] = { {0,0,{{0,0,0}},0} }; +static rw_lock_t *b2b_sdp_contexts_lock; +static struct list_head *b2b_sdp_contexts; + /** Module init function */ static int mod_init(void) { + b2b_sdp_contexts_lock = lock_init_rw(); + if (!b2b_sdp_contexts_lock) { + LM_ERR("could not allocate contexts lock\n"); + return -1; + } + b2b_sdp_contexts = shm_malloc(sizeof *b2b_sdp_contexts); + if (!b2b_sdp_contexts) { + LM_ERR("cannot create B2B SDP contexts list\n"); + return -1; + } + INIT_LIST_HEAD(b2b_sdp_contexts); + /* load b2b_entities api */ if(load_b2b_api(&b2b_api) < 0) { LM_ERR("Failed to load b2b api\n"); @@ -190,6 +206,7 @@ static struct b2b_sdp_stream *b2b_sdp_stream_raw_new(struct b2b_sdp_client *clie #define B2B_SDP_CLIENT_EARLY (1<<0) #define B2B_SDP_CLIENT_STARTED (1<<1) #define B2B_SDP_CLIENT_PENDING (1<<2) +#define B2B_SDP_CLIENT_REPL (1<<3) struct b2b_sdp_client { unsigned int flags; @@ -211,8 +228,11 @@ struct b2b_sdp_ctx { gen_lock_t lock; struct list_head clients; struct list_head streams; + struct list_head contexts; }; + + static str *b2b_sdp_label_from_sdp(sdp_stream_cell_t *stream) { sdp_attr_t *attr; @@ -400,7 +420,11 @@ static void b2b_sdp_client_terminate(struct b2b_sdp_client *client, str *key, in static void b2b_sdp_client_release(struct b2b_sdp_client *client, int lock) { + struct b2b_sdp_ctx *ctx = client->ctx; struct list_head *it, *safe; + + client->ctx = NULL; + if (client->hdrs.s) shm_free(client->hdrs.s); @@ -410,11 +434,11 @@ static void b2b_sdp_client_release(struct b2b_sdp_client *client, int lock) list_for_each_safe(it, safe, &client->streams) b2b_sdp_stream_free(list_entry(it, struct b2b_sdp_stream, list)); if (lock) - lock_get(&client->ctx->lock); + lock_get(&ctx->lock); list_del(&client->list); - client->ctx->clients_no--; + ctx->clients_no--; if (lock) - lock_release(&client->ctx->lock); + lock_release(&ctx->lock); shm_free(client); } @@ -436,10 +460,51 @@ static struct b2b_sdp_ctx *b2b_sdp_ctx_new(void) INIT_LIST_HEAD(&ctx->streams); lock_init(&ctx->lock); time(&ctx->sess_id); + lock_start_write(b2b_sdp_contexts_lock); + list_add(&ctx->contexts, b2b_sdp_contexts); + lock_stop_write(b2b_sdp_contexts_lock); return ctx; } -static void b2b_sdp_ctx_free(struct b2b_sdp_ctx *ctx) +static struct b2b_sdp_ctx *b2b_sdp_ctx_get(str *key) +{ + struct list_head *it; + struct b2b_sdp_ctx *ctx; + + lock_start_read(b2b_sdp_contexts_lock); + list_for_each(it, b2b_sdp_contexts) { + ctx = list_entry(it, struct b2b_sdp_ctx, contexts); + if (!ctx->b2b_key.len) + continue; + if (str_match(key, &ctx->b2b_key)) { + lock_stop_read(b2b_sdp_contexts_lock); + return ctx; + } + } + lock_stop_read(b2b_sdp_contexts_lock); + return NULL; +} + + +/* has the ctx's lock taken */ +static struct b2b_sdp_client *b2b_sdp_client_get(struct b2b_sdp_ctx *ctx, str *key) +{ + struct list_head *it; + struct b2b_sdp_client *client; + + lock_get(&ctx->lock); + list_for_each(it, &ctx->clients) { + client = list_entry(it, struct b2b_sdp_client, list); + if (!ctx->b2b_key.len) + continue; + if (str_match(key, &client->b2b_key)) + return client; + } + lock_release(&ctx->lock); + return NULL; +} + +static void b2b_sdp_ctx_free(struct b2b_sdp_ctx *ctx, int replicate) { struct list_head *it, *safe; @@ -449,9 +514,12 @@ static void b2b_sdp_ctx_free(struct b2b_sdp_ctx *ctx) list_for_each_safe(it, safe, &ctx->streams) b2b_sdp_stream_free(list_entry(it, struct b2b_sdp_stream, ordered)); if (ctx->b2b_key.s) { - b2b_api.entity_delete(B2B_SERVER, &ctx->b2b_key, NULL, 1, 1); + b2b_api.entity_delete(B2B_SERVER, &ctx->b2b_key, NULL, 1, replicate); shm_free(ctx->b2b_key.s); } + lock_start_write(b2b_sdp_contexts_lock); + list_del(&ctx->contexts); + lock_stop_write(b2b_sdp_contexts_lock); shm_free(ctx); } @@ -1278,14 +1346,14 @@ static int b2b_sdp_server_reply_invite(struct sip_msg *msg, struct b2b_sdp_ctx * static int b2b_sdp_server_reply_bye(struct sip_msg *msg, struct b2b_sdp_ctx *ctx) { - b2b_sdp_ctx_free(ctx); + b2b_sdp_ctx_free(ctx, 1); return 0; } static int b2b_sdp_server_bye(struct sip_msg *msg, struct b2b_sdp_ctx *ctx) { b2b_sdp_reply(&ctx->b2b_key, B2B_SERVER, msg->REQ_METHOD, 200, NULL); - b2b_sdp_ctx_free(ctx); + b2b_sdp_ctx_free(ctx, 1); return 0; } @@ -1525,7 +1593,7 @@ static int b2b_sdp_demux(struct sip_msg *msg, str *uri, return 0; error: free_sdp_content(&sdp); - b2b_sdp_ctx_free(ctx); + b2b_sdp_ctx_free(ctx, 1); return -1; } @@ -1621,6 +1689,10 @@ static int b2b_sdp_client_restore(struct b2b_sdp_client *client) str hack; hack.s = (char *)&client; hack.len = sizeof(void *); + + if ((client->flags & B2B_SDP_CLIENT_REPL) == 0) + return 0; + if (b2b_api.update_b2bl_param(B2B_CLIENT, &client->b2b_key, &hack, 0) < 0) { LM_ERR("could not update restore param!\n"); return -1; @@ -1629,6 +1701,7 @@ static int b2b_sdp_client_restore(struct b2b_sdp_client *client) LM_ERR("could not register restore logic!\n"); return -1; } + client->flags &= ~B2B_SDP_CLIENT_REPL; return 0; } @@ -1672,10 +1745,15 @@ static void b2b_sdp_server_event_received_create(str *key, bin_packet_t *store) LM_ERR("could not duplicate b2b client key!\n"); goto error; } - if (b2b_sdp_client_restore(client) < 0) { + client->flags |= B2B_SDP_CLIENT_REPL; + + /* check if the key exists */ + if (b2b_api.entity_exists(B2B_CLIENT, &client->b2b_key) && + b2b_sdp_client_restore(client) < 0) { LM_ERR("could not restore b2b client logic!\n"); goto error; } + bin_pop_str(store, &tmp); if (tmp.len && shm_str_sync(&client->hdrs, &tmp) < 0) { LM_ERR("could not duplicate b2b client headers!\n"); @@ -1705,12 +1783,12 @@ static void b2b_sdp_server_event_received_create(str *key, bin_packet_t *store) } return; error: - b2b_sdp_ctx_free(ctx); + b2b_sdp_ctx_free(ctx, 0); } static void b2b_sdp_server_event_received_delete(struct b2b_sdp_ctx *ctx, bin_packet_t *store) { - b2b_sdp_ctx_free(ctx); + b2b_sdp_ctx_free(ctx, 0); } static void b2b_sdp_server_event_trigger(enum b2b_entity_type et, str *key, @@ -1726,7 +1804,7 @@ static void b2b_sdp_server_event_trigger(enum b2b_entity_type et, str *key, * but for clusterer, during CREATE, thus we don't need to store * it one more type on the ACK */ - if (backend == B2BCB_BACKEND_CLUSTER) + if (backend & B2BCB_BACKEND_CLUSTER) return; case B2B_EVENT_CREATE: b2b_sdp_server_event_trigger_create(ctx, store); @@ -1759,6 +1837,12 @@ static void b2b_sdp_server_event_received(enum b2b_entity_type et, str *key, } } +static void b2b_sdp_client_event_trigger_create(struct b2b_sdp_client *client, + bin_packet_t *store) +{ + bin_push_str(store, &client->ctx->b2b_key); +} + static void b2b_sdp_client_event_trigger_update(struct b2b_sdp_client *client, bin_packet_t *store) { @@ -1807,6 +1891,31 @@ static void b2b_sdp_client_event_receive_update(struct b2b_sdp_client *client, lock_release(&client->ctx->lock); } +static void b2b_sdp_client_event_receive_create(str *key, bin_packet_t *store) +{ + str ctx_key; + struct b2b_sdp_ctx *ctx; + struct b2b_sdp_client *client; + + /* in the packet, we have the server's key */ + bin_pop_str(store, &ctx_key); + ctx = b2b_sdp_ctx_get(&ctx_key); + if (!ctx) { + LM_DBG("no ctx available for %.*s\n", ctx_key.len, ctx_key.s); + return; + } + client = b2b_sdp_client_get(ctx, key); + if (!client) { + LM_DBG("no client %.*s available for ctx %.*s\n", key->len, key->s, + ctx_key.len, ctx_key.s); + return; + } + + if (b2b_sdp_client_restore(client) < 0) + LM_ERR("could not restore b2b client logic!\n"); + lock_release(&ctx->lock); +} + static void b2b_sdp_client_event_receive_delete(struct b2b_sdp_client *client, bin_packet_t *store) { @@ -1822,7 +1931,7 @@ static void b2b_sdp_client_event_trigger(enum b2b_entity_type et, str *key, switch (event_type) { case B2B_EVENT_CREATE: - /* already handled by server */ + b2b_sdp_client_event_trigger_create(client, store); break; case B2B_EVENT_UPDATE: b2b_sdp_client_event_trigger_update(client, store); @@ -1840,9 +1949,18 @@ static void b2b_sdp_client_event_received(enum b2b_entity_type et, str *key, str *param, enum b2b_event_type event_type, bin_packet_t *store, int backend) { - struct b2b_sdp_client *client = *(struct b2b_sdp_client **)((str *)param)->s; + struct b2b_sdp_client *client; + if (!store) + return; + + if (!param || !param->s) + client = *(struct b2b_sdp_client **)((str *)param)->s; + else + client = NULL; switch (event_type) { + case B2B_EVENT_CREATE: + b2b_sdp_client_event_receive_create(key, store); break; case B2B_EVENT_UPDATE: b2b_sdp_client_event_receive_update(client, store); @@ -1850,7 +1968,6 @@ static void b2b_sdp_client_event_received(enum b2b_entity_type et, str *key, case B2B_EVENT_DELETE: b2b_sdp_client_event_receive_delete(client, store); break; - case B2B_EVENT_CREATE: default: /* nothing else for now */ break;