diff --git a/azure-storage-blob/ChangeLog.md b/azure-storage-blob/ChangeLog.md index 8d0ed281..c07f9453 100644 --- a/azure-storage-blob/ChangeLog.md +++ b/azure-storage-blob/ChangeLog.md @@ -2,6 +2,10 @@ > See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks. +## Version XX.XX.XX: + +- Optimized page blob upload for create_blob_from_* methods, by skipping the empty chunks. + ## Version 0.37.1: - Enabling MD5 validation no longer uses the memory-efficient algorithm for large block blobs, since computing the MD5 hash requires reading the entire block into memory. diff --git a/azure-storage-blob/azure/storage/blob/_upload_chunking.py b/azure-storage-blob/azure/storage/blob/_upload_chunking.py index 65c022f8..e1e069b0 100644 --- a/azure-storage-blob/azure/storage/blob/_upload_chunking.py +++ b/azure-storage-blob/azure/storage/blob/_upload_chunking.py @@ -196,19 +196,19 @@ def get_chunk_streams(self): data = self.padder.update(data) if self.encryptor: data = self.encryptor.update(data) - yield index, BytesIO(data) + yield index, data else: if self.padder: data = self.padder.update(data) + self.padder.finalize() if self.encryptor: data = self.encryptor.update(data) + self.encryptor.finalize() if len(data) > 0: - yield index, BytesIO(data) + yield index, data break index += len(data) def process_chunk(self, chunk_data): - chunk_bytes = chunk_data[1].read() + chunk_bytes = chunk_data[1] chunk_offset = chunk_data[0] return self._upload_chunk_with_progress(chunk_offset, chunk_bytes) @@ -290,24 +290,39 @@ def _upload_substream_block(self, block_id, block_stream): class _PageBlobChunkUploader(_BlobChunkUploader): + EMPTY_PAGE = bytearray(512) + + def _is_chunk_empty(self, chunk_data): + # read until non-zero data is encountered + # if reached the end without returning, then chunk_data is all 0's + data = BytesIO(chunk_data) + page = data.read(512) + while page != b'': + if page != self.EMPTY_PAGE: + return False + page = data.read(512) + return True + def _upload_chunk(self, chunk_start, chunk_data): - chunk_end = chunk_start + len(chunk_data) - 1 - resp = self.blob_service._update_page( - self.container_name, - self.blob_name, - chunk_data, - chunk_start, - chunk_end, - validate_content=self.validate_content, - lease_id=self.lease_id, - if_match=self.if_match, - timeout=self.timeout, - ) + # avoid uploading the empty pages + if not self._is_chunk_empty(chunk_data): + chunk_end = chunk_start + len(chunk_data) - 1 + resp = self.blob_service._update_page( + self.container_name, + self.blob_name, + chunk_data, + chunk_start, + chunk_end, + validate_content=self.validate_content, + lease_id=self.lease_id, + if_match=self.if_match, + timeout=self.timeout, + ) - if not self.parallel: - self.if_match = resp.etag + if not self.parallel: + self.if_match = resp.etag - self.set_response_properties(resp) + self.set_response_properties(resp) class _AppendBlobChunkUploader(_BlobChunkUploader): diff --git a/azure-storage-blob/azure/storage/blob/pageblobservice.py b/azure-storage-blob/azure/storage/blob/pageblobservice.py index 9b199517..27521734 100644 --- a/azure-storage-blob/azure/storage/blob/pageblobservice.py +++ b/azure-storage-blob/azure/storage/blob/pageblobservice.py @@ -77,12 +77,12 @@ class PageBlobService(BaseBlobService): and a range that align to 512-byte page boundaries. A write to a page blob can overwrite just one page, some pages, or up to 4 MB of the page blob. Writes to page blobs happen in-place and are immediately committed to the - blob. The maximum size for a page blob is 1 TB. + blob. The maximum size for a page blob is 8 TB. :ivar int MAX_PAGE_SIZE: The size of the pages put by create_blob_from_* methods. Smaller pages may be put if there is less data provided. The maximum page size the service - supports is 4MB. + supports is 4MB. When using the create_blob_from_* methods, empty pages are skipped. ''' MAX_PAGE_SIZE = 4 * 1024 * 1024 @@ -802,6 +802,7 @@ def create_blob_from_path( ''' Creates a new blob from a file path, or updates the content of an existing blob, with automatic chunking and progress notifications. + Empty chunks are skipped, while non-emtpy ones(even if only partly filled) are uploaded. :param str container_name: Name of existing container. @@ -895,6 +896,7 @@ def create_blob_from_stream( ''' Creates a new blob from a file/stream, or updates the content of an existing blob, with automatic chunking and progress notifications. + Empty chunks are skipped, while non-emtpy ones(even if only partly filled) are uploaded. :param str container_name: Name of existing container. @@ -1028,7 +1030,7 @@ def create_blob_from_bytes( ''' Creates a new blob from an array of bytes, or updates the content of an existing blob, with automatic chunking and progress - notifications. + notifications. Empty chunks are skipped, while non-emtpy ones(even if only partly filled) are uploaded. :param str container_name: Name of existing container. diff --git a/tests/blob/test_page_blob.py b/tests/blob/test_page_blob.py index 3e9b9024..c3a6b531 100644 --- a/tests/blob/test_page_blob.py +++ b/tests/blob/test_page_blob.py @@ -628,6 +628,38 @@ def test_create_blob_from_stream(self): self.assertEqual(blob.properties.etag, create_resp.etag) self.assertEqual(blob.properties.last_modified, create_resp.last_modified) + def test_create_blob_from_stream_with_empty_pages(self): + # parallel tests introduce random order of requests, can only run live + if TestMode.need_recording_file(self.test_mode): + return + + # Arrange + # data is almost all empty (0s) except two ranges + blob_name = self._get_blob_reference() + data = bytearray(LARGE_BLOB_SIZE) + data[512: 1024] = self.get_random_bytes(512) + data[8192: 8196] = self.get_random_bytes(4) + with open(FILE_PATH, 'wb') as stream: + stream.write(data) + + # Act + blob_size = len(data) + with open(FILE_PATH, 'rb') as stream: + create_resp = self.bs.create_blob_from_stream(self.container_name, blob_name, stream, blob_size) + blob = self.bs.get_blob_properties(self.container_name, blob_name) + + # Assert + # the uploader should have skipped the empty ranges + self.assertBlobEqual(self.container_name, blob_name, data[:blob_size]) + page_ranges = list(self.bs.get_page_ranges(self.container_name, blob_name)) + self.assertEqual(len(page_ranges), 2) + self.assertEqual(page_ranges[0].start, 0) + self.assertEqual(page_ranges[0].end, 4095) + self.assertEqual(page_ranges[1].start, 8192) + self.assertEqual(page_ranges[1].end, 12287) + self.assertEqual(blob.properties.etag, create_resp.etag) + self.assertEqual(blob.properties.last_modified, create_resp.last_modified) + def test_create_blob_from_stream_non_seekable(self): # parallel tests introduce random order of requests, can only run live if TestMode.need_recording_file(self.test_mode):