Skip to content

Commit

Permalink
rewrite the cache to be asynchronous
Browse files Browse the repository at this point in the history
We've been seeing "Task X took Y seconds" warnings in our tests for a
long time, especially on Windows. Running git commands synchronously
blocks other tasks from running, like display redrawing. It's bad
practice in an async program.

One of the barriers to async-ifying the cache code earlier was that many
commands relied on having exclusive ownership of the index file while
they were running. For example, 1) read a tree into the index, 2) merge
another tree into some subdirectory, 3) write out the result. If any
other git commands ran in the middle of that, it would screw up the
result. So we needed to rewrite every cache function to use its own
temporary index file, if we want them to run in parallel.

The reason I'm finally getting around to this now, is that I'm trying to
reduce the number of git commands that run in a no-op sync. One of the
optimizations I'm going to want to do, is to reuse the index file from
the last sync, so that we don't need a `read-tree` and an `update-index`
just to set us up for `diff-files`. But the plumbing to do that right is
pretty much the same as what we should be doing to run every git command
with its own index anyway. So let's just bite the bullet and do that
now, and then reusing index files will be easy after that.
  • Loading branch information
oconnor663 committed Nov 27, 2015
1 parent 264ede4 commit e18aa33
Show file tree
Hide file tree
Showing 27 changed files with 602 additions and 380 deletions.
532 changes: 320 additions & 212 deletions peru/cache.py

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions peru/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@
def checkout(runtime, scope, imports, path):
imports_tree = yield from get_imports_tree(runtime, scope, imports)
last_imports_tree = _get_last_imports(runtime)
runtime.cache.export_tree(imports_tree, path, last_imports_tree,
force=runtime.force)
yield from runtime.cache.export_tree(
imports_tree, path, last_imports_tree, force=runtime.force)
_set_last_imports(runtime, imports_tree)


@asyncio.coroutine
def get_imports_tree(runtime, scope, imports, base_tree=None):
target_trees = yield from get_trees(runtime, scope, imports.keys())
imports_tree = merge_imports_tree(runtime.cache, imports, target_trees,
base_tree)
imports_tree = yield from merge_imports_tree(
runtime.cache, imports, target_trees, base_tree)
return imports_tree


Expand Down
9 changes: 9 additions & 0 deletions peru/keyval.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import os
import shutil
import tempfile
Expand Down Expand Up @@ -46,3 +47,11 @@ def _tmp_file(self):
fd, path = tempfile.mkstemp(dir=self._tmp_dir)
os.close(fd)
return path

@contextlib.contextmanager
def tmp_dir_context(self):
try:
path = tempfile.mkdtemp(dir=self._tmp_dir)
yield path
finally:
shutil.rmtree(path, ignore_errors=True)
5 changes: 3 additions & 2 deletions peru/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ def do_copy(params):
dest = params.args['<dest>']
tree = yield from imports.get_tree(
params.runtime, params.scope, params.args['<target>'])
params.runtime.cache.export_tree(tree, dest, force=params.runtime.force)
yield from params.runtime.cache.export_tree(
tree, dest, force=params.runtime.force)
if not params.args['<dest>']:
print(dest)

Expand Down Expand Up @@ -335,7 +336,7 @@ def main(*, argv=None, env=None, nocatch=False):
return ret

try:
runtime = Runtime(args, env)
runtime = async.run_task(Runtime(args, env))
if not args['--quiet']:
parser.warn_duplicate_keys(runtime.peru_file)
scope, imports = parser.parse_file(runtime.peru_file)
Expand Down
6 changes: 4 additions & 2 deletions peru/merge.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import textwrap

from .cache import compute_key, MergeConflictError


