Skip to content

Commit

Permalink
res_pjsip_pubsub: Add new pubsub module capabilities.
Browse files Browse the repository at this point in the history
The existing res_pjsip_pubsub APIs are somewhat limited in
what they can do. This adds a few API extensions that make
it possible for PJSIP pubsub modules to implement richer
features than is currently possible.

* Allow pubsub modules to get a handle to pjsip_rx_data on subscription
* Allow pubsub modules to run a callback when a subscription is renewed
* Allow pubsub modules to run a callback for outgoing NOTIFYs, with
  a handle to the tdata, so that modules can append their own headers
  to the NOTIFYs

This change does not add any features directly, but makes possible
several new features that will be added in future changes.

Resolves: #81
ASTERISK-30485 #close

Master-Only: True
  • Loading branch information
InterLinked1 committed May 18, 2023
1 parent 172a1a9 commit 32b05a7
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 37 deletions.
33 changes: 33 additions & 0 deletions include/asterisk/res_pjsip_pubsub.h
Expand Up @@ -232,6 +232,8 @@ enum ast_sip_subscription_notify_reason {
#define AST_SIP_EXTEN_STATE_DATA "ast_sip_exten_state_data"
/*! Type used for conveying mailbox state */
#define AST_SIP_MESSAGE_ACCUMULATOR "ast_sip_message_accumulator"
/*! Type used for device feature synchronization */
#define AST_SIP_DEVICE_FEATURE_SYNC_DATA "ast_sip_device_feature_sync_data"

/*!
* \brief Data used to create bodies for NOTIFY/PUBLISH requests.
Expand Down Expand Up @@ -268,6 +270,18 @@ struct ast_sip_notifier {
* \return The response code to send to the SUBSCRIBE.
*/
int (*new_subscribe)(struct ast_sip_endpoint *endpoint, const char *resource);
/*!
* \brief Same as new_subscribe, but also pass a handle to the pjsip_rx_data
*
* \note If this callback exists, it will be executed, otherwise new_subscribe will be.
* Only use this if you need the rdata. Otherwise, use new_subscribe.
*
* \param endpoint The endpoint from which we received the SUBSCRIBE
* \param resource The name of the resource to which the subscription is being made
* \param rdata The pjsip_rx_data for incoming subscription
* \return The response code to send to the SUBSCRIBE.
*/
int (*new_subscribe_with_rdata)(struct ast_sip_endpoint *endpoint, const char *resource, pjsip_rx_data *rdata);
/*!
* \brief Called when an inbound subscription has been accepted.
*
Expand All @@ -282,6 +296,25 @@ struct ast_sip_notifier {
* \retval -1 Failure
*/
int (*subscription_established)(struct ast_sip_subscription *sub);
/*!
* \brief Called when a SUBSCRIBE arrives for an already active subscription.
*
* \param sub The existing subscription
* \retval 0 Success
* \retval -1 Failure
*/
int (*refresh_subscribe)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata);
/*!
* \brief Optional callback to execute before sending outgoing NOTIFY requests.
* Because res_pjsip_pubsub creates the tdata internally, this allows modules
* to access the tdata if needed, e.g. to add custom headers.
*
* \param sub The existing subscription
* \param tdata The pjsip_tx_data to use for the outgoing NOTIFY
* \retval 0 Success
* \retval -1 Failure
*/
int (*notify_created)(struct ast_sip_subscription *sub, pjsip_tx_data *tdata);
/*!
* \brief Supply data needed to create a NOTIFY body.
*
Expand Down
94 changes: 57 additions & 37 deletions res/res_pjsip_pubsub.c
Expand Up @@ -1018,6 +1018,8 @@ static int have_visited(const char *resource, struct resources *visited)
return 0;
}

#define NEW_SUBSCRIBE(notifier, endpoint, resource, rdata) notifier->new_subscribe_with_rdata ? notifier->new_subscribe_with_rdata(endpoint, resource, rdata) : notifier->new_subscribe(endpoint, resource)

/*!
* \brief Build child nodes for a given parent.
*
Expand All @@ -1040,7 +1042,7 @@ static int have_visited(const char *resource, struct resources *visited)
* \param visited The resources that have already been visited.
*/
static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
struct resource_list *list, struct tree_node *parent, struct resources *visited)
struct resource_list *list, struct tree_node *parent, struct resources *visited, pjsip_rx_data *rdata)
{
int i;

Expand All @@ -1056,7 +1058,7 @@ static void build_node_children(struct ast_sip_endpoint *endpoint, const struct

child_list = retrieve_resource_list(resource, list->event);
if (!child_list) {
int resp = handler->notifier->new_subscribe(endpoint, resource);
int resp = NEW_SUBSCRIBE(handler->notifier, endpoint, resource, rdata);
if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
char display_name[AST_MAX_EXTENSION] = "";
if (list->resource_display_name && handler->notifier->get_resource_display_name) {
Expand Down Expand Up @@ -1085,7 +1087,7 @@ static void build_node_children(struct ast_sip_endpoint *endpoint, const struct
ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
continue;
}
build_node_children(endpoint, handler, child_list, current, visited);
build_node_children(endpoint, handler, child_list, current, visited, rdata);
if (AST_VECTOR_SIZE(&current->children) > 0) {
ast_debug(1, "List %s had no successful children.\n", resource);
if (AST_VECTOR_APPEND(&parent->children, current)) {
Expand Down Expand Up @@ -1158,19 +1160,21 @@ static void resource_tree_destroy(struct resource_tree *tree)
* \retval 300-699 Failure to subscribe to requested resource.
*/
static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
const char *resource, struct resource_tree *tree, int has_eventlist_support)
const char *resource, struct resource_tree *tree, int has_eventlist_support, pjsip_rx_data *rdata)
{
RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
struct resources visited;

if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
int not_eventlist_but_needs_children = !strcmp(handler->body_type, AST_SIP_DEVICE_FEATURE_SYNC_DATA);

if ((!has_eventlist_support && !not_eventlist_but_needs_children) || !(list = retrieve_resource_list(resource, handler->event_name))) {
ast_debug(2, "Subscription '%s->%s' is not to a list\n",
ast_sorcery_object_get_id(endpoint), resource);
tree->root = tree_node_alloc(resource, NULL, 0, NULL);
if (!tree->root) {
return 500;
}
return handler->notifier->new_subscribe(endpoint, resource);
return NEW_SUBSCRIBE(handler->notifier, endpoint, resource, rdata);
}

ast_debug(2, "Subscription '%s->%s' is a list\n",
Expand All @@ -1187,7 +1191,7 @@ static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct a

tree->notification_batch_interval = list->notification_batch_interval;

build_node_children(endpoint, handler, list, tree->root, &visited);
build_node_children(endpoint, handler, list, tree->root, &visited, rdata);
AST_VECTOR_FREE(&visited);

if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
Expand Down Expand Up @@ -1380,6 +1384,7 @@ static void shutdown_subscriptions(struct ast_sip_subscription *sub)
sub->handler->subscription_shutdown(sub);
}
}

static int subscription_unreference_dialog(void *obj)
{
struct sip_subscription_tree *sub_tree = obj;
Expand Down Expand Up @@ -1674,7 +1679,7 @@ static int sub_persistence_recreate(void *obj)

memset(&tree, 0, sizeof(tree));
resp = build_resource_tree(endpoint, handler, resource, &tree,
ast_sip_pubsub_has_eventlist_support(rdata));
ast_sip_pubsub_has_eventlist_support(rdata), rdata);
if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
pj_status_t dlg_status;

Expand Down Expand Up @@ -2454,6 +2459,16 @@ static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
return require;
}

static void set_state_terminated(struct ast_sip_subscription *sub)
{
int i;

sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
set_state_terminated(AST_VECTOR_GET(&sub->children, i));
}
}

/*!
* \brief Send a NOTIFY request to a subscriber
*
Expand Down Expand Up @@ -2491,6 +2506,12 @@ static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int forc
pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
}

if (sub_tree->root->handler->notifier->notify_created) {
/* The module for this event wants a callback to the pjsip_tx_data,
* e.g. so it can add custom headers or do something custom to the response. */
sub_tree->root->handler->notifier->notify_created(sub_tree->root, tdata);
}

if (sip_subscription_send_request(sub_tree, tdata)) {
/* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */
return -1;
Expand Down Expand Up @@ -2954,6 +2975,7 @@ static int generate_initial_notify(struct ast_sip_subscription *sub)

notify_data = sub->handler->notifier->get_notify_data(sub);
if (!notify_data) {
ast_debug(3, "No notify data, not generating any body content\n");
return -1;
}

Expand Down Expand Up @@ -3085,7 +3107,7 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)

memset(&tree, 0, sizeof(tree));
resp = build_resource_tree(endpoint, handler, resource, &tree,
ast_sip_pubsub_has_eventlist_support(rdata));
ast_sip_pubsub_has_eventlist_support(rdata), rdata);
if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
resource_tree_destroy(&tree);
Expand All @@ -3095,6 +3117,7 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status, NULL);
if (!sub_tree) {
if (dlg_status != PJ_EEXISTS) {
ast_debug(3, "No dialog exists, rejecting\n");
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
}
} else {
Expand Down Expand Up @@ -3331,6 +3354,7 @@ static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoi
publication->handler = handler;
if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
AST_SIP_PUBLISH_STATE_INITIALIZED)) {
ast_debug(3, "Publication state change failed\n");
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
ao2_cleanup(publication);
return NULL;
Expand Down Expand Up @@ -3760,16 +3784,6 @@ static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
return PJ_FALSE;
}

