Skip to content

Commit

Permalink
Merge pull request #4 from hayesgb/abfs-tests
Browse files Browse the repository at this point in the history
Abfs tests
  • Loading branch information
hayesgb committed Sep 5, 2019
2 parents 1475765 + c5e6627 commit 0cc39df
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 74 deletions.
Binary file removed adlfs/__pycache__/__init__.cpython-34.pyc
Binary file not shown.
Binary file removed adlfs/__pycache__/__init__.cpython-36.pyc
Binary file not shown.
Binary file modified adlfs/__pycache__/__init__.cpython-37.pyc
Binary file not shown.
Binary file removed adlfs/__pycache__/core.cpython-34.pyc
Binary file not shown.
Binary file removed adlfs/__pycache__/core.cpython-36.pyc
Binary file not shown.
Binary file modified adlfs/__pycache__/core.cpython-37.pyc
Binary file not shown.
Binary file removed adlfs/__pycache__/tests.cpython-37-pytest-5.0.1.pyc
Binary file not shown.
130 changes: 59 additions & 71 deletions adlfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ class AzureBlobFileSystem(AbstractFileSystem):

protocol = 'abfs'

def __init__(self, tenant_id, client_id, client_secret, storage_account,
filesystem, token=None):
def __init__(self, tenant_id: str, client_id: str, client_secret: str,
storage_account: str, filesystem: str, token=None):

"""
Parameters
----------
Expand Down Expand Up @@ -177,11 +177,11 @@ def connect(self):
"scope": "https://storage.azure.com/.default",
"grant_type": "client_credentials"}
response = requests.post(url=url, headers=header, data=data).json()
self.token_type=response['token_type']
expires_in=response['expires_in']
ext_expires_in=response['ext_expires_in']
self.token=response['access_token']
self.token_type = response['token_type']
expires_in = response['expires_in']
ext_expires_in = response['ext_expires_in']
self.token = response['access_token']

def _make_headers(self, range: str = None, encoding: str = None):
""" Creates the headers for an API request to Azure Datalake Gen2
Expand All @@ -206,28 +206,25 @@ def _parse_path(self, path: str):
return []
else:
return "/".join(fparts)
def _make_url(self, resource: str = None):

def _make_url(self, path: str =None):
""" Creates a url for making a request to the Azure Datalake Gen2 API """
return f"https://{self.storage_account}{self.dns_suffix}/{self.filesystem}"

if not path:
return f"https://{self.storage_account}{self.dns_suffix}/{self.filesystem}"
else: return f"https://{self.storage_account}{self.dns_suffix}/{self.filesystem}/{path}"

def ls(self, path: str, detail: bool = False, resource: str = 'filesystem',
recursive: bool = False):
""" List a single filesystem directory, with or without details
Parameters
__________
path - string
The Azure Datalake Gen2 filesystem name, followed by subdirectories and files
detail - boolean
Specified by the AbstractFileSystem.
If false, return a list of strings (without protocol) that detail the full path of
resource - string
recursive - boolean
Determines if the files should be listed recursively nor not.
path: The Azure Datalake Gen2 filesystem name, followed by subdirectories and files
detail: Specified by the AbstractFileSystem. If false, return a list of strings (without protocol) that detail the full path of
resource: Variable to be passed to the Microsoft API
recursive: Determines if the files should be listed recursively nor not.
"""

