Skip to content
This repository has been archived by the owner on Jun 12, 2018. It is now read-only.

Commit

Permalink
[record] join writer thread after tailer threads
Browse files Browse the repository at this point in the history
_join_workers uses a simple loop to await worker threads. When ctrl+c is
called, force_quit is set to true, but threads are not killed until
processed by this loop. As the writer thread continues to run until the
queue is empty, this can cause an indefinite wait if _join_workers
blocks on the write thread while tailer threads are still running, since
they continue to write to the queue.

Here we move the join call on the writer thread to the end of the loop.
In addition, stop increasing the the wait_time by a factor of 2 as this
can lead to long and unnecessary waits for the program to shut down.
  • Loading branch information
Travis Redman committed Jan 5, 2016
1 parent fbf2e91 commit da188ca
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions record/record.py
Expand Up @@ -16,6 +16,8 @@
from pymongo import MongoClient, uri_parser
from threading import Thread

WRITER_THREAD_NAME = "write-all-docs-to-file"

def tail_to_queue(tailer, identifier, doc_queue, state, end_time,
check_duration_secs=1):
"""
Expand Down Expand Up @@ -304,7 +306,7 @@ def _generate_workers(self, files, state, start_utc_secs, end_utc_secs):
# be written to the same device (disk or SSD), as a result it yields
# not much benefit to have multiple writers.
workers_info.append({
"name": "write-all-docs-to-file",
"name": WRITER_THREAD_NAME,
"thread": Thread(
target=MongoQueryRecorder._process_doc_queue,
args=(doc_queue, files, state))
Expand Down Expand Up @@ -370,11 +372,16 @@ def _generate_workers(self, files, state, start_utc_secs, end_utc_secs):
def _join_workers(self, state, workers_info):
"""Prepare to exit all workers"""
for idx, worker_info in enumerate(workers_info):
utils.LOG.info(
"Time to stop, waiting for thread: %s to finish",
worker_info["name"])
thread = worker_info["thread"]
name = worker_info["name"]

# stop tailing threads before attempting to stop the writer thread
if name == WRITER_THREAD_NAME:
continue

utils.LOG.info(
"Time to stop, waiting for thread: %s to finish",
name)
# Idempotently wait for thread to exit
wait_secs = 5
while thread.is_alive():
Expand All @@ -384,12 +391,26 @@ def _join_workers(self, state, workers_info):
worker_info["on_close"]()
utils.LOG.error(
"Thread %s didn't exit after %d seconds. Will wait for "
"another %d seconds", name, wait_secs, 2 * wait_secs)
wait_secs *= 2
thread.join(wait_secs)
"another %d seconds", name, wait_secs, wait_secs)
else:
utils.LOG.info("Thread %s exits normally.", name)

# now find the writer thread and wait on it
for worker_info in workers_info:
name = worker_info['name']
thread = worker_info['thread']
if name == WRITER_THREAD_NAME:
wait_secs = 5
while thread.is_alive():
thread.join(wait_secs)
if thread.is_alive():
utils.LOG.error(
"Thread %s didn't exit after %d seconds. "
"Will wait for another %d seconds",
name, wait_secs, wait_secs)
else:
utils.LOG.info("Thread %s exits normally.", name)

@utils.set_interval(3)
def _periodically_report_status(self, state):
return MongoQueryRecorder._report_status(state)
Expand Down

0 comments on commit da188ca

Please sign in to comment.