Skip to content

Commit

Permalink
threads: fix missed logging at shutdown
Browse files Browse the repository at this point in the history
At shutdown, all flows that still need work are handled by the flow
force reassembly logic. This means one or more flow end pseudo packets
are generated and pushed through the engine for final detection and
logging.

In some cases this would not work correctly. This was caused by the
flow timeout logic kicking in before all the 'live' packets were
processed. Before the flow timeout handling runs the receive threads
are disabled, however the engine did not wait for the in-flight
packets to be fully processed. In autofp mode, packets could still
be in the queue between receive thread(s) and flow worker(s).

This patch adds a new function that 'drains' all the packet threads
of any in-progress packets before moving on the flow timeout logic.

Bug #1946.
  • Loading branch information
victorjulien committed Jan 26, 2017
1 parent f738062 commit 3c64cfb
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions src/tm-threads.c
Expand Up @@ -1521,6 +1521,64 @@ void TmThreadKillThread(ThreadVars *tv)
return;
}

/** \internal
*
* \brief make sure that all packet threads are done processing their
* in-flight packets
*/
static void TmThreadDrainPacketThreads(void)
{
/* value in seconds */
#define THREAD_KILL_MAX_WAIT_TIME 60
/* value in microseconds */
#define WAIT_TIME 1000

uint64_t total_wait_time = 0;

ThreadVars *tv = NULL;

again:
if (total_wait_time > (THREAD_KILL_MAX_WAIT_TIME * 1000000)) {
SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
"to process their packets in time");
return;
}

SCMutexLock(&tv_root_lock);

/* all receive threads are part of packet processing threads */
tv = tv_root[TVT_PPT];

while (tv) {
if (tv->inq != NULL) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
if (!(strlen(tv->inq->name) == strlen("packetpool") &&
strcasecmp(tv->inq->name, "packetpool") == 0)) {
PacketQueue *q = &trans_q[tv->inq->id];
if (q->len != 0) {
SCMutexUnlock(&tv_root_lock);

total_wait_time += WAIT_TIME;

/* don't sleep while holding a lock */
usleep(WAIT_TIME);
goto again;
}
}
}

tv = tv->next;
}

SCMutexUnlock(&tv_root_lock);
return;

#undef THREAD_KILL_MAX_WAIT_TIME
#undef WAIT_TIME
}

/**
* \brief Disable all threads having the specified TMs.
*
Expand Down Expand Up @@ -1618,6 +1676,11 @@ void TmThreadDisableReceiveThreads(void)

SCMutexUnlock(&tv_root_lock);

/* finally wait for all packet threads to have
* processed all of their 'live' packets so we
* don't process the last live packets together
* with FFR packets */
TmThreadDrainPacketThreads();
return;
}

Expand All @@ -1635,6 +1698,8 @@ void TmThreadDisablePacketThreads(void)

ThreadVars *tv = NULL;

/* first drain all packet threads of their packets */
TmThreadDrainPacketThreads();
again:
SCMutexLock(&tv_root_lock);

Expand Down

0 comments on commit 3c64cfb

Please sign in to comment.