Skip to content

Commit

Permalink
support compression for IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Jun 15, 2022
1 parent 486118c commit 7dbd4d6
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 20 deletions.
3 changes: 3 additions & 0 deletions arrow/Cargo.toml
Expand Up @@ -38,6 +38,9 @@ path = "src/lib.rs"
bench = false

[dependencies]
byteorder = "1"
lz4 = "1.23"
zstd = "0.11.1"
serde = { version = "1.0" }
serde_derive = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
Expand Down
84 changes: 84 additions & 0 deletions arrow/src/ipc/compression/compression.rs
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::io::{Read, Write};
use crate::error::{ArrowError, Result};
use crate::ipc::CompressionType;

#[derive(Clone, Copy, PartialEq)]
pub enum CompressionCodecType {
NoCompression,
Lz4Frame,
ZSTD,
}

impl From<CompressionType> for CompressionCodecType {
fn from(compression_type: CompressionType) -> Self {
match compression_type {
CompressionType::ZSTD => CompressionCodecType::ZSTD,
CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
_ => CompressionCodecType::NoCompression
}
}
}

impl Into<CompressionType> for CompressionCodecType {
fn into(self) -> CompressionType {
match self {
CompressionCodecType::NoCompression => CompressionType(-1),
CompressionCodecType::Lz4Frame => CompressionType::LZ4_FRAME,
CompressionCodecType::ZSTD => CompressionType::ZSTD
}
}
}

impl CompressionCodecType {
pub fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
match self {
CompressionCodecType::Lz4Frame => {
let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap();
encoder.write_all(input).unwrap();
Ok(())
}
CompressionCodecType::ZSTD => {
Err(ArrowError::NotYetImplemented("Compression don't support the ZSTD".to_string()))
}
_ => {
Ok(())
}
}
}

pub fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<usize> {
let result = match self {
CompressionCodecType::Lz4Frame => {
let mut decoder = lz4::Decoder::new(input)?;
decoder.read_to_end(output)
}
CompressionCodecType::ZSTD => {
let mut decoder = zstd::Decoder::new(input)?;
decoder.read_to_end(output)
}
_ => {
Ok(input.len())
}
};
result.map_err(|e| {
ArrowError::from(e)
})
}
}
21 changes: 21 additions & 0 deletions arrow/src/ipc/compression/mod.rs
@@ -0,0 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub(crate) mod compression;
pub(crate) const LENGTH_EMPTY_COMPRESSED_DATA: i64 = 0;
pub(crate) const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
pub(crate) const LENGTH_OF_PREFIX_DATA: i64 = 8;
1 change: 1 addition & 0 deletions arrow/src/ipc/mod.rs
Expand Up @@ -29,6 +29,7 @@ pub mod writer;
#[allow(clippy::redundant_field_names)]
#[allow(non_camel_case_types)]
pub mod gen;
mod compression;

pub use self::gen::File::*;
pub use self::gen::Message::*;
Expand Down
99 changes: 88 additions & 11 deletions arrow/src/ipc/reader.rs
Expand Up @@ -23,6 +23,7 @@
use std::collections::HashMap;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use byteorder::{ByteOrder, LittleEndian};

use crate::array::*;
use crate::buffer::Buffer;
Expand All @@ -34,13 +35,62 @@ use crate::record_batch::{RecordBatch, RecordBatchReader};

use ipc::CONTINUATION_MARKER;
use DataType::*;
use crate::ipc::{CompressionType};
use crate::ipc::compression::compression::CompressionCodecType;
use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA};


/// Read a buffer based on offset and length
fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
/// Each constituent buffer is first compressed with the indicated
/// compressor, and then written with the uncompressed length in the first 8
/// bytes as a 64-bit little-endian signed integer followed by the compressed
/// buffer bytes (and then padding as required by the protocol). The
/// uncompressed length may be set to -1 to indicate that the data that
/// follows is not compressed, which can be useful for cases where
/// compression does not yield appreciable savings.
fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], compression_codec: &CompressionCodecType) -> Buffer {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = &a_data[start_offset..end_offset];
Buffer::from(&buf_data)
match compression_codec {
CompressionCodecType::NoCompression => {
Buffer::from(buf_data)
}
CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
// 8byte + data
// read the first 8 bytes
// if the data is compressed, decompress the data, otherwise decompress data.
let decompressed_length = read_uncompressed_size(buf_data);
if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
// emtpy
let empty = Vec::<u8>::new();
Buffer::from(empty)
} else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
// not compress
let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..(end_offset - start_offset)];
Buffer::from(data)
} else {
// decompress data using the codec
let mut uncompressed_buffer = Vec::new();
let input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..(end_offset - start_offset)];
// TODO consider the error result
compression_codec.decompress(input_data, &mut uncompressed_buffer).unwrap();
Buffer::from(uncompressed_buffer)
}
}
}
}