static void set_state_terminated(struct ast_sip_subscription *sub)
{
int i;

sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
set_state_terminated(AST_VECTOR_GET(&sub->children, i));
}
}

/*!
* \brief Callback sequence for subscription terminate:
*
Expand Down Expand Up @@ -3852,7 +3866,8 @@ static void set_state_terminated(struct ast_sip_subscription *sub)


/* The code in this function was previously in pubsub_on_evsub_state. */
static void clean_sub_tree(pjsip_evsub *evsub){
static void clean_sub_tree(pjsip_evsub *evsub)
{

struct sip_subscription_tree *sub_tree;
sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
Expand Down Expand Up @@ -3912,7 +3927,6 @@ static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
return;
}


/* It's easier to write this as what we WANT to process, then negate it. */
if (!(sub_tree->state == SIP_SUB_TREE_TERMINATE_IN_PROGRESS
|| (event->type == PJSIP_EVENT_TSX_STATE && sub_tree->state == SIP_SUB_TREE_NORMAL)
Expand All @@ -3927,9 +3941,8 @@ static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
This was previously handled by pubsub_on_rx_refresh setting:
'sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING' */
if (event->body.tsx_state.type == PJSIP_EVENT_RX_MSG &&
!pjsip_method_cmp(&event->body.tsx_state.tsx->method, &pjsip_subscribe_method) &&
pjsip_evsub_get_expires(evsub) == 0) {

!pjsip_method_cmp(&event->body.tsx_state.tsx->method, &pjsip_subscribe_method) &&
pjsip_evsub_get_expires(evsub) == 0) {
ast_debug(3, "Subscription ending, do nothing.\n");
return;
}
Expand Down Expand Up @@ -4058,6 +4071,7 @@ static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
{
struct sip_subscription_tree *sub_tree;
struct ast_sip_subscription_handler *handler;

sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
ast_debug(3, "evsub %p sub_tree %p sub_tree state %s\n", evsub, sub_tree,
Expand Down Expand Up @@ -4085,25 +4099,31 @@ static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
}

handler = sub_tree->root->handler;

/* If the handler wants a callback on refresh, then do it (some protocols require this). */
if (sub_tree->state == SIP_SUB_TREE_NORMAL && handler && handler->notifier->refresh_subscribe) {
if (!handler->notifier->refresh_subscribe(sub_tree->root, rdata)) {
return; /* If the callback handled it, we're done. */
}
}

if (sub_tree->state == SIP_SUB_TREE_NORMAL && sub_tree->is_list) {
/* update RLS */
const char *resource = sub_tree->root->resource;
struct ast_sip_subscription *old_root = sub_tree->root;
struct ast_sip_subscription *new_root = NULL;
RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
struct ast_sip_subscription_handler *handler = NULL;

struct ast_sip_pubsub_body_generator *generator = NULL;

if ((endpoint = ast_pjsip_rdata_get_endpoint(rdata))
&& (handler = subscription_get_handler_from_rdata(rdata, ast_sorcery_object_get_id(endpoint)))
&& (generator = subscription_get_generator_from_rdata(rdata, handler))) {
if ((generator = subscription_get_generator_from_rdata(rdata, handler))) {

struct resource_tree tree;
int resp;

memset(&tree, 0, sizeof(tree));
resp = build_resource_tree(endpoint, handler, resource, &tree,
ast_sip_pubsub_has_eventlist_support(rdata));
ast_sip_pubsub_has_eventlist_support(rdata), rdata);
if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
new_root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree.root);
if (new_root) {
Expand Down Expand Up @@ -5330,7 +5350,7 @@ AST_TEST_DEFINE(resource_tree)
}

tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
Expand Down Expand Up @@ -5400,7 +5420,7 @@ AST_TEST_DEFINE(complex_resource_tree)
}

tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
Expand Down Expand Up @@ -5461,7 +5481,7 @@ AST_TEST_DEFINE(bad_resource)
}

tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
Expand Down Expand Up @@ -5530,7 +5550,7 @@ AST_TEST_DEFINE(bad_branch)
}

tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
Expand Down Expand Up @@ -5603,7 +5623,7 @@ AST_TEST_DEFINE(duplicate_resource)
}

tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
Expand Down Expand Up @@ -5675,7 +5695,7 @@ AST_TEST_DEFINE(loop)
}

tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "herp", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "herp", tree, 1, NULL);
if (resp == 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
Expand Down Expand Up @@ -5722,7 +5742,7 @@ AST_TEST_DEFINE(bad_event)
/* Since the test_handler is for event "test", this should not build a list, but
* instead result in a single resource being created, called "foo"
*/
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
Expand Down

0 comments on commit 32b05a7

Please sign in to comment.