Skip to content

Commit

Permalink
track active packets in a connection
Browse files Browse the repository at this point in the history
  • Loading branch information
alandekok committed Oct 10, 2017
1 parent 8cb1576 commit bd69498
Showing 1 changed file with 80 additions and 50 deletions.
130 changes: 80 additions & 50 deletions src/modules/rlm_radius/rlm_radius_udp.c
Expand Up @@ -119,6 +119,8 @@ typedef struct rlm_radius_udp_connection_t {
uint32_t max_packet_size; //!< Our max packet size. may be different from the parent.
int fd; //!< File descriptor.

int active_requests; //!< number of active requests handled by this connection

fr_ipaddr_t dst_ipaddr; //!< IP of the home server. stupid 'const' issues.
uint16_t dst_port; //!< Port of the home server.
fr_ipaddr_t src_ipaddr; //!< Our source IP.
Expand All @@ -134,13 +136,13 @@ typedef struct rlm_radius_udp_connection_t {


typedef enum rlm_radius_request_state_t {
REQUEST_STATUS_INIT = 0,
REQUEST_STATUS_WRITE, //!< trying a short-circuit write
REQUEST_STATUS_THREAD, //!< in the thread queue
REQUEST_STATUS_QUEUED, //!< in the connection queue
REQUEST_STATUS_SENT, //!< in the connection "sent" heap
REQUEST_STATUS_RESUMABLE, //!< timed out, or received a reply
REQUEST_STATUS_FINISHED, //!< and done
PACKET_STATE_INIT = 0,
PACKET_STATE_WRITE, //!< trying a short-circuit write
PACKET_STATE_THREAD, //!< in the thread queue
PACKET_STATE_QUEUED, //!< in the connection queue
PACKET_STATE_SENT, //!< in the connection "sent" heap
PACKET_STATE_RESUMABLE, //!< timed out, or received a reply
PACKET_STATE_FINISHED, //!< and done
} rlm_radius_request_state_t;


Expand Down Expand Up @@ -278,7 +280,10 @@ static void conn_idle(rlm_radius_udp_connection_t *c)
* No outstanding packets, we're idle.
*/
if ((fr_heap_num_elements(c->queued) == 0) &&
(FR_DLIST_FIRST(c->sent) == NULL)) break;
(FR_DLIST_FIRST(c->sent) == NULL)) {
rad_assert(c->active_requests == 0);
break;
}

if (c->idle_ev) (void) fr_event_timer_delete(c->thread->el, &c->idle_ev);
return;
Expand Down Expand Up @@ -391,7 +396,8 @@ static void conn_zombie_timeout(UNUSED fr_event_list_t *el, UNUSED struct timeva
rlm_radius_udp_request_t *u = c->status_u;

(void) fr_heap_insert(c->queued, u);
u->state = REQUEST_STATUS_QUEUED;
u->state = PACKET_STATE_QUEUED;
c->active_requests++;
if (!c->pending) fd_active(c);
return;
}
Expand Down Expand Up @@ -456,7 +462,7 @@ static void conn_error(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int fla

static void mod_finished_request(rlm_radius_udp_connection_t *c, rlm_radius_udp_request_t *u)
{
rad_assert(u->state != REQUEST_STATUS_FINISHED);
rad_assert(u->state != PACKET_STATE_FINISHED);

/*
* Delete the tracking table entry, and remove the
Expand All @@ -466,16 +472,20 @@ static void mod_finished_request(rlm_radius_udp_connection_t *c, rlm_radius_udp_
rad_assert(u != c->status_u);

switch (u->state) {
case REQUEST_STATUS_THREAD:
case PACKET_STATE_THREAD:
(void) fr_heap_extract(u->thread->queued, u);
break;

case REQUEST_STATUS_QUEUED:
case PACKET_STATE_QUEUED:
(void) fr_heap_extract(c->queued, u);
rad_assert(c->active_requests > 0);
c->active_requests--;
break;

case REQUEST_STATUS_SENT:
case PACKET_STATE_SENT:
fr_dlist_remove(&u->entry);
rad_assert(c->active_requests > 0);
c->active_requests--;
break;

default:
Expand All @@ -487,15 +497,15 @@ static void mod_finished_request(rlm_radius_udp_connection_t *c, rlm_radius_udp_
u->rr = NULL;
u->c = NULL;
} else {
if (u->state == REQUEST_STATUS_THREAD) {
if (u->state == PACKET_STATE_THREAD) {
(void) fr_heap_extract(u->thread->queued, u);
}
rad_assert(u->rr == NULL);
}

if (u->timer.ev) (void) fr_event_timer_delete(u->thread->el, &u->timer.ev);

u->state = REQUEST_STATUS_RESUMABLE;
u->state = PACKET_STATE_RESUMABLE;
unlang_resumable(u->link->request);
}

Expand Down Expand Up @@ -867,6 +877,12 @@ static void conn_read(fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
#endif
fr_pair_list_free(&request->reply->vps);

rad_assert(u->state == PACKET_STATE_SENT);
fr_dlist_remove(&u->entry);
u->state = PACKET_STATE_INIT;
rad_assert(c->active_requests > 0);
c->active_requests--;

} else {
/*
* It's a normal request. Mark it as finished.
Expand Down Expand Up @@ -928,7 +944,8 @@ static void status_check_timeout(UNUSED fr_event_list_t *el, struct timeval *now
*/
fr_dlist_remove(&u->entry);
(void) fr_heap_insert(c->queued, u);
u->state = REQUEST_STATUS_QUEUED;
c->active_requests++;
u->state = PACKET_STATE_QUEUED;
if (!c->pending) fd_active(c);
}

Expand Down Expand Up @@ -1042,8 +1059,7 @@ static void retransmit_packet(rlm_radius_udp_request_t *u, struct timeval *now)
return;
}

u->state = REQUEST_STATUS_SENT;
DEBUG("SENT %d", __LINE__);
u->state = PACKET_STATE_SENT;
}

