Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show digest progress when uploading #465

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 102 additions & 12 deletions dandi/support/digests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,35 @@

import hashlib
import logging
import os
import os.path
import sys
from typing import Dict, Iterable, Optional, Tuple

from fscacher import PersistentCache
import appdirs
from diskcache import FanoutCache

from ..utils import auto_repr
from ..utils import AnyPath, auto_repr

if sys.version_info < (3, 8):
from typing_extensions import TypedDict
else:
from typing import TypedDict

lgr = logging.getLogger("dandi.support.digests")

# This is the default number of workers for
# concurrent.futures.ThreadPoolExecutor (as of Python 3.8) and pyout, which
# means it's also the number of concurrent uploads/downloads dandi performs at
# once.
SHARD_QTY = min(32, os.cpu_count() + 4)

CACHE_DIR = os.path.join(appdirs.user_cache_dir("dandi-cli", "dandi"), "digests")

DEFAULT_DIGESTS = ["md5", "sha1", "sha256", "sha512"]

DEFAULT_BLOCKSIZE = 1 << 16


@auto_repr
class Digester(object):
Expand All @@ -29,9 +51,7 @@ class Digester(object):
# Ideally we should find an efficient way to parallelize this but
# atm this one is sufficiently speedy

DEFAULT_DIGESTS = ["md5", "sha1", "sha256", "sha512"]

def __init__(self, digests=None, blocksize=1 << 16):
def __init__(self, digests=None, blocksize=DEFAULT_BLOCKSIZE):
"""
Parameters
----------
Expand All @@ -42,7 +62,7 @@ def __init__(self, digests=None, blocksize=1 << 16):
blocksize : int
Chunk size (in bytes) by which to consume a file.
"""
self._digests = digests or self.DEFAULT_DIGESTS
self._digests = digests or DEFAULT_DIGESTS
self._digest_funcs = [getattr(hashlib, digest) for digest in self._digests]
self.blocksize = blocksize

Expand Down Expand Up @@ -72,9 +92,79 @@ def __call__(self, fpath):
return {n: d.hexdigest() for n, d in zip(self.digests, digests)}


checksums = PersistentCache(name="dandi-checksums", envvar="DANDI_CACHE")


@checksums.memoize_path
def get_digest(filepath, digest="sha256"):
return Digester([digest])(filepath)[digest]
class DigestProgress(TypedDict, total=False):
status: str
# size: int
digests: Dict[str, str]


digest_cache = FanoutCache(
directory=CACHE_DIR, shards=SHARD_QTY, eviction_policy="least-recently-used"
)


def get_progressive_digests(
path: AnyPath,
digests: Optional[Iterable[str]] = None,
blocksize: int = DEFAULT_BLOCKSIZE,
) -> Iterable[DigestProgress]:
if not os.path.isfile(path):
raise ValueError(f"{os.fsdecode(path)}: Cannot hash a directory")
if digests is None:
digests = DEFAULT_DIGESTS
digest_tuple = tuple(sorted(digests))
if not digest_tuple:
raise ValueError("No digests specified")

def mkkey() -> Tuple[str, Tuple[str, ...], int, int, int, int]:
pathstr = os.path.realpath(os.fsdecode(path))
s = os.stat(pathstr)
return (
pathstr,
digest_tuple,
s.st_mtime_ns,
s.st_ctime_ns,
s.st_size,
s.st_ino,
)

try:
digested = digest_cache[mkkey()]
except KeyError:
lgr.debug("Digesting %s", path)
digestions = [getattr(hashlib, d)() for d in digest_tuple]
total_size = os.path.getsize(path)
current = 0
with open(path, "rb") as f:
while True:
block = f.read(blocksize)
if not block:
break
for d in digestions:
d.update(block)
current += len(block)
pct = 100 * current / total_size
yield {
"status": f"digesting ({pct:.2f}%)",
# "size": current,
}
digested = {n: d.hexdigest() for n, d in zip(digest_tuple, digestions)}
# Calculate the key again just in case:
digest_cache[mkkey()] = digested
else:
lgr.debug("Digests for %s found in cache", path)
yield {"status": "digested", "digests": digested}


def get_digests(
path: AnyPath,
digests: Optional[Iterable[str]] = None,
blocksize: int = DEFAULT_BLOCKSIZE,
) -> Dict[str, str]:
for status in get_progressive_digests(path, digests, blocksize):
if status["status"] == "digested":
return status["digests"]


