diff --git a/docs/internals/repositories.rst b/docs/internals/repositories.rst new file mode 100644 index 00000000000..7911bc9f232 --- /dev/null +++ b/docs/internals/repositories.rst @@ -0,0 +1,32 @@ +All About Repositories: Handling Protocols +========================================== + +A repository is where borg keeps all its backup data. It abstracts the details of +repositories in order to support multiple remote locations behind a similar interface. + +The top-level abstraction is the Repository class which then loads the +appropriate specific repository class for the location specified by the user. + +For example, a ``file://`` location will end up loading a ``LocalRepository`` while +an ``ssh://`` location will end up loading a ``RemoteRepository`` (which communicates +with a remote borg instance over ssh). + +Adding A New Repository Backend +------------------------------- + +You can see most of what needs to be done by looking at the main ``Repository`` +class in ``repository.py``. Every call it gets, it proxies to a subclass that +does the real work. That is what you'll write. + +A few of the methods are optional and can return ``None`` or do nothing: + +- ``get_free_nonce`` +- ``commit_nonce_reservation`` +- ``config`` (if remote) +- ``save_config()`` (if remote) + +Write your new repository class in a file in the ``repositories`` directory. + +After writing your new class, add support for it in the ``Repository.__init__`` +method, which inspects a location's protocol and instantiates the appropriate +backend. diff --git a/src/borg/archive.py b/src/borg/archive.py index 212fd3bfa93..c45e066084f 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -43,8 +43,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand from .item import Item, ArchiveItem, ItemDiff from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname -from .remote import cache_if_remote -from .repository import Repository, LIST_SCAN_LIMIT +from .repository import Repository, cache_if_remote has_lchmod = hasattr(os, 'lchmod') diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 8fb6439d55e..2673b036bf4 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -76,8 +76,7 @@ from .patterns import PatternMatcher from .item import Item from .platform import get_flags, get_process_id, SyncFile - from .remote import RepositoryServer, RemoteRepository, cache_if_remote - from .repository import Repository, LIST_SCAN_LIMIT, TAG_PUT, TAG_DELETE, TAG_COMMIT + from .repository import Repository, TAG_PUT, TAG_DELETE, TAG_COMMIT, cache_if_remote from .selftest import selftest from .upgrader import AtticRepositoryUpgrader, BorgRepositoryUpgrader except BaseException: @@ -106,7 +105,7 @@ def argument(args, str_or_bool): def with_repository(fake=False, invert_fake=False, create=False, lock=True, exclusive=False, manifest=True, cache=False, secure=True, - compatibility=None): + compatibility=None, local_only=False): """ Method decorator for subcommand-handling methods: do_XYZ(self, args, repository, …) @@ -136,19 +135,17 @@ def decorator(method): @functools.wraps(method) def wrapper(self, args, **kwargs): location = args.location # note: 'location' must be always present in args - append_only = getattr(args, 'append_only', False) - storage_quota = getattr(args, 'storage_quota', None) - make_parent_dirs = getattr(args, 'make_parent_dirs', False) if argument(args, fake) ^ invert_fake: return method(self, args, repository=None, **kwargs) - elif location.proto == 'ssh': - repository = RemoteRepository(location, create=create, exclusive=argument(args, exclusive), - lock_wait=self.lock_wait, lock=lock, append_only=append_only, - make_parent_dirs=make_parent_dirs, args=args) - else: - repository = Repository(location.path, create=create, exclusive=argument(args, exclusive), - lock_wait=self.lock_wait, lock=lock, append_only=append_only, - storage_quota=storage_quota, make_parent_dirs=make_parent_dirs) + + repository = Repository(location, create=create, + exclusive=argument(args, exclusive), + lock_wait=self.lock_wait, lock=lock, + args=args) + + if local_only and repository.remote: + raise argparse.ArgumentTypeError('"%s": Repository must be local' % location.canonical_path()) + with repository: if manifest or cache: kwargs['manifest'], kwargs['key'] = Manifest.load(repository, compatibility) @@ -236,6 +233,7 @@ def build_matcher(inclexcl_patterns, include_paths): def do_serve(self, args): """Start in server mode. This command is usually not used manually.""" + from .repositories.remote import RepositoryServer RepositoryServer( restrict_to_paths=args.restrict_to_paths, restrict_to_repositories=args.restrict_to_repositories, @@ -1606,7 +1604,7 @@ def do_compact(self, args, repository): repository.commit(compact=True, cleanup_commits=args.cleanup_commits) return EXIT_SUCCESS - @with_repository(exclusive=True, manifest=False) + @with_repository(exclusive=True, manifest=False, local_only=True) def do_config(self, args, repository): """get, set, and delete values in a repository or cache config file""" @@ -1657,8 +1655,8 @@ def list_config(config): 'segments_per_dir': str(DEFAULT_SEGMENTS_PER_DIR), 'max_segment_size': str(MAX_SEGMENT_SIZE_LIMIT), 'additional_free_space': '0', - 'storage_quota': repository.storage_quota, - 'append_only': repository.append_only + 'storage_quota': '0', + 'append_only': 'False' } print('[repository]') for key in ['version', 'segments_per_dir', 'max_segment_size', @@ -1695,7 +1693,7 @@ def list_config(config): validate = cache_validate else: config = repository.config - save = lambda: repository.save_config(repository.path, repository.config) # noqa + save = lambda: repository.save_config() # noqa validate = repo_validate if args.delete: @@ -2842,7 +2840,7 @@ def define_archive_filters_group(subparser, *, sort_by=True, first_last=True): help='list the configuration of the repo') subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='', - type=location_validator(archive=False, proto='file'), + type=location_validator(archive=False), help='repository to configure') subparser.add_argument('name', metavar='NAME', nargs='?', help='name of config key') @@ -4432,21 +4430,10 @@ def main(): # pragma: no cover exit_code = archiver.run(args) except Error as e: msg = e.get_message() - msgid = type(e).__qualname__ + msgid = e.get_msgid() tb_log_level = logging.ERROR if e.traceback else logging.DEBUG - tb = "%s\n%s" % (traceback.format_exc(), sysinfo()) + tb = "%s\n%s" % (e.format_exc(), sysinfo()) exit_code = e.exit_code - except RemoteRepository.RPCError as e: - important = e.exception_class not in ('LockTimeout', ) and e.traceback - msgid = e.exception_class - tb_log_level = logging.ERROR if important else logging.DEBUG - if important: - msg = e.exception_full - else: - msg = e.get_message() - tb = '\n'.join('Borg server: ' + l for l in e.sysinfo.splitlines()) - tb += "\n" + sysinfo() - exit_code = EXIT_ERROR except Exception: msg = 'Local Exception' msgid = 'Exception' diff --git a/src/borg/cache.py b/src/borg/cache.py index de050aa4d30..a65be271ec8 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -10,7 +10,7 @@ logger = create_logger() -from .constants import CACHE_README, DEFAULT_FILES_CACHE_MODE +from .constants import CACHE_README, DEFAULT_FILES_CACHE_MODE, LIST_SCAN_LIMIT from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer from .helpers import Location from .helpers import Error @@ -30,8 +30,7 @@ from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError from .locking import Lock from .platform import SaveFile -from .remote import cache_if_remote -from .repository import LIST_SCAN_LIMIT +from .repository import cache_if_remote # note: cmtime might me either a ctime or a mtime timestamp FileCacheEntry = namedtuple('FileCacheEntry', 'age inode size cmtime chunk_ids') diff --git a/src/borg/crypto/key.py b/src/borg/crypto/key.py index 7bc87fbd151..ee3271fee3c 100644 --- a/src/borg/crypto/key.py +++ b/src/borg/crypto/key.py @@ -231,7 +231,7 @@ def unpack_and_verify_manifest(self, data, force_tam_not_required=False): unpacked = unpacker.unpack() if b'tam' not in unpacked: if tam_required: - raise TAMRequiredError(self.repository._location.canonical_path()) + raise TAMRequiredError(self.repository.location.canonical_path()) else: logger.debug('TAM not found and not required') return unpacked, False @@ -549,7 +549,7 @@ def create(cls, repository, args): @classmethod def detect(cls, repository, manifest_data): - prompt = 'Enter passphrase for %s: ' % repository._location.canonical_path() + prompt = 'Enter passphrase for %s: ' % repository.location.canonical_path() key = cls(repository) passphrase = Passphrase.env_passphrase() if passphrase is None: @@ -706,9 +706,9 @@ def sanity_check(self, filename, id): # we do the magic / id check in binary mode to avoid stumbling over # decoding errors if somebody has binary files in the keys dir for some reason. if fd.read(len(file_id)) != file_id: - raise KeyfileInvalidError(self.repository._location.canonical_path(), filename) + raise KeyfileInvalidError(self.repository.location.canonical_path(), filename) if fd.read(len(repo_id)) != repo_id: - raise KeyfileMismatchError(self.repository._location.canonical_path(), filename) + raise KeyfileMismatchError(self.repository.location.canonical_path(), filename) return filename def find_key(self): @@ -723,7 +723,7 @@ def find_key(self): return self.sanity_check(filename, id) except (KeyfileInvalidError, KeyfileMismatchError): pass - raise KeyfileNotFoundError(self.repository._location.canonical_path(), get_keys_dir()) + raise KeyfileNotFoundError(self.repository.location.canonical_path(), get_keys_dir()) def get_new_target(self, args): keyfile = os.environ.get('BORG_KEY_FILE') @@ -761,7 +761,7 @@ class RepoKey(ID_HMAC_SHA_256, KeyfileKeyBase): STORAGE = KeyBlobStorage.REPO def find_key(self): - loc = self.repository._location.canonical_path() + loc = self.repository.location.canonical_path() try: self.repository.load_key() return loc diff --git a/src/borg/crypto/nonces.py b/src/borg/crypto/nonces.py index fe5abc547c3..8d5a4a84605 100644 --- a/src/borg/crypto/nonces.py +++ b/src/borg/crypto/nonces.py @@ -5,7 +5,6 @@ from ..helpers import get_security_dir from ..helpers import bin_to_hex from ..platform import SaveFile -from ..remote import InvalidRPCMethod from .low_level import bytes_to_long, long_to_bytes @@ -34,14 +33,7 @@ def commit_local_nonce_reservation(self, next_unreserved, start_nonce): fd.write(bin_to_hex(long_to_bytes(next_unreserved))) def get_repo_free_nonce(self): - try: - return self.repository.get_free_nonce() - except InvalidRPCMethod: - # old server version, suppress further calls - sys.stderr.write("Please upgrade to borg version 1.1+ on the server for safer AES-CTR nonce handling.\n") - self.get_repo_free_nonce = lambda: None - self.commit_repo_nonce_reservation = lambda next_unreserved, start_nonce: None - return None + return self.repository.get_free_nonce() def commit_repo_nonce_reservation(self, next_unreserved, start_nonce): self.repository.commit_nonce_reservation(next_unreserved, start_nonce) diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 8ec61b402b0..b00ea091d7f 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -23,7 +23,6 @@ from .helpers import msgpack from .item import Item from .lrucache import LRUCache -from .remote import RemoteRepository # Does this version of llfuse support ns precision? have_fuse_xtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns') @@ -504,9 +503,8 @@ def pop_option(options, key, present, not_present, wanted_type, int_base=0): llfuse.init(self, mountpoint, options) if not foreground: old_id, new_id = daemonize() - if not isinstance(self.repository_uncached, RemoteRepository): - # local repo and the locking process' PID just changed, migrate it: - self.repository_uncached.migrate_lock(old_id, new_id) + # local repo and the locking process' PID just changed, migrate it: + self.repository_uncached.migrate_lock(old_id, new_id) # If the file system crashes, we do not want to umount because in that # case the mountpoint suddenly appears to become empty. This can have diff --git a/src/borg/helpers/errors.py b/src/borg/helpers/errors.py index 80a47a9b6bb..bc2a1170b25 100644 --- a/src/borg/helpers/errors.py +++ b/src/borg/helpers/errors.py @@ -1,3 +1,5 @@ +import traceback + from ..constants import * # NOQA import borg.crypto.low_level @@ -20,6 +22,12 @@ def __init__(self, *args): def get_message(self): return type(self).__doc__.format(*self.args) + def get_msgid(self): + return type(self).__qualname__ + + def format_exc(self): + return traceback.format_exc() + __str__ = get_message diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index c09dc868f5c..a3d1b262de2 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -477,7 +477,7 @@ def canonical_path(self): path) -def location_validator(archive=None, proto=None): +def location_validator(archive=None): def validator(text): try: loc = Location(text) @@ -487,11 +487,6 @@ def validator(text): raise argparse.ArgumentTypeError('"%s": No archive specified' % text) elif archive is False and loc.archive: raise argparse.ArgumentTypeError('"%s": No archive can be specified' % text) - if proto is not None and loc.proto != proto: - if proto == 'file': - raise argparse.ArgumentTypeError('"%s": Repository must be local' % text) - else: - raise argparse.ArgumentTypeError('"%s": Repository must be remote' % text) return loc return validator @@ -921,13 +916,12 @@ def ellipsis_truncate(msg, space): class BorgJsonEncoder(json.JSONEncoder): def default(self, o): from ..repository import Repository - from ..remote import RemoteRepository from ..archive import Archive from ..cache import LocalCache, AdHocCache - if isinstance(o, Repository) or isinstance(o, RemoteRepository): + if isinstance(o, Repository): return { - 'id': bin_to_hex(o.id), - 'location': o._location.canonical_path(), + 'id': o.id_str, + 'location': o.location.canonical_path(), } if isinstance(o, Archive): return o.info() diff --git a/src/borg/repositories/local.py b/src/borg/repositories/local.py index e53cec6d03d..5839d85027d 100644 --- a/src/borg/repositories/local.py +++ b/src/borg/repositories/local.py @@ -4,43 +4,35 @@ import shutil import struct import time -from binascii import hexlify, unhexlify +from binascii import unhexlify from collections import defaultdict from configparser import ConfigParser from datetime import datetime from functools import partial from itertools import islice -from .constants import * # NOQA -from .hashindex import NSIndex -from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size -from .helpers import Location -from .helpers import ProgressIndicatorPercent -from .helpers import bin_to_hex -from .helpers import secure_erase, truncate_and_unlink -from .helpers import Manifest -from .helpers import msgpack -from .locking import Lock, LockError, LockErrorT -from .logger import create_logger -from .lrucache import LRUCache -from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise -from .algorithms.checksums import crc32 -from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError +from ..constants import * # NOQA +from ..hashindex import NSIndex +from ..helpers import IntegrityError, format_file_size, parse_file_size +from ..helpers import ProgressIndicatorPercent +from ..helpers import bin_to_hex +from ..helpers import secure_erase, truncate_and_unlink +from ..helpers import Manifest +from ..helpers import msgpack +from ..locking import Lock, LockError, LockErrorT +from ..logger import create_logger +from ..lrucache import LRUCache +from ..platform import SaveFile, SyncFile, sync_dir, safe_fadvise +from ..repository import Repository, ATTIC_MAGIC, MAGIC, MAGIC_LEN, TAG_COMMIT, TAG_DELETE, TAG_PUT +from ..algorithms.checksums import crc32 +from ..crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError logger = create_logger(__name__) -MAGIC = b'BORG_SEG' -MAGIC_LEN = len(MAGIC) -ATTIC_MAGIC = b'ATTICSEG' -assert len(ATTIC_MAGIC) == MAGIC_LEN -TAG_PUT = 0 -TAG_DELETE = 1 -TAG_COMMIT = 2 - FreeSpace = partial(defaultdict, int) -class Repository: +class LocalRepository: """ Filesystem based transactional key value store @@ -110,49 +102,10 @@ class Repository: will still get rid of them. """ - class DoesNotExist(Error): - """Repository {} does not exist.""" - - class AlreadyExists(Error): - """A repository already exists at {}.""" - - class PathAlreadyExists(Error): - """There is already something at {}.""" - - class ParentPathDoesNotExist(Error): - """The parent path of the repo directory [{}] does not exist.""" - - class InvalidRepository(Error): - """{} is not a valid repository. Check repo config.""" - - class InvalidRepositoryConfig(Error): - """{} does not have a valid configuration. Check repo config [{}].""" - - class AtticRepository(Error): - """Attic repository detected. Please run "borg upgrade {}".""" - - class CheckNeeded(ErrorWithTraceback): - """Inconsistency detected. Please run "borg check {}".""" - - class ObjectNotFound(ErrorWithTraceback): - """Object with key {} not found in repository {}.""" - - def __init__(self, id, repo): - if isinstance(id, bytes): - id = bin_to_hex(id) - super().__init__(id, repo) - - class InsufficientFreeSpaceError(Error): - """Insufficient free space to complete transaction (required: {}, available: {}).""" - - class StorageQuotaExceeded(Error): - """The storage quota ({}) has been exceeded ({}). Try deleting some archives.""" - def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, storage_quota=None, check_segment_magic=True, make_parent_dirs=False): self.path = os.path.abspath(path) - self._location = Location('file://%s' % self.path) self.io = None # type: LoggedIO self.lock = None self.index = None @@ -235,9 +188,9 @@ def check_can_create_repository(self, path): """ if os.path.exists(path): if self.is_repository(path): - raise self.AlreadyExists(path) + raise Repository.AlreadyExists(path) if not os.path.isdir(path) or os.listdir(path): - raise self.PathAlreadyExists(path) + raise Repository.PathAlreadyExists(path) while True: # Check all parent directories for Borg's repository README @@ -248,7 +201,7 @@ def check_can_create_repository(self, path): # We reached the root of the directory hierarchy (/.. = / and C:\.. = C:\). break if self.is_repository(path): - raise self.AlreadyExists(path) + raise Repository.AlreadyExists(path) def create(self, path): """Create a new empty repository at `path` @@ -261,7 +214,7 @@ def create(self, path): try: os.mkdir(path) except FileNotFoundError as err: - raise self.ParentPathDoesNotExist(path) from err + raise Repository.ParentPathDoesNotExist(path) from err with open(os.path.join(path, 'README'), 'w') as fd: fd.write(REPOSITORY_README) os.mkdir(os.path.join(path, 'data')) @@ -279,7 +232,9 @@ def create(self, path): config.set('repository', 'id', bin_to_hex(os.urandom(32))) self.save_config(path, config) - def save_config(self, path, config): + def save_config(self, path=None, config=None): + path = path or self.path + config = config or self.config config_path = os.path.join(path, 'config') old_config_path = os.path.join(path, 'config.old') @@ -307,7 +262,7 @@ def save_key(self, keydata): assert self.config keydata = keydata.decode('utf-8') # remote repo: msgpack issue #99, getting bytes self.config.set('repository', 'key', keydata) - self.save_config(self.path, self.config) + self.save_config() def load_key(self): keydata = self.config.get('repository', 'key') @@ -362,7 +317,7 @@ def check_transaction(self): # filesystem or hardware malfunction. it means we have no identifiable # valid (committed) state of the repo which we could use. msg = '%s" - although likely this is "beyond repair' % self.path # dirty hack - raise self.CheckNeeded(msg) + raise Repository.CheckNeeded(msg) # Attempt to automatically rebuild index if we crashed between commit # tag write and index save if index_transaction_id != segments_transaction_id: @@ -387,7 +342,7 @@ def migrate_lock(self, old_id, new_id): def open(self, path, exclusive, lock_wait=None, lock=True): self.path = path if not os.path.isdir(path): - raise self.DoesNotExist(path) + raise Repository.DoesNotExist(path) if lock: self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire() else: @@ -401,11 +356,11 @@ def open(self, path, exclusive, lock_wait=None, lock=True): raise self.InvalidRepository(self.path) if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1: self.close() - raise self.InvalidRepository(path) + raise Repository.InvalidRepository(path) self.max_segment_size = self.config.getint('repository', 'max_segment_size') if self.max_segment_size >= MAX_SEGMENT_SIZE_LIMIT: self.close() - raise self.InvalidRepositoryConfig(path, 'max_segment_size >= %d' % MAX_SEGMENT_SIZE_LIMIT) # issue 3592 + raise Repository.InvalidRepositoryConfig(path, 'max_segment_size >= %d' % MAX_SEGMENT_SIZE_LIMIT) # issue 3592 self.segments_per_dir = self.config.getint('repository', 'segments_per_dir') self.additional_free_space = parse_file_size(self.config.get('repository', 'additional_free_space', fallback=0)) # append_only can be set in the constructor @@ -421,7 +376,7 @@ def open(self, path, exclusive, lock_wait=None, lock=True): segment = self.io.get_latest_segment() if segment is not None and self.io.get_segment_magic(segment) == ATTIC_MAGIC: self.close() - raise self.AtticRepository(path) + raise Repository.AtticRepository(path) def close(self): if self.lock: @@ -682,7 +637,7 @@ def check_free_space(self): self._rollback(cleanup=True) formatted_required = format_file_size(required_free_space) formatted_free = format_file_size(free_space) - raise self.InsufficientFreeSpaceError(formatted_required, formatted_free) + raise Repository.InsufficientFreeSpaceError(formatted_required, formatted_free) def log_storage_quota(self): if self.storage_quota: @@ -876,7 +831,7 @@ def _update_index(self, segment, objects, report=None): else: msg = 'Unexpected tag {} in segment {}'.format(tag, segment) if report is None: - raise self.CheckNeeded(msg) + raise Repository.CheckNeeded(msg) else: report(msg) if self.segments[segment] == 0: @@ -1125,7 +1080,7 @@ def get(self, id): segment, offset = self.index[id] return self.io.read(segment, offset, id) except KeyError: - raise self.ObjectNotFound(id, self.path) from None + raise Repository.ObjectNotFound(id, self.path) from None def get_many(self, ids, is_preloaded=False): for id_ in ids: @@ -1157,7 +1112,7 @@ def put(self, id, data, wait=True): self.segments[segment] += 1 self.index[id] = segment, offset if self.storage_quota and self.storage_quota_use > self.storage_quota: - self.transaction_doomed = self.StorageQuotaExceeded( + self.transaction_doomed = Repository.StorageQuotaExceeded( format_file_size(self.storage_quota), format_file_size(self.storage_quota_use)) raise self.transaction_doomed @@ -1172,7 +1127,7 @@ def delete(self, id, wait=True): try: segment, offset = self.index.pop(id) except KeyError: - raise self.ObjectNotFound(id, self.path) from None + raise Repository.ObjectNotFound(id, self.path) from None self.shadow_index.setdefault(id, []).append(segment) self.segments[segment] -= 1 size = self.io.read(segment, offset, id, read_data=False) diff --git a/src/borg/repositories/remote.py b/src/borg/repositories/remote.py index dce028801ad..fa9805be365 100644 --- a/src/borg/repositories/remote.py +++ b/src/borg/repositories/remote.py @@ -6,32 +6,26 @@ import os import select import shlex -import shutil -import struct import sys -import tempfile import textwrap import time import traceback from subprocess import Popen, PIPE -from . import __version__ -from .compress import LZ4 -from .constants import * # NOQA -from .helpers import Error, IntegrityError -from .helpers import bin_to_hex -from .helpers import get_base_dir -from .helpers import get_limited_unpacker -from .helpers import replace_placeholders -from .helpers import sysinfo -from .helpers import format_file_size -from .helpers import truncate_and_unlink -from .helpers import prepare_subprocess_env -from .logger import create_logger, setup_logging -from .helpers import msgpack -from .repository import Repository -from .version import parse_version, format_version -from .algorithms.checksums import xxh64 +from .. import __version__ +from ..constants import * # NOQA +from ..helpers import Error, IntegrityError +from ..helpers import get_base_dir +from ..helpers import get_limited_unpacker +from ..helpers import replace_placeholders +from ..helpers import sysinfo +from ..helpers import format_file_size +from ..helpers import prepare_subprocess_env +from ..logger import create_logger, setup_logging +from ..helpers import msgpack +from .local import LocalRepository +from ..repository import Repository +from ..version import parse_version, format_version logger = create_logger(__name__) @@ -359,11 +353,12 @@ def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, ap # while "borg init --append-only" (=append_only) does, regardless of the --append-only (self.append_only) # flag for serve. append_only = (not create and self.append_only) or append_only - self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock, - append_only=append_only, - storage_quota=self.storage_quota, - exclusive=exclusive, - make_parent_dirs=make_parent_dirs) + self.repository = LocalRepository(path, create, + lock_wait=lock_wait, lock=lock, + append_only=append_only, + storage_quota=self.storage_quota, + exclusive=exclusive, + make_parent_dirs=make_parent_dirs) self.repository.__enter__() # clean exit handled by serve() method return self.repository.id @@ -419,7 +414,7 @@ def write(self, fd, to_send): return written -def api(*, since, **kwargs_decorator): +def api(*, since, required=True, **kwargs_decorator): """Check version requirements and use self.call to do the remote method call. specifies the version in which borg introduced this method, @@ -455,7 +450,9 @@ def do_rpc(self, *args, **kwargs): named[name] = param.default if self.server_version < since: - raise self.RPCServerOutdated(f.__name__, format_version(since)) + if required: + raise self.RPCServerOutdated(f.__name__, format_version(since)) + return None for name, restriction in kwargs_decorator.items(): if restriction['since'] <= self.server_version: @@ -476,20 +473,30 @@ def do_rpc(self, *args, **kwargs): class RemoteRepository: extra_test_args = [] - class RPCError(Exception): + class RPCError(Error): def __init__(self, unpacked): + super().__init__() # for borg < 1.1: unpacked only has b'exception_class' as key # for borg 1.1+: unpacked has keys: b'exception_args', b'exception_full', b'exception_short', b'sysinfo' self.unpacked = unpacked def get_message(self): - if b'exception_short' in self.unpacked: - return b'\n'.join(self.unpacked[b'exception_short']).decode() + if self.traceback: + return self.exception_full else: - return self.exception_class + return self.exception_short + + def get_msgid(self): + return self.exception_class + + def format_exc(self): + sysinfo = self.sysinfo.splitlines() + return '\n'.join('Borg server: ' + l for l in sysinfo) @property def traceback(self): + if self.exception_class in ('LockTimeout', ): + return False return self.unpacked.get(b'exception_trace', True) @property @@ -501,7 +508,14 @@ def exception_full(self): if b'exception_full' in self.unpacked: return b'\n'.join(self.unpacked[b'exception_full']).decode() else: - return self.get_message() + '\nRemote Exception (see remote log for the traceback)' + return self.exception_short + '\nRemote Exception (see remote log for the traceback)' + + @property + def exception_short(self): + if b'exception_short' in self.unpacked: + return b'\n'.join(self.unpacked[b'exception_short']).decode() + else: + return self.exception_class @property def sysinfo(self): @@ -526,7 +540,7 @@ def required_version(self): def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, make_parent_dirs=False, args=None): - self.location = self._location = location + self.location = location self.preload_ids = [] self.msgid = 0 self.rx_bytes = 0 @@ -634,10 +648,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): format_file_size(self.tx_bytes), format_file_size(self.rx_bytes), self.msgid) self.close() - @property - def id_str(self): - return bin_to_hex(self.id) - def borg_cmd(self, args, testing): """return a borg serve command line""" # give some args/options to 'borg serve' process as they were given to us @@ -948,11 +958,11 @@ def save_key(self, keydata): def load_key(self): """actual remoting is done via self.call in the @api decorator""" - @api(since=parse_version('1.0.0')) + @api(since=parse_version('1.1.0'), required=False) def get_free_nonce(self): """actual remoting is done via self.call in the @api decorator""" - @api(since=parse_version('1.0.0')) + @api(since=parse_version('1.1.0'), required=False) def commit_nonce_reservation(self, next_unreserved, start_nonce): """actual remoting is done via self.call in the @api decorator""" @@ -960,6 +970,10 @@ def commit_nonce_reservation(self, next_unreserved, start_nonce): def break_lock(self): """actual remoting is done via self.call in the @api decorator""" + def migrate_lock(self, old_id, new_id): + # Not used for remote repositories + pass + def close(self): if self.p: self.p.stdin.close() @@ -974,6 +988,15 @@ def async_response(self, wait=True): def preload(self, ids): self.preload_ids += ids + @property + def config(self): + # Not currently supported + return None + + def save_config(self): + # Not currently supported + pass + def handle_remote_line(line): """ @@ -1047,191 +1070,3 @@ def handle_remote_line(line): # In non-JSON mode we circumvent logging to preserve carriage returns (\r) # which are generated by remote progress displays. sys.stderr.write('Remote: ' + line) - - -class RepositoryNoCache: - """A not caching Repository wrapper, passes through to repository. - - Just to have same API (including the context manager) as RepositoryCache. - - *transform* is a callable taking two arguments, key and raw repository data. - The return value is returned from get()/get_many(). By default, the raw - repository data is returned. - """ - def __init__(self, repository, transform=None): - self.repository = repository - self.transform = transform or (lambda key, data: data) - - def close(self): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - def get(self, key): - return next(self.get_many([key], cache=False)) - - def get_many(self, keys, cache=True): - for key, data in zip(keys, self.repository.get_many(keys)): - yield self.transform(key, data) - - def log_instrumentation(self): - pass - - -class RepositoryCache(RepositoryNoCache): - """ - A caching Repository wrapper. - - Caches Repository GET operations locally. - - *pack* and *unpack* complement *transform* of the base class. - *pack* receives the output of *transform* and should return bytes, - which are stored in the cache. *unpack* receives these bytes and - should return the initial data (as returned by *transform*). - """ - - def __init__(self, repository, pack=None, unpack=None, transform=None): - super().__init__(repository, transform) - self.pack = pack or (lambda data: data) - self.unpack = unpack or (lambda data: data) - self.cache = set() - self.basedir = tempfile.mkdtemp(prefix='borg-cache-') - self.query_size_limit() - self.size = 0 - # Instrumentation - self.hits = 0 - self.misses = 0 - self.slow_misses = 0 - self.slow_lat = 0.0 - self.evictions = 0 - self.enospc = 0 - - def query_size_limit(self): - stat_fs = os.statvfs(self.basedir) - available_space = stat_fs.f_bavail * stat_fs.f_frsize - self.size_limit = int(min(available_space * 0.25, 2**31)) - - def key_filename(self, key): - return os.path.join(self.basedir, bin_to_hex(key)) - - def backoff(self): - self.query_size_limit() - target_size = int(0.9 * self.size_limit) - while self.size > target_size and self.cache: - key = self.cache.pop() - file = self.key_filename(key) - self.size -= os.stat(file).st_size - os.unlink(file) - self.evictions += 1 - - def add_entry(self, key, data, cache): - transformed = self.transform(key, data) - if not cache: - return transformed - packed = self.pack(transformed) - file = self.key_filename(key) - try: - with open(file, 'wb') as fd: - fd.write(packed) - except OSError as os_error: - try: - truncate_and_unlink(file) - except FileNotFoundError: - pass # open() could have failed as well - if os_error.errno == errno.ENOSPC: - self.enospc += 1 - self.backoff() - else: - raise - else: - self.size += len(packed) - self.cache.add(key) - if self.size > self.size_limit: - self.backoff() - return transformed - - def log_instrumentation(self): - logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), ' - '%d evictions, %d ENOSPC hit', - len(self.cache), format_file_size(self.size), format_file_size(self.size_limit), - self.hits, self.misses, self.slow_misses, self.slow_lat, - self.evictions, self.enospc) - - def close(self): - self.log_instrumentation() - self.cache.clear() - shutil.rmtree(self.basedir) - - def get_many(self, keys, cache=True): - unknown_keys = [key for key in keys if key not in self.cache] - repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys)) - for key in keys: - if key in self.cache: - file = self.key_filename(key) - with open(file, 'rb') as fd: - self.hits += 1 - yield self.unpack(fd.read()) - else: - for key_, data in repository_iterator: - if key_ == key: - transformed = self.add_entry(key, data, cache) - self.misses += 1 - yield transformed - break - else: - # slow path: eviction during this get_many removed this key from the cache - t0 = time.perf_counter() - data = self.repository.get(key) - self.slow_lat += time.perf_counter() - t0 - transformed = self.add_entry(key, data, cache) - self.slow_misses += 1 - yield transformed - # Consume any pending requests - for _ in repository_iterator: - pass - - -def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False): - """ - Return a Repository(No)Cache for *repository*. - - If *decrypted_cache* is a key object, then get and get_many will return a tuple - (csize, plaintext) instead of the actual data in the repository. The cache will - store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting - and more importantly MAC and ID checking cached objects). - Internally, objects are compressed with LZ4. - """ - if decrypted_cache and (pack or unpack or transform): - raise ValueError('decrypted_cache and pack/unpack/transform are incompatible') - elif decrypted_cache: - key = decrypted_cache - # 32 bit csize, 64 bit (8 byte) xxh64 - cache_struct = struct.Struct('=I8s') - compressor = LZ4() - - def pack(data): - csize, decrypted = data - compressed = compressor.compress(decrypted) - return cache_struct.pack(csize, xxh64(compressed)) + compressed - - def unpack(data): - data = memoryview(data) - csize, checksum = cache_struct.unpack(data[:cache_struct.size]) - compressed = data[cache_struct.size:] - if checksum != xxh64(compressed): - raise IntegrityError('detected corrupted data in metadata cache') - return csize, compressor.decompress(compressed) - - def transform(id_, data): - csize = len(data) - decrypted = key.decrypt(id_, data) - return csize, decrypted - - if isinstance(repository, RemoteRepository) or force_cache: - return RepositoryCache(repository, pack, unpack, transform) - else: - return RepositoryNoCache(repository, transform) diff --git a/src/borg/repository.py b/src/borg/repository.py new file mode 100644 index 00000000000..fe745e9f882 --- /dev/null +++ b/src/borg/repository.py @@ -0,0 +1,382 @@ +import errno +import os +import shutil +import struct +import tempfile +import time + +from .algorithms.checksums import xxh64 +from .compress import LZ4 +from .helpers import Error, ErrorWithTraceback, IntegrityError, Location +from .helpers import bin_to_hex +from .helpers import format_file_size +from .helpers import truncate_and_unlink +from .logger import create_logger + +logger = create_logger(__name__) + +MAGIC = b'BORG_SEG' +MAGIC_LEN = len(MAGIC) +ATTIC_MAGIC = b'ATTICSEG' +assert len(ATTIC_MAGIC) == MAGIC_LEN + +TAG_PUT = 0 +TAG_DELETE = 1 +TAG_COMMIT = 2 + + +class Repository: + + class DoesNotExist(Error): + """Repository {} does not exist.""" + + class AlreadyExists(Error): + """A repository already exists at {}.""" + + class PathAlreadyExists(Error): + """There is already something at {}.""" + + class ParentPathDoesNotExist(Error): + """The parent path of the repo directory [{}] does not exist.""" + + class InvalidRepository(Error): + """{} is not a valid repository. Check repo config.""" + + class InvalidRepositoryConfig(Error): + """{} does not have a valid configuration. Check repo config [{}].""" + + class AtticRepository(Error): + """Attic repository detected. Please run "borg upgrade {}".""" + + class CheckNeeded(ErrorWithTraceback): + """Inconsistency detected. Please run "borg check {}".""" + + class ObjectNotFound(ErrorWithTraceback): + """Object with key {} not found in repository {}.""" + + def __init__(self, id, repo): + if isinstance(id, bytes): + id = bin_to_hex(id) + super().__init__(id, repo) + + class InsufficientFreeSpaceError(Error): + """Insufficient free space to complete transaction (required: {}, available: {}).""" + + class StorageQuotaExceeded(Error): + """The storage quota ({}) has been exceeded ({}). Try deleting some archives.""" + + def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, args=None): + if type(location) == str: + # Convenience handling of a string as a local path + location = Location('file://' + location) + + self._location = location + self._remote = True + + append_only = getattr(args, 'append_only', False) + storage_quota = getattr(args, 'storage_quota', None) + make_parent_dirs = getattr(args, 'make_parent_dirs', False) + + try: + if location.proto == 'ssh': + from .repositories.remote import RemoteRepository + repo = RemoteRepository(location, create=create, + exclusive=exclusive, + lock_wait=lock_wait, lock=lock, + make_parent_dirs=make_parent_dirs, + append_only=append_only, args=args) + elif location.proto == 'file': + from .repositories.local import LocalRepository + repo = LocalRepository(location.path, create=create, + exclusive=exclusive, + lock_wait=lock_wait, lock=lock, + make_parent_dirs=make_parent_dirs, + append_only=append_only, + storage_quota=storage_quota) + self._remote = False + else: + raise Exception('Unrecognized location: ' + location.canonical_path()) + except ImportError: + logger.warning('Missing dependencies needed to handle this repository location:') + raise + + self._repo = repo + + def __repr__(self): + return '<%s %s %s>' % (self.__class__.__name__, + self._repo.__class__.__name__, + self.location.canonical_path()) + + def __len__(self): + return self._repo.__len__() + + def __enter__(self): + self._repo.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._repo.__exit__(exc_type, exc_val, exc_tb) + + @property + def location(self): + return self._location + + @property + def remote(self): + return self._remote + + @property + def id(self): + return self._repo.id + + @property + def id_str(self): + return bin_to_hex(self.id) + + def check(self, repair=False, save_space=False, max_duration=0): + return self._repo.check(repair, save_space, max_duration) + + def commit(self, save_space=False, compact=True, cleanup_commits=False): + self._repo.commit(save_space, compact, cleanup_commits) + + def destroy(self): + self._repo.destroy() + + def list(self, limit=None, marker=None): + return self._repo.list(limit, marker) + + def scan(self, limit=None, marker=None): + return self._repo.scan(limit, marker) + + def get(self, id): + for resp in self.get_many([id]): + return resp + + def get_many(self, ids, is_preloaded=False): + for resp in self._repo.get_many(ids, is_preloaded): + yield resp + + def put(self, id, data, wait=True): + self._repo.put(id, data, wait) + + def delete(self, id, wait=True): + self._repo.delete(id, wait) + + def save_key(self, keydata): + return self._repo.save_key(keydata) + + def load_key(self): + return self._repo.load_key() + + def get_free_nonce(self): + return self._repo.get_free_nonce() + + def commit_nonce_reservation(self, next_unreserved, start_nonce): + self._repo.commit_nonce_reservation(next_unreserved, start_nonce) + + def break_lock(self): + self._repo.break_lock() + + def migrate_lock(self, old_id, new_id): + self._repo.migrate_lock(old_id, new_id) + + def async_response(self, wait=True): + return self._repo.async_response(wait) + + def preload(self, ids): + self._repo.preload(ids) + + @property + def config(self): + return self._repo.config + + def save_config(self): + self._repo.save_config() + + +class RepositoryNoCache: + """A not caching Repository wrapper, passes through to repository. + + Just to have same API (including the context manager) as RepositoryCache. + + *transform* is a callable taking two arguments, key and raw repository data. + The return value is returned from get()/get_many(). By default, the raw + repository data is returned. + """ + def __init__(self, repository, transform=None): + self.repository = repository + self.transform = transform or (lambda key, data: data) + + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def get(self, key): + return next(self.get_many([key], cache=False)) + + def get_many(self, keys, cache=True): + for key, data in zip(keys, self.repository.get_many(keys)): + yield self.transform(key, data) + + def log_instrumentation(self): + pass + + +class RepositoryCache(RepositoryNoCache): + """ + A caching Repository wrapper. + + Caches Repository GET operations locally. + + *pack* and *unpack* complement *transform* of the base class. + *pack* receives the output of *transform* and should return bytes, + which are stored in the cache. *unpack* receives these bytes and + should return the initial data (as returned by *transform*). + """ + + def __init__(self, repository, pack=None, unpack=None, transform=None): + super().__init__(repository, transform) + self.pack = pack or (lambda data: data) + self.unpack = unpack or (lambda data: data) + self.cache = set() + self.basedir = tempfile.mkdtemp(prefix='borg-cache-') + self.query_size_limit() + self.size = 0 + # Instrumentation + self.hits = 0 + self.misses = 0 + self.slow_misses = 0 + self.slow_lat = 0.0 + self.evictions = 0 + self.enospc = 0 + + def query_size_limit(self): + stat_fs = os.statvfs(self.basedir) + available_space = stat_fs.f_bavail * stat_fs.f_frsize + self.size_limit = int(min(available_space * 0.25, 2**31)) + + def key_filename(self, key): + return os.path.join(self.basedir, bin_to_hex(key)) + + def backoff(self): + self.query_size_limit() + target_size = int(0.9 * self.size_limit) + while self.size > target_size and self.cache: + key = self.cache.pop() + file = self.key_filename(key) + self.size -= os.stat(file).st_size + os.unlink(file) + self.evictions += 1 + + def add_entry(self, key, data, cache): + transformed = self.transform(key, data) + if not cache: + return transformed + packed = self.pack(transformed) + file = self.key_filename(key) + try: + with open(file, 'wb') as fd: + fd.write(packed) + except OSError as os_error: + try: + truncate_and_unlink(file) + except FileNotFoundError: + pass # open() could have failed as well + if os_error.errno == errno.ENOSPC: + self.enospc += 1 + self.backoff() + else: + raise + else: + self.size += len(packed) + self.cache.add(key) + if self.size > self.size_limit: + self.backoff() + return transformed + + def log_instrumentation(self): + logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), ' + '%d evictions, %d ENOSPC hit', + len(self.cache), format_file_size(self.size), format_file_size(self.size_limit), + self.hits, self.misses, self.slow_misses, self.slow_lat, + self.evictions, self.enospc) + + def close(self): + self.log_instrumentation() + self.cache.clear() + shutil.rmtree(self.basedir) + + def get_many(self, keys, cache=True): + unknown_keys = [key for key in keys if key not in self.cache] + repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys)) + for key in keys: + if key in self.cache: + file = self.key_filename(key) + with open(file, 'rb') as fd: + self.hits += 1 + yield self.unpack(fd.read()) + else: + for key_, data in repository_iterator: + if key_ == key: + transformed = self.add_entry(key, data, cache) + self.misses += 1 + yield transformed + break + else: + # slow path: eviction during this get_many removed this key from the cache + t0 = time.perf_counter() + data = self.repository.get(key) + self.slow_lat += time.perf_counter() - t0 + transformed = self.add_entry(key, data, cache) + self.slow_misses += 1 + yield transformed + # Consume any pending requests + for _ in repository_iterator: + pass + + +def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False): + """ + Return a Repository(No)Cache for *repository*. + + If *decrypted_cache* is a key object, then get and get_many will return a tuple + (csize, plaintext) instead of the actual data in the repository. The cache will + store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting + and more importantly MAC and ID checking cached objects). + Internally, objects are compressed with LZ4. + """ + if decrypted_cache and (pack or unpack or transform): + raise ValueError('decrypted_cache and pack/unpack/transform are incompatible') + elif decrypted_cache: + key = decrypted_cache + # 32 bit csize, 64 bit (8 byte) xxh64 + cache_struct = struct.Struct('=I8s') + compressor = LZ4() + + def pack(data): + csize, decrypted = data + compressed = compressor.compress(decrypted) + return cache_struct.pack(csize, xxh64(compressed)) + compressed + + def unpack(data): + data = memoryview(data) + csize, checksum = cache_struct.unpack(data[:cache_struct.size]) + compressed = data[cache_struct.size:] + if checksum != xxh64(compressed): + raise IntegrityError('detected corrupted data in metadata cache') + return csize, compressor.decompress(compressed) + + def transform(id_, data): + csize = len(data) + decrypted = key.decrypt(id_, data) + return csize, decrypted + + if repository.remote or force_cache: + return RepositoryCache(repository, pack, unpack, transform) + else: + return RepositoryNoCache(repository, transform) diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index 1ef802c1db4..648809f88ab 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -51,7 +51,8 @@ from ..patterns import IECommand, PatternMatcher, parse_pattern from ..item import Item, ItemDiff from ..logger import setup_logging -from ..remote import RemoteRepository, PathNotAllowed +from ..repositories.local import LocalRepository +from ..repositories.remote import RemoteRepository, PathNotAllowed from ..repository import Repository from . import has_lchflags, has_llfuse from . import BaseTestCase, changedir, environment_variable, no_selinux @@ -1621,15 +1622,13 @@ def test_unknown_feature_on_mount(self): @pytest.mark.allow_cache_wipe def test_unknown_mandatory_feature_in_cache(self): if self.prefix: - path_prefix = 'ssh://__testsuite__' + location = Location(self.repository_location) else: - path_prefix = '' + location = self.repository_path print(self.cmd('init', '--encryption=repokey', self.repository_location)) - with Repository(self.repository_path, exclusive=True) as repository: - if path_prefix: - repository._location = Location(self.repository_location) + with Repository(location, exclusive=True) as repository: manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) with Cache(repository, key, manifest) as cache: cache.begin_txn() @@ -1652,9 +1651,7 @@ def wipe_wrapper(*args): assert called - with Repository(self.repository_path, exclusive=True) as repository: - if path_prefix: - repository._location = Location(self.repository_location) + with Repository(location, exclusive=True) as repository: manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) with Cache(repository, key, manifest) as cache: assert cache.cache_config.mandatory_features == set([]) @@ -2340,7 +2337,7 @@ def verify_aes_counter_uniqueness(self, method): used = set() # counter values already used def verify_uniqueness(): - with Repository(self.repository_path) as repository: + with LocalRepository(self.repository_path) as repository: for id, _ in repository.open_index(repository.get_transaction_id()).iteritems(): data = repository.get(id) hash = sha256(data).digest() @@ -3316,7 +3313,7 @@ class RemoteArchiverTestCase(ArchiverTestCase): prefix = '__testsuite__:' def open_repository(self): - return RemoteRepository(Location(self.repository_location)) + return Repository(Location(self.repository_location)) def test_remote_repo_restrict_to_path(self): # restricted to repo directory itself: diff --git a/src/borg/testsuite/key.py b/src/borg/testsuite/key.py index b397435c2a1..0ce1ce1d7e5 100644 --- a/src/borg/testsuite/key.py +++ b/src/borg/testsuite/key.py @@ -92,7 +92,7 @@ class _Location: def canonical_path(self): return self.orig - _location = _Location() + location = _Location() id = bytes(32) id_str = bin_to_hex(id) diff --git a/src/borg/testsuite/nonces.py b/src/borg/testsuite/nonces.py index d0bc85eaff3..7c21ed75490 100644 --- a/src/borg/testsuite/nonces.py +++ b/src/borg/testsuite/nonces.py @@ -6,7 +6,6 @@ from ..crypto.nonces import NonceManager from ..crypto.key import bin_to_hex from ..helpers import get_security_dir -from ..remote import InvalidRPCMethod class TestNonceManager: @@ -15,7 +14,7 @@ class MockRepository: class _Location: orig = '/some/place' - _location = _Location() + location = _Location() id = bytes(32) id_str = bin_to_hex(id) @@ -28,10 +27,10 @@ def commit_nonce_reservation(self, next_unreserved, start_nonce): class MockOldRepository(MockRepository): def get_free_nonce(self): - raise InvalidRPCMethod("") + return None def commit_nonce_reservation(self, next_unreserved, start_nonce): - pytest.fail("commit_nonce_reservation should never be called on an old repository") + return None def setUp(self): self.repository = None diff --git a/src/borg/testsuite/remote.py b/src/borg/testsuite/remote.py index d9117717493..4082aa569d0 100644 --- a/src/borg/testsuite/remote.py +++ b/src/borg/testsuite/remote.py @@ -6,8 +6,8 @@ import pytest -from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote -from ..repository import Repository +from ..repositories.remote import SleepingBandwidthLimiter +from ..repository import Repository, RepositoryCache, cache_if_remote from ..crypto.key import PlaintextKey from ..compress import CompressionSpec from ..helpers import IntegrityError diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index f93d603df34..38f52e098e5 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -8,13 +8,15 @@ import pytest +from ..constants import MAX_DATA_SIZE from ..hashindex import NSIndex from ..helpers import Location from ..helpers import IntegrityError from ..helpers import msgpack from ..locking import Lock, LockFailed -from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line -from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE +from ..repositories.local import LocalRepository, LoggedIO +from ..repositories.remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line +from ..repository import Repository, MAGIC, TAG_DELETE from . import BaseTestCase from .hashindex import H @@ -29,7 +31,7 @@ class RepositoryTestCaseBase(BaseTestCase): def open(self, create=False, exclusive=UNSPECIFIED): if exclusive is UNSPECIFIED: exclusive = self.exclusive - return Repository(os.path.join(self.tmppath, 'repository'), exclusive=exclusive, create=create) + return LocalRepository(os.path.join(self.tmppath, 'repository'), exclusive=exclusive, create=create) def setUp(self): self.tmppath = tempfile.mkdtemp() @@ -386,7 +388,7 @@ def test_shadow_index_rollback(self): class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase): def open(self, create=False): - return Repository(os.path.join(self.tmppath, 'repository'), exclusive=True, create=create, append_only=True) + return LocalRepository(os.path.join(self.tmppath, 'repository'), exclusive=True, create=create, append_only=True) def test_destroy_append_only(self): # Can't destroy append only repo (via the API) @@ -783,7 +785,7 @@ def test_crash_before_compact(self): self.repository.put(H(0), b'data') self.repository.put(H(0), b'data2') # Simulate a crash before compact - with patch.object(Repository, 'compact_segments') as compact: + with patch.object(LocalRepository, 'compact_segments') as compact: self.repository.commit(compact=True) compact.assert_called_once_with() self.reopen() @@ -865,7 +867,7 @@ def test_rpc_exception_transport(self): self.repository.call('inject_exception', {'kind': 'divide'}) except RemoteRepository.RPCError as e: assert e.unpacked - assert e.get_message() == 'ZeroDivisionError: integer division or modulo by zero\n' + assert e.exception_short == 'ZeroDivisionError: integer division or modulo by zero\n' assert e.exception_class == 'ZeroDivisionError' assert len(e.exception_full) > 0 diff --git a/src/borg/upgrader.py b/src/borg/upgrader.py index 4e3fe576f0d..7d40e58313b 100644 --- a/src/borg/upgrader.py +++ b/src/borg/upgrader.py @@ -9,14 +9,15 @@ from .helpers import get_base_dir, get_keys_dir, get_cache_dir from .locking import Lock from .logger import create_logger -from .repository import Repository, MAGIC +from .repository import MAGIC +from .repositories.local import LocalRepository logger = create_logger(__name__) ATTIC_MAGIC = b'ATTICSEG' -class AtticRepositoryUpgrader(Repository): +class AtticRepositoryUpgrader(LocalRepository): def __init__(self, *args, **kw): kw['lock'] = False # do not create borg lock files (now) in attic repo kw['check_segment_magic'] = False # skip the Attic check when upgrading @@ -160,10 +161,10 @@ def convert_repo_index(self, dryrun, inplace): `s/ATTICIDX/BORG_IDX/` in a few locations: * the repository index (in `$ATTIC_REPO/index.%d`, where `%d` - is the `Repository.get_index_transaction_id()`), which we + is the `LocalRepository.get_index_transaction_id()`), which we should probably update, with a lock, see - `Repository.open()`, which i'm not sure we should use - because it may write data on `Repository.close()`... + `LocalRepository.open()`, which i'm not sure we should use + because it may write data on `LocalRepository.close()`... """ transaction_id = self.get_index_transaction_id() if transaction_id is None: @@ -277,7 +278,7 @@ def find_key_file(cls, repository): raise KeyfileNotFoundError(repository.path, keys_dir) -class BorgRepositoryUpgrader(Repository): +class BorgRepositoryUpgrader(LocalRepository): def upgrade(self, dryrun=True, inplace=False, progress=False): """convert an old borg repository to a current borg repository """