Skip to content

Commit

Permalink
Merge pull request #38 from cjalmeida/unrevert-chunked
Browse files Browse the repository at this point in the history
Fix chucked upload incompatibilities with #35
  • Loading branch information
hayesgb committed Feb 8, 2020
2 parents eecca8f + 5768d8b commit 0070763
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 8 deletions.
37 changes: 30 additions & 7 deletions adlfs/core.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# -*- coding: utf-8 -*-
from __future__ import print_function, division, absolute_import
from __future__ import absolute_import, division, print_function

import logging
from os.path import join


from azure.datalake.store import lib, AzureDLFileSystem
from azure.datalake.store.core import AzureDLPath, AzureDLFile
from azure.storage.blob import BlockBlobService, BlobPrefix, Container
from azure.storage.blob import BlockBlobService, BlobPrefix, Container, BlobBlock
from azure.storage.common._constants import SERVICE_HOST_BASE, DEFAULT_PROTOCOL
from fsspec import AbstractFileSystem
from fsspec.spec import AbstractBufferedFile
Expand Down Expand Up @@ -269,6 +267,9 @@ class AzureBlobFileSystem(AbstractFileSystem):
token_credential:
A token credential used to authenticate HTTPS requests. The token value
should be updated before its expiration.
blocksize:
The block size to use for download/upload operations. Defaults to the value of
``BlockBlobService.MAX_BLOCK_SIZE``
Examples
--------
Expand Down Expand Up @@ -299,6 +300,7 @@ def __init__(
connection_string: str = None,
socket_timeout=None,
token_credential=None,
blocksize=BlockBlobService.MAX_BLOCK_SIZE,
):
AbstractFileSystem.__init__(self)
self.account_name = account_name
Expand All @@ -312,6 +314,7 @@ def __init__(
self.connection_string = connection_string
self.socket_timeout = socket_timeout
self.token_credential = token_credential
self.blocksize = blocksize
self.do_connect()

@classmethod
Expand Down Expand Up @@ -545,7 +548,7 @@ def _open(
fs=self,
path=path,
mode=mode,
block_size=block_size,
block_size=block_size or self.blocksize,
autocommit=autocommit,
cache_options=cache_options,
**kwargs,
Expand Down Expand Up @@ -590,8 +593,28 @@ def _fetch_range(self, start, end, **kwargs):
)
return blob.content

def _initiate_upload(self):
self._block_list = []
if self.fs.blob_fs.exists(self.container_name, self.blob):
self.fs.blob_fs.delete_blob(self.container_name, self.blob)
return super()._initiate_upload()

def _upload_chunk(self, final=False, **kwargs):
data = self.buffer.getvalue()
self.fs.blob_fs.create_blob_from_bytes(
container_name=self.container_name, blob_name=self.blob, blob=data
block_id = len(self._block_list)
block_id = f"{block_id:07d}"
self.fs.blob_fs.put_block(
container_name=self.container_name,
blob_name=self.blob,
block=data,
block_id=block_id,
)
self._block_list.append(block_id)

if final:
block_list = [BlobBlock(_id) for _id in self._block_list]
self.fs.blob_fs.put_block_list(
container_name=self.container_name,
blob_name=self.blob,
block_list=block_list,
)
72 changes: 71 additions & 1 deletion adlfs/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import adlfs
import docker
import pytest

import adlfs


@pytest.fixture(scope="session", autouse=True)
def spawn_azurite():
Expand Down Expand Up @@ -205,3 +206,72 @@ def test_mkdir_rmdir(storage):
fs.rmdir("new-container")

assert "new-container/" not in fs.ls("")


def test_large_blob(storage):
import tempfile
import hashlib
import io
import shutil
from pathlib import Path

fs = adlfs.AzureBlobFileSystem(
storage.account_name,
storage.account_key,
custom_domain=f"http://{storage.primary_endpoint}",
)

# create a 20MB byte array, ensure it's larger than blocksizes to force a
# chuncked upload
blob_size = 20_000_000
assert blob_size > fs.blocksize
assert blob_size > adlfs.AzureBlobFile.DEFAULT_BLOCK_SIZE

data = b"1" * blob_size
_hash = hashlib.md5(data)
expected = _hash.hexdigest()

# create container
fs.mkdir("chunk-container")

# upload the data using fs.open
path = "chunk-container/large-blob.bin"
with fs.open(path, "wb") as dst:
dst.write(data)

assert fs.exists(path)
assert fs.size(path) == blob_size

del data

# download with fs.open
bio = io.BytesIO()
with fs.open(path, "rb") as src:
shutil.copyfileobj(src, bio)

# read back the data and calculate md5
bio.seek(0)
data = bio.read()
_hash = hashlib.md5(data)
result = _hash.hexdigest()

assert expected == result

# do the same but using upload/download and a tempdir
path = path = "chunk-container/large_blob2.bin"
with tempfile.TemporaryDirectory() as td:
local_blob: Path = Path(td) / "large_blob2.bin"
with local_blob.open("wb") as fo:
fo.write(data)
assert local_blob.exists()
assert local_blob.stat().st_size == blob_size

fs.upload(str(local_blob), path)
assert fs.exists(path)
assert fs.size(path) == blob_size

# download now
local_blob.unlink()
fs.download(path, str(local_blob))
assert local_blob.exists()
assert local_blob.stat().st_size == blob_size

0 comments on commit 0070763

Please sign in to comment.