Skip to content

Commit

Permalink
cleanup msgpack related str/bytes mess, fixes borgbackup#968
Browse files Browse the repository at this point in the history
see ticket and borg.helpers.msgpack docstring.

this changeset implements the full migration to
msgpack 2.0 spec (use_bin_type=True, raw=False).

still needed compat to the past is done via want_bytes decoder in borg.item.
  • Loading branch information
ThomasWaldmann committed May 6, 2022
1 parent bef1356 commit 0e7d0ae
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 113 deletions.
11 changes: 4 additions & 7 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -1753,13 +1753,10 @@ def rebuild_manifest(self):
Iterates through all objects in the repository looking for archive metadata blocks.
"""
required_archive_keys = frozenset(key.encode() for key in REQUIRED_ARCHIVE_KEYS)

def valid_archive(obj):
if not isinstance(obj, dict):
return False
keys = set(obj)
return required_archive_keys.issubset(keys)
return REQUIRED_ARCHIVE_KEYS.issubset(obj)

logger.info('Rebuilding missing manifest, this might take some time...')
# as we have lost the manifest, we do not know any more what valid item keys we had.
Expand Down Expand Up @@ -1939,10 +1936,10 @@ def list_keys_safe(keys):
def valid_item(obj):
if not isinstance(obj, StableDict):
return False, 'not a dictionary'
# A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
# A bug in Attic up to and including release 0.13 added a (meaningless) 'acl' key to every item.
# We ignore it here, should it exist. See test_attic013_acl_bug for details.
obj.pop(b'acl', None)
keys = set(k.decode('utf-8', errors='replace') for k in obj)
obj.pop('acl', None)
keys = set(obj)
if not required_item_keys.issubset(keys):
return False, 'missing required keys: ' + list_keys_safe(required_item_keys - keys)
if not keys.issubset(item_keys):
Expand Down
2 changes: 1 addition & 1 deletion src/borg/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2326,7 +2326,7 @@ def output(fd):

unpacker = msgpack.Unpacker(use_list=False, object_hook=StableDict)
first = True
for item_id in archive_org_dict[b'items']:
for item_id in archive_org_dict['items']:
data = key.decrypt(item_id, repository.get(item_id))
unpacker.feed(data)
for item in unpacker:
Expand Down
10 changes: 5 additions & 5 deletions src/borg/crypto/key.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,24 +232,24 @@ def unpack_and_verify_manifest(self, data, force_tam_not_required=False):
unpacker = get_limited_unpacker('manifest')
unpacker.feed(data)
unpacked = unpacker.unpack()
if b'tam' not in unpacked:
if 'tam' not in unpacked:
if tam_required:
raise TAMRequiredError(self.repository._location.canonical_path())
else:
logger.debug('TAM not found and not required')
return unpacked, False
tam = unpacked.pop(b'tam', None)
tam = unpacked.pop('tam', None)
if not isinstance(tam, dict):
raise TAMInvalid()
tam_type = tam.get(b'type', b'<none>').decode('ascii', 'replace')
tam_type = tam.get('type', '<none>')
if tam_type != 'HKDF_HMAC_SHA512':
if tam_required:
raise TAMUnsupportedSuiteError(repr(tam_type))
else:
logger.debug('Ignoring TAM made with unsupported suite, since TAM is not required: %r', tam_type)
return unpacked, False
tam_hmac = tam.get(b'hmac')
tam_salt = tam.get(b'salt')
tam_hmac = tam.get('hmac')
tam_salt = tam.get('salt')
if not isinstance(tam_salt, bytes) or not isinstance(tam_hmac, bytes):
raise TAMInvalid()
offset = data.index(tam_hmac)
Expand Down
4 changes: 2 additions & 2 deletions src/borg/helpers/msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
version = mp_version

USE_BIN_TYPE = True
RAW = True # should become False later when we do not need to read old stuff any more
RAW = False
UNICODE_ERRORS = 'surrogateescape' # previously done by safe_encode, safe_decode


Expand Down Expand Up @@ -161,7 +161,7 @@ def unpackb(packed, *, raw=RAW, unicode_errors=UNICODE_ERRORS,
def unpack(stream, *, raw=RAW, unicode_errors=UNICODE_ERRORS,
strict_map_key=False,
**kwargs):
# assert raw == RAW
assert raw == RAW
assert unicode_errors == UNICODE_ERRORS
try:
kw = dict(raw=raw, unicode_errors=unicode_errors,
Expand Down
35 changes: 22 additions & 13 deletions src/borg/item.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ def fix_tuple_of_str_and_int(t):
return t


def want_bytes(v):
"""we know that we want bytes and the value should be bytes"""
# legacy support: it being str can be caused by msgpack unpack decoding old data that was packed with use_bin_type=False
if isinstance(v, str):
v = v.encode('utf-8', errors='surrogateescape')
assert isinstance(v, bytes)
return v


class PropDict:
"""
Manage a dictionary via properties.
Expand Down Expand Up @@ -204,10 +213,10 @@ class Item(PropDict):
user = PropDict._make_property('user', (str, type(None)), 'surrogate-escaped str or None')
group = PropDict._make_property('group', (str, type(None)), 'surrogate-escaped str or None')

