Skip to content

Commit

Permalink
Merge "app_queue: queue members can receive multiple calls" into 13
Browse files Browse the repository at this point in the history
  • Loading branch information
jcolp authored and Gerrit Code Review committed Apr 25, 2016
2 parents 6c28e8c + c345e53 commit 83dadc4
Showing 1 changed file with 101 additions and 44 deletions.
145 changes: 101 additions & 44 deletions apps/app_queue.c
Expand Up @@ -1510,7 +1510,6 @@ struct member {
struct call_queue *lastqueue; /*!< Last queue we received a call */
unsigned int dead:1; /*!< Used to detect members deleted in realtime */
unsigned int delme:1; /*!< Flag to delete entry on reload */
unsigned int call_pending:1; /*!< TRUE if the Q is attempting to place a call to the member. */
char rt_uniqueid[80]; /*!< Unique id of realtime member entry */
unsigned int ringinuse:1; /*!< Flag to ring queue members even if their status is 'inuse' */
};
Expand Down Expand Up @@ -2267,6 +2266,70 @@ static int get_member_status(struct call_queue *q, int max_penalty, int min_pena
return -1;
}

/*
* A "pool" of member objects that calls are currently pending on. If an
* agent is a member of multiple queues it's possible for that agent to be
* called by each of the queues at the same time. This happens because device
* state is slow to notify the queue app of one of it's member's being rung.
* This "pool" allows us to track which members are currently being rung while
* we wait on the device state change.
*/
static struct ao2_container *pending_members;
#define MAX_CALL_ATTEMPT_BUCKETS 353

static int pending_members_hash(const void *obj, const int flags)
{
const struct member *object;
const char *key;

switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_KEY:
key = obj;
break;
case OBJ_SEARCH_OBJECT:
object = obj;
key = object->interface;
break;
default:
ast_assert(0);
return 0;
}
return ast_str_case_hash(key);
}

static int pending_members_cmp(void *obj, void *arg, int flags)
{
const struct member *object_left = obj;
const struct member *object_right = arg;
const char *right_key = arg;
int cmp;

switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_OBJECT:
right_key = object_right->interface;
/* Fall through */
case OBJ_SEARCH_KEY:
cmp = strcasecmp(object_left->interface, right_key);
break;
case OBJ_SEARCH_PARTIAL_KEY:
/* Not supported by container. */
ast_assert(0);
return 0;
default:
cmp = 0;
break;
}
if (cmp) {
return 0;
}
return CMP_MATCH;
}

static void pending_members_remove(struct member *mem)
{
ao2_find(pending_members, mem, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK);
}

/*! \brief set a member's status based on device state of that member's state_interface.
*
* Lock interface list find sc, iterate through each queues queue_member list for member to
Expand All @@ -2276,6 +2339,9 @@ static void update_status(struct call_queue *q, struct member *m, const int stat
{
m->status = status;

/* Whatever the status is clear the member from the pending members pool */
pending_members_remove(m);

queue_publish_member_blob(queue_member_status_type(), queue_member_blob_create(q, m));
}

Expand Down Expand Up @@ -3132,6 +3198,7 @@ static void member_add_to_queue(struct call_queue *queue, struct member *mem)
*/
static void member_remove_from_queue(struct call_queue *queue, struct member *mem)
{
pending_members_remove(mem);
ao2_lock(queue->members);
ast_devstate_changed(QUEUE_UNKNOWN_PAUSED_DEVSTATE, AST_DEVSTATE_CACHABLE, "Queue:%s_pause_%s", queue->name, mem->interface);
queue_member_follower_removal(queue, mem);
Expand Down Expand Up @@ -4108,41 +4175,6 @@ static int member_status_available(int status)
return status == AST_DEVICE_NOT_INUSE || status == AST_DEVICE_UNKNOWN;
}

/*!
* \internal
* \brief Clear the member call pending flag.
*
* \param mem Queue member.
*
* \return Nothing
*/
static void member_call_pending_clear(struct member *mem)
{
ao2_lock(mem);
mem->call_pending = 0;
ao2_unlock(mem);
}

/*!
* \internal
* \brief Set the member call pending flag.
*
* \param mem Queue member.
*
* \retval non-zero if call pending flag was already set.
*/
static int member_call_pending_set(struct member *mem)
{
int old_pending;

ao2_lock(mem);
old_pending = mem->call_pending;
mem->call_pending = 1;
ao2_unlock(mem);

return old_pending;
}

/*!
* \internal
* \brief Determine if can ring a queue entry.
Expand Down Expand Up @@ -4185,12 +4217,31 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call)
}

if (!call->member->ringinuse) {
if (member_call_pending_set(call->member)) {
ast_debug(1, "%s has another call pending, can't receive call\n",
call->interface);
struct member *mem;

ao2_lock(pending_members);

mem = ao2_find(pending_members, call->member,
OBJ_SEARCH_OBJECT | OBJ_NOLOCK);
if (mem) {
/*
* If found that means this member is currently being attempted
* from another calling thread, so stop trying from this thread
*/
ast_debug(1, "%s has another call trying, can't receive call\n",
call->interface);
ao2_ref(mem, -1);
ao2_unlock(pending_members);
return 0;
}

/*
* If not found add it to the container so another queue
* won't attempt to call this member at the same time.
*/
ao2_link(pending_members, call->member);
ao2_unlock(pending_members);

/*
* The queue member is available. Get current status to be sure
* because the device state and extension state callbacks may
Expand All @@ -4199,7 +4250,7 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call)
if (!member_status_available(get_queue_member_status(call->member))) {
ast_debug(1, "%s actually not available, can't receive call\n",
call->interface);
member_call_pending_clear(call->member);
pending_members_remove(call->member);
return 0;
}
}
Expand Down Expand Up @@ -4236,7 +4287,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
++*busies;
return 0;
}
ast_assert(tmp->member->ringinuse || tmp->member->call_pending);

ast_copy_string(tech, tmp->interface, sizeof(tech));
if ((location = strchr(tech, '/'))) {
Expand All @@ -4253,7 +4303,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
qe->linpos++;
ao2_unlock(qe->parent);

member_call_pending_clear(tmp->member);
pending_members_remove(tmp->member);

publish_dial_end_event(qe->chan, tmp, NULL, "BUSY");
tmp->stillgoing = 0;
Expand Down Expand Up @@ -4324,7 +4374,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
/* Again, keep going even if there's an error */
ast_verb(3, "Couldn't call %s\n", tmp->interface);
do_hang(tmp);
member_call_pending_clear(tmp->member);
pending_members_remove(tmp->member);
++*busies;
return 0;
}
Expand All @@ -4344,7 +4394,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies

ast_verb(3, "Called %s\n", tmp->interface);

member_call_pending_clear(tmp->member);
return 1;
}

Expand Down Expand Up @@ -10805,6 +10854,7 @@ static int unload_module(void)
ast_extension_state_del(0, extension_state_cb);

ast_unload_realtime("queue_members");
ao2_cleanup(pending_members);
ao2_cleanup(queues);
queues = NULL;
return 0;
Expand Down Expand Up @@ -10833,6 +10883,13 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}

pending_members = ao2_container_alloc(
MAX_CALL_ATTEMPT_BUCKETS, pending_members_hash, pending_members_cmp);
if (!pending_members) {
unload_module();
return AST_MODULE_LOAD_DECLINE;
}

use_weight = 0;

if (reload_handler(0, &mask, NULL)) {
Expand Down

0 comments on commit 83dadc4

Please sign in to comment.