From 3c64cfb384dd82372644fc3ff1e6e5d0c7b82043 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Thu, 26 Jan 2017 10:16:53 +0100 Subject: [PATCH] threads: fix missed logging at shutdown At shutdown, all flows that still need work are handled by the flow force reassembly logic. This means one or more flow end pseudo packets are generated and pushed through the engine for final detection and logging. In some cases this would not work correctly. This was caused by the flow timeout logic kicking in before all the 'live' packets were processed. Before the flow timeout handling runs the receive threads are disabled, however the engine did not wait for the in-flight packets to be fully processed. In autofp mode, packets could still be in the queue between receive thread(s) and flow worker(s). This patch adds a new function that 'drains' all the packet threads of any in-progress packets before moving on the flow timeout logic. Bug #1946. --- src/tm-threads.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/tm-threads.c b/src/tm-threads.c index 9df10ff9370..bc866aa215d 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1521,6 +1521,64 @@ void TmThreadKillThread(ThreadVars *tv) return; } +/** \internal + * + * \brief make sure that all packet threads are done processing their + * in-flight packets + */ +static void TmThreadDrainPacketThreads(void) +{ + /* value in seconds */ +#define THREAD_KILL_MAX_WAIT_TIME 60 + /* value in microseconds */ +#define WAIT_TIME 1000 + + uint64_t total_wait_time = 0; + + ThreadVars *tv = NULL; + +again: + if (total_wait_time > (THREAD_KILL_MAX_WAIT_TIME * 1000000)) { + SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads " + "to process their packets in time"); + return; + } + + SCMutexLock(&tv_root_lock); + + /* all receive threads are part of packet processing threads */ + tv = tv_root[TVT_PPT]; + + while (tv) { + if (tv->inq != NULL) { + /* we wait till we dry out all the inq packets, before we + * kill this thread. Do note that you should have disabled + * packet acquire by now using TmThreadDisableReceiveThreads()*/ + if (!(strlen(tv->inq->name) == strlen("packetpool") && + strcasecmp(tv->inq->name, "packetpool") == 0)) { + PacketQueue *q = &trans_q[tv->inq->id]; + if (q->len != 0) { + SCMutexUnlock(&tv_root_lock); + + total_wait_time += WAIT_TIME; + + /* don't sleep while holding a lock */ + usleep(WAIT_TIME); + goto again; + } + } + } + + tv = tv->next; + } + + SCMutexUnlock(&tv_root_lock); + return; + +#undef THREAD_KILL_MAX_WAIT_TIME +#undef WAIT_TIME +} + /** * \brief Disable all threads having the specified TMs. * @@ -1618,6 +1676,11 @@ void TmThreadDisableReceiveThreads(void) SCMutexUnlock(&tv_root_lock); + /* finally wait for all packet threads to have + * processed all of their 'live' packets so we + * don't process the last live packets together + * with FFR packets */ + TmThreadDrainPacketThreads(); return; } @@ -1635,6 +1698,8 @@ void TmThreadDisablePacketThreads(void) ThreadVars *tv = NULL; + /* first drain all packet threads of their packets */ + TmThreadDrainPacketThreads(); again: SCMutexLock(&tv_root_lock);