acl_access = PropDict._make_property('acl_access', bytes)
acl_default = PropDict._make_property('acl_default', bytes)
acl_extended = PropDict._make_property('acl_extended', bytes)
acl_nfs4 = PropDict._make_property('acl_nfs4', bytes)
acl_access = PropDict._make_property('acl_access', bytes, decode=want_bytes)
acl_default = PropDict._make_property('acl_default', bytes, decode=want_bytes)
acl_extended = PropDict._make_property('acl_extended', bytes, decode=want_bytes)
acl_nfs4 = PropDict._make_property('acl_nfs4', bytes, decode=want_bytes)

mode = PropDict._make_property('mode', int)
uid = PropDict._make_property('uid', int)
Expand All @@ -224,7 +233,7 @@ class Item(PropDict):
# compatibility note: this is a new feature, in old archives size will be missing.
size = PropDict._make_property('size', int)

hlid = PropDict._make_property('hlid', bytes) # hard link id: same value means same hard link.
hlid = PropDict._make_property('hlid', bytes, decode=want_bytes) # hard link id: same value means same hard link.
hardlink_master = PropDict._make_property('hardlink_master', bool) # legacy

chunks = PropDict._make_property('chunks', (list, type(None)), 'list or None')
Expand Down Expand Up @@ -364,9 +373,9 @@ class EncryptedKey(PropDict):
version = PropDict._make_property('version', int)
algorithm = PropDict._make_property('algorithm', str)
iterations = PropDict._make_property('iterations', int)
salt = PropDict._make_property('salt', bytes)
hash = PropDict._make_property('hash', bytes)
data = PropDict._make_property('data', bytes)
salt = PropDict._make_property('salt', bytes, decode=want_bytes)
hash = PropDict._make_property('hash', bytes, decode=want_bytes)
data = PropDict._make_property('data', bytes, decode=want_bytes)
argon2_time_cost = PropDict._make_property('argon2_time_cost', int)
argon2_memory_cost = PropDict._make_property('argon2_memory_cost', int)
argon2_parallelism = PropDict._make_property('argon2_parallelism', int)
Expand Down Expand Up @@ -400,10 +409,10 @@ class Key(PropDict):
__slots__ = ("_dict", ) # avoid setting attributes not supported by properties

version = PropDict._make_property('version', int)
repository_id = PropDict._make_property('repository_id', bytes)
enc_key = PropDict._make_property('enc_key', bytes)
enc_hmac_key = PropDict._make_property('enc_hmac_key', bytes)
id_key = PropDict._make_property('id_key', bytes)
repository_id = PropDict._make_property('repository_id', bytes, decode=want_bytes)
enc_key = PropDict._make_property('enc_key', bytes, decode=want_bytes)
enc_hmac_key = PropDict._make_property('enc_hmac_key', bytes, decode=want_bytes)
id_key = PropDict._make_property('id_key', bytes, decode=want_bytes)
chunk_seed = PropDict._make_property('chunk_seed', int)
tam_required = PropDict._make_property('tam_required', bool)

