Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various bug fixes. #243

Merged
merged 8 commits into from Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions HISTORY.rst
Expand Up @@ -3,6 +3,12 @@
Release History
===============

0.0.32 (2018-10-04)
+++++++++++++++++++
* Fixed test bug
* Fixed empty folder upload bug
* Fixed ADL Downloader block size bug

0.0.31 (2018-09-10)
+++++++++++++++++++
* Added support for batched ls
Expand Down
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.31"
__version__ = "0.0.32"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
30 changes: 22 additions & 8 deletions azure/datalake/store/multithread.py
Expand Up @@ -289,17 +289,23 @@ def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize,
exponential_factor=backoff)
try:
nbytes = 0
with closing(_fetch_range(adlfs.azure, src, start=offset,
end=offset+size, stream=True, retry_policy=retry_policy)) as response:
with open(dst, 'rb+') as fout:
fout.seek(offset)
for chunk in response.iter_content(chunk_size=blocksize):
start = offset

with open(dst, 'rb+') as fout:
fout.seek(start)
while start < offset+size:
with closing(_fetch_range(adlfs.azure, src, start=start,
end=min(start+blocksize, offset+size), stream=True, retry_policy=retry_policy)) as response:
chunk = response.content
if shutdown_event and shutdown_event.is_set():
return total_bytes_downloaded, None
if chunk:
nwritten = fout.write(chunk)
if nwritten:
nbytes += nwritten
start += nwritten
else:
raise IOError("Failed to write to disk for {0} at location {1} with blocksize {2}".format(dst, start, blocksize))
logger.debug('Downloaded %s bytes to %s, byte offset %s', nbytes, dst, offset)

# There are certain cases where we will be throttled and recieve less than the expected amount of data.
Expand Down Expand Up @@ -456,9 +462,12 @@ def _setup(self):
"""
is_path_walk_empty = False
if "*" not in self.lpath:
out = os.walk(self.lpath)
lfiles = sum(([os.path.join(dir, f) for f in fnames] for
(dir, _, fnames) in out), [])
lfiles = []
for directory, subdir, fnames in os.walk(self.lpath):
lfiles.extend([os.path.join(directory, f) for f in fnames])
if not subdir and not fnames: # Empty Directory
self.client._adlfs._emptyDirs.append(directory)

if (not lfiles and os.path.exists(self.lpath) and
not os.path.isdir(self.lpath)):
lfiles = [self.lpath]
Expand Down Expand Up @@ -502,6 +511,11 @@ def run(self, nthreads=None, monitor=True):
monitor: bool [True]
To watch and wait (block) until completion.
"""
for empty_directory in self.client._adlfs._empty_dirs_to_add():
local_rel_path = os.path.relpath(empty_directory, self.lpath)
rel_rpath = str(AzureDLPath(self.rpath).trim().globless_prefix / local_rel_path)
self.client._adlfs.mkdir(rel_rpath)

self.client.run(nthreads, monitor)

def active(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_core.py
Expand Up @@ -836,7 +836,7 @@ def test_tail_head(azure):

@my_vcr.use_cassette
def test_read_delimited_block(azure):
fn = '/tmp/test/a'
fn = a
delimiter = b'\n'
data = delimiter.join([b'123', b'456', b'789'])
with azure_teardown(azure):
Expand Down
21 changes: 20 additions & 1 deletion tests/test_multithread.py
Expand Up @@ -412,4 +412,23 @@ def test_download_root_folder(azure, tempdir):
rpath = AzureDLPath('/'/test_dir / 'data/single/single'/ 'single.txt')
ADLDownloader(azure, rpath=rpath, lpath=tempdir)
assert os.path.isfile(os.path.join(tempdir, 'single.txt'))


@my_vcr.use_cassette
def test_upload_empty_folder(tempdir, azure):
with azure_teardown(azure):
os.mkdir(os.path.join(tempdir, "dir1"))
os.mkdir(os.path.join(tempdir, "dir1", "b"))

with open(os.path.join(tempdir, "dir1", "file.txt"), 'wb') as f:
f.write(b'0123456789')

# transfer client w/ deterministic temporary directory
from azure.datalake.store.multithread import put_chunk
client = ADLTransferClient(azure, transfer=put_chunk,
unique_temporary=False)

# single chunk, empty file
up = ADLUploader(azure, test_dir / "dir1", os.path.join(tempdir, "dir1") , nthreads=1,
overwrite=True)
assert azure.info(test_dir / "dir1" /"b")['type'] == 'DIRECTORY'
azure.rm(test_dir / "dir1", recursive=True)