Skip to content

Commit

Permalink
A Threaded Prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
enkore committed Jul 28, 2017
1 parent 69c7523 commit 4664f2d
Show file tree
Hide file tree
Showing 12 changed files with 1,100 additions and 27 deletions.
9 changes: 6 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
# Are we building on ReadTheDocs?
on_rtd = os.environ.get('READTHEDOCS')

# msgpack pure python data corruption was fixed in 0.4.6.
# Also, we might use some rather recent API features.
install_requires = ['msgpack-python>=0.4.6', ]
install_requires = [
# msgpack pure python data corruption was fixed in 0.4.6.
# Also, we might use some rather recent API features.
'msgpack-python>=0.4.6',
'pyzmq',

This comment has been minimized.

Copy link
@ThomasWaldmann

ThomasWaldmann Jul 28, 2017

btw, did you notice that (although having added that line) pip install -e . does not install pyzmq?

]

# note for package maintainers: if you package borgbackup for distribution,
# please add llfuse as a *requirement* on all platforms that have a working
Expand Down
5 changes: 5 additions & 0 deletions src/borg/_chunker.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ chunker_fill(Chunker *c)
return 1;
}
if(c->fh >= 0) {
PyThreadState *thread_state = PyEval_SaveThread();

offset = c->bytes_read;
// if we have a os-level file descriptor, use os-level API
n = read(c->fh, c->data + c->position + c->remaining, n);
Expand All @@ -177,6 +179,7 @@ chunker_fill(Chunker *c)
c->eof = 1;
}
else {
PyEval_RestoreThread(thread_state);
// some error happened
PyErr_SetFromErrno(PyExc_OSError);
return 0;
Expand Down Expand Up @@ -211,6 +214,8 @@ chunker_fill(Chunker *c)

posix_fadvise(c->fh, offset & ~pagemask, length - overshoot, POSIX_FADV_DONTNEED);
#endif

PyEval_RestoreThread(thread_state);
}
else {
// no os-level file descriptor, use Python file object API
Expand Down
41 changes: 41 additions & 0 deletions src/borg/_utils.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

#include "Python.h"

/*
* This is not quite as dark magic as it looks. We just convert the address of (pointer to)
* a PyObject into a bytes object in _wrap_object, and convert these bytes back to the
* pointer to the original object.
*
* This mainly looks a bit confusing due to our mental special-casing of "char*" from other
* pointers.
*
* The big upside to this is that this neither does *any* serialization (beyond creating tiny
* bytes objects as "stand-ins"), nor has to copy the entire object that's passed around.
*/

static PyObject *
_wrap_object(PyObject *obj)
{
/*
* Create a temporary reference to the object being passed around so it does not vanish.
* Note that we never decref this one in _unwrap_object, since we just transfer that reference
* there, i.e. there is an elided "Py_INCREF(x); Py_DECREF(x)".

This comment has been minimized.

Copy link
@ThomasWaldmann

ThomasWaldmann Jul 28, 2017

Add this, just for completeness: "... and the decref happens naturally when the object leaves scope."

*/
Py_INCREF(obj);
return PyBytes_FromStringAndSize((const char*) &obj, sizeof(void*));
}

static PyObject *
_unwrap_object(PyObject *bytes)
{
if(!PyBytes_Check(bytes)) {
PyErr_SetString(PyExc_TypeError, "Cannot unwrap non-bytes object");
return NULL;
}
if(PyBytes_Size(bytes) != sizeof(void*)) {
PyErr_SetString(PyExc_TypeError, "Invalid length of bytes object");
return NULL;
}
PyObject *object = * (PyObject **) PyBytes_AsString(bytes);
return object;
}
1 change: 1 addition & 0 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class ChunkBuffer:
BUFFER_SIZE = 8 * 1024 * 1024

def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
super().__init__()
self.buffer = BytesIO()
self.packer = msgpack.Packer(unicode_errors='surrogateescape')
self.chunks = []
Expand Down
43 changes: 26 additions & 17 deletions src/borg/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,9 @@ def do_create(self, args, repository, manifest=None, key=None):
matcher = PatternMatcher(fallback=True)
matcher.add_inclexcl(args.patterns)

def create_inner(archive, cache):
from .threads import CreationPipeline

def create_inner(archive, cache, pipeline):
# Add cache dir to inode_skip list
skip_inodes = set()
try:
Expand All @@ -418,7 +420,7 @@ def create_inner(archive, cache):
path = 'stdin'
if not dry_run:
try:
status = archive.process_stdin(path, cache)
status = pipeline.fso.process_stdin(path, cache)
except BackupOSError as e:
status = 'E'
self.print_warning('%s: %s', path, e)
Expand All @@ -438,8 +440,10 @@ def create_inner(archive, cache):
restrict_dev = None
self._process(archive, cache, matcher, args.exclude_caches, args.exclude_if_present,
args.keep_exclude_tags, skip_inodes, path, restrict_dev,
read_special=args.read_special, dry_run=dry_run, st=st)
read_special=args.read_special, dry_run=dry_run, st=st, pipeline=pipeline)
if not dry_run:
pipeline.save()
"""
archive.save(comment=args.comment, timestamp=args.timestamp)
if args.progress:
archive.stats.show_progress(final=True)
Expand All @@ -457,11 +461,14 @@ def create_inner(archive, cache):
str(archive.stats),
str(cache),
DASHES, logger=logging.getLogger('borg.output.stats'))
"""

