From a5cc3a0587f972c02baee6d4017210c891266074 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 22 Aug 2017 17:50:28 +0200 Subject: [PATCH 1/3] flake8 fixes --- mongodb_consistent_backup/Archive/Archive.py | 2 - mongodb_consistent_backup/Archive/Tar/Tar.py | 2 +- .../Archive/Tar/TarThread.py | 28 ++++---- .../Archive/Tar/__init__.py | 4 +- .../Archive/Zbackup/Zbackup.py | 18 ++--- .../Archive/Zbackup/__init__.py | 2 +- mongodb_consistent_backup/Archive/__init__.py | 4 +- mongodb_consistent_backup/Backup/Backup.py | 2 +- .../Backup/Mongodump/Mongodump.py | 9 +-- .../Backup/Mongodump/MongodumpThread.py | 8 +-- .../Backup/Mongodump/__init__.py | 2 +- mongodb_consistent_backup/Backup/__init__.py | 2 +- mongodb_consistent_backup/Common/Config.py | 6 +- mongodb_consistent_backup/Common/DB.py | 11 +-- .../Common/LocalCommand.py | 2 +- mongodb_consistent_backup/Common/Lock.py | 6 +- mongodb_consistent_backup/Common/MongoUri.py | 5 +- mongodb_consistent_backup/Common/Timer.py | 4 +- mongodb_consistent_backup/Common/Util.py | 3 + mongodb_consistent_backup/Common/__init__.py | 14 ++-- mongodb_consistent_backup/Errors.py | 5 ++ mongodb_consistent_backup/Logger.py | 2 +- mongodb_consistent_backup/Main.py | 4 +- mongodb_consistent_backup/Notify/Notify.py | 4 +- mongodb_consistent_backup/Notify/Nsca/Nsca.py | 2 +- .../Notify/Nsca/__init__.py | 2 +- mongodb_consistent_backup/Notify/__init__.py | 4 +- mongodb_consistent_backup/Oplog/Oplog.py | 12 ++-- mongodb_consistent_backup/Oplog/OplogState.py | 10 +-- .../Oplog/Resolver/Resolver.py | 21 +++--- .../Oplog/Resolver/__init__.py | 3 +- .../Oplog/Tailer/TailThread.py | 12 ++-- .../Oplog/Tailer/Tailer.py | 8 +-- .../Oplog/Tailer/__init__.py | 3 +- mongodb_consistent_backup/Oplog/__init__.py | 10 +-- .../Pipeline/PoolThread.py | 2 - mongodb_consistent_backup/Pipeline/Stage.py | 1 - .../Pipeline/__init__.py | 6 +- .../Replication/Replset.py | 68 ++++++++++--------- .../Replication/ReplsetSharded.py | 10 ++- .../Replication/__init__.py | 6 +- mongodb_consistent_backup/Sharding.py | 18 ++--- mongodb_consistent_backup/State.py | 20 +++--- mongodb_consistent_backup/Upload/Gs/Gs.py | 4 +- .../Upload/Gs/GsUploadThread.py | 15 ++-- .../Upload/Gs/__init__.py | 3 +- mongodb_consistent_backup/Upload/S3/S3.py | 9 ++- .../Upload/S3/S3Session.py | 1 + .../Upload/S3/__init__.py | 3 +- mongodb_consistent_backup/Upload/Upload.py | 4 +- mongodb_consistent_backup/Upload/__init__.py | 4 +- mongodb_consistent_backup/__init__.py | 1 + 52 files changed, 207 insertions(+), 204 deletions(-) diff --git a/mongodb_consistent_backup/Archive/Archive.py b/mongodb_consistent_backup/Archive/Archive.py index 3fa19f57..bd97812f 100644 --- a/mongodb_consistent_backup/Archive/Archive.py +++ b/mongodb_consistent_backup/Archive/Archive.py @@ -1,5 +1,3 @@ -from mongodb_consistent_backup.Archive.Tar import Tar -from mongodb_consistent_backup.Archive.Zbackup import Zbackup from mongodb_consistent_backup.Pipeline import Stage diff --git a/mongodb_consistent_backup/Archive/Tar/Tar.py b/mongodb_consistent_backup/Archive/Tar/Tar.py index b08f4ba4..dbe9bf9c 100644 --- a/mongodb_consistent_backup/Archive/Tar/Tar.py +++ b/mongodb_consistent_backup/Archive/Tar/Tar.py @@ -7,7 +7,6 @@ from types import MethodType from TarThread import TarThread -from mongodb_consistent_backup.Common import parse_method from mongodb_consistent_backup.Errors import Error, OperationError from mongodb_consistent_backup.Pipeline import Task @@ -19,6 +18,7 @@ def _reduce_method(m): else: return getattr, (m.im_self, m.im_func.func_name) + pickle(MethodType, _reduce_method) diff --git a/mongodb_consistent_backup/Archive/Tar/TarThread.py b/mongodb_consistent_backup/Archive/Tar/TarThread.py index 39fbfa51..fc47f1e0 100644 --- a/mongodb_consistent_backup/Archive/Tar/TarThread.py +++ b/mongodb_consistent_backup/Archive/Tar/TarThread.py @@ -28,20 +28,20 @@ def run(self): if os.path.isdir(self.backup_dir): if not os.path.isfile(self.output_file): try: - backup_base_dir = os.path.dirname(self.backup_dir) - backup_base_name = os.path.basename(self.backup_dir) - - log_msg = "Archiving directory: %s" % self.backup_dir - cmd_flags = ["-C", backup_base_dir, "-cf", self.output_file, "--remove-files", backup_base_name] - - if self.do_gzip(): - log_msg = "Archiving and compressing directory: %s" % self.backup_dir - cmd_flags = ["-C", backup_base_dir, "-czf", self.output_file, "--remove-files", backup_base_name] - - logging.info(log_msg) - self.running = True - self._command = LocalCommand(self.binary, cmd_flags, self.verbose) - self.exit_code = self._command.run() + backup_base_dir = os.path.dirname(self.backup_dir) + backup_base_name = os.path.basename(self.backup_dir) + + log_msg = "Archiving directory: %s" % self.backup_dir + cmd_flags = ["-C", backup_base_dir, "-cf", self.output_file, "--remove-files", backup_base_name] + + if self.do_gzip(): + log_msg = "Archiving and compressing directory: %s" % self.backup_dir + cmd_flags = ["-C", backup_base_dir, "-czf", self.output_file, "--remove-files", backup_base_name] + + logging.info(log_msg) + self.running = True + self._command = LocalCommand(self.binary, cmd_flags, self.verbose) + self.exit_code = self._command.run() except Exception, e: logging.fatal("Failed archiving file: %s! Error: %s" % (self.output_file, e)) finally: diff --git a/mongodb_consistent_backup/Archive/Tar/__init__.py b/mongodb_consistent_backup/Archive/Tar/__init__.py index 0f3e742e..63e733f8 100644 --- a/mongodb_consistent_backup/Archive/Tar/__init__.py +++ b/mongodb_consistent_backup/Archive/Tar/__init__.py @@ -1,9 +1,9 @@ -from Tar import Tar +from Tar import Tar # NOQA def config(parser): parser.add_argument("--archive.tar.compression", dest="archive.tar.compression", help="Tar archiver compression method (default: gzip)", default='gzip', choices=['gzip', 'none']) - parser.add_argument("--archive.tar.threads", dest="archive.tar.threads", + parser.add_argument("--archive.tar.threads", dest="archive.tar.threads", help="Number of Tar archiver threads to use (default: 1-per-CPU)", default=0, type=int) return parser diff --git a/mongodb_consistent_backup/Archive/Zbackup/Zbackup.py b/mongodb_consistent_backup/Archive/Zbackup/Zbackup.py index b2de2fa6..3f8aa35c 100644 --- a/mongodb_consistent_backup/Archive/Zbackup/Zbackup.py +++ b/mongodb_consistent_backup/Archive/Zbackup/Zbackup.py @@ -95,20 +95,20 @@ def version(self): return None except Exception, e: raise OperationError("Could not gather ZBackup version: %s" % e) - + def has_zbackup(self): if self.version(): - return True + return True return False def close(self, exit_code=None, frame=None): del exit_code del frame if not self.stopped: - if self._zbackup and self._zbackup.poll() == None: + if self._zbackup and self._zbackup.poll() is None: logging.debug("Stopping running ZBackup command") self._zbackup.terminate() - if self._tar and self._tar.poll() == None: + if self._tar and self._tar.poll() is None: logging.debug("Stopping running ZBackup tar command") self._tar.terminate() self.stopped = True @@ -131,12 +131,12 @@ def wait(self): self.poll() if tar_done: self._zbackup.communicate() - if self._zbackup.poll() != None: + if self._zbackup.poll() is not None: logging.info("ZBackup completed successfully with exit code: %i" % self._zbackup.returncode) if self._zbackup.returncode != 0: raise OperationError("ZBackup exited with code: %i!" % self._zbackup.returncode) break - elif self._tar.poll() != None: + elif self._tar.poll() is not None: if self._tar.returncode == 0: logging.debug("ZBackup tar command completed successfully with exit code: %i" % self._tar.returncode) tar_done = True @@ -160,9 +160,9 @@ def run(self): lock = Lock(self.zbackup_lock) lock.acquire() try: - logging.info("Starting ZBackup version: %s (options: compression=%s, encryption=%s, threads=%i, cache_mb=%i)" % - (self.version(), self.compression(), self.encrypted, self.threads(), self.zbackup_cache_mb) - ) + logging.info("Starting ZBackup version: %s (options: compression=%s, encryption=%s, threads=%i, cache_mb=%i)" % ( + self.version(), self.compression(), self.encrypted, self.threads(), self.zbackup_cache_mb + )) self.running = True try: for sub_dir in os.listdir(self.backup_dir): diff --git a/mongodb_consistent_backup/Archive/Zbackup/__init__.py b/mongodb_consistent_backup/Archive/Zbackup/__init__.py index d1efe025..f1138b41 100644 --- a/mongodb_consistent_backup/Archive/Zbackup/__init__.py +++ b/mongodb_consistent_backup/Archive/Zbackup/__init__.py @@ -1,4 +1,4 @@ -from Zbackup import Zbackup +from Zbackup import Zbackup # NOQA def config(parser): diff --git a/mongodb_consistent_backup/Archive/__init__.py b/mongodb_consistent_backup/Archive/__init__.py index 84ed5da5..d48d2fca 100644 --- a/mongodb_consistent_backup/Archive/__init__.py +++ b/mongodb_consistent_backup/Archive/__init__.py @@ -1,6 +1,6 @@ -from Archive import Archive +from Archive import Archive # NOQA def config(parser): - parser.add_argument("--archive.method", dest="archive.method", help="Archiver method (default: tar)", default='tar', choices=['tar','zbackup','none']) + parser.add_argument("--archive.method", dest="archive.method", help="Archiver method (default: tar)", default='tar', choices=['tar', 'zbackup', 'none']) return parser diff --git a/mongodb_consistent_backup/Backup/Backup.py b/mongodb_consistent_backup/Backup/Backup.py index d24f39b7..fc7e60fd 100644 --- a/mongodb_consistent_backup/Backup/Backup.py +++ b/mongodb_consistent_backup/Backup/Backup.py @@ -1,4 +1,4 @@ -from mongodb_consistent_backup.Backup.Mongodump import Mongodump +from mongodb_consistent_backup.Backup.Mongodump import Mongodump # NOQA from mongodb_consistent_backup.Pipeline import Stage diff --git a/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py b/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py index 6e472106..92b2a2ce 100644 --- a/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py +++ b/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py @@ -1,14 +1,13 @@ -import os, sys +import os import logging -import signal +import sys from math import floor -from multiprocessing import cpu_count from subprocess import check_output from time import sleep from mongodb_consistent_backup.Common import MongoUri, config_to_string -from mongodb_consistent_backup.Errors import Error, OperationError +from mongodb_consistent_backup.Errors import OperationError from mongodb_consistent_backup.Oplog import OplogState from mongodb_consistent_backup.Pipeline import Task @@ -82,8 +81,6 @@ def summary(self): def get_summaries(self): for shard in self.states: state = self.states[shard] - host = state.get('host') - port = state.get('port') self._summary[shard] = state.get().copy() def wait(self): diff --git a/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py b/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py index 184cace5..2ac39eed 100644 --- a/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py +++ b/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py @@ -61,7 +61,7 @@ def parse_mongodump_line(self, line): (date, line) = line.split("\t") elif is_datetime(line): return None - return "%s:\t%s" % (self.uri, line) + return "%s:\t%s" % (self.uri, line) except: return None @@ -105,7 +105,7 @@ def wait(self): break else: logging.info(line) - if self._process.poll() != None: + if self._process.poll() is not None: break except Exception, e: logging.exception("Error reading mongodump output: %s" % e) @@ -117,7 +117,7 @@ def mongodump_cmd(self): mongodump_cmd = [self.binary] mongodump_flags = ["--host", mongodump_uri.host, "--port", str(mongodump_uri.port), "--oplog", "--out", "%s/dump" % self.backup_dir] if self.threads > 0: - mongodump_flags.extend(["--numParallelCollections="+str(self.threads)]) + mongodump_flags.extend(["--numParallelCollections=" + str(self.threads)]) if self.dump_gzip: mongodump_flags.extend(["--gzip"]) if tuple("3.4.0".split(".")) <= tuple(self.version.split(".")): @@ -131,7 +131,7 @@ def mongodump_cmd(self): mongodump_flags.extend(["-u", self.user, "-p", '""']) self.do_stdin_passwd = True else: - logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.0.2 to resolve this") + logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.0.2 to resolve this") mongodump_flags.extend(["-u", self.user, "-p", self.password]) mongodump_cmd.extend(mongodump_flags) return mongodump_cmd diff --git a/mongodb_consistent_backup/Backup/Mongodump/__init__.py b/mongodb_consistent_backup/Backup/Mongodump/__init__.py index ba960a08..50a37eb6 100644 --- a/mongodb_consistent_backup/Backup/Mongodump/__init__.py +++ b/mongodb_consistent_backup/Backup/Mongodump/__init__.py @@ -1,4 +1,4 @@ -from Mongodump import Mongodump +from Mongodump import Mongodump # NOQA def config(parser): diff --git a/mongodb_consistent_backup/Backup/__init__.py b/mongodb_consistent_backup/Backup/__init__.py index 27ba8660..4fa08237 100644 --- a/mongodb_consistent_backup/Backup/__init__.py +++ b/mongodb_consistent_backup/Backup/__init__.py @@ -1,4 +1,4 @@ -from Backup import Backup +from Backup import Backup # NOQA def config(parser): diff --git a/mongodb_consistent_backup/Common/Config.py b/mongodb_consistent_backup/Common/Config.py index 9b949336..ad52b25a 100644 --- a/mongodb_consistent_backup/Common/Config.py +++ b/mongodb_consistent_backup/Common/Config.py @@ -32,7 +32,7 @@ def __call__(self, parser, namespace, values, option_string=None): class ConfigParser(BaseConfiguration): def makeParserLoadSubmodules(self, parser): - for _, modname, ispkg in walk_packages(path=mongodb_consistent_backup.__path__, prefix=mongodb_consistent_backup.__name__+'.'): + for _, modname, ispkg in walk_packages(path=mongodb_consistent_backup.__path__, prefix=mongodb_consistent_backup.__name__ + '.'): if not ispkg: continue try: @@ -41,7 +41,7 @@ def makeParserLoadSubmodules(self, parser): for comp in components[1:]: mod = getattr(mod, comp) parser = mod.config(parser) - except AttributeError, e: + except AttributeError: continue return parser @@ -104,7 +104,7 @@ def to_dict(self, data): value = "******" ret[key] = value return ret - elif isinstance(data, (str, int, bool)): # or isinstance(data, int) or isinstance(data, bool): + elif isinstance(data, (str, int, bool)): return data def dump(self): diff --git a/mongodb_consistent_backup/Common/DB.py b/mongodb_consistent_backup/Common/DB.py index 73444cea..eec89775 100644 --- a/mongodb_consistent_backup/Common/DB.py +++ b/mongodb_consistent_backup/Common/DB.py @@ -6,7 +6,7 @@ from pymongo.errors import ConnectionFailure, OperationFailure, ServerSelectionTimeoutError from time import sleep -from mongodb_consistent_backup.Errors import DBAuthenticationError, DBConnectionError, DBOperationError, Error, OperationError +from mongodb_consistent_backup.Errors import DBAuthenticationError, DBConnectionError, DBOperationError, Error class DB: @@ -31,8 +31,9 @@ def connect(self): try: if self.do_replset: self.replset = self.uri.replset - logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s)" % - (self.uri, self.replset, self.read_pref)) + logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s)" % ( + self.uri, self.replset, self.read_pref + )) conn = MongoClient( connect=self.do_connect, host=self.uri.hosts(), @@ -44,7 +45,7 @@ def connect(self): w="majority" ) if self.do_connect: - conn['admin'].command({"ping":1}) + conn['admin'].command({"ping": 1}) except (ConnectionFailure, OperationFailure, ServerSelectionTimeoutError), e: logging.error("Unable to connect to %s! Error: %s" % (self.uri, e)) raise DBConnectionError(e) @@ -134,7 +135,7 @@ def get_oplog_cursor_since(self, caller, ts=None): comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno) if not ts: ts = self.get_oplog_tail_ts() - query = {'ts':{'$gte':ts}} + query = {'ts': {'$gte': ts}} logging.debug("Querying oplog on %s with query: %s" % (self.uri, query)) # http://api.mongodb.com/python/current/examples/tailable.html return self.get_oplog_rs().find(query, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True).comment(comment) diff --git a/mongodb_consistent_backup/Common/LocalCommand.py b/mongodb_consistent_backup/Common/LocalCommand.py index d9238a3e..5c99bd31 100644 --- a/mongodb_consistent_backup/Common/LocalCommand.py +++ b/mongodb_consistent_backup/Common/LocalCommand.py @@ -42,7 +42,7 @@ def run(self): sleep(0.1) except Exception, e: raise Error(e) - + if self._process.returncode != 0: raise OperationError("%s command failed with exit code %i! Stderr output:\n%s" % ( self.command, diff --git a/mongodb_consistent_backup/Common/Lock.py b/mongodb_consistent_backup/Common/Lock.py index e8d97702..5b211925 100644 --- a/mongodb_consistent_backup/Common/Lock.py +++ b/mongodb_consistent_backup/Common/Lock.py @@ -8,11 +8,11 @@ class Lock: def __init__(self, lock_file, acquire=True): self.lock_file = lock_file - + self._lock = None if acquire: self.acquire() - + def acquire(self): try: self._lock = open(self.lock_file, "w") @@ -24,7 +24,7 @@ def acquire(self): if self._lock: self._lock.close() raise OperationError("Could not acquire lock on file: %s!" % self.lock_file) - + def release(self): if self._lock: logging.debug("Releasing exclusive lock on file: %s" % self.lock_file) diff --git a/mongodb_consistent_backup/Common/MongoUri.py b/mongodb_consistent_backup/Common/MongoUri.py index d52d208a..e867f529 100644 --- a/mongodb_consistent_backup/Common/MongoUri.py +++ b/mongodb_consistent_backup/Common/MongoUri.py @@ -15,6 +15,7 @@ def str(self): def __str__(self): return self.str() + class MongoUri: def __init__(self, url, default_port=27017, replset=None): self.url = url @@ -49,8 +50,8 @@ def parse(self): addr = MongoAddr() addr.replset = self.replset if ":" in url: - addr.host, addr.port = url.split(":") - addr.port = int(addr.port) + addr.host, addr.port = url.split(":") + addr.port = int(addr.port) else: addr.host = url if not addr.port: diff --git a/mongodb_consistent_backup/Common/Timer.py b/mongodb_consistent_backup/Common/Timer.py index c53740c7..a0d27bd9 100644 --- a/mongodb_consistent_backup/Common/Timer.py +++ b/mongodb_consistent_backup/Common/Timer.py @@ -8,7 +8,7 @@ def __init__(self, manager): self.timers = manager.dict() def start(self, timer_name): - self.timers[timer_name] = { 'start': time(), 'started': True } + self.timers[timer_name] = {'start': time(), 'started': True} def stop(self, timer_name): try: @@ -22,7 +22,7 @@ def stop(self, timer_name): else: raise OperationError("No started timer named %s to stop!" % timer_name) except IOError: - pass + pass def duration(self, timer_name): try: diff --git a/mongodb_consistent_backup/Common/Util.py b/mongodb_consistent_backup/Common/Util.py index e09df743..e6f25744 100644 --- a/mongodb_consistent_backup/Common/Util.py +++ b/mongodb_consistent_backup/Common/Util.py @@ -11,6 +11,7 @@ def config_to_string(config): config_pairs.append("%s=%s" % (key, config[key])) return ", ".join(config_pairs) + def is_datetime(string): try: parser.parse(string) @@ -18,9 +19,11 @@ def is_datetime(string): except: return False + def parse_method(method): return method.rstrip().lower() + def validate_hostname(hostname): try: if ":" in hostname: diff --git a/mongodb_consistent_backup/Common/__init__.py b/mongodb_consistent_backup/Common/__init__.py index beecef1a..aa30af72 100644 --- a/mongodb_consistent_backup/Common/__init__.py +++ b/mongodb_consistent_backup/Common/__init__.py @@ -1,7 +1,7 @@ -from Config import Config -from DB import DB -from LocalCommand import LocalCommand -from Lock import Lock -from MongoUri import MongoUri -from Timer import Timer -from Util import config_to_string, is_datetime, parse_method, validate_hostname +from Config import Config # NOQA +from DB import DB # NOQA +from LocalCommand import LocalCommand # NOQA +from Lock import Lock # NOQA +from MongoUri import MongoUri # NOQA +from Timer import Timer # NOQA +from Util import config_to_string, is_datetime, parse_method, validate_hostname # NOQA diff --git a/mongodb_consistent_backup/Errors.py b/mongodb_consistent_backup/Errors.py index 741c4260..8b178d63 100644 --- a/mongodb_consistent_backup/Errors.py +++ b/mongodb_consistent_backup/Errors.py @@ -2,22 +2,27 @@ class Error(Exception): """Raised when something failed in an unexpected and unrecoverable way""" pass + class OperationError(Error): """Raised when an operation failed in an expected but unrecoverable way""" pass + class NotifyError(Error): """Raised when an notify operation failed in an expected but unrecoverable way""" pass + class DBConnectionError(OperationError): """Raised when a db connection error occurs""" pass + class DBAuthenticationError(OperationError): """Raised when a db authentication error occurs""" pass + class DBOperationError(OperationError): """Raised when a db operation error occurs""" pass diff --git a/mongodb_consistent_backup/Logger.py b/mongodb_consistent_backup/Logger.py index 84b29d13..9f22e66d 100644 --- a/mongodb_consistent_backup/Logger.py +++ b/mongodb_consistent_backup/Logger.py @@ -41,7 +41,7 @@ def start_file_logger(self): self.file_log.setLevel(self.log_level) self.file_log.setFormatter(logging.Formatter(self.log_format)) logging.getLogger('').addHandler(self.file_log) - except OSError, e: + except OSError: logging.warning("Could not start file log handler, writing to stdout only") pass diff --git a/mongodb_consistent_backup/Main.py b/mongodb_consistent_backup/Main.py index a36424b1..9dd4975a 100644 --- a/mongodb_consistent_backup/Main.py +++ b/mongodb_consistent_backup/Main.py @@ -9,7 +9,7 @@ from Archive import Archive from Backup import Backup from Common import Config, DB, Lock, MongoUri, Timer -from Errors import Error, NotifyError, OperationError +from Errors import NotifyError, OperationError from Logger import Logger from Notify import Notify from Oplog import Tailer, Resolver @@ -360,7 +360,7 @@ def run(self): self.timer, self.backup_root_subdirectory, self.backup_directory, - self.replsets, + self.replsets, self.backup_stop, self.sharding ) diff --git a/mongodb_consistent_backup/Notify/Notify.py b/mongodb_consistent_backup/Notify/Notify.py index 2a51ebe9..c558364e 100644 --- a/mongodb_consistent_backup/Notify/Notify.py +++ b/mongodb_consistent_backup/Notify/Notify.py @@ -1,7 +1,7 @@ import logging from mongodb_consistent_backup.Errors import Error, NotifyError -from mongodb_consistent_backup.Notify.Nsca import Nsca +from mongodb_consistent_backup.Notify.Nsca import Nsca # NOQA from mongodb_consistent_backup.Pipeline import Stage @@ -25,7 +25,7 @@ def run(self, *args): try: (success, message) = self.notifications.pop() state = self._task.failed - if success == True: + if success is True: state = self._task.success self._task.run(state, message) except NotifyError: diff --git a/mongodb_consistent_backup/Notify/Nsca/Nsca.py b/mongodb_consistent_backup/Notify/Nsca/Nsca.py index 4ddf5410..e0649066 100644 --- a/mongodb_consistent_backup/Notify/Nsca/Nsca.py +++ b/mongodb_consistent_backup/Notify/Nsca/Nsca.py @@ -3,7 +3,7 @@ from pynsca import NSCANotifier -from mongodb_consistent_backup.Errors import Error, NotifyError, OperationError +from mongodb_consistent_backup.Errors import NotifyError, OperationError from mongodb_consistent_backup.Pipeline import Task diff --git a/mongodb_consistent_backup/Notify/Nsca/__init__.py b/mongodb_consistent_backup/Notify/Nsca/__init__.py index 6171d46a..46c963d5 100644 --- a/mongodb_consistent_backup/Notify/Nsca/__init__.py +++ b/mongodb_consistent_backup/Notify/Nsca/__init__.py @@ -1,4 +1,4 @@ -from Nsca import Nsca +from Nsca import Nsca # NOQA def config(parser): diff --git a/mongodb_consistent_backup/Notify/__init__.py b/mongodb_consistent_backup/Notify/__init__.py index 88e92838..76b85089 100644 --- a/mongodb_consistent_backup/Notify/__init__.py +++ b/mongodb_consistent_backup/Notify/__init__.py @@ -1,6 +1,6 @@ -from Notify import Notify +from Notify import Notify # NOQA def config(parser): - parser.add_argument("--notify.method", dest="notify.method", help="Notifier method (default: none)", default='none', choices=['nsca','none']) + parser.add_argument("--notify.method", dest="notify.method", help="Notifier method (default: none)", default='none', choices=['nsca', 'none']) return parser diff --git a/mongodb_consistent_backup/Oplog/Oplog.py b/mongodb_consistent_backup/Oplog/Oplog.py index 8200520c..12337cb7 100644 --- a/mongodb_consistent_backup/Oplog/Oplog.py +++ b/mongodb_consistent_backup/Oplog/Oplog.py @@ -118,9 +118,9 @@ def last_ts(self): return self._last_ts def stat(self): - return { - 'file': self.oplog_file, - 'count': self.count(), - 'first_ts': self.first_ts(), - 'last_ts': self.last_ts() - } + return { + 'file': self.oplog_file, + 'count': self.count(), + 'first_ts': self.first_ts(), + 'last_ts': self.last_ts() + } diff --git a/mongodb_consistent_backup/Oplog/OplogState.py b/mongodb_consistent_backup/Oplog/OplogState.py index 0f468795..39ba9a8e 100644 --- a/mongodb_consistent_backup/Oplog/OplogState.py +++ b/mongodb_consistent_backup/Oplog/OplogState.py @@ -9,7 +9,7 @@ def __init__(self, manager, uri, oplog_file=None): self.uri = uri self.oplog_file = oplog_file - try: + try: self._state = manager.dict() if uri: self._state['uri'] = self.uri.str() @@ -33,9 +33,9 @@ def get(self, key=None): return state[key] else: return None - return state - except IOError, e: - return None + return state + except IOError: + return None except Exception, e: raise OperationError(e) @@ -58,7 +58,7 @@ def write(self, file_name): f.write(json.dumps(self._state)) except Exception, e: logging.debug("Writing oplog state to file: '%s'! Error: %s" % (self.oplog_file, e)) - raise OperationError(e) + raise OperationError(e) finally: if f: f.close() diff --git a/mongodb_consistent_backup/Oplog/Resolver/Resolver.py b/mongodb_consistent_backup/Oplog/Resolver/Resolver.py index 1f844191..29bc6da6 100644 --- a/mongodb_consistent_backup/Oplog/Resolver/Resolver.py +++ b/mongodb_consistent_backup/Oplog/Resolver/Resolver.py @@ -5,11 +5,10 @@ from bson.timestamp import Timestamp from copy_reg import pickle from multiprocessing import Pool, TimeoutError -from time import sleep from types import MethodType from ResolverThread import ResolverThread -from mongodb_consistent_backup.Common import MongoUri, parse_method +from mongodb_consistent_backup.Common import MongoUri from mongodb_consistent_backup.Errors import Error, OperationError from mongodb_consistent_backup.Oplog import OplogState from mongodb_consistent_backup.Pipeline import Task @@ -22,6 +21,7 @@ def _reduce_method(m): else: return getattr, (m.im_self, m.im_func.func_name) + pickle(MethodType, _reduce_method) @@ -48,11 +48,11 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, tailed_oplogs, raise Error(e) def close(self, code=None, frame=None): - if self._pool and not self.stopped: - logging.debug("Stopping all oplog resolver threads") - self._pool.terminate() - logging.info("Stopped all oplog resolver threads") - self.stopped = True + if self._pool and not self.stopped: + logging.debug("Stopping all oplog resolver threads") + self._pool.terminate() + logging.info("Stopped all oplog resolver threads") + self.stopped = True def get_backup_end_max_ts(self): end_ts = None @@ -84,7 +84,7 @@ def done(self, done_uri): else: raise OperationError("Unexpected response from resolver thread: %s" % done_uri) - def wait(self, max_wait_secs=6*3600, poll_secs=2): + def wait(self, max_wait_secs=6 * 3600, poll_secs=2): if len(self._pooled) > 0: waited_secs = 0 self._pool.close() @@ -98,7 +98,7 @@ def wait(self, max_wait_secs=6*3600, poll_secs=2): if waited_secs < max_wait_secs: waited_secs += poll_secs else: - raise OperationError("Waited more than %i seconds for Oplog resolver! I will assume there is a problem and exit") + raise OperationError("Waited more than %i seconds for Oplog resolver! I will assume there is a problem and exit") self._pool.terminate() logging.debug("Stopped all oplog resolver threads") self.stopped = True @@ -109,14 +109,13 @@ def run(self): logging.info("Resolving oplogs (options: threads=%s, compression=%s)" % (self.threads(), self.compression())) self.timer.start(self.timer_name) self.running = True - + for shard in self.backup_oplogs: backup_oplog = self.backup_oplogs[shard] self.resolver_state[shard] = OplogState(self.manager, None, backup_oplog['file']) uri = MongoUri(backup_oplog['uri']).get() if shard in self.tailed_oplogs: tailed_oplog = self.tailed_oplogs[shard] - tailed_oplog_file = tailed_oplog['file'] if backup_oplog['last_ts'] is None and tailed_oplog['last_ts'] is None: logging.info("No oplog changes to resolve for %s" % uri) elif backup_oplog['last_ts'] > tailed_oplog['last_ts']: diff --git a/mongodb_consistent_backup/Oplog/Resolver/__init__.py b/mongodb_consistent_backup/Oplog/Resolver/__init__.py index 94b0a2ad..34b784d4 100644 --- a/mongodb_consistent_backup/Oplog/Resolver/__init__.py +++ b/mongodb_consistent_backup/Oplog/Resolver/__init__.py @@ -1,2 +1 @@ -from Resolver import Resolver - +from Resolver import Resolver # NOQA diff --git a/mongodb_consistent_backup/Oplog/Tailer/TailThread.py b/mongodb_consistent_backup/Oplog/Tailer/TailThread.py index bac5e22e..d97deb99 100644 --- a/mongodb_consistent_backup/Oplog/Tailer/TailThread.py +++ b/mongodb_consistent_backup/Oplog/Tailer/TailThread.py @@ -95,7 +95,7 @@ def run(self): try: logging.info("Tailing oplog on %s for changes" % self.uri) self.timer.start(self.timer_name) - + self.state.set('running', True) self.connect() oplog = self.oplog() @@ -109,11 +109,11 @@ def run(self): if self.last_ts and self.last_ts >= doc['ts']: continue oplog.add(doc) - + # update states self.count += 1 self.last_ts = doc['ts'] - if self.first_ts == None: + if self.first_ts is None: self.first_ts = self.last_ts update = { 'count': self.count, @@ -121,7 +121,7 @@ def run(self): 'last_ts': self.last_ts } self.state.set(None, update, True) - + # print status report every N seconds self.status() except NotMasterError: @@ -154,7 +154,7 @@ def run(self): logging.error("Tailer %s encountered an unexpected error: %s" % (self.uri, e)) self.exit_code = 1 self.backup_stop.set() - raise e + raise e finally: if self._cursor: logging.debug("Stopping oplog cursor on %s" % self.uri) @@ -171,5 +171,5 @@ def run(self): log_msg_extra = "%s, end ts: %s" % (log_msg_extra, self.last_ts) logging.info("Done tailing oplog on %s, %s" % (self.uri, log_msg_extra)) self.state.set('completed', True) - + sys.exit(self.exit_code) diff --git a/mongodb_consistent_backup/Oplog/Tailer/Tailer.py b/mongodb_consistent_backup/Oplog/Tailer/Tailer.py index 681aed48..80219858 100644 --- a/mongodb_consistent_backup/Oplog/Tailer/Tailer.py +++ b/mongodb_consistent_backup/Oplog/Tailer/Tailer.py @@ -1,13 +1,12 @@ -import bson import os import logging from bson.timestamp import Timestamp -from multiprocessing import Event, Manager +from multiprocessing import Event from time import time, sleep from TailThread import TailThread -from mongodb_consistent_backup.Common import parse_method, DB, MongoUri +from mongodb_consistent_backup.Common import MongoUri from mongodb_consistent_backup.Errors import OperationError from mongodb_consistent_backup.Oplog import OplogState from mongodb_consistent_backup.Pipeline import Task @@ -88,7 +87,6 @@ def stop(self, kill=False, sleep_secs=3): for shard in self.shards: replset = self.replsets[shard] state = self.shards[shard]['state'] - stop = self.shards[shard]['stop'] thread = self.shards[shard]['thread'] try: @@ -103,7 +101,7 @@ def stop(self, kill=False, sleep_secs=3): except: logging.warning("Could not get current optime from PRIMARY! Using now as a stop time") timestamp = Timestamp(int(time()), 0) - + # wait for replication to get in sync while state.get('last_ts') and state.get('last_ts') < timestamp: logging.info('Waiting for %s tailer to reach ts: %s, currrent: %s' % (uri, timestamp, state.get('last_ts'))) diff --git a/mongodb_consistent_backup/Oplog/Tailer/__init__.py b/mongodb_consistent_backup/Oplog/Tailer/__init__.py index 28cdcb23..47b22591 100644 --- a/mongodb_consistent_backup/Oplog/Tailer/__init__.py +++ b/mongodb_consistent_backup/Oplog/Tailer/__init__.py @@ -1,2 +1 @@ -from Tailer import Tailer - +from Tailer import Tailer # NOQA diff --git a/mongodb_consistent_backup/Oplog/__init__.py b/mongodb_consistent_backup/Oplog/__init__.py index d4b2b94e..18ebfd79 100644 --- a/mongodb_consistent_backup/Oplog/__init__.py +++ b/mongodb_consistent_backup/Oplog/__init__.py @@ -1,11 +1,11 @@ -from Oplog import Oplog -from OplogState import OplogState -from Resolver import Resolver -from Tailer import Tailer +from Oplog import Oplog # NOQA +from OplogState import OplogState # NOQA +from Resolver import Resolver # NOQA +from Tailer import Tailer # NOQA def config(parser): - parser.add_argument("--oplog.compression", dest="oplog.compression", help="Compression method to use on captured oplog file (default: none)", choices=["none","gzip"], default="none") + parser.add_argument("--oplog.compression", dest="oplog.compression", help="Compression method to use on captured oplog file (default: none)", choices=["none", "gzip"], default="none") parser.add_argument("--oplog.flush.max_docs", dest="oplog.flush.max_docs", help="Maximum number of oplog document writes to trigger a flush of the backup oplog file (default: 1000)", default=1000, type=int) parser.add_argument("--oplog.flush.max_secs", dest="oplog.flush.max_secs", help="Number of seconds to wait to flush the backup oplog file, if 'max_docs' is not reached (default: 1)", default=1, type=int) parser.add_argument("--oplog.resolver.threads", dest="oplog.resolver.threads", help="Number of threads to use during resolver step (default: 1-per-CPU)", default=0, type=int) diff --git a/mongodb_consistent_backup/Pipeline/PoolThread.py b/mongodb_consistent_backup/Pipeline/PoolThread.py index af95e70c..7d3c4d71 100644 --- a/mongodb_consistent_backup/Pipeline/PoolThread.py +++ b/mongodb_consistent_backup/Pipeline/PoolThread.py @@ -1,5 +1,3 @@ -import logging - from mongodb_consistent_backup.Errors import Error diff --git a/mongodb_consistent_backup/Pipeline/Stage.py b/mongodb_consistent_backup/Pipeline/Stage.py index eda46ba6..4fa7e334 100644 --- a/mongodb_consistent_backup/Pipeline/Stage.py +++ b/mongodb_consistent_backup/Pipeline/Stage.py @@ -1,7 +1,6 @@ import logging import sys -from mongodb_consistent_backup.Common import config_to_string, parse_method from mongodb_consistent_backup.Errors import Error, OperationError from Task import Task diff --git a/mongodb_consistent_backup/Pipeline/__init__.py b/mongodb_consistent_backup/Pipeline/__init__.py index 9ac1da66..93b4dc34 100644 --- a/mongodb_consistent_backup/Pipeline/__init__.py +++ b/mongodb_consistent_backup/Pipeline/__init__.py @@ -1,3 +1,3 @@ -from PoolThread import PoolThread -from Stage import Stage -from Task import Task +from PoolThread import PoolThread # NOQA +from Stage import Stage # NOQA +from Task import Task # NOQA diff --git a/mongodb_consistent_backup/Replication/Replset.py b/mongodb_consistent_backup/Replication/Replset.py index 747252fe..72e1689b 100644 --- a/mongodb_consistent_backup/Replication/Replset.py +++ b/mongodb_consistent_backup/Replication/Replset.py @@ -1,11 +1,12 @@ import logging +import pymongo.errors -from bson.timestamp import Timestamp from math import ceil -from time import mktime, time +from time import mktime from mongodb_consistent_backup.Common import DB, MongoUri -from mongodb_consistent_backup.Errors import OperationError +from mongodb_consistent_backup.Errors import Error, OperationError + class Replset: def __init__(self, config, db): @@ -104,8 +105,9 @@ def get_repl_op_lag(self, rs_status, rs_member): return op_lag def get_repl_lag(self, rs_member): - rs_status = self.get_rs_status(False, True) - rs_primary = self.find_primary(False, True) + rs_status = self.get_rs_status(False, True) + self.find_primary(False, True) + member_optime_ts = rs_member['optime'] primary_optime_ts = self.primary_optime(False, True) if isinstance(rs_member['optime'], dict) and 'ts' in rs_member['optime']: @@ -120,7 +122,7 @@ def get_electable_members(self, force=False): electable = [] rs_config = self.get_rs_config(force, True) for member in rs_config['members']: - if 'arbiterOnly' in member and member['arbiterOnly'] == True: + if 'arbiterOnly' in member and member['arbiterOnly'] is True: continue elif 'priority' in member and member['priority'] == 0: continue @@ -139,33 +141,35 @@ def is_member_electable(self, member): def find_primary(self, force=False, quiet=False): if force or not self.primary: - rs_status = self.get_rs_status(force, quiet) - rs_name = rs_status['set'] - for member in rs_status['members']: - if member['stateStr'] == 'PRIMARY' and member['health'] > 0: - member_uri = MongoUri(member['name'], 27017, rs_name) - optime_ts = member['optime'] - if isinstance(member['optime'], dict) and 'ts' in member['optime']: - optime_ts = member['optime']['ts'] - if quiet == False or not self.primary: - logging.info("Found PRIMARY: %s with optime %s" % ( - member_uri, - str(optime_ts) - )) - self.primary = { - 'uri': member_uri, - 'optime': optime_ts - } - self.replset_summary['primary'] = { "member": member, "uri": member_uri.str() } - if self.primary is None: - logging.error("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name) - raise OperationError("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name) + rs_status = self.get_rs_status(force, quiet) + rs_name = rs_status['set'] + for member in rs_status['members']: + if member['stateStr'] == 'PRIMARY' and member['health'] > 0: + member_uri = MongoUri(member['name'], 27017, rs_name) + optime_ts = member['optime'] + if isinstance(member['optime'], dict) and 'ts' in member['optime']: + optime_ts = member['optime']['ts'] + if quiet is False or not self.primary: + logging.info("Found PRIMARY: %s with optime %s" % ( + member_uri, + str(optime_ts) + )) + self.primary = { + 'uri': member_uri, + 'optime': optime_ts + } + self.replset_summary['primary'] = {"member": member, "uri": member_uri.str()} + if self.primary is None: + logging.error("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name) + raise OperationError("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name) return self.primary def find_secondary(self, force=False, quiet=False): rs_status = self.get_rs_status(force, quiet) - rs_config = self.get_rs_config(force, quiet) - db_config = self.get_mongo_config(force, quiet) + + self.get_rs_config(force, quiet) + self.get_mongo_config(force, quiet) + quorum = self.get_rs_quorum() rs_name = rs_status['set'] @@ -198,11 +202,11 @@ def find_secondary(self, force=False, quiet=False): if member_config['priority'] > 1: score -= priority - 1 elif member_config['priority'] == 0: - score += (score * self.pri0_weight) + score += (score * self.pri0_weight) if priority < self.min_priority or priority > self.max_priority: logging.info("Found SECONDARY %s with out-of-bounds priority! Skipping" % member_uri) continue - elif self.hidden_only and not 'hidden' in log_data: + elif self.hidden_only and 'hidden' not in log_data: logging.info("Found SECONDARY %s that is non-hidden and hidden-only mode is enabled! Skipping" % member_uri) continue @@ -227,7 +231,7 @@ def find_secondary(self, force=False, quiet=False): log_data['optime'] = optime_ts log_data['score'] = int(score) logging.info("%s: %s" % (log_msg, str(log_data))) - self.replset_summary['secondary'] = { "member": member, "uri": member_uri.str(), "data": log_data } + self.replset_summary['secondary'] = {"member": member, "uri": member_uri.str(), "data": log_data} if self.secondary is None or electable_count < quorum: logging.error("Not enough valid secondaries in replset %s to take backup! Num replset electable members: %i, required quorum: %i" % ( rs_name, diff --git a/mongodb_consistent_backup/Replication/ReplsetSharded.py b/mongodb_consistent_backup/Replication/ReplsetSharded.py index 798c9a83..7143f521 100644 --- a/mongodb_consistent_backup/Replication/ReplsetSharded.py +++ b/mongodb_consistent_backup/Replication/ReplsetSharded.py @@ -1,7 +1,5 @@ -import logging - from mongodb_consistent_backup.Common import DB, MongoUri -from mongodb_consistent_backup.Errors import DBConnectionError, Error +from mongodb_consistent_backup.Errors import Error from mongodb_consistent_backup.Sharding import Sharding from Replset import Replset @@ -13,7 +11,7 @@ def __init__(self, config, sharding, db): self.db = db self.max_lag_secs = self.config.replication.max_lag_secs - self.replsets = {} + self.replsets = {} self.replset_conns = {} # Check Sharding class: @@ -35,14 +33,14 @@ def summary(self): return summary def get_replset_connection(self, uri, force=False): - if force or not uri.replset in self.replset_conns: + if force or uri.replset not in self.replset_conns: self.replset_conns[uri.replset] = DB(uri, self.config, True) return self.replset_conns[uri.replset] def get_replsets(self, force=False): for shard in self.sharding.shards(): shard_uri = MongoUri(shard['host']) - if force or not shard_uri.replset in self.replsets: + if force or shard_uri.replset not in self.replsets: rs_db = self.get_replset_connection(shard_uri) self.replsets[shard_uri.replset] = Replset(self.config, rs_db) diff --git a/mongodb_consistent_backup/Replication/__init__.py b/mongodb_consistent_backup/Replication/__init__.py index 64535d65..e1559a31 100644 --- a/mongodb_consistent_backup/Replication/__init__.py +++ b/mongodb_consistent_backup/Replication/__init__.py @@ -1,5 +1,5 @@ -from Replset import Replset -from ReplsetSharded import ReplsetSharded +from Replset import Replset # NOQA +from ReplsetSharded import ReplsetSharded # NOQA def config(parser): @@ -8,5 +8,5 @@ def config(parser): parser.add_argument("--replication.max_priority", dest="replication.max_priority", help="Max priority of secondary members for backup (default: 1000)", default=1000, type=int) parser.add_argument("--replication.hidden_only", dest="replication.hidden_only", help="Only use hidden secondary members for backup (default: false)", default=False, action="store_true") # todo: add tag-specific backup option - #parser.add_argument("-replication.use_tag", dest="replication.use_tag", help="Only use secondary members with tag for backup", type=str) + # parser.add_argument("-replication.use_tag", dest="replication.use_tag", help="Only use secondary members with tag for backup", type=str) return parser diff --git a/mongodb_consistent_backup/Sharding.py b/mongodb_consistent_backup/Sharding.py index 5e4ca6da..ac60f117 100644 --- a/mongodb_consistent_backup/Sharding.py +++ b/mongodb_consistent_backup/Sharding.py @@ -3,8 +3,8 @@ from pymongo import DESCENDING from time import sleep -from mongodb_consistent_backup.Common import DB, MongoUri, validate_hostname -from mongodb_consistent_backup.Errors import DBOperationError, Error, OperationError +from mongodb_consistent_backup.Common import DB, MongoUri +from mongodb_consistent_backup.Errors import DBConnectionError, DBOperationError, Error, OperationError from mongodb_consistent_backup.Replication import Replset @@ -59,7 +59,7 @@ def get_mongos(self, force=False): self.mongos_db = DB(mongos_uri, self.config, False, 'nearest') logging.info("Connected to cluster mongos: %s" % mongos_uri) return self.mongos_db - except DBConnectionFailure, e: + except DBConnectionError: logging.debug("Failed to connect to mongos: %s, trying next available mongos" % mongos_uri) raise OperationError('Could not connect to any mongos!') @@ -107,9 +107,9 @@ def get_balancer_state(self): config = self.connection['config'] state = config['settings'].find_one({'_id': 'balancer'}) if not state: - return True + return True elif 'stopped' in state and state.get('stopped') is True: - return False + return False return True except Exception, e: raise DBOperationError(e) @@ -159,8 +159,8 @@ def stop_balancer(self): self.timer.stop(self.timer_name) logging.info("Balancer stopped after %.2f seconds" % self.timer.duration(self.timer_name)) return - logging.fatal("Could not stop balancer %s: %s!" % (self.db.uri, e)) - raise DBOperationError("Could not stop balancer %s: %s" % (self.db.uri, e)) + logging.fatal("Could not stop balancer %s!" % self.db.uri) + raise DBOperationError("Could not stop balancer %s" % self.db.uri) def get_configdb_hosts(self): try: @@ -192,9 +192,9 @@ def get_config_server(self, force=False): else: self.config_db = DB(configdb_uri, self.config, True) if self.config_db.is_replset(): - self.config_server = Replset(self.config, self.config_db) + self.config_server = Replset(self.config, self.config_db) else: - self.config_server = { 'host': configdb_uri.hosts() } + self.config_server = {'host': configdb_uri.hosts()} self.config_db.close() except Exception, e: logging.fatal("Unable to locate config servers using %s: %s!" % (self.db.uri, e)) diff --git a/mongodb_consistent_backup/State.py b/mongodb_consistent_backup/State.py index 05d0223f..7a21bd9b 100644 --- a/mongodb_consistent_backup/State.py +++ b/mongodb_consistent_backup/State.py @@ -35,19 +35,19 @@ def __init__(self, base_dir, config, filename="meta.bson", state_version=1, meta def merge(self, new, old): merged = old.copy() merged.update(new) - return merged + return merged def load(self, load_one=False): - f = None - try: + f = None + try: f = open(self.state_file, "r") data = decode_all(f.read()) if load_one and len(data) > 0: return data[0] return data - except Exception, e: + except Exception, e: raise e - finally: + finally: if f: f.close() @@ -57,7 +57,7 @@ def write(self, do_merge=False): self.lock.acquire() if do_merge and os.path.isfile(self.state_file): curr = self.load(True) - data = self.merge(self.state, curr) + self.merge(self.state, curr) f = open(self.state_file, 'w+') logging.debug("Writing %s state file: %s" % (self.__class__.__name__, self.state_file)) self.state['updated_at'] = int(time()) @@ -67,7 +67,7 @@ def write(self, do_merge=False): f.close() self.lock.release() - + class StateBaseReplset(StateBase): def __init__(self, base_dir, config, backup_time, set_name, filename): StateBase.__init__(self, base_dir, config, filename) @@ -120,7 +120,7 @@ def init(self): def set(self, name, summary): self.state[name] = summary - self.write(True) + self.write(True) class StateRoot(StateBase): @@ -152,9 +152,9 @@ def load_backups(self): continue logging.info("Found %i existing completed backups for set" % len(backups)) return backups - + + class StateDoneStamp(StateBase): def __init__(self, base_dir, config): StateBase.__init__(self, base_dir, config, "done.bson") self.state = {'done': True} - diff --git a/mongodb_consistent_backup/Upload/Gs/Gs.py b/mongodb_consistent_backup/Upload/Gs/Gs.py index a1a87523..1d3cdd94 100755 --- a/mongodb_consistent_backup/Upload/Gs/Gs.py +++ b/mongodb_consistent_backup/Upload/Gs/Gs.py @@ -8,7 +8,7 @@ from mongodb_consistent_backup.Errors import OperationError from mongodb_consistent_backup.Pipeline import Task -from mongodb_consistent_backup.Upload.Gs.GsUploadThread import GsUploadThread +from GsUploadThread import GsUploadThread # Allows pooled .apply_async()s to work on Class-methods: @@ -17,6 +17,8 @@ def _reduce_method(m): return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) + + pickle(MethodType, _reduce_method) diff --git a/mongodb_consistent_backup/Upload/Gs/GsUploadThread.py b/mongodb_consistent_backup/Upload/Gs/GsUploadThread.py index ebeb44f1..ce6a5b9e 100755 --- a/mongodb_consistent_backup/Upload/Gs/GsUploadThread.py +++ b/mongodb_consistent_backup/Upload/Gs/GsUploadThread.py @@ -1,4 +1,5 @@ import boto +import hashlib import logging import os @@ -31,32 +32,32 @@ def configure(self): def get_uri(self): return boto.storage_uri(self.path, 'gs') - + def exists(self): try: self.metadata() return True except boto.exception.InvalidUriError: return False - + def metadata(self): logging.debug("Getting metadata for path: %s" % self.path) if not self._metadata: self._metadata = self.get_uri().get_key() return self._metadata - + def gs_md5hash(self): key = self.metadata() if hasattr(key, 'etag'): return key.etag.strip('"\'') - + def file_md5hash(self, blocksize=65536): md5 = hashlib.md5() with open(self.file_path, "rb") as f: for block in iter(lambda: f.read(blocksize), b""): md5.update(block) return md5.hexdigest() - + def success(self): if self.remove_uploaded and not self.file_path.startswith(os.path.join(self.backup_dir, self.meta_data_dir)): logging.debug("Removing successfully uploaded file: %s" % self.file_path) @@ -73,7 +74,7 @@ def run(self): logging.debug("Path %s checksum and local checksum differ, re-uploading" % self.path) else: logging.debug("Path %s does not exist, uploading" % self.path) - + try: f = open(self.file_path, 'r') uri = self.get_uri() @@ -81,7 +82,7 @@ def run(self): uri.new_key().set_contents_from_file(f) finally: if f: - f.close() + f.close() self.success() except Exception, e: logging.error("Uploading to Google Cloud Storage failed! Error: %s" % e) diff --git a/mongodb_consistent_backup/Upload/Gs/__init__.py b/mongodb_consistent_backup/Upload/Gs/__init__.py index 6ad604ad..7fd2f0bb 100644 --- a/mongodb_consistent_backup/Upload/Gs/__init__.py +++ b/mongodb_consistent_backup/Upload/Gs/__init__.py @@ -1,5 +1,4 @@ -from Gs import Gs -from GsUploadThread import GsUploadThread +from Gs import Gs # NOQA def config(parser): diff --git a/mongodb_consistent_backup/Upload/S3/S3.py b/mongodb_consistent_backup/Upload/S3/S3.py index 6b939f64..9cf2bec9 100644 --- a/mongodb_consistent_backup/Upload/S3/S3.py +++ b/mongodb_consistent_backup/Upload/S3/S3.py @@ -20,6 +20,8 @@ def _reduce_method(m): return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) + + pickle(MethodType, _reduce_method) @@ -38,10 +40,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): self.secure = self.config.upload.s3.secure self.retries = self.config.upload.s3.retries self.s3_acl = self.config.upload.s3.acl - - self.key_prefix = base_dir - if 'key_prefix' in self.args: - self.key_prefix = key_prefix + self.key_prefix = base_dir self._pool = None self._multipart = None @@ -105,7 +104,7 @@ def run(self): part_count = 0 for part in boto.s3.multipart.part_lister(self._multipart): - part_count += 1 + part_count += 1 if part_count == chunk_count: self._multipart.complete_upload() key = self.bucket.get_key(key_name) diff --git a/mongodb_consistent_backup/Upload/S3/S3Session.py b/mongodb_consistent_backup/Upload/S3/S3Session.py index aa24d0c9..ecb30160 100644 --- a/mongodb_consistent_backup/Upload/S3/S3Session.py +++ b/mongodb_consistent_backup/Upload/S3/S3Session.py @@ -6,6 +6,7 @@ from mongodb_consistent_backup.Errors import OperationError + class S3Session: def __init__(self, region, access_key, secret_key, bucket_name, secure=True, num_retries=5, socket_timeout=15): self.region = region diff --git a/mongodb_consistent_backup/Upload/S3/__init__.py b/mongodb_consistent_backup/Upload/S3/__init__.py index 123bf161..ae5f1e4c 100644 --- a/mongodb_consistent_backup/Upload/S3/__init__.py +++ b/mongodb_consistent_backup/Upload/S3/__init__.py @@ -1,4 +1,5 @@ -from S3 import S3 +from S3 import S3 # NOQA + def config(parser): parser.add_argument("--upload.s3.region", dest="upload.s3.region", help="S3 Uploader AWS region to connect to (default: us-east-1)", default="us-east-1", type=str) diff --git a/mongodb_consistent_backup/Upload/Upload.py b/mongodb_consistent_backup/Upload/Upload.py index e9fd22e2..a77c47b9 100644 --- a/mongodb_consistent_backup/Upload/Upload.py +++ b/mongodb_consistent_backup/Upload/Upload.py @@ -1,5 +1,5 @@ -from mongodb_consistent_backup.Upload.Gs import Gs -from mongodb_consistent_backup.Upload.S3 import S3 +from mongodb_consistent_backup.Upload.Gs import Gs # NOQA +from mongodb_consistent_backup.Upload.S3 import S3 # NOQA from mongodb_consistent_backup.Pipeline import Stage diff --git a/mongodb_consistent_backup/Upload/__init__.py b/mongodb_consistent_backup/Upload/__init__.py index fa10dfa6..4f5412a7 100644 --- a/mongodb_consistent_backup/Upload/__init__.py +++ b/mongodb_consistent_backup/Upload/__init__.py @@ -1,7 +1,7 @@ -from Upload import Upload +from Upload import Upload # NOQA def config(parser): parser.add_argument("--upload.method", dest="upload.method", help="Uploader method (default: none)", default='none', choices=['gs', 's3', 'none']) - parser.add_argument("--upload.remove_uploaded", dest="upload.remove_uploaded",help="Remove source files after successful upload (default: false)", default=False, action="store_true") + parser.add_argument("--upload.remove_uploaded", dest="upload.remove_uploaded", help="Remove source files after successful upload (default: false)", default=False, action="store_true") return parser diff --git a/mongodb_consistent_backup/__init__.py b/mongodb_consistent_backup/__init__.py index e66980b0..fd838dd8 100644 --- a/mongodb_consistent_backup/__init__.py +++ b/mongodb_consistent_backup/__init__.py @@ -8,6 +8,7 @@ git_commit = 'GIT_COMMIT_HASH' prog_name = 'mongodb-consistent-backup' + # noinspection PyUnusedLocal def run(): try: From b828b65be3feb127b2f005b076102184db815fbb Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 22 Aug 2017 18:04:39 +0200 Subject: [PATCH 2/3] Add 'make flake8' step and docs on code quality --- Makefile | 4 ++++ README.rst | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/Makefile b/Makefile index 1da9e0f1..7192e0fb 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,10 @@ install: bin/mongodb-consistent-backup install -m 0644 LICENSE $(SHAREDIR)/$(NAME)/LICENSE install -m 0644 README.rst $(SHAREDIR)/$(NAME)/README.rst +flake8: + # Ignore long-lines and space-aligned = and : for now + flake8 --ignore E221,E241,E501 $(PWD)/$(NAME) + uninstall: rm -f $(BINDIR)/mongodb-consistent-backup rm -rf $(SHAREDIR)/$(NAME) diff --git a/README.rst b/README.rst index 78eb5aab..a6d85b0e 100644 --- a/README.rst +++ b/README.rst @@ -218,6 +218,12 @@ Roadmap - Documentation for running under Docker with persistent volumes - Python unit tests +Submitting Code +~~~~~~~~~~~~~~~ + +- Submitted code must pass Python `'flake8' `__ checks. Run *'make flake8'* to test. +- To make review easier pull requests must address and solve a one problem at a time. + Links ~~~~~ From 56133184113a1c2b777a3d4e19b2bc1e4f9c9367 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 22 Aug 2017 18:26:42 +0200 Subject: [PATCH 3/3] typo --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index a6d85b0e..44abe01f 100644 --- a/README.rst +++ b/README.rst @@ -222,7 +222,7 @@ Submitting Code ~~~~~~~~~~~~~~~ - Submitted code must pass Python `'flake8' `__ checks. Run *'make flake8'* to test. -- To make review easier pull requests must address and solve a one problem at a time. +- To make review easier, pull requests must address and solve one problem at a time. Links ~~~~~