Skip to content

Commit

Permalink
emerge: enable parallel-fetch during pkg_pretend (bug 710432)
Browse files Browse the repository at this point in the history
Execute pkg_pretend phases in a coroutine while parallel-fetch
is running concurrently. When it's time to execute the pkg_pretend
phase for a remote binary package, use a Scheduler _get_prefetcher
method to get a running prefetcher if available, and otherwise
start a new fetcher.

Since pkg_pretend phases now run inside of the --keep-going retry
loop, --keep-going is now able to recover from pkg_pretend
failures, which fixes bug 404157.

Bug: https://bugs.gentoo.org/404157
Bug: https://bugs.gentoo.org/710432
Signed-off-by: Zac Medico <zmedico@gentoo.org>
  • Loading branch information
zmedico committed Sep 21, 2020
1 parent 302a93e commit a93f20b
Showing 1 changed file with 96 additions and 43 deletions.
139 changes: 96 additions & 43 deletions lib/_emerge/Scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from portage._sets.base import InternalPackageSet
from portage.util import ensure_dirs, writemsg, writemsg_level
from portage.util.futures import asyncio
from portage.util.futures.compat_coroutine import coroutine, coroutine_return
from portage.util.SlotObject import SlotObject
from portage.util._async.SchedulerInterface import SchedulerInterface
from portage.package.ebuild.digestcheck import digestcheck
Expand Down Expand Up @@ -766,15 +767,16 @@ def _create_prefetcher(self, pkg):

return prefetcher

def _run_pkg_pretend(self):
@coroutine
def _run_pkg_pretend(self, loop=None):
"""
Since pkg_pretend output may be important, this method sends all
output directly to stdout (regardless of options like --quiet or
--jobs).
"""

failures = 0
sched_iface = self._sched_iface
sched_iface = loop = asyncio._wrap_loop(loop or self._sched_iface)

for x in self._mergelist:
if not isinstance(x, Package):
Expand All @@ -789,18 +791,28 @@ def _run_pkg_pretend(self):
if "pretend" not in x.defined_phases:
continue

out_str =">>> Running pre-merge checks for " + colorize("INFORM", x.cpv) + "\n"
portage.util.writemsg_stdout(out_str, noiselevel=-1)
out_str = "Running pre-merge checks for " + colorize("INFORM", x.cpv)
self._status_msg(out_str)

root_config = x.root_config
settings = self.pkgsettings[root_config.root]
settings = self._allocate_config(root_config.root)
settings.setcpv(x)
if not x.built:
# Get required SRC_URI metadata (it's not cached in x.metadata
# because some packages have an extremely large SRC_URI value).
portdb = root_config.trees["porttree"].dbapi
(settings.configdict["pkg"]["SRC_URI"],) = yield portdb.async_aux_get(
x.cpv, ["SRC_URI"], myrepo=x.repo, loop=loop
)

# setcpv/package.env allows for per-package PORTAGE_TMPDIR so we
# have to validate it for each package
rval = _check_temp_dir(settings)
if rval != os.EX_OK:
return rval
failures += 1
self._record_pkg_failure(x, settings, FAILURE)
self._deallocate_config(settings)
continue

build_dir_path = os.path.join(
os.path.realpath(settings["PORTAGE_TMPDIR"]),
Expand All @@ -809,7 +821,7 @@ def _run_pkg_pretend(self):
settings["PORTAGE_BUILDDIR"] = build_dir_path
build_dir = EbuildBuildDir(scheduler=sched_iface,
settings=settings)
sched_iface.run_until_complete(build_dir.async_lock())
yield build_dir.async_lock()
current_task = None

try:
Expand All @@ -835,7 +847,7 @@ def _run_pkg_pretend(self):
phase='clean', scheduler=sched_iface, settings=settings)
current_task = clean_phase
clean_phase.start()
clean_phase.wait()
yield clean_phase.async_wait()

if x.built:
tree = "bintree"
Expand All @@ -845,13 +857,19 @@ def _run_pkg_pretend(self):
# Display fetch on stdout, so that it's always clear what
# is consuming time here.
if bintree.isremote(x.cpv):
fetcher = BinpkgFetcher(pkg=x,
scheduler=sched_iface)
fetcher.start()
if fetcher.wait() != os.EX_OK:
fetcher = self._get_prefetcher(x)
if fetcher is None:
fetcher = BinpkgFetcher(pkg=x, scheduler=loop)
fetcher.start()
# We only set the fetched value when fetcher
# is a BinpkgFetcher, since BinpkgPrefetcher
# handles fetch, verification, and the
# bintree.inject call which moves the file.
fetched = fetcher.pkg_path
if (yield fetcher.async_wait()) != os.EX_OK:
failures += 1
self._record_pkg_failure(x, settings, fetcher.returncode)
continue
fetched = fetcher.pkg_path

if fetched is False:
filename = bintree.getname(x.cpv)
Expand All @@ -861,17 +879,17 @@ def _run_pkg_pretend(self):
scheduler=sched_iface, _pkg_path=filename)
current_task = verifier
verifier.start()
if verifier.wait() != os.EX_OK:
if (yield verifier.async_wait()) != os.EX_OK:
failures += 1
self._record_pkg_failure(x, settings, verifier.returncode)
continue

