Skip to content

Commit

Permalink
Enable multithread for crate download/verify
Browse files Browse the repository at this point in the history
This should significantly increase the download speed for crates
  • Loading branch information
hehaoqian committed Jul 4, 2021
1 parent e71f709 commit ed51214
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/romt/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def close_optional(f: Optional[IO[Any]]) -> None:
def make_dirs_for(path: Path) -> None:
parent = path.parent
if not parent.is_dir():
parent.mkdir(parents=True)
parent.mkdir(parents=True, exist_ok=True)


def log(log_file: Optional[IO[Any]], message: Any) -> None:
Expand Down
105 changes: 70 additions & 35 deletions src/romt/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

import argparse
import json
import multiprocessing
import os
from pathlib import Path
import re
from typing import Generator, List, Optional, Tuple
import urllib.parse
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

import git
import git.remote
Expand Down Expand Up @@ -250,6 +252,38 @@ def list_crates(crates: List[Crate]) -> None:
common.iprint(crate.rel_path().name)


def _process_one_crate(
downloader: romt.download.Downloader,
dl_template: Optional[str],
path: Path,
crate: Crate,
assume_ok: bool,
) -> True:
is_good = False
try:
if dl_template is None:
downloader.verify_hash(path, crate.hash)
else:
url = dl_template.format(
crate=crate.name, version=crate.version
)
downloader.download_verify_hash(
url, path, crate.hash, assume_ok=assume_ok
)
is_good = True
except error.DownloadError as e:
common.eprint(
"Error: Download failure for {}: {}".format(
e.name, e.exception
)
)
except error.MissingFileError as e:
common.eprint("Error: Missing {}".format(e.name))
except error.IntegrityError as e:
common.eprint(str(e))
return is_good


def _process_crates(
downloader: romt.download.Downloader,
dl_template: Optional[str],
Expand All @@ -259,45 +293,33 @@ def _process_crates(
bad_paths_log_path: str,
*,
keep_going: bool,
assume_ok: bool
assume_ok: bool,
threads: int,
) -> None:
good_paths_file = common.open_optional(good_paths_log_path, "w")
bad_paths_file = common.open_optional(bad_paths_log_path, "w")

num_good_paths = 0
num_bad_paths = 0
for crate in crates:
rel_path = crate.rel_path()
path = crates_root / rel_path
is_good = False
try:
if dl_template is None:
downloader.verify_hash(path, crate.hash)
else:
url = dl_template.format(
crate=crate.name, version=crate.version
)
downloader.download_verify_hash(
url, path, crate.hash, assume_ok=assume_ok
)
is_good = True
except error.DownloadError as e:
common.eprint(
"Error: Download failure for {}: {}".format(
e.name, e.exception
)
)
except error.MissingFileError as e:
common.eprint("Error: Missing {}".format(e.name))
except error.IntegrityError as e:
common.eprint(str(e))

if is_good:
num_good_paths += 1
common.log(good_paths_file, path)
else:
num_bad_paths += 1
common.log(bad_paths_file, path)
with ThreadPoolExecutor(max_workers=threads) as executor:
running_threads = set()
for i, crate in enumerate(crates):
rel_path = crate.rel_path()
path = crates_root / rel_path
thread = executor.submit(_process_one_crate, downloader, dl_template, path, crate, assume_ok)
running_threads.add(thread)
is_last_crate = (i + 1 == len(crates))
while (len(running_threads) > threads) or (is_last_crate and running_threads):
done, not_done = wait(running_threads, return_when=FIRST_COMPLETED)
for thread in done:
is_good = thread.result()
if is_good:
num_good_paths += 1
common.log(good_paths_file, path)
else:
num_bad_paths += 1
common.log(bad_paths_file, path)
running_threads = not_done

common.iprint(
"{} bad paths, {} good paths".format(num_bad_paths, num_good_paths)
Expand All @@ -319,7 +341,8 @@ def download_crates(
bad_paths_log_path: str,
*,
keep_going: bool,
assume_ok: bool
assume_ok: bool,
threads: int,
) -> None:
_process_crates(
downloader,
Expand All @@ -330,6 +353,7 @@ def download_crates(
bad_paths_log_path,
keep_going=keep_going,
assume_ok=assume_ok,
threads=threads,
)


Expand All @@ -341,7 +365,8 @@ def verify_crates(
bad_paths_log_path: str,
*,
keep_going: bool,
assume_ok: bool
assume_ok: bool,
threads: int,
) -> None:
_process_crates(
downloader,
Expand All @@ -352,6 +377,7 @@ def verify_crates(
bad_paths_log_path,
keep_going=keep_going,
assume_ok=assume_ok,
threads=threads,
)


Expand Down Expand Up @@ -667,6 +693,13 @@ def add_arguments(parser: argparse.ArgumentParser) -> None:
""",
)

parser.add_argument(
'--threads',
type=int,
default=(multiprocessing.cpu_count() // 2) or 1,
help="Number of CPU cores used to download or verify (default=half of logical cpus(%(default)s))",
)

parser.add_argument(
"commands",
nargs="*",
Expand Down Expand Up @@ -820,6 +853,7 @@ def cmd_download(self) -> None:
self.args.bad_paths,
keep_going=self.args.keep_going,
assume_ok=self.args.assume_ok,
threads=self.args.threads,
)

def cmd_verify(self) -> None:
Expand All @@ -831,6 +865,7 @@ def cmd_verify(self) -> None:
self.args.bad_paths,
keep_going=self.args.keep_going,
assume_ok=self.args.assume_ok,
threads=self.args.threads,
)

def cmd_pack(self) -> None:
Expand Down

0 comments on commit ed51214

Please sign in to comment.