Skip to content

Commit

Permalink
support compression for IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Jun 13, 2022
1 parent 486118c commit e79ecf8
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 0 deletions.
2 changes: 2 additions & 0 deletions arrow/Cargo.toml
Expand Up @@ -38,6 +38,8 @@ path = "src/lib.rs"
bench = false

[dependencies]
lz4 = { version = "1.23", optional = true }
zstd = { version = "0.11.1", optional = true, default-features = false }
serde = { version = "1.0" }
serde_derive = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
Expand Down
16 changes: 16 additions & 0 deletions arrow/src/ipc/compression/compression.rs
@@ -0,0 +1,16 @@

pub fn lz4_compress() {

}

pub fn lz4_decompress() {

}

pub fn zstd_compress() {

}

pub fn zstd_decompress() {

}
1 change: 1 addition & 0 deletions arrow/src/ipc/compression/mod.rs
@@ -0,0 +1 @@
mod compression;
1 change: 1 addition & 0 deletions arrow/src/ipc/mod.rs
Expand Up @@ -29,6 +29,7 @@ pub mod writer;
#[allow(clippy::redundant_field_names)]
#[allow(non_camel_case_types)]
pub mod gen;
pub mod compression;

pub use self::gen::File::*;
pub use self::gen::Message::*;
Expand Down
12 changes: 12 additions & 0 deletions arrow/src/ipc/writer.rs
Expand Up @@ -37,6 +37,7 @@ use crate::record_batch::RecordBatch;
use crate::util::bit_util;

use ipc::CONTINUATION_MARKER;
use crate::ipc::{BodyCompressionMethod, CompressionType};

/// IPC write options used to control the behaviour of the writer
#[derive(Debug, Clone)]
Expand All @@ -55,6 +56,8 @@ pub struct IpcWriteOptions {
/// version 2.0.0: V4, with legacy format enabled
/// version 4.0.0: V5
metadata_version: ipc::MetadataVersion,
/// TODO comments
batch_compression_type: CompressionType
}

impl IpcWriteOptions {
Expand Down Expand Up @@ -91,6 +94,7 @@ impl IpcWriteOptions {
alignment,
write_legacy_ipc_format,
metadata_version,
batch_compression_type: CompressionType::NO_COMPRESSION,
})
}
}
Expand All @@ -105,6 +109,7 @@ impl Default for IpcWriteOptions {
alignment: 8,
write_legacy_ipc_format: false,
metadata_version: ipc::MetadataVersion::V5,
batch_compression_type: CompressionType::NO_COMPRESSION,
}
}
}
Expand Down Expand Up @@ -326,6 +331,7 @@ impl IpcDataGenerator {
let mut buffers: Vec<ipc::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];
let mut offset = 0;
// TODO: add compression type
for array in batch.columns() {
let array_data = array.data();
offset = write_array_data(
Expand All @@ -342,12 +348,18 @@ impl IpcDataGenerator {
// write data
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
// TODO check the compression type
let mut bodyCompressionBuilder = ipc::BodyCompressionBuilder::new(&mut fbb);
bodyCompressionBuilder.add_method(BodyCompressionMethod::BUFFER);
bodyCompressionBuilder.add_codec(CompressionType::LZ4_FRAME);

let root = {
let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
batch_builder.add_length(batch.num_rows() as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
// TODO check the compression type
batch_builder.add_compression(bodyCompressionBuilder.finish());
let b = batch_builder.finish();
b.as_union_value()
};
Expand Down

0 comments on commit e79ecf8

Please sign in to comment.