Skip to content

Commit

Permalink
Revert "Merge branch 'cjalmeida-fix-chuncked-upload'"
Browse files Browse the repository at this point in the history
This reverts commit cd53a8c, reversing
changes made to 1f08c4c.
  • Loading branch information
hayesgb committed Feb 6, 2020
1 parent cd53a8c commit 7bca626
Showing 1 changed file with 7 additions and 30 deletions.
37 changes: 7 additions & 30 deletions adlfs/core.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
from __future__ import print_function, division, absolute_import

import logging
from os.path import join


from azure.datalake.store import lib, AzureDLFileSystem
from azure.datalake.store.core import AzureDLPath, AzureDLFile
from azure.storage.blob import BlockBlobService, BlobPrefix, Container, BlobBlock
from azure.storage.blob import BlockBlobService, BlobPrefix, Container
from azure.storage.common._constants import SERVICE_HOST_BASE, DEFAULT_PROTOCOL
from fsspec import AbstractFileSystem
from fsspec.spec import AbstractBufferedFile
Expand Down Expand Up @@ -267,9 +269,6 @@ class AzureBlobFileSystem(AbstractFileSystem):
token_credential:
A token credential used to authenticate HTTPS requests. The token value
should be updated before its expiration.
blocksize:
The block size to use for download/upload operations. Defaults to the value of
``BlockBlobService.MAX_BLOCK_SIZE``
Examples
--------
Expand Down Expand Up @@ -300,7 +299,6 @@ def __init__(
connection_string: str = None,
socket_timeout=None,
token_credential=None,
blocksize=BlockBlobService.MAX_BLOCK_SIZE,
):
AbstractFileSystem.__init__(self)
self.account_name = account_name
Expand All @@ -314,7 +312,6 @@ def __init__(
self.connection_string = connection_string
self.socket_timeout = socket_timeout
self.token_credential = token_credential
self.blocksize = blocksize
self.do_connect()

@classmethod
Expand Down Expand Up @@ -548,7 +545,7 @@ def _open(
fs=self,
path=path,
mode=mode,
block_size=block_size or self.blocksize,
block_size=block_size,
autocommit=autocommit,
cache_options=cache_options,
**kwargs,
Expand Down Expand Up @@ -593,28 +590,8 @@ def _fetch_range(self, start, end, **kwargs):
)
return blob.content

def _initiate_upload(self):
self._block_list = []
if self.fs.blob_fs.exists(self.container_name, self.path):
self.fs.blob_fs.delete_blob(self.container_name, self.path)
return super()._initiate_upload()

def _upload_chunk(self, final=False, **kwargs):
data = self.buffer.getvalue()
block_id = len(self._block_list)
block_id = f"{block_id:07d}"
self.fs.blob_fs.put_block(
container_name=self.container_name,
blob_name=self.path,
block=data,
block_id=block_id,
self.fs.blob_fs.create_blob_from_bytes(
container_name=self.container_name, blob_name=self.blob, blob=data
)
self._block_list.append(block_id)

if final:
block_list = [BlobBlock(_id) for _id in self._block_list]
self.fs.blob_fs.put_block_list(
container_name=self.container_name,
blob_name=self.path,
block_list=block_list,
)

0 comments on commit 7bca626

Please sign in to comment.