self.output_filter = args.output_filter
self.output_list = args.output_list
self.ignore_inode = args.ignore_inode
dry_run = args.dry_run
if dry_run:
raise NotImplementedError("FIXME")
t0 = datetime.utcnow()
t0_monotonic = time.monotonic()
if not dry_run:
Expand All @@ -474,14 +481,16 @@ def create_inner(archive, cache):
chunker_params=args.chunker_params, start=t0, start_monotonic=t0_monotonic,
compression=args.compression, compression_files=args.compression_files,
log_json=args.log_json)
create_inner(archive, cache)
pipeline = CreationPipeline(archive, archive.key, cache)
with pipeline:
create_inner(archive, cache, pipeline)
else:
create_inner(None, None)
return self.exit_code

def _process(self, archive, cache, matcher, exclude_caches, exclude_if_present,
keep_exclude_tags, skip_inodes, path, restrict_dev,
read_special=False, dry_run=False, st=None):
read_special=False, dry_run=False, st=None, pipeline=None):
"""
Process *path* recursively according to the various parameters.
Expand Down Expand Up @@ -510,32 +519,32 @@ def _process(self, archive, cache, matcher, exclude_caches, exclude_if_present,
return
if stat.S_ISREG(st.st_mode):
if not dry_run:
status = archive.process_file(path, st, cache, self.ignore_inode)
status = pipeline.fso.process_file(path, st, cache, self.ignore_inode)
elif stat.S_ISDIR(st.st_mode):
if recurse:
tag_paths = dir_is_tagged(path, exclude_caches, exclude_if_present)
if tag_paths:
if keep_exclude_tags and not dry_run:
archive.process_dir(path, st)
pipeline.fso.process_dir(path, st)
for tag_path in tag_paths:
self._process(archive, cache, matcher, exclude_caches, exclude_if_present,
keep_exclude_tags, skip_inodes, tag_path, restrict_dev,
read_special=read_special, dry_run=dry_run)
read_special=read_special, dry_run=dry_run, pipeline=pipeline)
return
if not dry_run:
status = archive.process_dir(path, st)
status = pipeline.fso.process_dir(path, st)
if recurse:
with backup_io('scandir'):
entries = helpers.scandir_inorder(path)
for dirent in entries:
normpath = os.path.normpath(dirent.path)
self._process(archive, cache, matcher, exclude_caches, exclude_if_present,
keep_exclude_tags, skip_inodes, normpath, restrict_dev,
read_special=read_special, dry_run=dry_run)
read_special=read_special, dry_run=dry_run, pipeline=pipeline)
elif stat.S_ISLNK(st.st_mode):
if not dry_run:
if not read_special:
status = archive.process_symlink(path, st)
status = pipeline.fso.process_symlink(path, st)
else:
try:
st_target = os.stat(path)
Expand All @@ -544,21 +553,21 @@ def _process(self, archive, cache, matcher, exclude_caches, exclude_if_present,
else:
special = is_special(st_target.st_mode)
if special:
status = archive.process_file(path, st_target, cache)
status = pipeline.fso.process_file(path, st_target, cache)
else:
status = archive.process_symlink(path, st)
status = pipeline.fso.process_symlink(path, st)
elif stat.S_ISFIFO(st.st_mode):
if not dry_run:
if not read_special:
status = archive.process_fifo(path, st)
status = pipeline.fso.process_fifo(path, st)
else:
status = archive.process_file(path, st, cache)
status = pipeline.fso.process_file(path, st, cache)
elif stat.S_ISCHR(st.st_mode) or stat.S_ISBLK(st.st_mode):
if not dry_run:
if not read_special:
status = archive.process_dev(path, st)
status = pipeline.fso.process_dev(path, st)
else:
status = archive.process_file(path, st, cache)
status = pipeline.fso.process_file(path, st, cache)
elif stat.S_ISSOCK(st.st_mode):
# Ignore unix sockets
return
Expand Down
15 changes: 15 additions & 0 deletions src/borg/hashindex.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
from collections import namedtuple
import locale
import os
import sys

This comment has been minimized.

Copy link
@ThomasWaldmann

ThomasWaldmann Jul 28, 2017

not used


cimport cython
from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t
from libc.errno cimport errno
from cpython.exc cimport PyErr_SetFromErrnoWithFilename
from cpython.ref cimport Py_INCREF, Py_DECREF

This comment has been minimized.

Copy link
@ThomasWaldmann

ThomasWaldmann Jul 28, 2017

not used

This comment has been minimized.

Copy link
@enkore

enkore Jul 28, 2017

Author Owner

I'd not spend too much time on details here, since I'll reroll anyway. As a prototype, the exact code wasn't supposed to be merged.


API_VERSION = '1.1_01'

Expand All @@ -31,6 +33,11 @@ cdef extern from "_hashindex.c":
double HASH_MAX_LOAD


cdef extern from "_utils.c":
object _wrap_object(object obj)
object _unwrap_object(object bytes)


cdef _NoDefault = object()

"""
Expand All @@ -56,6 +63,14 @@ cdef uint32_t _MAX_VALUE = 2**32-1025
assert _MAX_VALUE % 2 == 1


