-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Poc concurrent package validation #45
WIP: Poc concurrent package validation #45
Conversation
This should not alter the bahaviour of conda-mirror except for a modfied log message reporting which package is being validated.
I've started by refactoring the loop into a map. This should not do anything except for slightly altering the log message. (The "# of #" info is gone.) |
Codecov Report
@@ Coverage Diff @@
## master #45 +/- ##
==========================================
+ Coverage 94.06% 94.53% +0.47%
==========================================
Files 2 2
Lines 219 238 +19
==========================================
+ Hits 206 225 +19
Misses 13 13
Continue to review full report at Codecov.
|
Looks like |
Processes started by `multiprocessing.Pool.map()` don't see the same outer scope as iterations of `map()`. To avoid this problem, define the function to be mapped when outer vars are known.
So pickling the actual validation function also fails. We can still explicitly pass the necessary info. (Sorry for testing all this using the CI. I'm not in a proper dev environment at the moment.) |
Test are passing. @ericdill, can you have a try with this? To use concurrent mode, set the number of threads in an environment var. The following should validate a conda-forge mirror using 8 threads: export CONDA_MIRROR_NUM_THREADS=8
conda-mirror --upstream-channel conda-forge --target-directory local_mirror --platform linux-64 |
A few thoughts on what's left to do:
|
@willirath I'll give this a close look tomorrow. I've got a planning meeting for the rest of the day (I'd much rather be working on this PR 😀 with you) Regarding your todo list
I'll set it up to run as a nightly job tomorrow and monitor its status
I generally prefer cli args to env vars and argparse is already configured, so shouldn't be too hard to add another "num_workers" argument or some such.
Now that you bring this up, that's actually one of the main reasons I abandoned the multiprocessing validation. I'm a sucker for high coverage numbers.
Yeah that's probably a bit unnecessary. I had toyed around with a separate script that would use conda_mirror as a library, but many of the same things apply here too. Basically I think you want to zip up the path to each package and the associated metadata and then pass that to your validation function. That way you're not needing to copy that repodata dict N times. I think there's plenty of room to solve this problem in many different ways and am interested to see where this one ends. Here's how I did it as a separate script: from conda_mirror import conda_mirror as cm
from pprint import pprint
from argparse import ArgumentParser
from os.path import join
import json
import os
from multiprocessing import Pool
import sys
import time
cm._init_logger(3)
METADATA="No metadata found"
PACKAGE_VALIDATION="Validation failed"
def validate(pkg_tuple):
package_path, package_metadata = pkg_tuple
if package_metadata is None:
return package_path, METADATA, ""
ret = cm._validate(package_path,
md5=package_metadata.get('md5'),
sha256=package_metadata.get('sha256'),
size=package_metadata.get('size'))
if ret is not None:
return package_path, PACKAGE_VALIDATION, ret
def cli():
ap = ArgumentParser()
ap.add_argument(
'pkgs_dir',
action='store',
help="The path to the directory that you are interested in"
)
ap.add_argument(
'num_workers',
action='store',
help="The number of parallel processes to spin up"
)
ap.add_argument(
'--cron',
action="store_true",
help="Disable print calls that don't do so well in logs",
default=False
)
args = ap.parse_args()
with open(join(args.pkgs_dir, 'repodata.json'), 'r') as f:
repodata = json.load(f)
conda_packages = cm._list_conda_packages(args.pkgs_dir)
pkg_iter = ((join(args.pkgs_dir, pkg), repodata['packages'].get(pkg))
for idx, pkg in enumerate(conda_packages))
start = time.time()
need_attention = []
with Pool(int(args.num_workers)) as p:
for i, ret in enumerate(p.imap_unordered(validate, pkg_iter)):
elapsed = int(time.time()) - int(start) or 1
pkgs_per_sec = int(i / elapsed) or 1
eta = int((len(conda_packages) - i) / pkgs_per_sec)
msg = ('{0}/{1} {2}s elapsed {3} processed/sec {4}s remaining'.format(
i, len(conda_packages), elapsed, pkgs_per_sec, eta))
if not args.cron:
sys.stderr.write('\r%s' % msg)
else:
if i % 100 == 0:
print('%s'% msg)
if ret is not None:
need_attention.append(ret)
print('\n%s packages need attention' % len(need_attention))
for package_path, problem_type, info in need_attention:
if problem_type == METADATA:
print("Removing %s because it is not in the local "
"repodata.json" % (package_path), file=sys.stderr)
os.remove(package_path)
elif problem_type == PACKAGE_VALIDATION:
print("Removing %s because it failed package validation with this "
"reason: %s" % (package_path, info), file=sys.stderr)
continue
#os.remove(package_path)
del repodata['packages'][os.path.basename(package_path)]
# Update the repodata immediately
cm._write_repodata(os.path.dirname(package_path), repodata)
print("All packages that require attention")
pprint(need_attention)
if __name__ == "__main__":
cli() |
So I'm back. Running export CONDA_MIRROR_NUM_THREADS=8
conda-mirror --upstream-channel conda-forge --target-directory local_mirror --platform linux-64 at 208d203 seems to work as intended. I can't say much about performance gains as I was only testing this on my workstation which was relatively busy otherwise. @ericdill: Did you have a chance to try this on your jobs? |
https://codecov.io/gh/maxpoint/conda-mirror/compare/8159290dca5ccba2fe9e5b1f6b1053241712fbca...208d203e8d5e61d80ab950c34b90770e738ba021/diff#D1-382 and below looks like |
@willirath sorry for letting this sit for 10 days. It fell off my mental stack and I forgot about it. I'll be able to think on this again next week and will reply with constructive comments then. Thanks for working on this PR! |
I also was quite busy with other things. But I'll hopefully be able to get this done during the next days. A first test shows that in fact validation runs concurrently using the given number of processes. I'll continue cleaning up the args passed to You are probably most familiar with your existing tests. Can you then add the tests using concurrent validation? |
- Really interpret num_threads=0 as inf - Do not terminate but only join Pool of workers
OK ... The problems with coverage turned out to be due to the way The essential bits were avoiding In the end, coverage was increased by 4.5‰ :) |
@ericdill: Can you have a look? I see no further issues. |
@willirath Excellent job tracking down that coverage multiprocessing thing! That's been plaguing me for a while now. Looking over the PR right now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. Thanks @willirath . I'll test this locally and report back the results
@@ -79,6 +84,7 @@ def _match(all_packages, key_glob_dict): | |||
matched : dict | |||
Iterable of package metadata dicts which match the `target_packages` | |||
(key, glob_value) tuples | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take it that your preference is for an extra blank line in at the end of docstrings? Why is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, I'm relatively used to following Numpy's example and probably followed pep257
complaining at some point.
But I must admit that I did not check if this integrates with the existing docstring styles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have really any preference (space or no space) and was mostly just curious. Thanks for the clarification
[edit] spelling
conda_mirror/conda_mirror.py
Outdated
are not in `repodata` and also any packages that fail the package | ||
validation | ||
NOTE2: In concurrent mode (num_threads is not None) this will be hard to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should say "num_threads is not 1"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll change that.
conda_mirror/conda_mirror.py
Outdated
num_threads : int | ||
Number of threads to be used for concurrent validation. Defaults to | ||
`num_threads=1` for non-concurrent mode. To use all available cores, | ||
set `num_threads=0`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this to be
Number of concurrent processes to use. Set to `0` to use a number of processes
equal to the number of cores in the system. Defaults to `1` (i.e. serial
package validation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
conda_mirror/conda_mirror.py
Outdated
p.join() | ||
|
||
|
||
def _valiadate_or_remove_package(args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in function name: s/valiadate/validate/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
_validate_packages(desired_repodata, local_directory) | ||
desired_repodata = {pkgname: packages[pkgname] | ||
for pkgname in possible_packages_to_mirror} | ||
_validate_packages(desired_repodata, local_directory, num_threads) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Through reviewing this PR I've discovered a bug in the current implementation of the mirror. The problematic aspect is as follows:
_validate_packages
can remove packages on disk that exist in the local repodata.json file- The local repodata.json file is not updated before that problematic file is removed
- This means that a user of the condaserver can ask for a package that conda thinks it has (since it is still in repodata.json) but that it can't find (because it's been removed from disk)
As such, this code needs to be changed so that
_validate_packages
returns a list of packages to remove- a new dict of package metadata is created that does not contain the packages that we are going to remove
- That new package metadata dict is written to disk as an atomic operation
- The problematic files are removed
With the above changes we will reduce the chance that the user will encounter an error from conda saying that the file cannot be found on the conda server. This is not a blocking issue on getting this PR merged. I'll fix this problem in a follow-on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
conda_mirror/conda_mirror.py
Outdated
|
||
# create argument list (necessary because multiprocessing.Pool.map does not | ||
# accept additional args to be passed to the mapped function) | ||
val_func_arg_list = [(package, num, package_repodata, package_directory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want to pass in len(local_packages) as another argument since it's unlikely that len(local_packages)
== len(package_repodata)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
That's great!
I'll have a look tomorrow as well. Given that it's quite a messy series of
commits now, I wonder if I should clean this up in a different brach w only
a single commit eventually.
|
In general, I am not particularly concerned about feature branches with a large number of commits. However, there is one commit that I'd like you to get rid of in this feature branch since it's a merge of master into the feature branch. If you'd like to just squash all of this into one commit that's fine with me. |
Tried this locally and everything worked just fine. validation is so much faster too. Thanks for your efforts on this @willirath . If you could just rebase this whole thing to get rid of that merge from master into this branch, then I think this LGTM! |
Let me know when all this is done. I'll squash it then. |
I've added a clean PR #48 which squashes all of the above and also has a proper commit message. |
Closing in favor of #48 |
Proof-of concept for concurrent package validation
Implements solution to #43.