Skip to content

Commit

Permalink
async_aux_get: Use EbuildMetadataPhase deallocate_config future
Browse files Browse the repository at this point in the history
For the portdbapi async_aux_get method, there is not a very
good place to store a config pool, so instead use asyncio.Lock
to manage access to the portdbapi doebuild_settings attribute
when using the main event loop in the main thread. For other
threads, clone a config instance since we do not have a
thread-safe config pool. This cloning is expensive, but since
portage internals do not trigger this case, it suffices for now
(an AssertionError ensures that internals do not trigger it).
For the main event loop running in the main thread, performance
with the asyncio.Lock should not be significantly different to
performance prior to commit c95fc64, since check_locale
results are typically cached and before there was only a single
shared doebuild_settings instance with access serialized via
the EbuildMetadataPhase _start method.

Update async_aux_get callers to use asyncio.ensure_future on
the returned coroutine when needed, since it used to return
a future instead of a coroutine, and sometimes a future is
needed for add_done_callback usage.

In the portdbapi async_fetch_map method, fix a broken reference
to "future" which should have been "aux_get_future", an error
discovered while testing this patch.

Bug: https://bugs.gentoo.org/924319
Fixes: c95fc64 ("EbuildPhase: async_check_locale")
Signed-off-by: Zac Medico <zmedico@gentoo.org>
  • Loading branch information
zmedico committed Feb 21, 2024
1 parent a42c216 commit 389bb30
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 45 deletions.
10 changes: 7 additions & 3 deletions lib/portage/_emirrordist/FetchIterator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2013-2018 Gentoo Foundation
# Copyright 2013-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import threading
Expand All @@ -14,6 +14,7 @@
from portage.package.ebuild.fetch import DistfileName
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
from portage.util._async.TaskScheduler import TaskScheduler
from portage.util.futures import asyncio
from portage.util.futures.iter_completed import iter_gather
from .FetchTask import FetchTask
from _emerge.CompositeTask import CompositeTask
Expand Down Expand Up @@ -276,8 +277,11 @@ def aux_get_done(gather_result):
result.set_result(fetch_tasks)

def future_generator():
yield config.portdb.async_aux_get(
cpv, ("RESTRICT",), myrepo=repo_config.name, loop=loop
yield asyncio.ensure_future(
config.portdb.async_aux_get(
cpv, ("RESTRICT",), myrepo=repo_config.name, loop=loop
),
loop,
)
yield config.portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop)

Expand Down
129 changes: 87 additions & 42 deletions lib/portage/dbapi/porttree.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 1998-2021 Gentoo Authors
# Copyright 1998-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

__all__ = ["close_portdbapi_caches", "FetchlistDict", "portagetree", "portdbapi"]
Expand Down Expand Up @@ -41,7 +41,9 @@
from portage.util.futures.iter_completed import iter_gather
from _emerge.EbuildMetadataPhase import EbuildMetadataPhase

import contextlib
import os as _os
import threading
import traceback
import warnings
import errno
Expand Down Expand Up @@ -239,6 +241,7 @@ def __init__(self, _unused_param=DeprecationWarning, mysettings=None):
# this purpose because doebuild makes many changes to the config
# instance that is passed in.
self.doebuild_settings = config(clone=self.settings)
self._doebuild_settings_lock = asyncio.Lock()
self.depcachedir = os.path.realpath(self.settings.depcachedir)

if os.environ.get("SANDBOX_ON") == "1":
Expand Down Expand Up @@ -356,6 +359,17 @@ def __init__(self, _unused_param=DeprecationWarning, mysettings=None):
self._better_cache = None
self._broken_ebuilds = set()

def __getstate__(self):
state = self.__dict__.copy()
# These attributes are not picklable, so they are automatically
# regenerated after unpickling.
state["_doebuild_settings_lock"] = None
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._doebuild_settings_lock = asyncio.Lock()

def _set_porttrees(self, porttrees):
"""
Consumers, such as emirrordist, may modify the porttrees attribute in
Expand Down Expand Up @@ -669,7 +683,7 @@ def aux_get(
self.async_aux_get(mycpv, mylist, mytree=mytree, myrepo=myrepo, loop=loop)
)

def async_aux_get(self, mycpv, mylist, mytree=None, myrepo=None, loop=None):
async def async_aux_get(self, mycpv, mylist, mytree=None, myrepo=None, loop=None):
"""
Asynchronous form form of aux_get.
Expand All @@ -694,13 +708,11 @@ def async_aux_get(self, mycpv, mylist, mytree=None, myrepo=None, loop=None):
# Callers of this method certainly want the same event loop to
# be used for all calls.
loop = asyncio._wrap_loop(loop)
future = loop.create_future()
cache_me = False
if myrepo is not None:
mytree = self.treemap.get(myrepo)
if mytree is None:
future.set_exception(PortageKeyError(myrepo))
return future
raise PortageKeyError(myrepo)

