Skip to content

Commit

Permalink
Added support for LZ4_RAW compression. (#1604) (#2943)
Browse files Browse the repository at this point in the history
* Added support for LZ4_RAW compression. (#1604)

* This adds the implementation of LZ4_RAW codec by using lz4 block compression algorithm. (#1604)
* This commit uses https://stackoverflow.com/questions/25740471/lz4-library-decompressed-data-upper-bound-size-estimation formula to estime the size of the uncompressed size. As it said in thread this algorithm over-estimates the size, but it is probably the best we can get with the current decompress API. As the size of a arrow LZ4_RAW block is not prepended to the block.
* Other option would be to take the C++ approach to bypass the API (https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/compression_lz4.cc#L343). This approach consists on relaying on the output_buffer capacity to guess the uncompress_size. This works as `serialized_reader.rs` already knows the uncompressed_size, as it reads it from the page header, and allocates the output_buffer with a capacity equal to the uncompress_size (https://github.com/marioloko/arrow-rs/blob/master/parquet/src/file/serialized_reader.rs#L417). I did not follow this approach because:
    1. It is too hacky.
    2. It will limit the use cases of the `decompress` API, as the caller will need to know to allocate the right uncompressed_size.
    3. It is not compatible with the current set of tests. However, new test can be created.

* Clippy

* Add integration test

Co-authored-by: Adrián Gallego Castellanos <kugoad@gmail.com>
Co-authored-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
  • Loading branch information
3 people committed Oct 27, 2022
1 parent 880c4d9 commit 4e1247e
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
31 changes: 31 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2390,4 +2390,35 @@ mod tests {
assert_eq!(full.column(idx), projected.column(0));
}
}

#[test]
fn test_read_lz4_raw() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/lz4_raw_compressed.parquet", testdata);
let file = File::open(&path).unwrap();

let batches = ParquetRecordBatchReader::try_new(file, 1024)
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];

assert_eq!(batch.num_columns(), 3);
assert_eq!(batch.num_rows(), 4);

// https://github.com/apache/parquet-testing/pull/18
let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
assert_eq!(
a.values(),
&[1593604800, 1593604800, 1593604801, 1593604801]
);

let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
let a: Vec<_> = a.iter().flatten().collect();
assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);

let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
}
}
4 changes: 4 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ pub enum Encoding {

/// Supported compression algorithms.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(non_camel_case_types)]
pub enum Compression {
UNCOMPRESSED,
SNAPPY,
Expand All @@ -290,6 +291,7 @@ pub enum Compression {
BROTLI,
LZ4,
ZSTD,
LZ4_RAW,
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -826,6 +828,7 @@ impl TryFrom<parquet::CompressionCodec> for Compression {
parquet::CompressionCodec::BROTLI => Compression::BROTLI,
parquet::CompressionCodec::LZ4 => Compression::LZ4,
parquet::CompressionCodec::ZSTD => Compression::ZSTD,
parquet::CompressionCodec::LZ4_RAW => Compression::LZ4_RAW,
_ => {
return Err(general_err!(
"unexpected parquet compression codec: {}",
Expand All @@ -846,6 +849,7 @@ impl From<Compression> for parquet::CompressionCodec {
Compression::BROTLI => parquet::CompressionCodec::BROTLI,
Compression::LZ4 => parquet::CompressionCodec::LZ4,
Compression::ZSTD => parquet::CompressionCodec::ZSTD,
Compression::LZ4_RAW => parquet::CompressionCodec::LZ4_RAW,
}
}
}
Expand Down
64 changes: 64 additions & 0 deletions parquet/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub fn create_codec(codec: CodecType) -> Result<Option<Box<dyn Codec>>> {
CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
#[cfg(any(feature = "zstd", test))]
CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
#[cfg(any(feature = "lz4", test))]
CodecType::LZ4_RAW => Ok(Some(Box::new(LZ4RawCodec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
}
Expand Down Expand Up @@ -325,6 +327,63 @@ mod zstd_codec {
#[cfg(any(feature = "zstd", test))]
pub use zstd_codec::*;

#[cfg(any(feature = "lz4", test))]
mod lz4_raw_codec {
use crate::compression::Codec;
use crate::errors::Result;

/// Codec for LZ4 Raw compression algorithm.
pub struct LZ4RawCodec {}

impl LZ4RawCodec {
/// Creates new LZ4 Raw compression codec.
pub(crate) fn new() -> Self {
Self {}
}
}

// Compute max LZ4 uncompress size.
// Check https://stackoverflow.com/questions/25740471/lz4-library-decompressed-data-upper-bound-size-estimation
fn max_uncompressed_size(compressed_size: usize) -> usize {
(compressed_size << 8) - compressed_size - 2526
}

impl Codec for LZ4RawCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
) -> Result<usize> {
let offset = output_buf.len();
let required_len = max_uncompressed_size(input_buf.len());
output_buf.resize(offset + required_len, 0);
let required_len: i32 = required_len.try_into().unwrap();
match lz4::block::decompress_to_buffer(input_buf, Some(required_len), &mut output_buf[offset..]) {
Ok(n) => {
output_buf.truncate(offset + n);
Ok(n)
},
Err(e) => Err(e.into()),
}
}

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let offset = output_buf.len();
let required_len = lz4::block::compress_bound(input_buf.len())?;
output_buf.resize(offset + required_len, 0);
match lz4::block::compress_to_buffer(input_buf, None, false, &mut output_buf[offset..]) {
Ok(n) => {
output_buf.truncate(offset + n);
Ok(())
},
Err(e) => Err(e.into()),
}
}
}
}
#[cfg(any(feature = "lz4", test))]
pub use lz4_raw_codec::*;

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -416,4 +475,9 @@ mod tests {
fn test_codec_zstd() {
test_codec(CodecType::ZSTD);
}

#[test]
fn test_codec_lz4_raw() {
test_codec(CodecType::LZ4_RAW);
}
}

0 comments on commit 4e1247e

Please sign in to comment.