Skip to content

Commit

Permalink
Faster parquet DictEncoder (~20%) (#2123)
Browse files Browse the repository at this point in the history
* Faster parquet DictEncoder

* Reserve dictionary capacity

* Split out interner

* Fix RAT
  • Loading branch information
tustvold committed Jul 29, 2022
1 parent 985760f commit 6ce4c4e
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 386 deletions.
2 changes: 2 additions & 0 deletions parquet/Cargo.toml
Expand Up @@ -30,6 +30,7 @@ edition = "2021"
rust-version = "1.62"

[dependencies]
ahash = "0.7"
parquet-format = { version = "4.0.0", default-features = false }
bytes = { version = "1.1", default-features = false, features = ["std"] }
byteorder = { version = "1", default-features = false }
Expand All @@ -49,6 +50,7 @@ serde_json = { version = "1.0", default-features = false, features = ["std"], op
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] }
hashbrown = { version = "0.12", default-features = false }

[dev-dependencies]
base64 = { version = "0.13", default-features = false, features = ["std"] }
Expand Down
185 changes: 185 additions & 0 deletions parquet/src/encodings/encoding/dict_encoder.rs
@@ -0,0 +1,185 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// ----------------------------------------------------------------------
// Dictionary encoding

use crate::basic::{Encoding, Type};
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, DataType};
use crate::encodings::encoding::{Encoder, PlainEncoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use crate::util::memory::ByteBufferPtr;
use std::io::Write;

#[derive(Debug)]
struct KeyStorage<T: DataType> {
uniques: Vec<T::T>,

size_in_bytes: usize,

type_length: usize,
}

impl<T: DataType> Storage for KeyStorage<T> {
type Key = u64;
type Value = T::T;

fn get(&self, idx: Self::Key) -> &Self::Value {
&self.uniques[idx as usize]
}

fn push(&mut self, value: &Self::Value) -> Self::Key {
let (base_size, num_elements) = value.dict_encoding_size();

let unique_size = match T::get_physical_type() {
Type::BYTE_ARRAY => base_size + num_elements,
Type::FIXED_LEN_BYTE_ARRAY => self.type_length,
_ => base_size,
};
self.size_in_bytes += unique_size;

let key = self.uniques.len() as u64;
self.uniques.push(value.clone());
key
}
}

/// Dictionary encoder.
/// The dictionary encoding builds a dictionary of values encountered in a given column.
/// The dictionary page is written first, before the data pages of the column chunk.
///
/// Dictionary page format: the entries in the dictionary - in dictionary order -
/// using the plain encoding.
///
/// Data page format: the bit width used to encode the entry ids stored as 1 byte
/// (max bit width = 32), followed by the values encoded using RLE/Bit packed described
/// above (with the given bit width).
pub struct DictEncoder<T: DataType> {
/// Descriptor for the column to be encoded.
desc: ColumnDescPtr,

interner: Interner<KeyStorage<T>>,

/// The buffered indices
indices: Vec<u64>,
}

impl<T: DataType> DictEncoder<T> {
/// Creates new dictionary encoder.
pub fn new(desc: ColumnDescPtr) -> Self {
let storage = KeyStorage {
uniques: vec![],
size_in_bytes: 0,
type_length: desc.type_length() as usize,
};

Self {
desc,
interner: Interner::new(storage),
indices: vec![],
}
}

/// Returns true if dictionary entries are sorted, false otherwise.
pub fn is_sorted(&self) -> bool {
// Sorting is not supported currently.
false
}

/// Returns number of unique values (keys) in the dictionary.
pub fn num_entries(&self) -> usize {
self.interner.storage().uniques.len()
}

/// Returns size of unique values (keys) in the dictionary, in bytes.
pub fn dict_encoded_size(&self) -> usize {
self.interner.storage().size_in_bytes
}

/// Writes out the dictionary values with PLAIN encoding in a byte buffer, and return
/// the result.
pub fn write_dict(&self) -> Result<ByteBufferPtr> {
let mut plain_encoder = PlainEncoder::<T>::new(self.desc.clone(), vec![]);
plain_encoder.put(&self.interner.storage().uniques)?;
plain_encoder.flush_buffer()
}

/// Writes out the dictionary values with RLE encoding in a byte buffer, and return
/// the result.
pub fn write_indices(&mut self) -> Result<ByteBufferPtr> {
let buffer_len = self.estimated_data_encoded_size();
let mut buffer = vec![0; buffer_len];
buffer[0] = self.bit_width() as u8;

// Write bit width in the first byte
buffer.write_all((self.bit_width() as u8).as_bytes())?;
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer, 1);
for index in &self.indices {
if !encoder.put(*index as u64)? {
return Err(general_err!("Encoder doesn't have enough space"));
}
}
self.indices.clear();
Ok(ByteBufferPtr::new(encoder.consume()?))
}

fn put_one(&mut self, value: &T::T) {
self.indices.push(self.interner.intern(value));
}

#[inline]
fn bit_width(&self) -> u8 {
let num_entries = self.num_entries();
if num_entries <= 1 {
num_entries as u8
} else {
num_required_bits(num_entries as u64 - 1)
}
}
}

impl<T: DataType> Encoder<T> for DictEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
self.indices.reserve(values.len());
for i in values {
self.put_one(i)
}
Ok(())
}

// Performance Note:
// As far as can be seen these functions are rarely called and as such we can hint to the
// compiler that they dont need to be folded into hot locations in the final output.
fn encoding(&self) -> Encoding {
Encoding::PLAIN_DICTIONARY
}

fn estimated_data_encoded_size(&self) -> usize {
let bit_width = self.bit_width();
1 + RleEncoder::min_buffer_size(bit_width)
+ RleEncoder::max_buffer_size(bit_width, self.indices.len())
}

fn flush_buffer(&mut self) -> Result<ByteBufferPtr> {
self.write_indices()
}
}

0 comments on commit 6ce4c4e

Please sign in to comment.