Skip to content

Commit

Permalink
Add progress bar to hca dss upload (closes #117)
Browse files Browse the repository at this point in the history
Adds a progress bar to `hca dss upload`. This introduces a new dependency,
`tqdm` (which already exists as an indirect development requirement). The
progress bar will show by default unless one of the following conditions
is true:
* `hca` has been invoked non-interactively (i.e. stdout is not a TTY)
* The logging level is higher than INFO
* The `--no-progress` flag is passed

The progress bar displays the amount of data uploaded (instead of files
uploaded) and can guess how much time remains in the upload.
  • Loading branch information
natanlao committed Sep 10, 2019
1 parent 40a35f9 commit a8c345e
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 9 deletions.
17 changes: 16 additions & 1 deletion hca/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import os, logging
# -*- coding: utf-8 -*-
import logging
import os

import tqdm
from tweak import Config as _Config


class HCAConfig(_Config):
default_config_file = os.path.join(os.path.dirname(__file__), "default_config.json")

Expand All @@ -18,11 +22,22 @@ def user_config_dir(self):


_config = None


def get_config():
global _config
if _config is None:
_config = HCAConfig()
return _config


class ProgressBarStreamHandler(object):
"""
Stream handler that allows for logging with a :mod:`tqdm` progress bar.
"""
@staticmethod
def write(msg):
tqdm.tqdm.write(msg, end='')


logger = logging.getLogger("hca")
7 changes: 5 additions & 2 deletions hca/dss/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def create_version(self):
def _create_version(self):
return datetime.utcnow().strftime("%Y-%m-%dT%H%M%S.%fZ")

def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200):
def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200, no_progress=False):
"""
Upload a directory of files from the local filesystem and create a bundle containing the uploaded files.
Expand All @@ -98,6 +98,9 @@ def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200):
`gcp` for Google Cloud Platform. [aws, gcp]
:param str staging_bucket: a client controlled AWS S3 storage bucket to upload from.
:param int timeout_seconds: the time to wait for a file to upload to replica.
:param bool no_progress: if set, will not report upload progress. Note that even if this flag
is not set, progress will not be reported if the logging level is higher
than INFO or if the session is not interactive.
Upload a directory of files from the local filesystem and create a bundle containing the uploaded files.
This method requires the use of a client-controlled object storage bucket to stage the data for upload.
Expand All @@ -112,7 +115,7 @@ def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200):

logger.info("Uploading %i files from %s to %s", len(files_to_upload), src_dir, staging_bucket)
file_uuids, uploaded_keys, abs_file_paths = upload_to_cloud(files_to_upload, staging_bucket=staging_bucket,
replica=replica, from_cloud=False)
replica=replica, from_cloud=False, log_progress=not no_progress)
for file_handle in files_to_upload:
file_handle.close()
filenames = [object_name_builder(p, src_dir) for p in abs_file_paths]
Expand Down
22 changes: 20 additions & 2 deletions hca/dss/upload_to_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
import logging
import mimetypes
import os
import sys
import uuid

import boto3
from boto3.s3.transfer import TransferConfig
import tqdm

from ..config import logger, ProgressBarStreamHandler
from dcplib import s3_multipart
from dcplib.checksumming_io import ChecksummingBufferedReader

Expand Down Expand Up @@ -50,25 +53,37 @@ def _copy_from_s3(path, s3):
return file_uuids, key_names


def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False):
def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False, log_progress=False):
"""
Upload files to cloud.
:param file_handles: If from_cloud, file_handles is a aws s3 directory path to files with appropriate
metadata uploaded. Else, a list of binary file_handles to upload.
:param staging_bucket: The aws bucket to upload the files to.
:param replica: The cloud replica to write to. One of 'aws', 'gc', or 'azure'. No functionality now.
:param bool log_progress: set to True to log progress to stdout. Progress bar will reflect bytes
uploaded (and not files uploaded). This is off by default,
as direct calls to this function are assumed to be programmatic.
In addition, even if this is set to True, a progress bar will not
be shown if (a) the logging level is not INFO or lower or (b) an
interactive session is not detected.
:return: a list of file uuids, key-names, and absolute file paths (local) for uploaded files
"""
s3 = boto3.resource("s3")
file_uuids = []
key_names = []
abs_file_paths = []
log_progress = all((logger.getEffectiveLevel() <= logging.INFO, sys.stdout.isatty(), log_progress))

if from_cloud:
file_uuids, key_names = _copy_from_s3(file_handles[0], s3)
else:
destination_bucket = s3.Bucket(staging_bucket)
if log_progress:
total_upload_size = sum(os.fstat(f.fileno()).st_size for f in file_handles)
logger.addHandler(ProgressBarStreamHandler())
progress = tqdm.tqdm(total=total_upload_size, desc="Uploading to " + replica,
unit="B", unit_scale=True, unit_divisor=1024)
for raw_fh in file_handles:
file_size = os.path.getsize(raw_fh.name)
multipart_chunksize = s3_multipart.get_s3_multipart_chunk_size(file_size)
Expand All @@ -81,6 +96,7 @@ def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False):
fh,
key_name,
Config=tx_cfg,
Callback=lambda x: progress.update(x) if log_progress else None,
ExtraArgs={
'ContentType': _mime_type(fh.raw.name),
}
Expand All @@ -98,5 +114,7 @@ def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False):
file_uuids.append(file_uuid)
key_names.append(key_name)
abs_file_paths.append(fh.raw.name)

if log_progress:
logger.handlers = [l for l in logger.handlers if not isinstance(l, ProgressBarStreamHandler)]
progress.close()
return file_uuids, key_names, abs_file_paths
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ requests >= 2.20.0, < 3
rsa<=3.5.0,>=3.1.2
s3transfer<0.3.0,>=0.2.0
tenacity >=5.0.2, < 5.1
tqdm >=4.33.0, < 5
tweak >= 1.0.2, < 2
66 changes: 62 additions & 4 deletions test/integration/dss/test_dss_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import filecmp
import json
import os
import pty
import sys
import unittest
import uuid
Expand Down Expand Up @@ -114,6 +115,63 @@ def _put_test_col(args, uuid=str(uuid.uuid4()), replica='aws'):
with CapturingIO('stdout'):
hca.cli.main(args=base_args)

def test_upload_progress_bar(self):
dirpath = os.path.join(TEST_DIR, 'tutorial', 'data') # arbitrary and small
put_args = ['dss', 'upload', '--src-dir', dirpath, '--replica',
'aws', '--staging-bucket', 'org-humancellatlas-dss-cli-test']

with self.subTest("Suppress progress bar if not interactive"):
with CapturingIO('stdout') as stdout:
hca.cli.main(args=put_args)
# If using CapturingIO, `hca dss upload` should know it's not being
# invoked interactively and as such not show a progress bar. Which
# means that stdout should parse nicely as json
self.assertTrue(json.loads(stdout.captured()))

@unittest.skipIf(os.name is 'nt', 'No pty support on Windows')
def test_upload_progress_bar_interactive(self):
"""Tests upload progress bar with a simulated interactive session"""
dirpath = os.path.join(TEST_DIR, 'tutorial', 'data') # arbitrary and small
put_args = ['dss', 'upload', '--src-dir', dirpath, '--replica',
'aws', '--staging-bucket', 'org-humancellatlas-dss-cli-test']

# In an interactive session, we should see a progress bar if we
# don't pass `--no-progress`.
with self.subTest("Show progress bar if interactive"):
child_pid, fd = pty.fork()
if child_pid == 0:
hca.cli.main(args=put_args)
os._exit(0)
output = self._get_child_output(child_pid, fd)
self.assertTrue('Uploading to aws' in output, output)

with self.subTest("Don't show progress bar if interactive and not logging INFO"):
child_pid, fd = pty.fork()
if child_pid == 0:
hca.cli.main(args=['--log-level', 'WARNING'] + put_args)
os._exit(0)
output = self._get_child_output(child_pid, fd)
self.assertFalse('Uploading to aws' in output, output)

with self.subTest("Don't show progress bar if interactive and --no-progress"):
child_pid, fd = pty.fork()
if child_pid == 0:
hca.cli.main(args=put_args + ['--no-progress'])
os._exit(0)
output = self._get_child_output(child_pid, fd)
self.assertFalse('Uploading to aws' in output, output)

@staticmethod
def _get_child_output(child_pid, fd):
output = ''
while not os.waitpid(child_pid, os.WNOHANG)[0]:
try:
output += os.read(fd, 128).decode()
except OSError:
break
os.close(fd)
return output

@staticmethod
@contextlib.contextmanager
def _put_test_bdl(dirpath=os.path.join(TEST_DIR, 'res', 'bundle'),
Expand All @@ -130,8 +188,8 @@ def _put_test_bdl(dirpath=os.path.join(TEST_DIR, 'res', 'bundle'),
:rtype: dict
:returns: upload response object
"""
put_args = ['dss', 'upload', '--src-dir', dirpath, '--replica',
replica, '--staging-bucket', staging_bucket]
put_args = ['dss', 'upload', '--src-dir', dirpath, '--replica',
replica, '--staging-bucket', staging_bucket, '--no-progress']
with CapturingIO('stdout') as stdout:
hca.cli.main(args=put_args)
rv = json.loads(stdout.captured())
Expand Down Expand Up @@ -168,8 +226,8 @@ def test_collection_download(self):
'--name', 'collection_test:%s' % bdl['bundle_uuid']
]
with self._put_test_col(put_col_args) as col, \
tempfile.TemporaryDirectory() as t1, \
tempfile.TemporaryDirectory() as t2:
tempfile.TemporaryDirectory() as t1, \
tempfile.TemporaryDirectory() as t2:
dl_col_args = ['dss', 'download-collection', '--uuid', col['uuid'],
'--replica', 'aws', '--download-dir', t1]
hca.cli.main(args=dl_col_args)
Expand Down

0 comments on commit a8c345e

Please sign in to comment.