def get_digest(path, digest="sha256"):
return get_digests(path)[digest]
7 changes: 6 additions & 1 deletion dandi/support/pyout.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections import Counter
import datetime
import logging
import re
import sys
import time

Expand Down Expand Up @@ -57,6 +58,10 @@ def counts(values):
return ["{:d} {}".format(v, k) for k, v in Counter(values).items()]


def counts_no_progress(values):
return counts(re.sub(r"\s+\(.+\)$", "", v) for v in values)


def minmax(values, fmt="%s"):
if not values:
return []
Expand Down Expand Up @@ -166,7 +171,7 @@ def get_style(hide_if_missing=True):
),
"status": dict(
color=dict(lookup={"skipped": "yellow", "done": "green", "error": "red"}),
aggregate=counts,
aggregate=counts_no_progress,
),
"message": dict(
color=dict(
Expand Down
37 changes: 36 additions & 1 deletion dandi/support/tests/test_digests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##

from ..digests import Digester
from ..digests import Digester, get_digests


def test_digester(tmp_path):
Expand Down Expand Up @@ -45,3 +45,38 @@ def test_digester(tmp_path):
"35e1d405ff1dc2763e433d69b8f299b3f4da500663b813ce176a43e29ffc"
"c31b0159",
}


def test_get_digests(tmp_path):
f = tmp_path / "sample.txt"
f.write_bytes(b"123")
assert get_digests(f) == {
"md5": "202cb962ac59075b964b07152d234b70",
"sha1": "40bd001563085fc35165329ea1ff5c5ecbdbbeef",
"sha256": "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3",
"sha512": "3c9909afec25354d551dae21590bb26e38d53f2173b8d3dc3eee4c047e7a"
"b1c1eb8b85103e3be7ba613b31bb5c9c36214dc9f14a42fd7a2fdb84856b"
"ca5c44c2",
}

f = tmp_path / "0"
f.write_bytes(chr(0).encode())
assert get_digests(f) == {
"md5": "93b885adfe0da089cdf634904fd59f71",
"sha1": "5ba93c9db0cff93f52b521d7420e43f6eda2784f",
"sha256": "6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d",
"sha512": "b8244d028981d693af7b456af8efa4cad63d282e19ff14942c246e50d935"
"1d22704a802a71c3580b6370de4ceb293c324a8423342557d4e5c38438f0"
"e36910ee",
}

f = tmp_path / "long.txt"
f.write_bytes(b"123abz\n" * 1000000)
assert get_digests(f) == {
"md5": "81b196e3d8a1db4dd2e89faa39614396",
"sha1": "5273ac6247322c3c7b4735a6d19fd4a5366e812f",
"sha256": "80028815b3557e30d7cbef1d8dbc30af0ec0858eff34b960d2839fd88ad08871",
"sha512": "684d23393eee455f44c13ab00d062980937a5d040259d69c6b291c983bf6"
"35e1d405ff1dc2763e433d69b8f299b3f4da500663b813ce176a43e29ffc"
"c31b0159",
}
7 changes: 5 additions & 2 deletions dandi/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ def _new_upload(
):
from .dandiapi import DandiAPIClient
from .dandiset import APIDandiset
from .support.digests import get_digest
from .support.digests import get_progressive_digests

client = DandiAPIClient(api_url)
client.dandi_authenticate()
Expand Down Expand Up @@ -704,7 +704,10 @@ def process_path(path, relpath):
#
yield {"status": "digesting"}
try:
sha256_digest = get_digest(path)
for status in get_progressive_digests(path, ["sha256"]):
if status["status"] == "digested":
sha256_digest = status.pop("digests")["sha256"]
yield status
except Exception as exc:
yield skip_file("failed to compute digests: %s" % str(exc))
return
Expand Down
3 changes: 3 additions & 0 deletions dandi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import subprocess
import sys
from time import sleep
from typing import Union

import dateutil.parser
import requests
Expand Down Expand Up @@ -49,6 +50,8 @@
platform.python_version(),
)

AnyPath = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]


def is_interactive():
"""Return True if all in/outs are tty"""
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ install_requires =
appdirs
click
click-didyoumean
diskcache
email-validator
etelemetry >= 0.2.0
fasteners
Expand Down