try:
path = self._strip_protocol(path)
directory = self._parse_path(path)
Expand Down Expand Up @@ -259,10 +256,13 @@ def ls(self, path: str, detail: bool = False, resource: str = 'filesystem',
# Finally, fsspec expects the API response to return a key 'size'
# that specifies the size of the file in bytes, but the Azure DL
# Gen2 API returns the key 'contentLength'. We update this below.
path_['size'] = int(path_.pop('contentLength'))
if 'contentLength' in path_.keys():
path_['size'] = int(path_.pop('contentLength'))
else: path_['size'] = int(0)
if len(pathlist) == 1:
return pathlist[0]
else:
print(pathlist)
return pathlist
else:
files = []
Expand All @@ -278,71 +278,65 @@ def ls(self, path: str, detail: bool = False, resource: str = 'filesystem',
else:
raise KeyError(f'{response}')

def info(self, path, detail=True):
def info(self, path: str = '', detail=True):
""" Give details of entry at path"""
path = self._strip_protocol(path)
path = self.ls(path, detail=detail)
return path
print('info..')
print(path)
url = self._make_url(path=path)
print(url)
headers = self._make_headers()
payload = {'action': 'getStatus'}
response = requests.head(url=url, headers=headers, params=payload)
print(response.url)
if not response.status_code == requests.codes.ok:
try:
detail = self.ls(path, detail=False)
return detail
except:
response.raise_for_status()
h = response.headers
detail = {'name': path,
'size': int(h['Content-Length']),
'type': h['x-ms-resource-type']
}
return detail


def _open(self, path, mode='rb', block_size=None, autocommit=True):
""" Return a file-like object from the ADL Gen2 in raw bytes-mode """

return AzureBlobFile(self, path, mode)

# def glob(self, path):
# raise NotImplementedError

class AzureBlobFile(AbstractBufferedFile):
""" Buffered Azure Datalake Gen2 File Object """
def __init__(self, fs, path, mode='rb', block_size='default',

def __init__(self, fs, path, mode='rb', blocksize='default',
cache_type='bytes', autocommit=True):
super().__init__(fs, path, mode, block_size=block_size,
super().__init__(fs, path, mode, blocksize=blocksize,
cache_type=cache_type, autocommit=autocommit)
self.fs = fs
self.path = path
self.cache = b''
self.closed = False

def read(self, length=-1):
"""Read bytes from file
"""

if (
(length < 0 and self.loc == 0) or
(length > (self.size or length)) or
(self.size and self.size < self.block_size)
):
self._fetch_all()
if self.size is None:
if length < 0:
self._fetch_all()
else:
length = min(self.size - self.loc, length)
return super().read(length)

def _fetch_all(self):
"""Read the whole file in one show. Without caching"""

headers = self.fs._make_headers(range=(0, self.size))
url = f'{self.fs._make_url()}/{self.path}'
response = requests.get(url=url, headers=headers)
data = response.content
self.cache = AllBytes(data)
self.size = len(data)

def _fetch_range(self, start=None, end=None):
""" Gets the specified byte range from Azure Datalake Gen2 """
print('?????? _fetch_range ??????')
print(start, end)
if start is not None or end is not None:
start = start or 0
end - end or 0
end = end or 0
print(f'start is: {start}, end is: {end}')
headers = self.fs._make_headers(range=(start, end-1))
else:
headers = self.fs._make_headers(range=None)

headers = self.fs._make_headers(range=(start, end))
headers = self.fs._make_headers(range=(None))

url = f'{self.fs._make_url()}/{self.path}'
response = requests.get(url=url, headers=headers)
data = response.content
print(f'This is data from _fetch_range: {data}')
return data

def _upload_chunk(self, final: bool = False, resource: str = None):
Expand All @@ -355,13 +349,7 @@ def _upload_chunk(self, final: bool = False, resource: str = None):
response = requests.put(url, headers=headers, data=self.buffer, params=params)
if not response.status_code == requests.codes.ok:
response.raise_for_status()



class AllBytes:
""" Cache the entire contents of a remote file """
def __init__(self, data):
self.data = data

def _fetch(self, start, end):
return self.data[start:end]


Empty file added adlfs/tests/__init__.py
Empty file.
Binary file added adlfs/tests/__pycache__/__init__.cpython-37.pyc
Binary file not shown.
Binary file not shown.
4 changes: 4 additions & 0 deletions adlfs/tests/dummyfiles/testfile.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
,test
0,0
1,1
2,2
180 changes: 180 additions & 0 deletions adlfs/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from unittest import mock
import pytest
import requests
import responses
from dask.bytes.core import read_bytes

from adlfs.core import AzureBlobFileSystem, AzureBlobFile


# class MockResponseSingleFile:
# def __init__(self):
# self.status_code = 200

# def json(self):
# return {'paths': [
# {'name': 'testfile.csv', 'contentLength': '10'},
# ]}

# class MockResponseReadFile:
# def __init__():
# self.status_code = 200

# def content(self):
# return b',test\n0,0\n1,1\n2,2\n'


@pytest.fixture()
def test_mock_abfs(monkeypatch):
tenant_id = 'test_tenant'
client_id = 'test_client'
client_secret = 'client_secret'
storage_account = 'test_storage_account'
filesystem = 'test_filesystem'

def mock_connect(conn):
return "None"

monkeypatch.setattr(AzureBlobFileSystem, "connect", mock_connect)

mock_fs = AzureBlobFileSystem(tenant_id=tenant_id, client_id=client_id,
client_secret=client_secret,
storage_account=storage_account, filesystem=filesystem,
)
return mock_fs

def test_make_url(test_mock_abfs):
fs = test_mock_abfs
url = fs._make_url()
assert url == "https://test_storage_account.dfs.core.windows.net/test_filesystem"

@responses.activate
def test_ls(test_mock_abfs):
responses.add(responses.GET, 'https://test_storage_account.dfs.core.windows.net/test_filesystem',
json={'paths': [
{'name': 'subfolder', 'isDirectory': 'true', 'contentLength': '5'},
{'name': 'testfile.csv', 'contentLength': '18'},
{'name': 'testfile2.csv', 'contentLength': '20'}
]
})

""" Test that ls returns a list of approopriate files """
# Get the mock filesystem
fs = test_mock_abfs
files = fs.ls("")
assert files == ['subfolder', 'testfile.csv', 'testfile2.csv']

@responses.activate
def test_ls_nested(test_mock_abfs):
""" Verify a ls call to a nested directory returns the correct response """
responses.add(responses.GET,
'https://test_storage_account.dfs.core.windows.net/test_filesystem?resource=filesystem&recursive=False&directory=subfolder%2Fnested_subfolder',
json={'paths': [{'name': 'subfolder/nested_subfolder/testfile.csv'},
{'name': 'subfolder/nested_subfolder/writefile.csv'}]
}
)

mock_fs = test_mock_abfs
files = mock_fs.ls("abfs://subfolder/nested_subfolder")
assert files == ['subfolder/nested_subfolder/testfile.csv',
'subfolder/nested_subfolder/writefile.csv']

@responses.activate
def test_ls_detail(test_mock_abfs):
""" Verify a directory can be found when requested """

responses.add(responses.GET, 'https://test_storage_account.dfs.core.windows.net/test_filesystem',
json={'paths': [
{'name': 'subfolder', 'isDirectory': 'true', 'contentLength': '5'},
{'name': 'testfile.csv', 'contentLength': '18'},
{'name': 'testfile2.csv', 'contentLength': '20'}
]
})
fs = test_mock_abfs

files = fs.ls(path="", detail=True)
assert files == [
{'name': 'subfolder', 'type': 'directory', 'isDirectory': 'true', 'size': 5},
{'name': 'testfile.csv', 'type': 'file', 'size': 18},
{'name': 'testfile2.csv', 'type': 'file', 'size': 20}
]

@responses.activate
def test_info(test_mock_abfs):
responses.add(responses.HEAD, 'https://test_storage_account.dfs.core.windows.net/test_filesystem/testfile.csv?action=getStatus',
headers={'Content-Length': '10',
'name': 'testfile.csv',
'x-ms-resource-type': 'file'},
)
mock_abfs = test_mock_abfs
mock_details = mock_abfs.info("testfile.csv")
assert mock_details == {'name': 'testfile.csv', 'size': 10, 'type': 'file'}

@responses.activate
def test_fs_open(test_mock_abfs):
""" Test opening an AzureBlobFile using the AzureBlobFileSystem """

responses.add(responses.HEAD, 'https://test_storage_account.dfs.core.windows.net/test_filesystem/testfile.csv?action=getStatus',
headers={'Content-Length': '10',
'name': 'testfile.csv',
'x-ms-resource-type': 'file'},
)
responses.add(responses.GET, 'https://test_storage_account.dfs.core.windows.net/test_filesystem/testfile.csv',
body=b',test\n0,0\n1,1\n2,2\n')
mock_abfs = test_mock_abfs
mock_file_object = mock_abfs.open('testfile.csv')
assert mock_file_object == AzureBlobFile(fs=mock_abfs, path='testfile.csv')

#### Tests against the AzureBlobFile Class
@responses.activate
@pytest.mark.parametrize("start, end", [(None, None), (0,0), (300, 18), (3, 30)])
def test_fetch_range(test_mock_abfs, start, end):
""" Test opening an AzureBlobFile using the AzureBlobFileSystem """

responses.add(responses.HEAD, 'https://test_storage_account.dfs.core.windows.net/test_filesystem/testfile.csv?action=getStatus',
headers={'Content-Length': '10',
'name': 'testfile.csv',
'x-ms-resource-type': 'file'},
)
responses.add(responses.GET,
'https://test_storage_account.dfs.core.windows.net/test_filesystem/testfile.csv',
body=b',test\n0,0\n1,1\n2,2\n')
mock_abfs = test_mock_abfs
f = mock_abfs.open('testfile.csv')
cache = f._fetch_range(start=start, end=end)
assert cache == b',test\n0,0\n1,1\n2,2\n'

@responses.activate
@pytest.mark.parametrize('pointer_location', [0, 300])
def test_read(test_mock_abfs, pointer_location):
""" Test read method on AzureBlobFile """
responses.add(responses.HEAD, 'https://test_storage_account.dfs.core.windows.net/test_filesystem/testfile.csv?action=getStatus',
headers={'Content-Length': '18',
'name': 'testfile.csv',
'x-ms-resource-type': 'file'},
)
responses.add(responses.GET,
'https://test_storage_account.dfs.core.windows.net/test_filesystem/testfile.csv',
body=b',test\n0,0\n1,1\n2,2\n')
mock_abfs = test_mock_abfs
mock_abf = AzureBlobFile(fs=mock_abfs, path='testfile.csv')
mock_abf.loc = pointer_location
out = mock_abf.read()
assert out==b',test\n0,0\n1,1\n2,2\n'

@responses.activate
def test_fetch_range(test_mock_abfs):
""" Test _fetch_range method, to verify that the start and end locations when cacheing files are passed properlly """

# Set up the mocked API calls
responses.add(responses.HEAD, 'https://test_storage_account.dfs.core.windows.net/test_filesystem/testfile.csv?action=getStatus',
headers={'Content-Length': '18',
'name': 'testfile.csv',
'x-ms-resource-type': 'file'},
)
responses.add(responses.GET,
'https://test_storage_account.dfs.core.windows.net/test_filesystem/testfile.csv',
body=b',test\n0,0\n1,1\n2,2\n')
mock_abfs = test_mock_abfs
mock_abf = AzureBlobFile(fs=mock_abfs, path='testfile.csv')

Binary file added mydask.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 0cc39df

Please sign in to comment.