@asyncio.coroutine
def merge_imports_tree(cache, imports, target_trees, base_tree=None):
'''Take an Imports struct and a dictionary of resolved trees and merge the
unified imports tree. If base_tree is supplied, merge that too. There are a
Expand All @@ -20,11 +22,11 @@ def merge_imports_tree(cache, imports, target_trees, base_tree=None):
# We always want to merge imports in the same order, so that any conflicts
# we run into will be deterministic. Sort the imports alphabetically by
# target name.
unified_tree = base_tree or cache.get_empty_tree()
unified_tree = base_tree or (yield from cache.get_empty_tree())
for target, paths in imports.items():
for path in paths:
try:
unified_tree = cache.merge_trees(
unified_tree = yield from cache.merge_trees(
unified_tree, target_trees[target], path)
except MergeConflictError as e:
message = 'Merge conflict in import "{}" at "{}":\n\n{}'
Expand Down
13 changes: 9 additions & 4 deletions peru/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ def __init__(self, name, type, default_rule, plugin_fields, yaml_name,
def _get_base_tree(self, runtime):
override_path = runtime.get_override(self.name)
if override_path is not None:
return self._get_override_tree(runtime, override_path)
override_tree = yield from self._get_override_tree(
runtime, override_path)
return override_tree

key = compute_key({
'type': self.type,
Expand All @@ -48,7 +50,7 @@ def _get_base_tree(self, runtime):
runtime.get_plugin_context(), self.type,
self.plugin_fields, tmp_dir,
runtime.display.get_handle(self.name))
tree = runtime.cache.import_tree(tmp_dir)
tree = yield from runtime.cache.import_tree(tmp_dir)
# Note that we still *write* to cache even when --no-cache is True.
# That way we avoid confusing results on subsequent syncs.
runtime.cache.keyval[key] = tree
Expand Down Expand Up @@ -78,7 +80,8 @@ def parse_peru_file(self, runtime):
yaml = json.loads(runtime.cache.keyval[cache_key])
else:
try:
yaml_bytes = runtime.cache.read_file(tree, self.peru_file)
yaml_bytes = yield from runtime.cache.read_file(
tree, self.peru_file)
yaml = yaml_bytes.decode('utf8')
except FileNotFoundError:
yaml = None
Expand Down Expand Up @@ -106,6 +109,7 @@ def reup(self, runtime):
for line in output_lines:
runtime.display.print(line)

@asyncio.coroutine
def _get_override_tree(self, runtime, path):
if not os.path.exists(path):
raise PrintableError(
Expand All @@ -115,4 +119,5 @@ def _get_override_tree(self, runtime, path):
raise PrintableError(
"override path for module '{}' is not a directory: {}".format(
self.name, path))
return runtime.cache.import_tree(path)
tree = yield from runtime.cache.import_tree(path)
return tree
48 changes: 31 additions & 17 deletions peru/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,28 @@ def get_tree(self, runtime, input_tree):

tree = input_tree
if self.copy:
tree = copy_files(runtime.cache, tree, self.copy)
tree = yield from copy_files(runtime.cache, tree, self.copy)
if self.move:
tree = move_files(runtime.cache, tree, self.move)
tree = yield from move_files(runtime.cache, tree, self.move)
if self.pick:
tree = pick_files(runtime.cache, tree, self.pick)
tree = yield from pick_files(runtime.cache, tree, self.pick)
if self.executable:
tree = make_files_executable(
tree = yield from make_files_executable(
runtime.cache, tree, self.executable)
if self.export:
tree = get_export_tree(runtime.cache, tree, self.export)
tree = yield from get_export_tree(
runtime.cache, tree, self.export)

runtime.cache.keyval[key] = tree

return tree


@asyncio.coroutine
def _copy_files_modifications(_cache, tree, paths_multimap):
modifications = {}
for source in paths_multimap:
source_info_dict = _cache.ls_tree(tree, source)
source_info_dict = yield from _cache.ls_tree(tree, source)
if not source_info_dict:
raise NoMatchingFilesError(
'Path "{}" does not exist.'.format(source))
Expand All @@ -67,7 +69,7 @@ def _copy_files_modifications(_cache, tree, paths_multimap):
# If dest is a directory, put the source inside dest instead of
# overwriting dest entirely.
dest_is_dir = False
dest_info_dict = _cache.ls_tree(tree, dest)
dest_info_dict = yield from _cache.ls_tree(tree, dest)
if dest_info_dict:
dest_info = list(dest_info_dict.items())[0][1]
dest_is_dir = (dest_info.type == cache.TREE_TYPE)
Expand All @@ -79,16 +81,21 @@ def _copy_files_modifications(_cache, tree, paths_multimap):
return modifications


@asyncio.coroutine
def copy_files(_cache, tree, paths_multimap):
modifications = _copy_files_modifications(_cache, tree, paths_multimap)
return _cache.modify_tree(tree, modifications)
modifications = yield from _copy_files_modifications(
_cache, tree, paths_multimap)
tree = yield from _cache.modify_tree(tree, modifications)
return tree


@asyncio.coroutine
def move_files(_cache, tree, paths_multimap):
# First obtain the copies from the original tree. Moves are not ordered but
# happen all at once, so if you move a->b and b->c, the contents of c will
# always end up being b rather than a.
modifications = _copy_files_modifications(_cache, tree, paths_multimap)
modifications = yield from _copy_files_modifications(
_cache, tree, paths_multimap)
# Now add in deletions, but be careful not to delete a file that just got
# moved. Note that if "a" gets moved into "dir", it will end up at "dir/a",
# even if "dir" is deleted (because modify_tree always modifies parents
Expand All @@ -97,9 +104,11 @@ def move_files(_cache, tree, paths_multimap):
for source in paths_multimap:
if source not in modifications:
modifications[source] = None
return _cache.modify_tree(tree, modifications)
tree = yield from _cache.modify_tree(tree, modifications)
return tree


@asyncio.coroutine
def _get_glob_entries(_cache, tree, globs_list):
matches = {}
for glob_str in globs_list:
Expand All @@ -108,7 +117,7 @@ def _get_glob_entries(_cache, tree, globs_list):
# like 'a/b/**/foo', only list the paths under 'a/b'.
regex = glob.glob_to_path_regex(glob_str)
prefix = glob.unglobbed_prefix(glob_str)
entries = _cache.ls_tree(tree, prefix, recursive=True)
entries = yield from _cache.ls_tree(tree, prefix, recursive=True)
found = False
for path, entry in entries.items():
if re.match(regex, path):
Expand All @@ -120,23 +129,28 @@ def _get_glob_entries(_cache, tree, globs_list):
return matches


@asyncio.coroutine
def pick_files(_cache, tree, globs_list):
picks = _get_glob_entries(_cache, tree, globs_list)
return _cache.modify_tree(None, picks)
picks = yield from _get_glob_entries(_cache, tree, globs_list)
tree = yield from _cache.modify_tree(None, picks)
return tree


@asyncio.coroutine
def make_files_executable(_cache, tree, globs_list):
entries = _get_glob_entries(_cache, tree, globs_list)
entries = yield from _get_glob_entries(_cache, tree, globs_list)
exes = {}
for path, entry in entries.items():
# Ignore directories.
if entry.type == cache.BLOB_TYPE:
exes[path] = entry._replace(mode=cache.EXECUTABLE_FILE_MODE)
return _cache.modify_tree(tree, exes)
tree = yield from _cache.modify_tree(tree, exes)
return tree


@asyncio.coroutine
def get_export_tree(_cache, tree, export_path):
entries = _cache.ls_tree(tree, export_path)
entries = yield from _cache.ls_tree(tree, export_path)
if not entries:
raise NoMatchingFilesError('Export path "{}" doesn\'t exist.'
.format(export_path))
Expand Down
16 changes: 14 additions & 2 deletions peru/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,20 @@
from . import plugin


class Runtime:
@asyncio.coroutine
def Runtime(args, env):
'This is the async constructor for the _Runtime class.'
r = _Runtime(args, env)
yield from r._init_cache()
return r


class _Runtime:
def __init__(self, args, env):
"Don't instantiate this class directly. Use the Runtime() constructor."
self._set_paths(args, env)

compat.makedirs(self.state_dir)
self.cache = cache.Cache(self.cache_dir)

self._tmp_root = os.path.join(self.state_dir, 'tmp')
compat.makedirs(self._tmp_root)
Expand Down Expand Up @@ -49,6 +57,10 @@ def __init__(self, args, env):

self.display = get_display(args)

@asyncio.coroutine
def _init_cache(self):
self.cache = yield from cache.Cache(self.cache_dir)

def _set_paths(self, args, env):
explicit_peru_file = args['--file']
explicit_sync_dir = args['--sync-dir']
Expand Down
37 changes: 36 additions & 1 deletion tests/shared.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import asyncio
import difflib
import functools
import inspect
import io
import os
from pathlib import Path
Expand All @@ -7,14 +10,27 @@
import sys
import tempfile
import textwrap
import unittest

import peru.async
from peru.compat import makedirs
import peru.main


test_resources = Path(__file__).parent.resolve() / 'resources'


def make_synchronous(f):
'''This lets you turn coroutines into regular functions and call them from
synchronous code, so for example test methods can be coroutines. It does
NOT let you call coroutines as regular functions *inside* another
coroutine. That will raise an "Event loop is running" error.'''
@functools.wraps(f)
def wrapper(*args, **kwargs):
return peru.async.run_task(asyncio.coroutine(f)(*args, **kwargs))
return wrapper


def tmp_dir():
return tempfile.mkdtemp(dir=_tmp_root())

Expand Down Expand Up @@ -103,9 +119,10 @@ def assert_contents(dir, expected_contents, *, message='', excludes=(),
raise AssertionError(assertion_msg)


@asyncio.coroutine
def assert_tree_contents(cache, tree, expected_contents, **kwargs):
export_dir = create_dir()
cache.export_tree(tree, export_dir)
yield from cache.export_tree(tree, export_dir)
assert_contents(export_dir, expected_contents, **kwargs)


Expand Down Expand Up @@ -209,3 +226,21 @@ def assert_executable(path):

def assert_not_executable(path):
_check_executable(path, False)


class PeruTest(unittest.TestCase):
'''Behaves like a standard TestCase, but checks to make sure that we don't
accidentally define any generator tests. (Normally using yield in a test
turns it into a silent no-op. Very sad.)'''

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Complain if it looks like an important test function is a generator.
for name in dir(self):
is_test = (name.startswith('test') or
name in ('setUp', 'tearDown'))
is_generator = inspect.isgeneratorfunction(getattr(self, name))
if is_test and is_generator:
raise TypeError("{}() is a generator, which makes it a silent "
"no-op!\nUse @make_synchronous or something."
.format(type(self).__name__ + '.' + name))

0 comments on commit e18aa33

Please sign in to comment.