From 1f9fe1871bb87a9801fa731d7f79e71628e4c2de Mon Sep 17 00:00:00 2001 From: akharit <38331238+akharit@users.noreply.github.com> Date: Thu, 4 Oct 2018 15:20:31 -0700 Subject: [PATCH] Various bug fixes. (#243) (#244) * Fix for test case failure by adding randomized file path * Fix for empty folder upload issue * Fix chunked downloader to make block size requests --- HISTORY.rst | 6 ++++++ azure/datalake/store/__init__.py | 2 +- azure/datalake/store/multithread.py | 30 +++++++++++++++++++++-------- tests/test_core.py | 2 +- tests/test_multithread.py | 21 +++++++++++++++++++- 5 files changed, 50 insertions(+), 11 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 6afb0ed..5afef06 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,12 @@ Release History =============== +0.0.32 (2018-10-04) ++++++++++++++++++++ +* Fixed test bug +* Fixed empty folder upload bug +* Fixed ADL Downloader block size bug + 0.0.31 (2018-09-10) +++++++++++++++++++ * Added support for batched ls diff --git a/azure/datalake/store/__init__.py b/azure/datalake/store/__init__.py index f3c3736..3833a6f 100644 --- a/azure/datalake/store/__init__.py +++ b/azure/datalake/store/__init__.py @@ -6,7 +6,7 @@ # license information. # -------------------------------------------------------------------------- -__version__ = "0.0.31" +__version__ = "0.0.32" from .core import AzureDLFileSystem from .multithread import ADLDownloader diff --git a/azure/datalake/store/multithread.py b/azure/datalake/store/multithread.py index b49105a..1670bf3 100644 --- a/azure/datalake/store/multithread.py +++ b/azure/datalake/store/multithread.py @@ -289,17 +289,23 @@ def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize, exponential_factor=backoff) try: nbytes = 0 - with closing(_fetch_range(adlfs.azure, src, start=offset, - end=offset+size, stream=True, retry_policy=retry_policy)) as response: - with open(dst, 'rb+') as fout: - fout.seek(offset) - for chunk in response.iter_content(chunk_size=blocksize): + start = offset + + with open(dst, 'rb+') as fout: + fout.seek(start) + while start < offset+size: + with closing(_fetch_range(adlfs.azure, src, start=start, + end=min(start+blocksize, offset+size), stream=True, retry_policy=retry_policy)) as response: + chunk = response.content if shutdown_event and shutdown_event.is_set(): return total_bytes_downloaded, None if chunk: nwritten = fout.write(chunk) if nwritten: nbytes += nwritten + start += nwritten + else: + raise IOError("Failed to write to disk for {0} at location {1} with blocksize {2}".format(dst, start, blocksize)) logger.debug('Downloaded %s bytes to %s, byte offset %s', nbytes, dst, offset) # There are certain cases where we will be throttled and recieve less than the expected amount of data. @@ -456,9 +462,12 @@ def _setup(self): """ is_path_walk_empty = False if "*" not in self.lpath: - out = os.walk(self.lpath) - lfiles = sum(([os.path.join(dir, f) for f in fnames] for - (dir, _, fnames) in out), []) + lfiles = [] + for directory, subdir, fnames in os.walk(self.lpath): + lfiles.extend([os.path.join(directory, f) for f in fnames]) + if not subdir and not fnames: # Empty Directory + self.client._adlfs._emptyDirs.append(directory) + if (not lfiles and os.path.exists(self.lpath) and not os.path.isdir(self.lpath)): lfiles = [self.lpath] @@ -502,6 +511,11 @@ def run(self, nthreads=None, monitor=True): monitor: bool [True] To watch and wait (block) until completion. """ + for empty_directory in self.client._adlfs._empty_dirs_to_add(): + local_rel_path = os.path.relpath(empty_directory, self.lpath) + rel_rpath = str(AzureDLPath(self.rpath).trim().globless_prefix / local_rel_path) + self.client._adlfs.mkdir(rel_rpath) + self.client.run(nthreads, monitor) def active(self): diff --git a/tests/test_core.py b/tests/test_core.py index abd509d..bc41bdd 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -836,7 +836,7 @@ def test_tail_head(azure): @my_vcr.use_cassette def test_read_delimited_block(azure): - fn = '/tmp/test/a' + fn = a delimiter = b'\n' data = delimiter.join([b'123', b'456', b'789']) with azure_teardown(azure): diff --git a/tests/test_multithread.py b/tests/test_multithread.py index 81d8e3e..de48309 100644 --- a/tests/test_multithread.py +++ b/tests/test_multithread.py @@ -412,4 +412,23 @@ def test_download_root_folder(azure, tempdir): rpath = AzureDLPath('/'/test_dir / 'data/single/single'/ 'single.txt') ADLDownloader(azure, rpath=rpath, lpath=tempdir) assert os.path.isfile(os.path.join(tempdir, 'single.txt')) - \ No newline at end of file + +@my_vcr.use_cassette +def test_upload_empty_folder(tempdir, azure): + with azure_teardown(azure): + os.mkdir(os.path.join(tempdir, "dir1")) + os.mkdir(os.path.join(tempdir, "dir1", "b")) + + with open(os.path.join(tempdir, "dir1", "file.txt"), 'wb') as f: + f.write(b'0123456789') + + # transfer client w/ deterministic temporary directory + from azure.datalake.store.multithread import put_chunk + client = ADLTransferClient(azure, transfer=put_chunk, + unique_temporary=False) + + # single chunk, empty file + up = ADLUploader(azure, test_dir / "dir1", os.path.join(tempdir, "dir1") , nthreads=1, + overwrite=True) + assert azure.info(test_dir / "dir1" /"b")['type'] == 'DIRECTORY' + azure.rm(test_dir / "dir1", recursive=True) \ No newline at end of file