Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invalid/empty header with write_csv #81

Closed
mgsnuno opened this issue Aug 12, 2020 · 11 comments
Closed

Invalid/empty header with write_csv #81

mgsnuno opened this issue Aug 12, 2020 · 11 comments

Comments

@mgsnuno
Copy link

mgsnuno commented Aug 12, 2020

What happened:
using dask.to_csv("abfs://...") in a file with several partitions causes random invalid/empty header errors.

The value for one of the HTTP headers is not in the correct format.
RequestId: ...
Time: .....
ErrorCode: InvalidHeaderValue
Error:None
HeaderName:Content-length
HeaderValue:0

Minimal Complete Verifiable Example:

_ = df.to_csv(
    os.path.join(f"abfs://{output_path_csv}", table_name, "part_*.csv"),
    index=False,
    sep="|",
    storage_options=storage_options,
)

Hard to reproduce, all partitions but one fail to write, failing partition has same dtypes and size as others. Code to write to csv that fails above. Error seems to indicate length 0 partition, but when I do len(df.get_partition(x)) I get the same size as other partitions. Also partition that fails is not the last one.

Help on how to debug this?

Environment:

  • Dask version: 2.22.0
  • adlfs version: 0.3.2
  • Python version: 3.8.5
  • Operating System: linux arch 64 bit
  • Install method (conda, pip, source): conda
@mgsnuno
Copy link
Author

mgsnuno commented Sep 7, 2020

@hayesgb any pointers on how to debug this? Thanks

@hayesgb
Copy link
Collaborator

hayesgb commented Sep 10, 2020

Does repartitioning the dataframe before writing it resolve the problem?

Are you still seeing the issue with either 0.4.x or 0.5.0? If so, can you provide me with a small example that replicates the problem?

Something like this:

