Skip to content

Commit

Permalink
Use LZ4 block compression for compatibility with parquet-cpp (#314) (#…
Browse files Browse the repository at this point in the history
…315)

* Use LZ4 block compression for compatibility with parquet-cpp (#314)

This removes LZ4 frame compression in favour of LZ4 block
compression to ensure the LZ4 compression used is compatible
with the paqrquet-cpp implementation.

Because the lz4.block decompressor needs to know the size
of the uncompressed data, this change also requires:

    - propagation of page_header.uncompressed_page_size
      through to decompress_data
    - the decompression functions for all compressors
      to handle the uncompressed_size argument

Where a decompressor doesn't need to know the uncompressed
data size, we simply ignore it.

* Clarify wording of exception messages in compression.py

* Don't raise exception if store_size or write_content_size are True
  • Loading branch information
jonathanunderwood authored and martindurant committed Mar 11, 2018
1 parent 59036fc commit dbb6b22
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 42 deletions.
49 changes: 27 additions & 22 deletions fastparquet/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
'UNCOMPRESSED': lambda x: x
}
decompressions = {
'UNCOMPRESSED': lambda x: x
'UNCOMPRESSED': lambda x, y: x
}

# Gzip is present regardless
Expand All @@ -24,7 +24,7 @@ def gzip_compress_v2(data, compresslevel=COMPRESSION_LEVEL):
f.write(data)
f.close()
return bio.getvalue()
def gzip_decompress_v2(data):
def gzip_decompress_v2(data, uncompressed_size):
import zlib
return zlib.decompress(data,
16+15)
Expand All @@ -33,50 +33,55 @@ def gzip_decompress_v2(data):
else:
def gzip_compress_v3(data, compresslevel=COMPRESSION_LEVEL):
return gzip.compress(data, compresslevel=compresslevel)
def gzip_decompress(data, uncompressed_size):
return gzip.decompress(data)
compressions['GZIP'] = gzip_compress_v3
decompressions['GZIP'] = gzip.decompress
decompressions['GZIP'] = gzip_decompress

try:
import snappy
def snappy_decompress(data, uncompressed_size):
return snappy.decompress(data)
compressions['SNAPPY'] = snappy.compress
decompressions['SNAPPY'] = snappy.decompress
decompressions['SNAPPY'] = snappy_decompress
except ImportError:
pass
try:
import lzo
def lzo_decompress(data, uncompressed_size):
return lzo.decompress(data)
compressions['LZO'] = lzo.compress
decompressions['LZO'] = lzo.decompress
decompressions['LZO'] = lzo_decompress
except ImportError:
pass
try:
import brotli
def brotli_decompress(data, uncompressed_size):
return brotli.decompress(data)
compressions['BROTLI'] = brotli.compress
decompressions['BROTLI'] = brotli.decompress
decompressions['BROTLI'] = brotli_decompress
except ImportError:
pass
try:
import lz4.frame
compressions['LZ4'] = lz4.frame.compress
decompressions['LZ4'] = lz4.frame.decompress
import lz4.block
def lz4_compress(data, **kwargs):
kwargs['store_size'] = False
return lz4.block.compress(data, **kwargs)
def lz4_decompress(data, uncompressed_size):
return lz4.block.decompress(data, uncompressed_size=uncompressed_size)
compressions['LZ4'] = lz4_compress
decompressions['LZ4'] = lz4_decompress
except ImportError:
pass
try:
import zstd
def zstd_compress(data, **kwargs):
# For the ZstdDecompressor to work, the compressed data must include
# the uncompressed size, so we raise an exception if the user tries to
# set this to False. We also set it to True if it's not specified
# (since the default is False, weirdly).
try:
if kwargs['write_content_size'] == False:
raise RuntimeError('write_content_size cannot be false for the ZSTD compressor')
except KeyError:
kwargs['write_content_size'] = True
kwargs['write_content_size'] = False
cctx = zstd.ZstdCompressor(**kwargs)
return cctx.compress(data, allow_empty=True)
def zstd_decompress(data):
def zstd_decompress(data, uncompressed_size):
dctx = zstd.ZstdDecompressor()
return dctx.decompress(data)
return dctx.decompress(data, max_output_size=uncompressed_size)
compressions['ZSTD'] = zstd_compress
decompressions['ZSTD'] = zstd_decompress
except ImportError:
Expand Down Expand Up @@ -113,10 +118,10 @@ def compress_data(data, compression='gzip'):
raise ValueError("args dict entry is not a dict")
return compressions[algorithm.upper()](data, **args)