def wrap_obj(object):
return _wrap_object(object)


def unwrap_obj(ptr):
return _unwrap_object(ptr)

This comment has been minimized.

Copy link
@ThomasWaldmann

ThomasWaldmann Jul 28, 2017

hmm, this is not really related to hashindex, so maybe we need a utils.pyx/tools.pyx?



@cython.internal
cdef class IndexBase:
cdef HashIndex *index
Expand Down
15 changes: 9 additions & 6 deletions src/borg/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def load(cls, repository, key=None, force_tam_not_required=False):
os.unlink(tam_required_file(repository))
return manifest, key

def write(self):
def get_data(self):
from .item import ManifestItem
if self.key.tam_required:
self.config[b'tam_required'] = True
Expand All @@ -291,6 +291,10 @@ def write(self):
)
self.tam_verified = True
data = self.key.pack_and_authenticate_metadata(manifest.as_dict())
return data

def write(self):
data = self.get_data()
self.id = self.key.id_hash(data)
self.repository.put(self.MANIFEST_ID, self.key.encrypt(Chunk(data, compression={'name': 'none'})))

Expand Down Expand Up @@ -911,7 +915,7 @@ def format_archive(archive):
)


class Buffer:
class Buffer(threading.local):
"""
provide a thread-local buffer
"""
Expand All @@ -926,13 +930,12 @@ def __init__(self, allocator, size=4096, limit=None):
"""
assert callable(allocator), 'must give alloc(size) function as first param'
assert limit is None or size <= limit, 'initial size must be <= limit'
self._thread_local = threading.local()
self.allocator = allocator
self.limit = limit
self.resize(size, init=True)

def __len__(self):
return len(self._thread_local.buffer)
return len(self.buffer)

def resize(self, size, init=False):
"""
Expand All @@ -944,7 +947,7 @@ def resize(self, size, init=False):
if self.limit is not None and size > self.limit:
raise Buffer.MemoryLimitExceeded(size, self.limit)
if init or len(self) < size:
self._thread_local.buffer = self.allocator(size)
self.buffer = self.allocator(size)

