Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions mongodb_consistent_backup/Oplog/Resolver/Resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, tailed_oplogs,
self.completed = False
self._pool = None
self._pooled = []
self._results = {}
try:
self._pool = Pool(processes=self.threads(None, 2))
except Exception, e:
Expand Down Expand Up @@ -74,7 +75,13 @@ def wait(self):
self._pool.close()
while len(self._pooled):
logging.debug("Waiting for %i oplog resolver thread(s) to stop" % len(self._pooled))
sleep(2)
try:
for thread_name in self._pooled:
thread = self._results[thread_name]
thread.get(1)
sleep(2)
except Exception, e:
raise e
self._pool.terminate()
logging.debug("Stopped all oplog resolve threads")
self.stopped = True
Expand All @@ -100,15 +107,16 @@ def run(self):
raise OperationError("Backup oplog is newer than the tailed oplog!")
else:
try:
self._pool.apply_async(ResolverThread(
thread_name = uri.str()
self._results[thread_name] = self._pool.apply_async(ResolverThread(
self.resolver_state[shard],
uri,
tailed_oplog.copy(),
backup_oplog.copy(),
self.get_consistent_end_ts(),
self.compression()
).run, callback=self.done)
self._pooled.append(uri.str())
self._pooled.append(thread_name)
except Exception, e:
logging.fatal("Resolve failed for %s! Error: %s" % (uri, e))
raise Error(e)
Expand Down
5 changes: 3 additions & 2 deletions mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# noinspection PyPackageRequirements
from bson import decode_file_iter

from mongodb_consistent_backup.Errors import Error
from mongodb_consistent_backup.Oplog import Oplog
from mongodb_consistent_backup.Pipeline import PoolThread

Expand All @@ -20,7 +21,7 @@ def __init__(self, state, uri, tailed_oplog, mongodump_oplog, max_end_ts, compre

self.oplogs = {}
self.changes = 0
self.stopped = True
self.stopped = False

def run(self):
self.oplogs['backup'] = Oplog(self.mongodump_oplog['file'], self.do_gzip(), 'a+')
Expand All @@ -46,7 +47,7 @@ def run(self):
self.state.set('running', False)
self.exit_code = 0
except Exception, e:
logging.exception("Resolving of oplogs failed! Error: %s" % e)
raise Error("Resolving of oplogs failed! Error: %s" % e)
finally:
self.close()

Expand Down