def decompress_data(data, algorithm='gzip'):
def decompress_data(data, uncompressed_size, algorithm='gzip'):
if isinstance(algorithm, int):
algorithm = rev_map[algorithm]
if algorithm.upper() not in decompressions:
raise RuntimeError("Decompression '%s' not available. Options: %s" %
(algorithm.upper(), sorted(decompressions)))
return decompressions[algorithm.upper()](data)
return decompressions[algorithm.upper()](data, uncompressed_size)
6 changes: 5 additions & 1 deletion fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ def _read_page(file_obj, page_header, column_metadata):
"""Read the data page from the given file-object and convert it to raw,
uncompressed bytes (if necessary)."""
raw_bytes = file_obj.read(page_header.compressed_page_size)
raw_bytes = decompress_data(raw_bytes, column_metadata.codec)
raw_bytes = decompress_data(
raw_bytes,
page_header.uncompressed_page_size,
column_metadata.codec,
)

assert len(raw_bytes) == page_header.uncompressed_page_size, \
"found {0} raw bytes (expected {1})".format(
Expand Down
12 changes: 4 additions & 8 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def test_compression_zstandard(tempdir):
"threads": 0,
"write_checksum": True,
"write_dict_id": True,
"write_content_size": True,
"write_content_size": False,
}
},
"_default": {
Expand Down Expand Up @@ -470,12 +470,8 @@ def test_compression_lz4(tempdir):
"y": {
"type": "lz4",
"args": {
"compression_level": 5,
"content_checksum": True,
"block_size": 0,
"block_checksum": True,
"block_linked": True,
"store_size": True,
"compression": 5,
"store_size": False,
}
},
"_default": {
Expand Down Expand Up @@ -564,4 +560,4 @@ def test_only_partition_columns(tempdir):

with pytest.raises(ValueError):
# because this leaves no data to write
write(tempdir, df[['b']], file_scheme='hive', partition_on=['b'])
write(tempdir, df[['b']], file_scheme='hive', partition_on=['b'])
18 changes: 7 additions & 11 deletions fastparquet/test/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def test_compress_decompress_roundtrip(fmt):
else:
assert len(compressed) < len(data)

decompressed = decompress_data(compressed, algorithm=fmt)
decompressed = decompress_data(compressed, len(data), algorithm=fmt)
assert data == decompressed


Expand All @@ -30,7 +30,7 @@ def test_compress_decompress_roundtrip_args_gzip():
)
assert len(compressed) < len(data)

decompressed = decompress_data(compressed, algorithm="gzip")
decompressed = decompress_data(compressed, len(data), algorithm="gzip")
assert data == decompressed

def test_compress_decompress_roundtrip_args_lz4():
Expand All @@ -41,18 +41,14 @@ def test_compress_decompress_roundtrip_args_lz4():
compression={
"type": "lz4",
"args": {
"compression_level": 5,
"content_checksum": True,
"block_size": 0,
"block_checksum": True,
"block_linked": True,
"store_size": True,
"compression": 5,
"store_size": False,
}
}
)
assert len(compressed) < len(data)

decompressed = decompress_data(compressed, algorithm="lz4")
decompressed = decompress_data(compressed, len(data), algorithm="lz4")
assert data == decompressed

def test_compress_decompress_roundtrip_args_zstd():
Expand All @@ -67,13 +63,13 @@ def test_compress_decompress_roundtrip_args_zstd():
"threads": 0,
"write_checksum": True,
"write_dict_id": True,
"write_content_size": True,
"write_content_size": False,
}
}
)
assert len(compressed) < len(data)

decompressed = decompress_data(compressed, algorithm="zstd")
decompressed = decompress_data(compressed, len(data), algorithm="zstd")
assert data == decompressed

def test_errors():
Expand Down

0 comments on commit dbb6b22

Please sign in to comment.