diff --git a/record/record.py b/record/record.py index 3be683e..77daa93 100755 --- a/record/record.py +++ b/record/record.py @@ -16,6 +16,8 @@ from pymongo import MongoClient, uri_parser from threading import Thread +WRITER_THREAD_NAME = "write-all-docs-to-file" + def tail_to_queue(tailer, identifier, doc_queue, state, end_time, check_duration_secs=1): """ @@ -304,7 +306,7 @@ def _generate_workers(self, files, state, start_utc_secs, end_utc_secs): # be written to the same device (disk or SSD), as a result it yields # not much benefit to have multiple writers. workers_info.append({ - "name": "write-all-docs-to-file", + "name": WRITER_THREAD_NAME, "thread": Thread( target=MongoQueryRecorder._process_doc_queue, args=(doc_queue, files, state)) @@ -370,11 +372,16 @@ def _generate_workers(self, files, state, start_utc_secs, end_utc_secs): def _join_workers(self, state, workers_info): """Prepare to exit all workers""" for idx, worker_info in enumerate(workers_info): - utils.LOG.info( - "Time to stop, waiting for thread: %s to finish", - worker_info["name"]) thread = worker_info["thread"] name = worker_info["name"] + + # stop tailing threads before attempting to stop the writer thread + if name == WRITER_THREAD_NAME: + continue + + utils.LOG.info( + "Time to stop, waiting for thread: %s to finish", + name) # Idempotently wait for thread to exit wait_secs = 5 while thread.is_alive(): @@ -384,12 +391,26 @@ def _join_workers(self, state, workers_info): worker_info["on_close"]() utils.LOG.error( "Thread %s didn't exit after %d seconds. Will wait for " - "another %d seconds", name, wait_secs, 2 * wait_secs) - wait_secs *= 2 - thread.join(wait_secs) + "another %d seconds", name, wait_secs, wait_secs) else: utils.LOG.info("Thread %s exits normally.", name) + # now find the writer thread and wait on it + for worker_info in workers_info: + name = worker_info['name'] + thread = worker_info['thread'] + if name == WRITER_THREAD_NAME: + wait_secs = 5 + while thread.is_alive(): + thread.join(wait_secs) + if thread.is_alive(): + utils.LOG.error( + "Thread %s didn't exit after %d seconds. " + "Will wait for another %d seconds", + name, wait_secs, wait_secs) + else: + utils.LOG.info("Thread %s exits normally.", name) + @utils.set_interval(3) def _periodically_report_status(self, state): return MongoQueryRecorder._report_status(state)