Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2c03c23
Added more exception handling in tail thread, added flush mechanism b…
timvaillancourt May 23, 2017
83d2f0d
More fixes for safer oplog tailing, still broken but getting closer
timvaillancourt May 24, 2017
c9f65c4
Field rename, plus config file example update
timvaillancourt May 26, 2017
18d5f2c
Several oplog durability fixes and tailer reliability fixes
timvaillancourt May 28, 2017
930129d
merge islue/160_tailer_thread_utf8_codec_cant_decode
timvaillancourt May 29, 2017
a22b076
Use pymongo.DESCENDING for sort
timvaillancourt May 29, 2017
47d2ad0
Merge remote-tracking branch 'upstream/master' into oplog_tailer_flush
timvaillancourt May 29, 2017
2e46673
Merge remote-tracking branch 'upstream/master' into oplog_tailer_flush
timvaillancourt May 31, 2017
90e7f2f
The code does not act as the warning message (#156)
maulal Jun 9, 2017
b85839b
Merge remote-tracking branch 'upstream/1.0.4' into oplog_tailer_flush
timvaillancourt Jun 9, 2017
e54007f
Merge remote-tracking branch 'upstream/master' into oplog_tailer_flush
timvaillancourt Jun 10, 2017
8aa5125
Accept dict as .set() for State.py to reduce locking on multiprocessi…
timvaillancourt Jun 13, 2017
c811fe0
Fixes for Oplog self.config and autoflush
timvaillancourt Jun 13, 2017
54dcee1
Flush more often
timvaillancourt Jun 13, 2017
eee28b1
Merge remote-tracking branch 'upstream/1.0.4' into oplog_tailer_flush
timvaillancourt Jun 13, 2017
3c550b5
Update conf example
timvaillancourt Jun 13, 2017
e791c87
Code cleanup
timvaillancourt Jun 13, 2017
2b9b936
Code cleanup #2
timvaillancourt Jun 13, 2017
b5884d1
Don't stop tailer on AutoReconnect errors
timvaillancourt Jun 13, 2017
23a2212
Several fixes for exiting on many TailThread failures
timvaillancourt Jun 14, 2017
51c1768
Remove inner try loop on TailThread, the outer try will catch and log…
timvaillancourt Jun 14, 2017
73e0776
Improved code comments
timvaillancourt Jun 15, 2017
023d373
Oplog tailing functions moved to Common/DB.py
timvaillancourt Jun 15, 2017
260f8ce
Fix check_cursor() to understand temp loss of server instead of dying
timvaillancourt Jun 15, 2017
8886e6d
Use for tailer and skip anything we've written so we don't miss the …
timvaillancourt Jun 15, 2017
8d32072
Use for tailer and skip anything we've written so we don't miss the …
timvaillancourt Jun 15, 2017
9675aeb
remove unused funcs I added
timvaillancourt Jun 16, 2017
924d3fe
rollback some changes that are not required
timvaillancourt Jun 16, 2017
52e884a
Code cleanup and sub-second flush secs
timvaillancourt Jun 16, 2017
924dae0
Code cleanup again
timvaillancourt Jun 16, 2017
f2ae0a8
Code cleanup again #2
timvaillancourt Jun 16, 2017
7ebb4c4
Add script for failure testing oplog tailer
timvaillancourt Jun 16, 2017
5bcc6ce
better comment for usage
timvaillancourt Jun 16, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/mongodb-consistent-backup.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ production:
# ping_secs: [1+] (default: 3)
#oplog:
# compression: [none|gzip] (default: gzip - if gzip is used by backup stage)
# flush:
# max_docs: 100
# max_secs: 1
# resolver_threads: [1+] (default: 2 per CPU)
# tailer:
# enabled: true
# status_interval: 30
archive:
method: tar
Expand Down
4 changes: 2 additions & 2 deletions mongodb_consistent_backup/Backup/Backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


class Backup(Stage):
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, sharding=None):
super(Backup, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, replsets=replsets, sharding=sharding)
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop=None, sharding=None):
super(Backup, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, replsets=replsets, backup_stop=backup_stop, sharding=sharding)
self.task = self.config.backup.method
self.init()
23 changes: 10 additions & 13 deletions mongodb_consistent_backup/Backup/Mongodump/Mongodump.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@