Expand Down Expand Up @@ -444,7 +453,7 @@ class ArchiveItem(PropDict):
chunker_params = PropDict._make_property('chunker_params', tuple)
recreate_cmdline = PropDict._make_property('recreate_cmdline', list) # list of s-e-str
# recreate_source_id, recreate_args, recreate_partial_chunks were used in 1.1.0b1 .. b2
recreate_source_id = PropDict._make_property('recreate_source_id', bytes)
recreate_source_id = PropDict._make_property('recreate_source_id', bytes, decode=want_bytes)
recreate_args = PropDict._make_property('recreate_args', list) # list of s-e-str
recreate_partial_chunks = PropDict._make_property('recreate_partial_chunks', list) # list of tuples
size = PropDict._make_property('size', int)
Expand Down
75 changes: 34 additions & 41 deletions src/borg/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
RPC_PROTOCOL_VERSION = 2
BORG_VERSION = parse_version(__version__)
MSGID, MSG, ARGS, RESULT = 'i', 'm', 'a', 'r' # pack
MSGIDB, MSGB, ARGSB, RESULTB = b'i', b'm', b'a', b'r' # unpack

MAX_INFLIGHT = 100

Expand Down Expand Up @@ -139,10 +138,6 @@ def __init__(self, data):
}


def decode_keys(d):
return {k.decode(): d[k] for k in d}