def get(self, size=None, init=False):
"""
Expand All @@ -953,7 +956,7 @@ def get(self, size=None, init=False):
"""
if size is not None:
self.resize(size, init)
return self._thread_local.buffer
return self.buffer


@lru_cache(maxsize=None)
Expand Down
2 changes: 1 addition & 1 deletion src/borg/item.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class Item(PropDict):

VALID_KEYS = ITEM_KEYS | {'deleted', 'nlink', } # str-typed keys

__slots__ = ("_dict", ) # avoid setting attributes not supported by properties
__slots__ = ("_dict", "num_chunks", "status", "original_path") # avoid setting attributes not supported by properties

# properties statically defined, so that IDEs can know their names:

Expand Down
1 change: 1 addition & 0 deletions src/borg/platform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .linux import set_flags, get_flags
from .linux import SyncFile
from .linux import swidth, umount
from .linux import set_python_thread_affinity
elif sys.platform.startswith('freebsd'): # pragma: freebsd only
from .freebsd import API_VERSION as OS_API_VERSION
from .freebsd import acl_get, acl_set
Expand Down
25 changes: 25 additions & 0 deletions src/borg/platform/_linux.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

#include <pthread.h>
#include <sched.h>

static int
set_python_thread_affinity(void)
{
pthread_t thread = pthread_self();
cpu_set_t cpu_set;

if(pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpu_set) != 0) {
/* Could be triggered by >64 HW threads */
return 0;
}

CPU_ZERO(&cpu_set);
CPU_SET(1, &cpu_set);

This comment has been minimized.

Copy link
@ThomasWaldmann

ThomasWaldmann Jul 28, 2017

maybe more useful later with 1<<n - or leave it away completely?

This comment has been minimized.

Copy link
@ThomasWaldmann

ThomasWaldmann Jul 29, 2017

guess "cpu 1" btw. often will be the same pyhsical cpu core as "cpu 0", just hyperthreading.

This comment has been minimized.

Copy link
@enkore

enkore Jul 29, 2017

Author Owner

On x86, 0...n are usually the first strand of n cores and n+1...2n is the second strand.

Anyhow, the basic idea here is that later Python threads won't do any heavy lifting and can all be thrown on one CPU to reduce GIL thrashing. So this won't be needed for some time.

This comment has been minimized.

Copy link
@ThomasWaldmann

ThomasWaldmann Jul 29, 2017

hmm, is that different on windows? i observed windows preferred putting load on cores 0,2,4,6,... (and less on 1,3,5,7,...).

This comment has been minimized.

Copy link
@enkore

enkore Jul 29, 2017

Author Owner

Possible, but not that important. Like I explained, the main point would be to avoid the OS scheduler trying to schedule two Python threads at the same time. If no heavy lifting is done in Python threads, then this should reduce GIL contention and possibly reduce latency between Python threads. Which CPU the Python threads are pinned to isn't really important for this.


if(pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpu_set) != 0) {
/* Parent affinity excludes CPU1 or CPU1 does not exist (usually implies single core system) */
return 0;
}

return 1;
}
9 changes: 9 additions & 0 deletions src/borg/platform/linux.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ from .base import SyncFile as BaseSyncFile
from .base import safe_fadvise
from .posix import swidth

from cpython.exc cimport PyErr_SetFromErrno
from libc cimport errno
from libc.stdint cimport int64_t

Expand Down Expand Up @@ -57,9 +58,17 @@ cdef extern from "sys/ioctl.h":
cdef extern from "string.h":
char *strerror(int errnum)

cdef extern from "_linux.c":
int _set_python_thread_affinity "set_python_thread_affinity"()

_comment_re = re.compile(' *#.*', re.M)


def set_python_thread_affinity():
if not _set_python_thread_affinity():
print('cpuset failed')


BSD_TO_LINUX_FLAGS = {
stat.UF_NODUMP: FS_NODUMP_FL,
stat.UF_IMMUTABLE: FS_IMMUTABLE_FL,
Expand Down

1 comment on commit 4664f2d

@ThomasWaldmann
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you copied quite some code from elsewhere to threads.
any plans about how to dedup that and how to test it?

Please sign in to comment.