storage_options = { I don't need these }
A = np.random.randint(0, 100, size=(10000, 4))
df2 = pd.DataFrame(data=A, columns=list("ABCD"))
ddf2 = dd.from_pandas(df2, npartitions=4)
ddf2.to_csv("abfs://container/path_to_my.csv/*.csv", storage_options=storage_options)

What is the exact expected result of the to_csv operation?

@mgsnuno
Copy link
Author

mgsnuno commented Sep 16, 2020

@hayesgb repartitioning the dataframe before, with partition_size to make sure we have evenly sized partitions and any empty, didn't fix the issue with 0.4.x.

I've tried 0.5.0 briefly, need more time to see if it's working.

P.S. - What I notice with 0.5.0 (you're using asyncio correct?) is that some delete operations that were fast before, are really slow sometimes and sometimes really fast. I use Python Interactive in VSCode a lot and sometimes it just gets stuck. As reported by more people, looks like VSCode Python Interactive is not too friendly with asyncio multiple processess.

@hayesgb
Copy link
Collaborator

hayesgb commented Sep 16, 2020

v0.5.x uses asyncio and caching for filesystem operation (not for the AzureBlobFile class) to speed up operations.

@mgsnuno
Copy link
Author

mgsnuno commented Sep 18, 2020

@hayesgb quick way to cause this issue:

file = open('sample.txt', 'wb')  #empty file
abfs.put("./sample.txt", '<blob_name>/sample.txt')
Traceback ~/miniconda3/envs/pipelines/lib/python3.8/site-packages/adlfs/spec.py in put(self, lpath, rpath, recursive, **kwargs) 1121 1122 for lpath, rpath in zip(lpaths, rpaths): -> 1123 self.put_file(lpath, rpath, **kwargs) 1124 1125 def upload(self, lpath, rpath, recursive=False, **kwargs):

~/miniconda3/envs/pipelines/lib/python3.8/site-packages/fsspec/spec.py in put_file(self, lpath, rpath, **kwargs)
677 while data:
678 data = f1.read(self.blocksize)
--> 679 f2.write(data)
680
681 def put(self, lpath, rpath, recursive=False, **kwargs):

~/miniconda3/envs/pipelines/lib/python3.8/site-packages/adlfs/spec.py in exit(self, *args)
1608
1609 def exit(self, *args):
-> 1610 self.close()
1611
1612 def del(self):

~/miniconda3/envs/pipelines/lib/python3.8/site-packages/adlfs/spec.py in close(self)
1585 else:
1586 if not self.forced:
-> 1587 self.flush(force=True)
1588 if self.fs is not None:
1589 self.fs.invalidate_cache(self.path)

~/miniconda3/envs/pipelines/lib/python3.8/site-packages/adlfs/spec.py in flush(self, force)
1460 self._initiate_upload()
1461
-> 1462 if self._upload_chunk(final=force) is not False:
1463 self.offset += self.buffer.seek(0, 2)
1464 self.buffer = io.BytesIO()

~/miniconda3/envs/pipelines/lib/python3.8/site-packages/adlfs/spec.py in _upload_chunk(self, final, **kwargs)
1422 block_id = len(self._block_list)
1423 block_id = f"{block_id:07d}"
-> 1424 self.blob_client.stage_block(block_id=block_id, data=data, length=length)
1425 self._block_list.append(block_id)
1426

~/miniconda3/envs/pipelines/lib/python3.8/site-packages/azure/core/tracing/decorator.py in wrapper_use_tracer(*args, **kwargs)
81 span_impl_type = settings.tracing_implementation()
82 if span_impl_type is None:
---> 83 return func(*args, **kwargs)
84
85 # Merge span is parameter is set, but only if no explicit parent are passed

~/miniconda3/envs/pipelines/lib/python3.8/site-packages/azure/storage/blob/_blob_client.py in stage_block(self, block_id, data, length, **kwargs)
2015 return self._client.block_blob.stage_block(**options)
2016 except StorageErrorException as error:
-> 2017 process_storage_error(error)
2018
2019 def _stage_block_from_url_options(

~/miniconda3/envs/pipelines/lib/python3.8/site-packages/azure/storage/blob/_shared/response_handlers.py in process_storage_error(storage_error)
145 error.error_code = error_code
146 error.additional_info = additional_data
--> 147 raise error
148
149

HttpResponseError: The value for one of the HTTP headers is not in the correct format.
RequestId:8d688c63-601e-000a-3e91-8db477000000
Time:2020-09-18T07:56:45.3665244Z
ErrorCode:InvalidHeaderValue
Error:None
HeaderName:Content-Length
HeaderValue:0

With/without repartitioning the issue still occurs and even more frequently in v0.5.x. Something like bellow fails with v0.5.x:

s3.get(path_s3, path_local, recursive=True)
abfs.put(path_local, path_abfs, recursive=True)

It is tied with recursive I believe. I use this code to copy folders of parquet tables from s3 to aws. This code was working on v0.3.3 and now gives those empty header/values.

Versions:
adlfs - v0.5.3
fsspec - 0.8.2
azure.storage.blob - 12.5.0'

@hayesgb
Copy link
Collaborator

hayesgb commented Sep 23, 2020

I tried to replicate your issue (unsuccessfully) as follows:

fs = AzureBlobFileSystem(account_name=storage.account_name, connection_string=CONN_STR)

# Create a directory for the sample file
fs.mkdir("test")
with open('sample.txt', 'wb') as f:
    f.write(b"test of put method")
fs.put("./sample.txt", "test/sample.txt")
fs.get("test/sample.txt", "sample2.txt")
with open("./sample.txt", 'rb') as f:
    f1 = f.read()
with open("./sample.txt", 'rb') as f:
    f2 = f.read()
assert f1 == f2

This passes with:
adlfs -- master branch
fsspec -- 0.8.2
azure.storage.blob -- 12.5.0
azure-common -- 1.1.24
azure-core -- 1.8.0
azure-identity 1.3.1
azure-storage-common 2.1.0
aiohttp 3.6.2

azure storage account:
Access Tier: standard/hot
Replication: RA-GRS

@mgsnuno
Copy link
Author

mgsnuno commented Sep 23, 2020

@hayesgb like I mentioned, sample.txt has to be emtpy file = open('sample.txt', 'wb') #empty file.

Like the error points towards, it's a empty (null content) file.

Your test modified to cause the error (just got the error using the same versions as you, apart from adlfs==0.5.3):

fs = AzureBlobFileSystem(account_name=storage.account_name, connection_string=CONN_STR)

fs.mkdir("test")
_ = open('sample.txt', 'wb')
fs.put("./sample.txt", "test/sample.txt")  # will raise HttpResponseError

The I think the question is: why are empty files being sent by adlfs when writing a dataframe.to_csv with no empty partitions (verified)?

@mgsnuno
Copy link
Author

mgsnuno commented Sep 23, 2020

Just caused the issue in another way. Same versions as you, download the latest ubuntu and ubuntu server images:

wget https://releases.ubuntu.com/20.04.1/ubuntu-20.04.1-desktop-amd64.iso
wget https://releases.ubuntu.com/20.04.1/ubuntu-20.04.1-live-server-amd64.iso

And then:

fs = AzureBlobFileSystem(account_name=storage.account_name, connection_string=CONN_STR)
fs.mkdir("test")
file_name = "ubuntu-20.04.1-desktop-amd64.iso"  # 2.6Gb
fs.put(f"{file_name}", f"test/{file_name}")  # raises empty header error at some point
file_name = 'ubuntu-20.04.1-live-server-amd64.iso'  # 914Mb
fs.put(f"{file_name}", f"test/{file_name}")  # no error

Can it be that adlfs is doing some automatic chunking under the hood and we end up with empty chunks at some point?

@hayesgb
Copy link
Collaborator

hayesgb commented Sep 23, 2020

#108 Adds a fix for the put operation of an empty file.

@mgsnuno
Copy link
Author

mgsnuno commented Sep 24, 2020

Thanks a lot.
Any ideas why putting a single file fails if the file is above a certain size? Like in the case above of ubuntu desktop vs ubuntu live? Is there chunking being made somewhere?
Thank you again

@hayesgb
Copy link
Collaborator

hayesgb commented Sep 26, 2020

#115 fixes both. I tested it with the ubuntu image and I downloaded a 5GB zipfile from AWS. This took nearly 90 minutes to write, but it was successful. I'll implement the same on the get_file next.

@hayesgb hayesgb closed this as completed Sep 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants