From bd6949819396b780f8e9b127aa6b14ca83f5c9e6 Mon Sep 17 00:00:00 2001 From: "Alan T. DeKok" Date: Tue, 10 Oct 2017 14:43:12 -0400 Subject: [PATCH] track active packets in a connection --- src/modules/rlm_radius/rlm_radius_udp.c | 130 +++++++++++++++--------- 1 file changed, 80 insertions(+), 50 deletions(-) diff --git a/src/modules/rlm_radius/rlm_radius_udp.c b/src/modules/rlm_radius/rlm_radius_udp.c index 6e0118d5e104..f3169c2bdc0f 100644 --- a/src/modules/rlm_radius/rlm_radius_udp.c +++ b/src/modules/rlm_radius/rlm_radius_udp.c @@ -119,6 +119,8 @@ typedef struct rlm_radius_udp_connection_t { uint32_t max_packet_size; //!< Our max packet size. may be different from the parent. int fd; //!< File descriptor. + int active_requests; //!< number of active requests handled by this connection + fr_ipaddr_t dst_ipaddr; //!< IP of the home server. stupid 'const' issues. uint16_t dst_port; //!< Port of the home server. fr_ipaddr_t src_ipaddr; //!< Our source IP. @@ -134,13 +136,13 @@ typedef struct rlm_radius_udp_connection_t { typedef enum rlm_radius_request_state_t { - REQUEST_STATUS_INIT = 0, - REQUEST_STATUS_WRITE, //!< trying a short-circuit write - REQUEST_STATUS_THREAD, //!< in the thread queue - REQUEST_STATUS_QUEUED, //!< in the connection queue - REQUEST_STATUS_SENT, //!< in the connection "sent" heap - REQUEST_STATUS_RESUMABLE, //!< timed out, or received a reply - REQUEST_STATUS_FINISHED, //!< and done + PACKET_STATE_INIT = 0, + PACKET_STATE_WRITE, //!< trying a short-circuit write + PACKET_STATE_THREAD, //!< in the thread queue + PACKET_STATE_QUEUED, //!< in the connection queue + PACKET_STATE_SENT, //!< in the connection "sent" heap + PACKET_STATE_RESUMABLE, //!< timed out, or received a reply + PACKET_STATE_FINISHED, //!< and done } rlm_radius_request_state_t; @@ -278,7 +280,10 @@ static void conn_idle(rlm_radius_udp_connection_t *c) * No outstanding packets, we're idle. */ if ((fr_heap_num_elements(c->queued) == 0) && - (FR_DLIST_FIRST(c->sent) == NULL)) break; + (FR_DLIST_FIRST(c->sent) == NULL)) { + rad_assert(c->active_requests == 0); + break; + } if (c->idle_ev) (void) fr_event_timer_delete(c->thread->el, &c->idle_ev); return; @@ -391,7 +396,8 @@ static void conn_zombie_timeout(UNUSED fr_event_list_t *el, UNUSED struct timeva rlm_radius_udp_request_t *u = c->status_u; (void) fr_heap_insert(c->queued, u); - u->state = REQUEST_STATUS_QUEUED; + u->state = PACKET_STATE_QUEUED; + c->active_requests++; if (!c->pending) fd_active(c); return; } @@ -456,7 +462,7 @@ static void conn_error(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int fla static void mod_finished_request(rlm_radius_udp_connection_t *c, rlm_radius_udp_request_t *u) { - rad_assert(u->state != REQUEST_STATUS_FINISHED); + rad_assert(u->state != PACKET_STATE_FINISHED); /* * Delete the tracking table entry, and remove the @@ -466,16 +472,20 @@ static void mod_finished_request(rlm_radius_udp_connection_t *c, rlm_radius_udp_ rad_assert(u != c->status_u); switch (u->state) { - case REQUEST_STATUS_THREAD: + case PACKET_STATE_THREAD: (void) fr_heap_extract(u->thread->queued, u); break; - case REQUEST_STATUS_QUEUED: + case PACKET_STATE_QUEUED: (void) fr_heap_extract(c->queued, u); + rad_assert(c->active_requests > 0); + c->active_requests--; break; - case REQUEST_STATUS_SENT: + case PACKET_STATE_SENT: fr_dlist_remove(&u->entry); + rad_assert(c->active_requests > 0); + c->active_requests--; break; default: @@ -487,7 +497,7 @@ static void mod_finished_request(rlm_radius_udp_connection_t *c, rlm_radius_udp_ u->rr = NULL; u->c = NULL; } else { - if (u->state == REQUEST_STATUS_THREAD) { + if (u->state == PACKET_STATE_THREAD) { (void) fr_heap_extract(u->thread->queued, u); } rad_assert(u->rr == NULL); @@ -495,7 +505,7 @@ static void mod_finished_request(rlm_radius_udp_connection_t *c, rlm_radius_udp_ if (u->timer.ev) (void) fr_event_timer_delete(u->thread->el, &u->timer.ev); - u->state = REQUEST_STATUS_RESUMABLE; + u->state = PACKET_STATE_RESUMABLE; unlang_resumable(u->link->request); } @@ -867,6 +877,12 @@ static void conn_read(fr_event_list_t *el, int fd, UNUSED int flags, void *uctx) #endif fr_pair_list_free(&request->reply->vps); + rad_assert(u->state == PACKET_STATE_SENT); + fr_dlist_remove(&u->entry); + u->state = PACKET_STATE_INIT; + rad_assert(c->active_requests > 0); + c->active_requests--; + } else { /* * It's a normal request. Mark it as finished. @@ -928,7 +944,8 @@ static void status_check_timeout(UNUSED fr_event_list_t *el, struct timeval *now */ fr_dlist_remove(&u->entry); (void) fr_heap_insert(c->queued, u); - u->state = REQUEST_STATUS_QUEUED; + c->active_requests++; + u->state = PACKET_STATE_QUEUED; if (!c->pending) fd_active(c); } @@ -1042,8 +1059,7 @@ static void retransmit_packet(rlm_radius_udp_request_t *u, struct timeval *now) return; } - u->state = REQUEST_STATUS_SENT; - DEBUG("SENT %d", __LINE__); + u->state = PACKET_STATE_SENT; } static rlm_radius_udp_connection_t *connection_get(rlm_radius_udp_request_t *u); @@ -1059,7 +1075,7 @@ static void response_timeout(fr_event_list_t *el, struct timeval *now, void *uct REQUEST *request; rad_assert(u->timer.ev == NULL); - rad_assert(u->state == REQUEST_STATUS_SENT); + rad_assert(u->state == PACKET_STATE_SENT); /* * The packet timed out while it didn't have a @@ -1067,13 +1083,13 @@ static void response_timeout(fr_event_list_t *el, struct timeval *now, void *uct * a new connection. */ if (!c) { - rad_assert(u->state == REQUEST_STATUS_THREAD); + rad_assert(u->state == PACKET_STATE_THREAD); c = connection_get(u); } if (c) { rad_assert(u->timer.retry == &c->inst->parent->retry[u->code]); - rad_assert(u->state == REQUEST_STATUS_QUEUED); + rad_assert(u->state == PACKET_STATE_QUEUED); } request = u->link->request; @@ -1121,7 +1137,7 @@ static void response_timeout(fr_event_list_t *el, struct timeval *now, void *uct RDEBUG("Retransmitting ID %d on connection %s", u->rr->id, c->name); retransmit_packet(u, now); } else { - rad_assert(u->state == REQUEST_STATUS_THREAD); + rad_assert(u->state == PACKET_STATE_THREAD); RDEBUG("No available connections for retransmission. Waiting %d.%06ds for retry", u->timer.rt / USEC, u->timer.rt % USEC); } @@ -1152,9 +1168,9 @@ static int conn_write(rlm_radius_udp_connection_t *c, rlm_radius_udp_request_t * rad_assert(c->inst->parent->allowed[u->code] || (u == c->status_u)); if (c->idle_ev) (void) fr_event_timer_delete(c->thread->el, &c->idle_ev); - rad_assert((u->state == REQUEST_STATUS_WRITE) || - (u->state == REQUEST_STATUS_SENT) || - (u->state == REQUEST_STATUS_QUEUED)); + rad_assert((u->state == PACKET_STATE_WRITE) || + (u->state == PACKET_STATE_SENT) || + (u->state == PACKET_STATE_QUEUED)); request = u->link->request; /* @@ -1389,7 +1405,7 @@ static int conn_write(rlm_radius_udp_connection_t *c, rlm_radius_udp_request_t * * timers, etc. */ if (c->inst->replicate) { - u->state = REQUEST_STATUS_FINISHED; + u->state = PACKET_STATE_FINISHED; return 2; } @@ -1484,7 +1500,7 @@ static void conn_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int while ((u = fr_heap_pop(c->queued)) != NULL) { int rcode; - rad_assert(u->state == REQUEST_STATUS_QUEUED); + rad_assert(u->state == PACKET_STATE_QUEUED); rcode = conn_write(c, u); // @todo - do something intelligent on error.. @@ -1492,15 +1508,14 @@ static void conn_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int if (rcode == 1) { fr_dlist_insert_tail(&c->sent, &u->entry); - u->state = REQUEST_STATUS_SENT; - DEBUG("SENT %d", __LINE__); + u->state = PACKET_STATE_SENT; continue; } /* * Was replicated: can resume it immediately. */ - u->state = REQUEST_STATUS_RESUMABLE; + u->state = PACKET_STATE_RESUMABLE; unlang_resumable(u->link->request); } @@ -1598,17 +1613,21 @@ static int udp_request_free(rlm_radius_udp_request_t *u) } switch (u->state) { - case REQUEST_STATUS_THREAD: + case PACKET_STATE_THREAD: rad_assert(u->c == NULL); (void) fr_heap_extract(u->thread->queued, u); return 0; - case REQUEST_STATUS_QUEUED: + case PACKET_STATE_QUEUED: (void) fr_heap_extract(u->c->queued, u); + rad_assert(u->c->active_requests > 0); + u->c->active_requests--; break; - case REQUEST_STATUS_SENT: + case PACKET_STATE_SENT: fr_dlist_remove(&u->entry); + rad_assert(u->c->active_requests > 0); + u->c->active_requests--; break; default: @@ -1750,11 +1769,11 @@ static fr_connection_state_t _conn_failed(int fd, fr_connection_state_t state, v */ u->rr = NULL; - rad_assert(u->state == REQUEST_STATUS_SENT); + rad_assert(u->state == PACKET_STATE_SENT); fr_dlist_remove(&u->entry); u->c = NULL; (void) fr_heap_insert(c->thread->queued, u); - u->state = REQUEST_STATUS_THREAD; + u->state = PACKET_STATE_THREAD; c->thread->pending = true; } @@ -2016,18 +2035,23 @@ static int _conn_free(rlm_radius_udp_connection_t *c) while ((entry = FR_DLIST_FIRST(c->sent)) != NULL) { u = fr_ptr_to_type(rlm_radius_udp_request_t, entry, entry); + rad_assert(u->state == PACKET_STATE_SENT); + rad_assert(u->c == c); + rad_assert(c->active_requests > 0); + c->active_requests--; + u->c = NULL; + /* * Don't bother freeing individual entries. They * will get deleted when c->id is free'd. */ u->rr = NULL; - u->c = NULL; + if (u->timer.ev) (void) fr_event_timer_delete(c->thread->el, &u->timer.ev); - rad_assert(u->state == REQUEST_STATUS_SENT); fr_dlist_remove(&u->entry); (void) fr_heap_insert(t->queued, u); - u->state = REQUEST_STATUS_THREAD; + u->state = PACKET_STATE_THREAD; t->pending = true; } @@ -2035,15 +2059,21 @@ static int _conn_free(rlm_radius_udp_connection_t *c) * Move "queued" packets back to the main thread queue */ while ((u = fr_heap_pop(c->queued)) != NULL) { - u->rr = NULL; + rad_assert(u->state == PACKET_STATE_QUEUED); + rad_assert(u->c == c); + rad_assert(c->active_requests > 0); + c->active_requests--; u->c = NULL; - rad_assert(u->state == REQUEST_STATUS_QUEUED); + u->rr = NULL; + (void) fr_heap_insert(t->queued, u); - u->state = REQUEST_STATUS_THREAD; + u->state = PACKET_STATE_THREAD; t->pending = true; } + rad_assert(c->active_requests == 0); + switch (c->state) { default: rad_assert(0 == 1); @@ -2185,11 +2215,11 @@ static rlm_radius_udp_connection_t *connection_get(rlm_radius_udp_request_t *u) * Remove it from the main thread queue, and add * it to the connection queue. */ - rad_assert((u->state == REQUEST_STATUS_THREAD) || - (u->state == REQUEST_STATUS_WRITE)); - DEBUG("QUEUE AT %d", __LINE__); + rad_assert((u->state == PACKET_STATE_THREAD) || + (u->state == PACKET_STATE_WRITE)); (void) fr_heap_insert(c->queued, u); - u->state = REQUEST_STATUS_QUEUED; + u->state = PACKET_STATE_QUEUED; + c->active_requests++; if (!c->pending) fd_active(c); @@ -2255,7 +2285,7 @@ static rlm_rcode_t mod_push(void *instance, REQUEST *request, rlm_radius_link_t return RLM_MODULE_FAIL; } - u->state = REQUEST_STATUS_WRITE; + u->state = PACKET_STATE_WRITE; u->link = link; u->code = request->packet->code; u->thread = t; @@ -2285,7 +2315,7 @@ static rlm_rcode_t mod_push(void *instance, REQUEST *request, rlm_radius_link_t */ t->pending = true; (void) fr_heap_insert(t->queued, u); - u->state = REQUEST_STATUS_THREAD; + u->state = PACKET_STATE_THREAD; return RLM_MODULE_YIELD; } @@ -2310,7 +2340,7 @@ static rlm_rcode_t mod_push(void *instance, REQUEST *request, rlm_radius_link_t * actively trying to write. */ if (rcode == 0) { - rad_assert(u->state == REQUEST_STATUS_QUEUED); + rad_assert(u->state == PACKET_STATE_QUEUED); fd_active(c); return RLM_MODULE_YIELD; } @@ -2321,7 +2351,7 @@ static rlm_rcode_t mod_push(void *instance, REQUEST *request, rlm_radius_link_t * waiting for the reply. */ if (rcode == 1) { - rad_assert(u->state == REQUEST_STATUS_SENT); + rad_assert(u->state == PACKET_STATE_SENT); return RLM_MODULE_YIELD; } @@ -2329,7 +2359,7 @@ static rlm_rcode_t mod_push(void *instance, REQUEST *request, rlm_radius_link_t * We replicated the packet, so we return "ok", and don't * care about the reply. */ - u->state = REQUEST_STATUS_FINISHED; + u->state = PACKET_STATE_FINISHED; return RLM_MODULE_OK; }