class RepositoryServer: # pragma: no cover
rpc_methods = (
'__len__',
Expand Down Expand Up @@ -217,14 +212,13 @@ def serve(self):
for unpacked in unpacker:
if isinstance(unpacked, dict):
dictFormat = True
msgid = unpacked[MSGIDB]
method = unpacked[MSGB].decode()
args = decode_keys(unpacked[ARGSB])
msgid = unpacked[MSGID]
method = unpacked[MSG]
args = unpacked[ARGS]
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
dictFormat = False
# The first field 'type' was always 1 and has always been ignored
_, msgid, method, args = unpacked
method = method.decode()
args = self.positional_to_named(method, args)
else:
if self.repository is not None:
Expand Down Expand Up @@ -308,7 +302,7 @@ def negotiate(self, client_data):
# clients since 1.1.0b3 use a dict as client_data
# clients since 1.1.0b6 support json log format from server
if isinstance(client_data, dict):
self.client_version = client_data[b'client_version']
self.client_version = client_data['client_version']
level = logging.getLevelName(logging.getLogger('').level)
setup_logging(is_serve=True, json=True, level=level)
logger.debug('Initialized logging system for JSON-based protocol')
Expand Down Expand Up @@ -370,7 +364,6 @@ def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, ap
return self.repository.id

def inject_exception(self, kind):
kind = kind.decode()
s1 = 'test string'
s2 = 'test string2'
if kind == 'DoesNotExist':
Expand Down Expand Up @@ -484,35 +477,35 @@ class RemoteRepository:

class RPCError(Exception):
def __init__(self, unpacked):
# for borg < 1.1: unpacked only has b'exception_class' as key
# for borg 1.1+: unpacked has keys: b'exception_args', b'exception_full', b'exception_short', b'sysinfo'
# for borg < 1.1: unpacked only has 'exception_class' as key
# for borg 1.1+: unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo'
self.unpacked = unpacked

def get_message(self):
if b'exception_short' in self.unpacked:
return b'\n'.join(self.unpacked[b'exception_short']).decode()
if 'exception_short' in self.unpacked:
return '\n'.join(self.unpacked['exception_short'])
else:
return self.exception_class

@property
def traceback(self):
return self.unpacked.get(b'exception_trace', True)
return self.unpacked.get('exception_trace', True)

@property
def exception_class(self):
return self.unpacked[b'exception_class'].decode()
return self.unpacked['exception_class']

@property
def exception_full(self):
if b'exception_full' in self.unpacked:
return b'\n'.join(self.unpacked[b'exception_full']).decode()
if 'exception_full' in self.unpacked:
return '\n'.join(self.unpacked['exception_full'])
else:
return self.get_message() + '\nRemote Exception (see remote log for the traceback)'

@property
def sysinfo(self):
if b'sysinfo' in self.unpacked:
return self.unpacked[b'sysinfo'].decode()
if 'sysinfo' in self.unpacked:
return self.unpacked['sysinfo']
else:
return ''

Expand Down Expand Up @@ -577,9 +570,9 @@ def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock
raise ConnectionClosedWithHint('Is borg working on the server?') from None
if version == RPC_PROTOCOL_VERSION:
self.dictFormat = False
elif isinstance(version, dict) and b'server_version' in version:
elif isinstance(version, dict) and 'server_version' in version:
self.dictFormat = True
self.server_version = version[b'server_version']
self.server_version = version['server_version']
else:
raise Exception('Server insisted on using unsupported protocol version %s' % version)

Expand Down Expand Up @@ -734,9 +727,9 @@ def pop_preload_msgid(chunkid):
return msgid

def handle_error(unpacked):
error = unpacked[b'exception_class'].decode()
old_server = b'exception_args' not in unpacked
args = unpacked.get(b'exception_args')
error = unpacked['exception_class']
old_server = 'exception_args' not in unpacked
args = unpacked.get('exception_args')

if error == 'DoesNotExist':
raise Repository.DoesNotExist(self.location.processed)
Expand All @@ -748,29 +741,29 @@ def handle_error(unpacked):
if old_server:
raise IntegrityError('(not available)')
else:
raise IntegrityError(args[0].decode())
raise IntegrityError(args[0])
elif error == 'AtticRepository':
if old_server:
raise Repository.AtticRepository('(not available)')
else:
raise Repository.AtticRepository(args[0].decode())
raise Repository.AtticRepository(args[0])
elif error == 'PathNotAllowed':
if old_server:
raise PathNotAllowed('(unknown)')
else:
raise PathNotAllowed(args[0].decode())
raise PathNotAllowed(args[0])
elif error == 'ParentPathDoesNotExist':
raise Repository.ParentPathDoesNotExist(args[0].decode())
raise Repository.ParentPathDoesNotExist(args[0])
elif error == 'ObjectNotFound':
if old_server:
raise Repository.ObjectNotFound('(not available)', self.location.processed)
else:
raise Repository.ObjectNotFound(args[0].decode(), self.location.processed)
raise Repository.ObjectNotFound(args[0], self.location.processed)
elif error == 'InvalidRPCMethod':
if old_server:
raise InvalidRPCMethod('(not available)')
else:
raise InvalidRPCMethod(args[0].decode())
raise InvalidRPCMethod(args[0])
else:
raise self.RPCError(unpacked)

Expand All @@ -789,10 +782,10 @@ def handle_error(unpacked):
try:
unpacked = self.responses.pop(waiting_for[0])
waiting_for.pop(0)
if b'exception_class' in unpacked:
if 'exception_class' in unpacked:
handle_error(unpacked)
else:
yield unpacked[RESULTB]
yield unpacked[RESULT]
if not waiting_for and not calls:
return
except KeyError:
Expand All @@ -809,10 +802,10 @@ def handle_error(unpacked):
else:
return
else:
if b'exception_class' in unpacked:
if 'exception_class' in unpacked:
handle_error(unpacked)
else:
yield unpacked[RESULTB]
yield unpacked[RESULT]
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
w_fds = [self.stdin_fd]
else:
Expand All @@ -829,26 +822,26 @@ def handle_error(unpacked):
self.unpacker.feed(data)
for unpacked in self.unpacker:
if isinstance(unpacked, dict):
msgid = unpacked[MSGIDB]
msgid = unpacked[MSGID]
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
# The first field 'type' was always 1 and has always been ignored
_, msgid, error, res = unpacked
if error:
# ignore res, because it is only a fixed string anyway.
unpacked = {MSGIDB: msgid, b'exception_class': error}
unpacked = {MSGID: msgid, 'exception_class': error}
else:
unpacked = {MSGIDB: msgid, RESULTB: res}
unpacked = {MSGID: msgid, RESULT: res}
else:
raise UnexpectedRPCDataFormatFromServer(data)
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
# async methods never return values, but may raise exceptions.
if b'exception_class' in unpacked:
if 'exception_class' in unpacked:
self.async_responses[msgid] = unpacked
else:
# we currently do not have async result values except "None",
# so we do not add them into async_responses.
if unpacked[RESULTB] is not None:
if unpacked[RESULT] is not None:
self.async_responses[msgid] = unpacked
else:
self.responses[msgid] = unpacked
Expand Down

0 comments on commit 0e7d0ae

Please sign in to comment.