static rlm_radius_udp_connection_t *connection_get(rlm_radius_udp_request_t *u);
Expand All @@ -1059,21 +1075,21 @@ static void response_timeout(fr_event_list_t *el, struct timeval *now, void *uct
REQUEST *request;

rad_assert(u->timer.ev == NULL);
rad_assert(u->state == REQUEST_STATUS_SENT);
rad_assert(u->state == PACKET_STATE_SENT);

/*
* The packet timed out while it didn't have a
* connection, or the connection was closed. Try to grab
* a new connection.
*/
if (!c) {
rad_assert(u->state == REQUEST_STATUS_THREAD);
rad_assert(u->state == PACKET_STATE_THREAD);
c = connection_get(u);
}

if (c) {
rad_assert(u->timer.retry == &c->inst->parent->retry[u->code]);
rad_assert(u->state == REQUEST_STATUS_QUEUED);
rad_assert(u->state == PACKET_STATE_QUEUED);
}

request = u->link->request;
Expand Down Expand Up @@ -1121,7 +1137,7 @@ static void response_timeout(fr_event_list_t *el, struct timeval *now, void *uct
RDEBUG("Retransmitting ID %d on connection %s", u->rr->id, c->name);
retransmit_packet(u, now);
} else {
rad_assert(u->state == REQUEST_STATUS_THREAD);
rad_assert(u->state == PACKET_STATE_THREAD);
RDEBUG("No available connections for retransmission. Waiting %d.%06ds for retry",
u->timer.rt / USEC, u->timer.rt % USEC);
}
Expand Down Expand Up @@ -1152,9 +1168,9 @@ static int conn_write(rlm_radius_udp_connection_t *c, rlm_radius_udp_request_t *
rad_assert(c->inst->parent->allowed[u->code] || (u == c->status_u));
if (c->idle_ev) (void) fr_event_timer_delete(c->thread->el, &c->idle_ev);

rad_assert((u->state == REQUEST_STATUS_WRITE) ||
(u->state == REQUEST_STATUS_SENT) ||
(u->state == REQUEST_STATUS_QUEUED));
rad_assert((u->state == PACKET_STATE_WRITE) ||
(u->state == PACKET_STATE_SENT) ||
(u->state == PACKET_STATE_QUEUED));
request = u->link->request;

/*
Expand Down Expand Up @@ -1389,7 +1405,7 @@ static int conn_write(rlm_radius_udp_connection_t *c, rlm_radius_udp_request_t *
* timers, etc.
*/
if (c->inst->replicate) {
u->state = REQUEST_STATUS_FINISHED;
u->state = PACKET_STATE_FINISHED;
return 2;
}

Expand Down Expand Up @@ -1484,23 +1500,22 @@ static void conn_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int
while ((u = fr_heap_pop(c->queued)) != NULL) {
int rcode;

rad_assert(u->state == REQUEST_STATUS_QUEUED);
rad_assert(u->state == PACKET_STATE_QUEUED);
rcode = conn_write(c, u);

// @todo - do something intelligent on error..
if (rcode <= 0) break;

if (rcode == 1) {
fr_dlist_insert_tail(&c->sent, &u->entry);
u->state = REQUEST_STATUS_SENT;
DEBUG("SENT %d", __LINE__);
u->state = PACKET_STATE_SENT;
continue;
}

/*
* Was replicated: can resume it immediately.
*/
u->state = REQUEST_STATUS_RESUMABLE;
u->state = PACKET_STATE_RESUMABLE;
unlang_resumable(u->link->request);
}

Expand Down Expand Up @@ -1598,17 +1613,21 @@ static int udp_request_free(rlm_radius_udp_request_t *u)
}

switch (u->state) {
case REQUEST_STATUS_THREAD:
case PACKET_STATE_THREAD:
rad_assert(u->c == NULL);
(void) fr_heap_extract(u->thread->queued, u);
return 0;

case REQUEST_STATUS_QUEUED:
case PACKET_STATE_QUEUED:
(void) fr_heap_extract(u->c->queued, u);
rad_assert(u->c->active_requests > 0);
u->c->active_requests--;
break;

case REQUEST_STATUS_SENT:
case PACKET_STATE_SENT:
fr_dlist_remove(&u->entry);
rad_assert(u->c->active_requests > 0);
u->c->active_requests--;
break;

default:
Expand Down Expand Up @@ -1750,11 +1769,11 @@ static fr_connection_state_t _conn_failed(int fd, fr_connection_state_t state, v
*/
u->rr = NULL;

rad_assert(u->state == REQUEST_STATUS_SENT);
rad_assert(u->state == PACKET_STATE_SENT);
fr_dlist_remove(&u->entry);
u->c = NULL;
(void) fr_heap_insert(c->thread->queued, u);
u->state = REQUEST_STATUS_THREAD;
u->state = PACKET_STATE_THREAD;
c->thread->pending = true;
}

Expand Down Expand Up @@ -2016,34 +2035,45 @@ static int _conn_free(rlm_radius_udp_connection_t *c)
while ((entry = FR_DLIST_FIRST(c->sent)) != NULL) {
u = fr_ptr_to_type(rlm_radius_udp_request_t, entry, entry);

rad_assert(u->state == PACKET_STATE_SENT);
rad_assert(u->c == c);
rad_assert(c->active_requests > 0);
c->active_requests--;
u->c = NULL;

/*
* Don't bother freeing individual entries. They
* will get deleted when c->id is free'd.
*/
u->rr = NULL;
u->c = NULL;

if (u->timer.ev) (void) fr_event_timer_delete(c->thread->el, &u->timer.ev);

rad_assert(u->state == REQUEST_STATUS_SENT);
fr_dlist_remove(&u->entry);
(void) fr_heap_insert(t->queued, u);
u->state = REQUEST_STATUS_THREAD;
u->state = PACKET_STATE_THREAD;
t->pending = true;
}

/*
* Move "queued" packets back to the main thread queue
*/
while ((u = fr_heap_pop(c->queued)) != NULL) {
u->rr = NULL;
rad_assert(u->state == PACKET_STATE_QUEUED);
rad_assert(u->c == c);
rad_assert(c->active_requests > 0);
c->active_requests--;
u->c = NULL;

rad_assert(u->state == REQUEST_STATUS_QUEUED);
u->rr = NULL;

(void) fr_heap_insert(t->queued, u);
u->state = REQUEST_STATUS_THREAD;
u->state = PACKET_STATE_THREAD;
t->pending = true;
}

rad_assert(c->active_requests == 0);

switch (c->state) {
default:
rad_assert(0 == 1);
Expand Down Expand Up @@ -2185,11 +2215,11 @@ static rlm_radius_udp_connection_t *connection_get(rlm_radius_udp_request_t *u)
* Remove it from the main thread queue, and add
* it to the connection queue.
*/
rad_assert((u->state == REQUEST_STATUS_THREAD) ||
(u->state == REQUEST_STATUS_WRITE));
DEBUG("QUEUE AT %d", __LINE__);
rad_assert((u->state == PACKET_STATE_THREAD) ||
(u->state == PACKET_STATE_WRITE));
(void) fr_heap_insert(c->queued, u);
u->state = REQUEST_STATUS_QUEUED;
u->state = PACKET_STATE_QUEUED;
c->active_requests++;

if (!c->pending) fd_active(c);

Expand Down Expand Up @@ -2255,7 +2285,7 @@ static rlm_rcode_t mod_push(void *instance, REQUEST *request, rlm_radius_link_t
return RLM_MODULE_FAIL;
}

u->state = REQUEST_STATUS_WRITE;
u->state = PACKET_STATE_WRITE;
u->link = link;
u->code = request->packet->code;
u->thread = t;
Expand Down Expand Up @@ -2285,7 +2315,7 @@ static rlm_rcode_t mod_push(void *instance, REQUEST *request, rlm_radius_link_t
*/
t->pending = true;
(void) fr_heap_insert(t->queued, u);
u->state = REQUEST_STATUS_THREAD;
u->state = PACKET_STATE_THREAD;
return RLM_MODULE_YIELD;
}

Expand All @@ -2310,7 +2340,7 @@ static rlm_rcode_t mod_push(void *instance, REQUEST *request, rlm_radius_link_t
* actively trying to write.
*/
if (rcode == 0) {
rad_assert(u->state == REQUEST_STATUS_QUEUED);
rad_assert(u->state == PACKET_STATE_QUEUED);
fd_active(c);
return RLM_MODULE_YIELD;
}
Expand All @@ -2321,15 +2351,15 @@ static rlm_rcode_t mod_push(void *instance, REQUEST *request, rlm_radius_link_t
* waiting for the reply.
*/
if (rcode == 1) {
rad_assert(u->state == REQUEST_STATUS_SENT);
rad_assert(u->state == PACKET_STATE_SENT);
return RLM_MODULE_YIELD;
}

/*
* We replicated the packet, so we return "ok", and don't
* care about the reply.
*/
u->state = REQUEST_STATUS_FINISHED;
u->state = PACKET_STATE_FINISHED;
return RLM_MODULE_OK;
}

Expand Down

0 comments on commit bd69498

Please sign in to comment.