class Mongodump(Task):
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, sharding=None):
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop=None, sharding=None):
super(Mongodump, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
self.compression_method = self.config.backup.mongodump.compression
self.binary = self.config.backup.mongodump.binary
self.user = self.config.username
self.password = self.config.password
self.authdb = self.config.authdb
self.replsets = replsets
self.backup_stop = backup_stop
self.sharding = sharding

self.compression_supported = ['auto', 'none', 'gzip']
Expand Down Expand Up @@ -90,6 +91,9 @@ def wait(self):
start_threads = len(self.dump_threads)
# wait for all threads to finish
while len(self.dump_threads) > 0:
if self.backup_stop and self.backup_stop.is_set():
logging.error("Received backup stop event due to error(s), stopping backup!")
raise OperationError("Received backup stop event due to error(s)")
for thread in self.dump_threads:
if not thread.is_alive():
if thread.exitcode == 0:
Expand Down Expand Up @@ -134,15 +138,11 @@ def run(self):
self.states[shard],
mongo_uri,
self.timer,
self.user,
self.password,
self.authdb,
self.config,
self.backup_dir,
self.binary,
self.version,
self.threads(),
self.do_gzip(),
self.verbose
self.do_gzip()
)
self.dump_threads.append(thread)

Expand Down Expand Up @@ -171,20 +171,17 @@ def run(self):
self.states['configsvr'],
mongo_uri,
self.timer,
self.user,
self.password,
self.authdb,
self.config,
self.backup_dir,
self.binary,
self.version,
self.threads(),
self.do_gzip(),
self.verbose
self.do_gzip()
)]
self.dump_threads[0].start()
self.dump_threads[0].join()

self.completed = True
self.stopped = True
return self._summary

def close(self):
Expand Down
18 changes: 9 additions & 9 deletions mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@

# noinspection PyStringFormat
class MongodumpThread(Process):
def __init__(self, state, uri, timer, user, password, authdb, base_dir, binary, version,
threads=0, dump_gzip=False, verbose=False):
def __init__(self, state, uri, timer, config, base_dir, version, threads=0, dump_gzip=False):
Process.__init__(self)
self.state = state
self.uri = uri
self.timer = timer
self.user = user
self.password = password
self.authdb = authdb
self.config = config
self.base_dir = base_dir
self.binary = binary
self.version = version
self.threads = threads
self.dump_gzip = dump_gzip
self.verbose = verbose

self.user = self.config.username
self.password = self.config.password
self.authdb = self.config.authdb
self.binary = self.config.backup.mongodump.binary

self.timer_name = "%s-%s" % (self.__class__.__name__, self.uri.replset)
self.exit_code = 1
Expand Down Expand Up @@ -112,11 +112,11 @@ def mongodump_cmd(self):
mongodump_flags.extend(["--authenticationDatabase", self.authdb])
if self.user and self.password:
# >= 3.0.2 supports password input via stdin to mask from ps
if tuple("3.0.2".split(".")) <= tuple(self.version.split(".")):
if tuple(self.version.split(".")) >= tuple("3.0.2".split(".")):
mongodump_flags.extend(["-u", self.user, "-p", '""'])
self.do_stdin_passwd = True
else:
logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.2.0 to resolve this")
logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.0.2 to resolve this")
mongodump_flags.extend(["-u", self.user, "-p", self.password])
mongodump_cmd.extend(mongodump_flags)
return mongodump_cmd
Expand Down
24 changes: 23 additions & 1 deletion mongodb_consistent_backup/Common/DB.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging

from pymongo import MongoClient
from bson.codec_options import CodecOptions
from inspect import currentframe, getframeinfo
from pymongo import DESCENDING, CursorType, MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure, ServerSelectionTimeoutError
from time import sleep

Expand Down Expand Up @@ -117,6 +119,26 @@ def replset(self):
return isMaster['setName']
return None

def get_oplog_rs(self):
if not self._conn:
self.connect()
db = self._conn['local']
return db.oplog.rs.with_options(codec_options=CodecOptions(unicode_decode_error_handler="ignore"))

def get_oplog_tail_ts(self):
logging.debug("Gathering oldest 'ts' in %s oplog" % self.uri)
return self.get_oplog_rs().find_one(sort=[('$natural', DESCENDING)])['ts']

def get_oplog_cursor_since(self, caller, ts=None):
frame = getframeinfo(currentframe().f_back)
comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno)
if not ts:
ts = self.get_oplog_tail_ts()
query = {'ts':{'$gte':ts}}
logging.debug("Querying oplog on %s with query: %s" % (self.uri, query))
# http://api.mongodb.com/python/current/examples/tailable.html
return self.get_oplog_rs().find(query, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True).comment(comment)

