From 8be38af9ad1c26c921d5de5082e490b76c3bc1ed Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 28 Jul 2022 15:52:28 +0100 Subject: [PATCH] Split out interner --- .../src/encodings/encoding/dict_encoder.rs | 111 ++++++++---------- parquet/src/util/interner.rs | 77 ++++++++++++ parquet/src/util/mod.rs | 3 +- 3 files changed, 127 insertions(+), 64 deletions(-) create mode 100644 parquet/src/util/interner.rs diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 6185406f97b..7bf98325466 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -19,18 +19,49 @@ // Dictionary encoding use crate::basic::{Encoding, Type}; +use crate::data_type::private::ParquetValueType; use crate::data_type::{AsBytes, DataType}; use crate::encodings::encoding::{Encoder, PlainEncoder}; use crate::encodings::rle::RleEncoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; +use crate::util::interner::{Interner, Storage}; use crate::util::memory::ByteBufferPtr; -use hashbrown::hash_map::RawEntryMut; -use hashbrown::HashMap; -use std::hash::{BuildHasher, Hash, Hasher}; use std::io::Write; -use crate::data_type::private::ParquetValueType; + +#[derive(Debug)] +struct KeyStorage { + uniques: Vec, + + size_in_bytes: usize, + + type_length: usize, +} + +impl Storage for KeyStorage { + type Key = u64; + type Value = T::T; + + fn get(&self, idx: Self::Key) -> &Self::Value { + &self.uniques[idx as usize] + } + + fn push(&mut self, value: &Self::Value) -> Self::Key { + let (base_size, num_elements) = value.dict_encoding_size(); + + let unique_size = match T::get_physical_type() { + Type::BYTE_ARRAY => base_size + num_elements, + Type::FIXED_LEN_BYTE_ARRAY => self.type_length, + _ => base_size, + }; + self.size_in_bytes += unique_size; + + let key = self.uniques.len() as u64; + self.uniques.push(value.clone()); + key + } +} /// Dictionary encoder. /// The dictionary encoding builds a dictionary of values encountered in a given column. @@ -46,35 +77,25 @@ pub struct DictEncoder { /// Descriptor for the column to be encoded. desc: ColumnDescPtr, - state: ahash::RandomState, - - /// Used to provide a lookup from value to unique value - /// - /// Note: `u64`'s hash implementation is not used, instead the raw entry - /// API is used to store keys w.r.t the hash of the strings themselves - /// - dedup: HashMap, - - /// The unique observed values. - uniques: Vec, + interner: Interner>, /// The buffered indices indices: Vec, - - /// Size in bytes needed to encode this dictionary. - uniques_size_in_bytes: usize, } impl DictEncoder { /// Creates new dictionary encoder. pub fn new(desc: ColumnDescPtr) -> Self { + let storage = KeyStorage { + uniques: vec![], + size_in_bytes: 0, + type_length: desc.type_length() as usize, + }; + Self { desc, - state: Default::default(), - dedup: HashMap::with_hasher(()), - uniques: vec![], + interner: Interner::new(storage), indices: vec![], - uniques_size_in_bytes: 0, } } @@ -86,19 +107,19 @@ impl DictEncoder { /// Returns number of unique values (keys) in the dictionary. pub fn num_entries(&self) -> usize { - self.uniques.len() + self.interner.storage().uniques.len() } /// Returns size of unique values (keys) in the dictionary, in bytes. pub fn dict_encoded_size(&self) -> usize { - self.uniques_size_in_bytes + self.interner.storage().size_in_bytes } /// Writes out the dictionary values with PLAIN encoding in a byte buffer, and return /// the result. pub fn write_dict(&self) -> Result { let mut plain_encoder = PlainEncoder::::new(self.desc.clone(), vec![]); - plain_encoder.put(&self.uniques)?; + plain_encoder.put(&self.interner.storage().uniques)?; plain_encoder.flush_buffer() } @@ -121,49 +142,13 @@ impl DictEncoder { Ok(ByteBufferPtr::new(encoder.consume()?)) } - fn compute_hash(state: &ahash::RandomState, value: &T::T) -> u64 { - let mut hasher = state.build_hasher(); - value.as_bytes().hash(&mut hasher); - hasher.finish() - } - fn put_one(&mut self, value: &T::T) { - let hash = Self::compute_hash(&self.state, value); - - let entry = self - .dedup - .raw_entry_mut() - .from_hash(hash, |index| value == &self.uniques[*index as usize]); - - let index = match entry { - RawEntryMut::Occupied(entry) => *entry.into_key(), - RawEntryMut::Vacant(entry) => { - let index = self.uniques.len() as u64; - self.uniques.push(value.clone()); - - let (base_size, num_elements) = value.dict_encoding_size(); - - let unique_size = match T::get_physical_type() { - Type::BYTE_ARRAY => base_size + num_elements, - Type::FIXED_LEN_BYTE_ARRAY => self.desc.type_length() as usize, - _ => base_size, - }; - self.uniques_size_in_bytes += unique_size; - - *entry - .insert_with_hasher(hash, index, (), |index| { - Self::compute_hash(&self.state, &self.uniques[*index as usize]) - }) - .0 - } - }; - - self.indices.push(index); + self.indices.push(self.interner.intern(value)); } #[inline] fn bit_width(&self) -> u8 { - let num_entries = self.uniques.len(); + let num_entries = self.num_entries(); if num_entries <= 1 { num_entries as u8 } else { diff --git a/parquet/src/util/interner.rs b/parquet/src/util/interner.rs new file mode 100644 index 00000000000..ad206aaa08e --- /dev/null +++ b/parquet/src/util/interner.rs @@ -0,0 +1,77 @@ +use crate::data_type::AsBytes; +use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; +use std::hash::Hash; + +/// Storage trait for [`Interner`] +pub trait Storage { + type Key: Copy; + + type Value: AsBytes + PartialEq + ?Sized; + + /// Gets an element by its key + fn get(&self, idx: Self::Key) -> &Self::Value; + + /// Adds a new element, returning the key + fn push(&mut self, value: &Self::Value) -> Self::Key; +} + +/// A generic value interner supporting various different [`Storage`] +pub struct Interner { + state: ahash::RandomState, + + /// Used to provide a lookup from value to unique value + /// + /// Note: `S::Key`'s hash implementation is not used, instead the raw entry + /// API is used to store keys w.r.t the hash of the strings themselves + /// + dedup: HashMap, + + storage: S, +} + +impl Interner { + /// Create a new `Interner` with the provided storage + pub fn new(storage: S) -> Self { + Self { + state: Default::default(), + dedup: Default::default(), + storage, + } + } + + /// Intern the value, returning the interned key, and if this was a new value + pub fn intern(&mut self, value: &S::Value) -> S::Key { + let hash = compute_hash(&self.state, value); + + let entry = self + .dedup + .raw_entry_mut() + .from_hash(hash, |index| value == self.storage.get(*index)); + + match entry { + RawEntryMut::Occupied(entry) => *entry.into_key(), + RawEntryMut::Vacant(entry) => { + let key = self.storage.push(value); + + *entry + .insert_with_hasher(hash, key, (), |key| { + compute_hash(&self.state, self.storage.get(*key)) + }) + .0 + } + } + } + + /// Returns the storage for this interner + pub fn storage(&self) -> &S { + &self.storage + } +} + +fn compute_hash(state: &ahash::RandomState, value: &T) -> u64 { + use std::hash::{BuildHasher, Hasher}; + let mut hasher = state.build_hasher(); + value.as_bytes().hash(&mut hasher); + hasher.finish() +} diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index 96644822489..01ac39116dc 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -21,9 +21,10 @@ pub mod memory; pub mod bit_util; mod bit_packing; pub mod cursor; +pub(crate) mod interner; +pub(crate) mod page_util; #[cfg(any(test, feature = "test_common"))] pub(crate) mod test_common; -pub(crate)mod page_util; #[cfg(any(test, feature = "test_common"))] pub use self::test_common::page_util::{