if fetched:
bintree.inject(x.cpv, filename=fetched)

infloc = os.path.join(build_dir_path, "build-info")
ensure_dirs(infloc)
self._sched_iface.run_until_complete(
bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface))
yield bintree.dbapi.unpack_metadata(settings, infloc, loop=loop)
ebuild_path = os.path.join(infloc, x.pf + ".ebuild")
settings.configdict["pkg"]["EMERGE_FROM"] = "binary"
settings.configdict["pkg"]["MERGE_TYPE"] = "binary"
Expand Down Expand Up @@ -905,28 +923,42 @@ def _run_pkg_pretend(self):

current_task = pretend_phase
pretend_phase.start()
ret = pretend_phase.wait()
ret = yield pretend_phase.async_wait()
if ret != os.EX_OK:
failures += 1
self._record_pkg_failure(x, settings, ret)
portage.elog.elog_process(x.cpv, settings)
finally:

if current_task is not None:
if current_task.isAlive():
current_task.cancel()
current_task.wait()
if current_task.returncode == os.EX_OK:
clean_phase = EbuildPhase(background=False,
phase='clean', scheduler=sched_iface,
settings=settings)
clean_phase.start()
clean_phase.wait()
yield clean_phase.async_wait()

sched_iface.run_until_complete(build_dir.async_unlock())
yield build_dir.async_unlock()
self._deallocate_config(settings)

if failures:
return FAILURE
return os.EX_OK
return coroutine_return(FAILURE)
coroutine_return(os.EX_OK)

def _record_pkg_failure(self, pkg, settings, ret):
self._failed_pkgs.append(
self._failed_pkg(
build_dir=settings.get("PORTAGE_BUILDDIR"),
build_log=settings.get("PORTAGE_LOG_FILE"),
pkg=pkg,
returncode=ret,
)
)
if not self._terminated_tasks:
self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for")
self._status_display.failed = len(self._failed_pkgs)

def merge(self):
if "--resume" in self.myopts:
Expand Down Expand Up @@ -988,11 +1020,6 @@ def merge(self):
if rval != os.EX_OK and not keep_going:
return rval

if not fetchonly:
rval = self._run_pkg_pretend()
if rval != os.EX_OK:
return rval

while True:

received_signal = []
Expand Down Expand Up @@ -1389,8 +1416,6 @@ def _merge(self):
if self._opts_no_background.intersection(self.myopts):
self._set_max_jobs(1)

self._add_prefetchers()
self._add_packages()
failed_pkgs = self._failed_pkgs
portage.locks._quiet = self._background
portage.elog.add_listener(self._elog_listener)
Expand All @@ -1406,6 +1431,30 @@ def display_callback():
rval = os.EX_OK

try:
self._add_prefetchers()
if not self._build_opts.fetchonly:
# Run pkg_pretend concurrently with parallel-fetch, and be careful
# to respond appropriately to termination, so that we don't start
# any new tasks after we've been terminated. Temporarily make the
# status display quiet so that its output is not interleaved with
# pkg_pretend output.
status_quiet = self._status_display.quiet
self._status_display.quiet = True
try:
rval = self._sched_iface.run_until_complete(
self._run_pkg_pretend(loop=self._sched_iface)
)
except asyncio.CancelledError:
self.terminate()
finally:
self._status_display.quiet = status_quiet
self._termination_check()
if self._terminated_tasks:
rval = 128 + signal.SIGINT
if rval != os.EX_OK:
return rval

self._add_packages()
self._main_loop()
finally:
self._main_loop_cleanup()
Expand Down Expand Up @@ -1742,6 +1791,23 @@ def _schedule_tasks_imp(self):

return bool(state_change)

def _get_prefetcher(self, pkg):
try:
prefetcher = self._prefetchers.pop(pkg, None)
except KeyError:
# KeyError observed with PyPy 1.8, despite None given as default.
# Note that PyPy 1.8 has the same WeakValueDictionary code as
# CPython 2.7, so it may be possible for CPython to raise KeyError
# here as well.
prefetcher = None
if prefetcher is not None and not prefetcher.isAlive():
try:
self._task_queues.fetch._task_queue.remove(prefetcher)
except ValueError:
pass
prefetcher = None
return prefetcher

def _task(self, pkg):

pkg_to_replace = None
Expand All @@ -1758,20 +1824,7 @@ def _task(self, pkg):
"installed", pkg.root_config, installed=True,
operation="uninstall")

try:
prefetcher = self._prefetchers.pop(pkg, None)
except KeyError:
# KeyError observed with PyPy 1.8, despite None given as default.
# Note that PyPy 1.8 has the same WeakValueDictionary code as
# CPython 2.7, so it may be possible for CPython to raise KeyError
# here as well.
prefetcher = None
if prefetcher is not None and not prefetcher.isAlive():
try:
self._task_queues.fetch._task_queue.remove(prefetcher)
except ValueError:
pass
prefetcher = None
prefetcher = self._get_prefetcher(pkg)

task = MergeListItem(args_set=self._args_set,
background=self._background, binpkg_opts=self._binpkg_opts,
Expand Down

0 comments on commit a93f20b

Please sign in to comment.