def close(self):
if self._conn:
logging.debug("Closing connection to: %s" % self.uri)
Expand Down
9 changes: 6 additions & 3 deletions mongodb_consistent_backup/Main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys

from datetime import datetime
from multiprocessing import current_process, Manager
from multiprocessing import current_process, Event, Manager

from Archive import Archive
from Backup import Backup
Expand Down Expand Up @@ -35,6 +35,7 @@ def __init__(self, prog_name="mongodb-consistent-backup"):
self.backup_time = None
self.backup_directory = None
self.backup_root_subdirectory = None
self.backup_stop = Event()
self.uri = None
self.db = None
self.is_sharded = False
Expand Down Expand Up @@ -345,7 +346,8 @@ def run(self):
self.timer,
self.backup_root_subdirectory,
self.backup_directory,
self.replsets
self.replsets,
self.backup_stop
)
except Exception, e:
self.exception("Problem initializing oplog tailer! Error: %s" % e, e)
Expand All @@ -359,6 +361,7 @@ def run(self):
self.backup_root_subdirectory,
self.backup_directory,
self.replsets,
self.backup_stop,
self.sharding
)
if self.backup.is_compressed():
Expand Down Expand Up @@ -465,7 +468,7 @@ def run(self):

StateDoneStamp(self.backup_directory, self.config).write()
self.update_symlinks()
logging.info("Completed %s in %.2f sec" % (self.program_name, self.timer.duration(self.timer_name)))

self.logger.rotate()
logging.info("Completed %s in %.2f sec" % (self.program_name, self.timer.duration(self.timer_name)))
self.release_lock()
39 changes: 36 additions & 3 deletions mongodb_consistent_backup/Oplog/Oplog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@
from gzip import GzipFile
from bson import BSON, decode_file_iter
from bson.codec_options import CodecOptions
from time import time

from mongodb_consistent_backup.Errors import OperationError


class Oplog:
def __init__(self, oplog_file, do_gzip=False, file_mode="r"):
def __init__(self, oplog_file, do_gzip=False, file_mode="r", flush_docs=100, flush_secs=1):
self.oplog_file = oplog_file
self.do_gzip = do_gzip
self.file_mode = file_mode
self.flush_docs = flush_docs
self.flush_secs = flush_secs

self._count = 0
self._first_ts = None
self._last_ts = None
self._oplog = None

self._last_flush_time = time()
self._writes_unflushed = 0

self.open()

def handle(self):
Expand Down Expand Up @@ -56,23 +62,50 @@ def load(self):
logging.fatal("Error reading oplog file %s! Error: %s" % (self.oplog_file, e))
raise OperationError(e)

def add(self, doc):
def add(self, doc, autoflush=True):
try:
self._oplog.write(BSON.encode(doc))
self._count += 1
self._writes_unflushed += 1
self._count += 1
if not self._first_ts:
self._first_ts = doc['ts']
self._last_ts = doc['ts']
if autoflush:
self.autoflush()
except Exception, e:
logging.fatal("Cannot write to oplog file %s! Error: %s" % (self.oplog_file, e))
raise OperationError(e)

def secs_since_flush(self):
return time() - self._last_flush_time

def do_flush(self):
if self._writes_unflushed > self.flush_docs:
return True
elif self.secs_since_flush() > self.flush_secs:
return True
return False

def flush(self):
if self._oplog:
return self._oplog.flush()

def fsync(self):
if self._oplog:
# https://docs.python.org/2/library/os.html#os.fsync
self._oplog.flush()
self._last_flush_time = time()
self._writes_unflushed = 0
return os.fsync(self._oplog.fileno())

def autoflush(self):
if self._oplog and self.do_flush():
logging.debug("Fsyncing %s (secs_since=%.2f, changes=%i, ts=%s)" % (self.oplog_file, self.secs_since_flush(), self._writes_unflushed, self.last_ts()))
return self.fsync()

def close(self):
if self._oplog:
self.fsync()
return self._oplog.close()

def count(self):
Expand Down
12 changes: 10 additions & 2 deletions mongodb_consistent_backup/Oplog/OplogState.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ def get(self, key=None):
else:
return None
return state
except IOError, e:
return None
except Exception, e:
raise OperationError(e)

def set(self, key, value):
def set(self, key, value, merge=False):
try:
self._state[key] = value
if merge and isinstance(value, dict):
for key in value:
self._state[key] = value[key]
else:
self._state[key] = value
except IOError, e:
pass
except Exception, e:
raise OperationError(e)

Expand Down
Loading