if (
mytree is not None
Expand All @@ -719,16 +731,14 @@ def async_aux_get(self, mycpv, mylist, mytree=None, myrepo=None, loop=None):
):
aux_cache = self._aux_cache.get(mycpv)
if aux_cache is not None:
future.set_result([aux_cache.get(x, "") for x in mylist])
return future
return [aux_cache.get(x, "") for x in mylist]
cache_me = True

try:
cat, pkg = mycpv.split("/", 1)
except ValueError:
# Missing slash. Can't find ebuild so raise PortageKeyError.
future.set_exception(PortageKeyError(mycpv))
return future
raise PortageKeyError(mycpv)

myebuild, mylocation = self.findname2(mycpv, mytree)

Expand All @@ -737,12 +747,12 @@ def async_aux_get(self, mycpv, mylist, mytree=None, myrepo=None, loop=None):
"!!! aux_get(): %s\n" % _("ebuild not found for '%s'") % mycpv,
noiselevel=1,
)
future.set_exception(PortageKeyError(mycpv))
return future
raise PortageKeyError(mycpv)

mydata, ebuild_hash = self._pull_valid_cache(mycpv, myebuild, mylocation)

if mydata is not None:
future = loop.create_future()
self._aux_get_return(
future,
mycpv,
Expand All @@ -754,37 +764,71 @@ def async_aux_get(self, mycpv, mylist, mytree=None, myrepo=None, loop=None):
cache_me,
None,
)
return future
return future.result()

if myebuild in self._broken_ebuilds:
future.set_exception(PortageKeyError(mycpv))
return future

proc = EbuildMetadataPhase(
cpv=mycpv,
ebuild_hash=ebuild_hash,
portdb=self,
repo_path=mylocation,
scheduler=loop,
settings=self.doebuild_settings,
)
raise PortageKeyError(mycpv)

proc.addExitListener(
functools.partial(
self._aux_get_return,
future,
mycpv,
mylist,
myebuild,
ebuild_hash,
mydata,
mylocation,
cache_me,
)
)
future.add_done_callback(functools.partial(self._aux_get_cancel, proc))
proc.start()
return future
proc = None
deallocate_config = None
async with contextlib.AsyncExitStack() as stack:
try:
if (
threading.current_thread() is threading.main_thread()
and loop is asyncio._safe_loop()
):
# In this case use self._doebuild_settings_lock to manage concurrency.
deallocate_config = loop.create_future()
await stack.enter_async_context(self._doebuild_settings_lock)
settings = self.doebuild_settings
else:
if portage._internal_caller:
raise AssertionError(
f"async_aux_get called from thread {threading.current_thread()} with loop {loop}"
)
# Clone a config instance since we do not have a thread-safe config pool.
settings = portage.config(clone=self.settings)

proc = EbuildMetadataPhase(
cpv=mycpv,
ebuild_hash=ebuild_hash,
portdb=self,
repo_path=mylocation,
scheduler=loop,
settings=settings,
deallocate_config=deallocate_config,
)

future = loop.create_future()
proc.addExitListener(
functools.partial(
self._aux_get_return,
future,
mycpv,
mylist,
myebuild,
ebuild_hash,
mydata,
mylocation,
cache_me,
)
)
future.add_done_callback(functools.partial(self._aux_get_cancel, proc))
proc.start()

finally:
# Wait for deallocate_config before releasing
# self._doebuild_settings_lock if needed.
if deallocate_config is not None:
if proc is None or not proc.isAlive():
deallocate_config.done() or deallocate_config.cancel()
else:
await deallocate_config

# After deallocate_config is done, release self._doebuild_settings_lock
# by leaving the stack context, and wait for proc to finish and
# trigger a call to self._aux_get_return.
return await future

@staticmethod
def _aux_get_cancel(proc, future):
Expand Down Expand Up @@ -889,7 +933,7 @@ def aux_get_done(aux_get_future):
)
)
else:
result.set_exception(future.exception())
result.set_exception(aux_get_future.exception())
return

eapi, myuris = aux_get_future.result()
Expand All @@ -913,8 +957,9 @@ def aux_get_done(aux_get_future):
except Exception as e:
result.set_exception(e)

aux_get_future = self.async_aux_get(
mypkg, ["EAPI", "SRC_URI"], mytree=mytree, loop=loop
aux_get_future = asyncio.ensure_future(
self.async_aux_get(mypkg, ["EAPI", "SRC_URI"], mytree=mytree, loop=loop),
loop,
)
result.add_done_callback(
lambda result: aux_get_future.cancel() if result.cancelled() else None
Expand Down
3 changes: 3 additions & 0 deletions lib/portage/tests/update/test_move_ent.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ def testMoveEntWithSignature(self):
finally:
playground.cleanup()

# Ignore "The loop argument is deprecated" since this argument is conditionally
# added to asyncio.Lock as needed for compatibility with python 3.9.
@pytest.mark.filterwarnings("ignore:The loop argument is deprecated")
@pytest.mark.filterwarnings("error")
def testMoveEntWithCorruptIndex(self):
"""
Expand Down

0 comments on commit 389bb30

Please sign in to comment.