From ff6c63acda6ce96237a4a7301f4c038456e0c241 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 11 May 2017 18:12:20 +0200 Subject: [PATCH 1/2] Raise ResolverThread.py exceptions to Resolver.py (and thus Main.py) --- .../Oplog/Resolver/Resolver.py | 14 +++++++++++--- .../Oplog/Resolver/ResolverThread.py | 3 ++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/mongodb_consistent_backup/Oplog/Resolver/Resolver.py b/mongodb_consistent_backup/Oplog/Resolver/Resolver.py index 3cc892d9..4b6bfdf2 100644 --- a/mongodb_consistent_backup/Oplog/Resolver/Resolver.py +++ b/mongodb_consistent_backup/Oplog/Resolver/Resolver.py @@ -40,6 +40,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, tailed_oplogs, self.completed = False self._pool = None self._pooled = [] + self._results = {} try: self._pool = Pool(processes=self.threads(None, 2)) except Exception, e: @@ -74,7 +75,13 @@ def wait(self): self._pool.close() while len(self._pooled): logging.debug("Waiting for %i oplog resolver thread(s) to stop" % len(self._pooled)) - sleep(2) + try: + for thread_name in self._pooled: + thread = self._results[thread_name] + thread.get(1) + sleep(2) + except Exception, e: + raise e self._pool.terminate() logging.debug("Stopped all oplog resolve threads") self.stopped = True @@ -100,7 +107,8 @@ def run(self): raise OperationError("Backup oplog is newer than the tailed oplog!") else: try: - self._pool.apply_async(ResolverThread( + thread_name = uri.str() + self._results[thread_name] = self._pool.apply_async(ResolverThread( self.resolver_state[shard], uri, tailed_oplog.copy(), @@ -108,7 +116,7 @@ def run(self): self.get_consistent_end_ts(), self.compression() ).run, callback=self.done) - self._pooled.append(uri.str()) + self._pooled.append(thread_name) except Exception, e: logging.fatal("Resolve failed for %s! Error: %s" % (uri, e)) raise Error(e) diff --git a/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py b/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py index 74662c68..a4271ff3 100644 --- a/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py +++ b/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py @@ -4,6 +4,7 @@ # noinspection PyPackageRequirements from bson import decode_file_iter +from mongodb_consistent_backup.Errors import Error from mongodb_consistent_backup.Oplog import Oplog from mongodb_consistent_backup.Pipeline import PoolThread @@ -46,7 +47,7 @@ def run(self): self.state.set('running', False) self.exit_code = 0 except Exception, e: - logging.exception("Resolving of oplogs failed! Error: %s" % e) + raise Error("Resolving of oplogs failed! Error: %s" % e) finally: self.close() From 0f6aa82085a0a41bebc6a35086dafa60d5c69ddc Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 12 May 2017 13:22:07 +0200 Subject: [PATCH 2/2] 'stopped' var should be set to False until stop is ran --- mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py b/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py index a4271ff3..c7798904 100644 --- a/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py +++ b/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py @@ -21,7 +21,7 @@ def __init__(self, state, uri, tailed_oplog, mongodump_oplog, max_end_ts, compre self.oplogs = {} self.changes = 0 - self.stopped = True + self.stopped = False def run(self): self.oplogs['backup'] = Oplog(self.mongodump_oplog['file'], self.do_gzip(), 'a+')