Skip to content

Commit

Permalink
Updates to read file API
Browse files Browse the repository at this point in the history
  • Loading branch information
annatisch committed Mar 9, 2020
1 parent d49daf1 commit 280ca3e
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 61 deletions.
3 changes: 2 additions & 1 deletion sdk/storage/azure-storage-file-datalake/README.md
Expand Up @@ -139,7 +139,8 @@ file = DataLakeFileClient.from_connection_string("my_connection_string",
file_system_name="myfilesystem", file_path="myfile")

with open("./BlockDestination.txt", "wb") as my_file:
file_data = file.read_file(stream=my_file)
download = file.read_file()
download.readinto(my_file)
```

### Enumerating paths
Expand Down
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------

from ._download import StorageStreamDownloader
from ._data_lake_file_client import DataLakeFileClient
from ._data_lake_directory_client import DataLakeDirectoryClient
from ._file_system_client import FileSystemClient
Expand Down Expand Up @@ -66,4 +67,5 @@
'generate_directory_sas',
'generate_file_sas',
'VERSION',
'StorageStreamDownloader'
]
Expand Up @@ -12,6 +12,7 @@
from ._shared.uploads import IterStreamer
from ._upload_helper import upload_datalake_file
from ._generated.models import StorageErrorException
from ._download import StorageStreamDownloader
from ._path_client import PathClient
from ._serialize import get_mod_conditions, get_path_http_headers, get_access_conditions, add_metadata_headers
from ._deserialize import process_storage_error
Expand Down Expand Up @@ -502,22 +503,18 @@ def flush_data(self, offset, # type: int
except StorageErrorException as error:
process_storage_error(error)

def read_file(self, offset=None, # type: Optional[int]
length=None, # type: Optional[int]
stream=None, # type: Optional[IO]
**kwargs):
# type: (...) -> Union[int, byte]
"""Download a file from the service. Return the downloaded data in bytes or
write the downloaded data into user provided stream and return the written size.
def read_file(self, offset=None, length=None, **kwargs):
# type: (Optional[int], Optional[int], Any) -> Union[int, byte]
"""Downloads a file to the StorageStreamDownloader. The readall() method must
be used to read all the content, or readinto() must be used to download the file into
a stream.
:param int offset:
Start of byte range to use for downloading a section of the file.
Must be set if length is provided.
:param int length:
Number of bytes to read from the stream. This is optional, but
should be supplied for optimal performance.
:param int stream:
User provided stream to write the downloaded data into.
:keyword lease:
If specified, download only succeeds if the file's lease is active
and matches this ID. Required if the file has an active lease.
Expand Down Expand Up @@ -545,8 +542,8 @@ def read_file(self, offset=None, # type: Optional[int]
The timeout parameter is expressed in seconds. This method may make
multiple calls to the Azure service and the timeout will apply to
each call individually.
:returns: downloaded data or the size of data written into the provided stream
:rtype: bytes or int
:returns: A streaming object (StorageStreamDownloader)
:rtype: ~azure.storage.filedatalake.StorageStreamDownloader
.. admonition:: Example:
Expand All @@ -558,9 +555,7 @@ def read_file(self, offset=None, # type: Optional[int]
:caption: Return the downloaded data.
"""
downloader = self._blob_client.download_blob(offset=offset, length=length, **kwargs)
if stream:
return downloader.readinto(stream)
return downloader.readall()
return StorageStreamDownloader(downloader)

def rename_file(self, new_name, # type: str
**kwargs):
Expand Down
@@ -0,0 +1,53 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from ._models import FileProperties


class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
"""A streaming object to download from Azure Storage.
:ivar str name:
The name of the file being downloaded.
:ivar ~azure.storage.filedatalake.FileProperties properties:
The properties of the file being downloaded. If only a range of the data is being
downloaded, this will be reflected in the properties.
:ivar int size:
The size of the total data in the stream. This will be the byte range if speficied,
otherwise the total size of the file.
"""

def __init__(self, downloader):
self._downloader = downloader
self.name = self._downloader.name
self.properties = FileProperties._from_blob_properties(self._downloader.properties)
self.size = self._downloader.size

def __len__(self):
return self.size

def chunks(self):
return self._downloader.chunks()

def readall(self):
"""Download the contents of this file.
This operation is blocking until all data is downloaded.
:rtype: bytes or str
"""
return self._downloader.readall()

def readinto(self, stream):
"""Download the contents of this file to a stream.
:param stream:
The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the download
uses more than one parallel connection.
:returns: The number of bytes read.
:rtype: int
"""
return self._downloader.readinto(stream)
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------

from ._download_async import StorageStreamDownloader
from .._shared.policies_async import ExponentialRetry, LinearRetry
from ._data_lake_file_client_async import DataLakeFileClient
from ._data_lake_directory_client_async import DataLakeDirectoryClient
Expand All @@ -19,4 +20,5 @@
'DataLakeLeaseClient',
'ExponentialRetry',
'LinearRetry',
'StorageStreamDownloader'
]
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------

from ._download_async import StorageStreamDownloader
from ._path_client_async import PathClient
from .._data_lake_file_client import DataLakeFileClient as DataLakeFileClientBase
from .._deserialize import process_storage_error
Expand Down Expand Up @@ -370,22 +371,18 @@ async def flush_data(self, offset, # type: int
except StorageErrorException as error:
process_storage_error(error)

async def read_file(self, offset=None, # type: Optional[int]
length=None, # type: Optional[int]
stream=None, # type: Optional[IO]
**kwargs):
# type: (...) -> Union[int, byte]
"""Download a file from the service. Return the downloaded data in bytes or
write the downloaded data into user provided stream and return the written size.
async def read_file(self, offset=None, length=None, **kwargs):
# type: (Optional[int], Optional[int], Any) -> StorageStreamDownloader
"""Downloads a file to the StorageStreamDownloader. The readall() method must
be used to read all the content, or readinto() must be used to download the file into
a stream.
:param int offset:
Start of byte range to use for downloading a section of the file.
Must be set if length is provided.
:param int length:
Number of bytes to read from the stream. This is optional, but
should be supplied for optimal performance.
:param int stream:
User provided stream to write the downloaded data into.
:keyword lease:
If specified, download only succeeds if the file's lease is active
and matches this ID. Required if the file has an active lease.
Expand Down Expand Up @@ -413,8 +410,8 @@ async def read_file(self, offset=None, # type: Optional[int]
The timeout parameter is expressed in seconds. This method may make
multiple calls to the Azure service and the timeout will apply to
each call individually.
:returns: downloaded data or the size of data written into the provided stream
:rtype: bytes or int
:returns: A streaming object (StorageStreamDownloader)
:rtype: ~azure.storage.aio.filedatalake.StorageStreamDownloader
.. admonition:: Example:
Expand All @@ -426,9 +423,7 @@ async def read_file(self, offset=None, # type: Optional[int]
:caption: Return the downloaded data.
"""
downloader = await self._blob_client.download_blob(offset=offset, length=length, **kwargs)
if stream:
return await downloader.readinto(stream)
return await downloader.readall()
return StorageStreamDownloader(downloader)

async def rename_file(self, new_name, # type: str
**kwargs):
Expand Down
@@ -0,0 +1,53 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from .._models import FileProperties


class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
"""A streaming object to download from Azure Storage.
:ivar str name:
The name of the file being downloaded.
:ivar ~azure.storage.filedatalake.FileProperties properties:
The properties of the file being downloaded. If only a range of the data is being
downloaded, this will be reflected in the properties.
:ivar int size:
The size of the total data in the stream. This will be the byte range if speficied,
otherwise the total size of the file.
"""

def __init__(self, downloader):
self._downloader = downloader
self.name = self._downloader.name
self.properties = FileProperties._from_blob_properties(self._downloader.properties)
self.size = self._downloader.size

def __len__(self):
return self.size

def chunks(self):
return self._downloader.chunks()

async def readall(self):
"""Download the contents of this file.
This operation is blocking until all data is downloaded.
:rtype: bytes or str
"""
return await self._downloader.readall()

async def readinto(self, stream):
"""Download the contents of this file to a stream.
:param stream:
The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the download
uses more than one parallel connection.
:returns: The number of bytes read.
:rtype: int
"""
return await self._downloader.readinto(stream)
32 changes: 16 additions & 16 deletions sdk/storage/azure-storage-file-datalake/tests/test_file.py
Expand Up @@ -237,7 +237,7 @@ def test_upload_data_to_none_existing_file(self):
data = self.get_random_bytes(200*1024) * 1024
file_client.upload_data(data, overwrite=True, max_concurrency=3)

downloaded_data = file_client.read_file()
downloaded_data = file_client.read_file().readall()
self.assertEqual(data, downloaded_data)

@record
Expand All @@ -260,7 +260,7 @@ def test_upload_data_to_existing_file(self):
file_client.upload_data(data, max_concurrency=5)
file_client.upload_data(data, overwrite=True, max_concurrency=5)

downloaded_data = file_client.read_file()
downloaded_data = file_client.read_file().readall()
self.assertEqual(data, downloaded_data)

@record
Expand All @@ -285,7 +285,7 @@ def test_upload_data_to_existing_file_with_content_settings(self):
content_settings=content_settings, etag=etag,
match_condition=MatchConditions.IfNotModified)

downloaded_data = file_client.read_file()
downloaded_data = file_client.read_file().readall()
properties = file_client.get_file_properties()

self.assertEqual(data, downloaded_data)
Expand All @@ -301,7 +301,7 @@ def test_read_file(self):
file_client.flush_data(len(data))

# doanload the data and make sure it is the same as uploaded data
downloaded_data = file_client.read_file()
downloaded_data = file_client.read_file().readall()
self.assertEqual(data, downloaded_data)

@record
Expand All @@ -327,7 +327,7 @@ def test_read_file_with_user_delegation_key(self):
file_client.file_system_name,
None,
file_client.path_name,
user_delegation_key=user_delegation_key,
credential=user_delegation_key,
permission=FileSasPermissions(read=True, create=True, write=True, delete=True),
expiry=datetime.utcnow() + timedelta(hours=1),
)
Expand All @@ -337,7 +337,7 @@ def test_read_file_with_user_delegation_key(self):
file_client.file_system_name,
file_client.path_name,
credential=sas_token)
downloaded_data = new_file_client.read_file()
downloaded_data = new_file_client.read_file().readall()
self.assertEqual(data, downloaded_data)

@record
Expand All @@ -351,10 +351,10 @@ def test_read_file_into_file(self):

# doanload the data into a file and make sure it is the same as uploaded data
with open(FILE_PATH, 'wb') as stream:
bytes_read = file_client.read_file(stream=stream, max_concurrency=2)
download = file_client.read_file(max_concurrency=2)
download.readinto(stream)

# Assert
self.assertIsInstance(bytes_read, int)
with open(FILE_PATH, 'rb') as stream:
actual = stream.read()
self.assertEqual(data, actual)
Expand All @@ -369,7 +369,7 @@ def test_read_file_to_text(self):
file_client.flush_data(len(data))

# doanload the text data and make sure it is the same as uploaded data
downloaded_data = file_client.read_file(max_concurrency=2, encoding="utf-8")
downloaded_data = file_client.read_file(max_concurrency=2, encoding="utf-8").readall()

# Assert
self.assertEqual(data, downloaded_data)
Expand Down Expand Up @@ -421,7 +421,7 @@ def test_file_sas_only_applies_to_file_level(self):
self.file_system_name,
directory_name,
file_name,
account_key=self.dsc.credential.account_key,
credential=self.dsc.credential.account_key,
permission=FileSasPermissions(read=True, write=True),
expiry=datetime.utcnow() + timedelta(hours=1),
)
Expand Down Expand Up @@ -539,7 +539,7 @@ def test_rename_file_with_non_used_name(self):
file_client.flush_data(3)
new_client = file_client.rename_file(file_client.file_system_name+'/'+'newname')

data = new_client.read_file()
data = new_client.read_file().readall()
self.assertEqual(data, data_bytes)
self.assertEqual(new_client.path_name, "newname")

Expand All @@ -559,7 +559,7 @@ def test_rename_file_to_existing_file(self):
new_client = file_client.rename_file(file_client.file_system_name+'/'+existing_file_client.path_name)
new_url = file_client.url

data = new_client.read_file()
data = new_client.read_file().readall()
# the existing file was overridden
self.assertEqual(data, data_bytes)

Expand All @@ -585,17 +585,17 @@ def test_rename_file_will_not_change_existing_directory(self):

new_client = f3.rename_file(f1.file_system_name+'/'+f1.path_name)

self.assertEqual(new_client.read_file(), b"file3")
self.assertEqual(new_client.read_file().readall(), b"file3")

# make sure the data in file2 and file4 weren't touched
f2_data = f2.read_file()
f2_data = f2.read_file().readall()
self.assertEqual(f2_data, b"file2")

f4_data = f4.read_file()
f4_data = f4.read_file().readall()
self.assertEqual(f4_data, b"file4")

with self.assertRaises(HttpResponseError):
f3.read_file()
f3.read_file().readall()

# ------------------------------------------------------------------------------
if __name__ == '__main__':
Expand Down

0 comments on commit 280ca3e

Please sign in to comment.