Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions azure/datalake/store/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,6 @@ def submit(self, src, dst, length):
"exception": None}


def _submit(self, fn, *args, **kwargs):
kwargs['shutdown_event'] = self._shutdown_event
future = self._pool.submit(fn, *args, **kwargs)
future.add_done_callback(self._update)
return future

def _start(self, src, dst):
key = (src, dst)
self._fstates[key] = 'transferring'
Expand All @@ -317,11 +311,12 @@ def _start(self, src, dst):
if obj in cs.objects and cs[obj] == 'finished':
continue
cs[obj] = 'running'
future = self._submit(
future = self._pool.submit(
self._transfer, self._adlfs, src, name, offset,
self._chunks[obj]['expected'], self._buffersize,
self._blocksize)
self._blocksize, shutdown_event=self._shutdown_event)
self._cfutures[future] = obj
future.add_done_callback(self._update)

@property
def active(self):
Expand Down Expand Up @@ -455,6 +450,9 @@ def _update(self, future):
# TODO: Re-enable progress saving when a less IO intensive solution is available.
# See issue: https://github.com/Azure/azure-data-lake-store-python/issues/117
#self.save()
else:
raise ValueError("Illegal state future {} not found in either file futures {} nor chunk futures {}"
.format(future, self._ffutures, self._cfutures))
if self.verbose:
print('\b' * 200, self.status, end='')
sys.stdout.flush()
Expand Down
4 changes: 0 additions & 4 deletions tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

def test_shutdown(azure):
def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, retries=5, shutdown_event=None):
time.sleep(1.0)
while shutdown_event and not shutdown_event.is_set():
time.sleep(0.1)
return size, None
Expand All @@ -25,15 +24,13 @@ def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, retries=5, sh
chunked=False)
client.submit('foo', 'bar', 16)
client.run(monitor=False)
time.sleep(0.1)
client.shutdown()

assert client.progress[0].state == 'finished'


def test_submit_and_run(azure):
def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, shutdown_event=None):
time.sleep(0.1)
return size, None

client = ADLTransferClient(azure, transfer=transfer, chunksize=8,
Expand Down Expand Up @@ -65,7 +62,6 @@ def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, shutdown_even

def test_temporary_path(azure):
def transfer(adlfs, src, dst, offset, size, blocksize, buffersize):
time.sleep(0.1)
return size, None

client = ADLTransferClient(azure, transfer=transfer, chunksize=8,
Expand Down