/// Get the uncompressed length
/// Notes:
/// -1: indicate that the data that follows is not compressed
/// 0: indicate that there is no data
/// positive number: indicate the uncompressed length for the following data
fn read_uncompressed_size(buffer : &[u8]) -> i64 {
let len_buffer = &buffer[0..8];
// 64-bit little-endian signed integer
LittleEndian::read_i64(len_buffer)
}

/// Coordinates reading arrays based on data types.
Expand All @@ -60,6 +110,7 @@ fn create_array(
dictionaries_by_id: &HashMap<i64, ArrayRef>,
mut node_index: usize,
mut buffer_index: usize,
compression_codec: &CompressionCodecType
) -> Result<(ArrayRef, usize, usize)> {
use DataType::*;
let data_type = field.data_type();
Expand All @@ -70,7 +121,7 @@ fn create_array(
data_type,
buffers[buffer_index..buffer_index + 3]
.iter()
.map(|buf| read_buffer(buf, data))
.map(|buf| read_buffer(buf, data, compression_codec))
.collect(),
);
node_index += 1;
Expand All @@ -83,7 +134,7 @@ fn create_array(
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.map(|buf| read_buffer(buf, data, compression_codec))
.collect(),
);
node_index += 1;
Expand All @@ -94,7 +145,7 @@ fn create_array(
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.map(|buf| read_buffer(buf, data, compression_codec))
.collect();
node_index += 1;
buffer_index += 2;
Expand All @@ -106,6 +157,7 @@ fn create_array(
dictionaries_by_id,
node_index,
buffer_index,
compression_codec
)?;
node_index = triple.1;
buffer_index = triple.2;
Expand All @@ -116,7 +168,7 @@ fn create_array(
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..=buffer_index]
.iter()
.map(|buf| read_buffer(buf, data))
.map(|buf| read_buffer(buf, data, compression_codec))
.collect();
node_index += 1;
buffer_index += 1;
Expand All @@ -128,6 +180,7 @@ fn create_array(
dictionaries_by_id,
node_index,
buffer_index,
compression_codec
)?;
node_index = triple.1;
buffer_index = triple.2;
Expand All @@ -136,7 +189,7 @@ fn create_array(
}
Struct(struct_fields) => {
let struct_node = &nodes[node_index];
let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data);
let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data, compression_codec);
node_index += 1;
buffer_index += 1;

Expand All @@ -153,6 +206,7 @@ fn create_array(
dictionaries_by_id,
node_index,
buffer_index,
compression_codec
)?;
node_index = triple.1;
buffer_index = triple.2;
Expand All @@ -172,7 +226,7 @@ fn create_array(
let index_node = &nodes[node_index];
let index_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.map(|buf| read_buffer(buf, data, compression_codec))
.collect();

let dict_id = field.dict_id().ok_or_else(|| {
Expand Down Expand Up @@ -202,13 +256,13 @@ fn create_array(
let len = union_node.length() as usize;

let type_ids: Buffer =
read_buffer(&buffers[buffer_index], data)[..len].into();
read_buffer(&buffers[buffer_index], data, compression_codec)[..len].into();

buffer_index += 1;

let value_offsets = match mode {
UnionMode::Dense => {
let buffer = read_buffer(&buffers[buffer_index], data);
let buffer = read_buffer(&buffers[buffer_index], data, compression_codec);
buffer_index += 1;
Some(buffer[..len * 4].into())
}
Expand All @@ -226,6 +280,7 @@ fn create_array(
dictionaries_by_id,
node_index,
buffer_index,
compression_codec
)?;

node_index = triple.1;
Expand Down Expand Up @@ -264,7 +319,7 @@ fn create_array(
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.map(|buf| read_buffer(buf, data, compression_codec))
.collect(),
);
node_index += 1;
Expand Down Expand Up @@ -589,6 +644,26 @@ pub fn read_record_batch(
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;
// TODO check the compression body logical
let compression_codec = match batch.compression() {
None => {
CompressionCodecType::NoCompression
}
Some(compression) => {
match compression.codec() {
CompressionType::ZSTD => {
CompressionCodecType::ZSTD
},
CompressionType::LZ4_FRAME => {
CompressionCodecType::Lz4Frame
}
_ => {
CompressionCodecType::NoCompression
}
}
}
};

// keep track of buffer and node index, the functions that create arrays mutate these
let mut buffer_index = 0;
let mut node_index = 0;
Expand All @@ -607,6 +682,7 @@ pub fn read_record_batch(
dictionaries_by_id,
node_index,
buffer_index,
&compression_codec,
)?;
node_index = triple.1;
buffer_index = triple.2;
Expand Down Expand Up @@ -640,6 +716,7 @@ pub fn read_record_batch(
dictionaries_by_id,
node_index,
buffer_index,
&compression_codec,
)?;
node_index = triple.1;
buffer_index = triple.2;
Expand Down

0 comments on commit 7dbd4d6

Please sign in to comment.