Skip to content

Commit

Permalink
Split out interner
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jul 28, 2022
1 parent e9b527a commit 8be38af
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 64 deletions.
111 changes: 48 additions & 63 deletions parquet/src/encodings/encoding/dict_encoder.rs
Expand Up @@ -19,18 +19,49 @@
// Dictionary encoding

use crate::basic::{Encoding, Type};
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, DataType};
use crate::encodings::encoding::{Encoder, PlainEncoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use crate::util::memory::ByteBufferPtr;
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use std::hash::{BuildHasher, Hash, Hasher};
use std::io::Write;
use crate::data_type::private::ParquetValueType;

#[derive(Debug)]
struct KeyStorage<T: DataType> {
uniques: Vec<T::T>,

size_in_bytes: usize,

type_length: usize,
}

impl<T: DataType> Storage for KeyStorage<T> {
type Key = u64;
type Value = T::T;

fn get(&self, idx: Self::Key) -> &Self::Value {
&self.uniques[idx as usize]
}

fn push(&mut self, value: &Self::Value) -> Self::Key {
let (base_size, num_elements) = value.dict_encoding_size();

let unique_size = match T::get_physical_type() {
Type::BYTE_ARRAY => base_size + num_elements,
Type::FIXED_LEN_BYTE_ARRAY => self.type_length,
_ => base_size,
};
self.size_in_bytes += unique_size;

let key = self.uniques.len() as u64;
self.uniques.push(value.clone());
key
}
}

/// Dictionary encoder.
/// The dictionary encoding builds a dictionary of values encountered in a given column.
Expand All @@ -46,35 +77,25 @@ pub struct DictEncoder<T: DataType> {
/// Descriptor for the column to be encoded.
desc: ColumnDescPtr,

state: ahash::RandomState,

/// Used to provide a lookup from value to unique value
///
/// Note: `u64`'s hash implementation is not used, instead the raw entry
/// API is used to store keys w.r.t the hash of the strings themselves
///
dedup: HashMap<u64, (), ()>,

/// The unique observed values.
uniques: Vec<T::T>,
interner: Interner<KeyStorage<T>>,

/// The buffered indices
indices: Vec<u64>,

/// Size in bytes needed to encode this dictionary.
uniques_size_in_bytes: usize,
}

impl<T: DataType> DictEncoder<T> {
/// Creates new dictionary encoder.
pub fn new(desc: ColumnDescPtr) -> Self {
let storage = KeyStorage {
uniques: vec![],
size_in_bytes: 0,
type_length: desc.type_length() as usize,
};

Self {
desc,
state: Default::default(),
dedup: HashMap::with_hasher(()),
uniques: vec![],
interner: Interner::new(storage),
indices: vec![],
uniques_size_in_bytes: 0,
}
}

Expand All @@ -86,19 +107,19 @@ impl<T: DataType> DictEncoder<T> {

/// Returns number of unique values (keys) in the dictionary.
pub fn num_entries(&self) -> usize {
self.uniques.len()
self.interner.storage().uniques.len()
}

/// Returns size of unique values (keys) in the dictionary, in bytes.
pub fn dict_encoded_size(&self) -> usize {
self.uniques_size_in_bytes
self.interner.storage().size_in_bytes
}

/// Writes out the dictionary values with PLAIN encoding in a byte buffer, and return
/// the result.
pub fn write_dict(&self) -> Result<ByteBufferPtr> {
let mut plain_encoder = PlainEncoder::<T>::new(self.desc.clone(), vec![]);
plain_encoder.put(&self.uniques)?;
plain_encoder.put(&self.interner.storage().uniques)?;
plain_encoder.flush_buffer()
}

Expand All @@ -121,49 +142,13 @@ impl<T: DataType> DictEncoder<T> {
Ok(ByteBufferPtr::new(encoder.consume()?))
}

fn compute_hash(state: &ahash::RandomState, value: &T::T) -> u64 {
let mut hasher = state.build_hasher();
value.as_bytes().hash(&mut hasher);
hasher.finish()
}

fn put_one(&mut self, value: &T::T) {
let hash = Self::compute_hash(&self.state, value);

let entry = self
.dedup
.raw_entry_mut()
.from_hash(hash, |index| value == &self.uniques[*index as usize]);

let index = match entry {
RawEntryMut::Occupied(entry) => *entry.into_key(),
RawEntryMut::Vacant(entry) => {
let index = self.uniques.len() as u64;
self.uniques.push(value.clone());

let (base_size, num_elements) = value.dict_encoding_size();

let unique_size = match T::get_physical_type() {
Type::BYTE_ARRAY => base_size + num_elements,
Type::FIXED_LEN_BYTE_ARRAY => self.desc.type_length() as usize,
_ => base_size,
};
self.uniques_size_in_bytes += unique_size;

*entry
.insert_with_hasher(hash, index, (), |index| {
Self::compute_hash(&self.state, &self.uniques[*index as usize])
})
.0
}
};

self.indices.push(index);
self.indices.push(self.interner.intern(value));
}

#[inline]
fn bit_width(&self) -> u8 {
let num_entries = self.uniques.len();
let num_entries = self.num_entries();
if num_entries <= 1 {
num_entries as u8
} else {
Expand Down
77 changes: 77 additions & 0 deletions parquet/src/util/interner.rs
@@ -0,0 +1,77 @@
use crate::data_type::AsBytes;
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use std::hash::Hash;

/// Storage trait for [`Interner`]
pub trait Storage {
type Key: Copy;

type Value: AsBytes + PartialEq + ?Sized;

/// Gets an element by its key
fn get(&self, idx: Self::Key) -> &Self::Value;

/// Adds a new element, returning the key
fn push(&mut self, value: &Self::Value) -> Self::Key;
}

/// A generic value interner supporting various different [`Storage`]
pub struct Interner<S: Storage> {
state: ahash::RandomState,

/// Used to provide a lookup from value to unique value
///
/// Note: `S::Key`'s hash implementation is not used, instead the raw entry
/// API is used to store keys w.r.t the hash of the strings themselves
///
dedup: HashMap<S::Key, (), ()>,

storage: S,
}

impl<S: Storage> Interner<S> {
/// Create a new `Interner` with the provided storage
pub fn new(storage: S) -> Self {
Self {
state: Default::default(),
dedup: Default::default(),
storage,
}
}

/// Intern the value, returning the interned key, and if this was a new value
pub fn intern(&mut self, value: &S::Value) -> S::Key {
let hash = compute_hash(&self.state, value);

let entry = self
.dedup
.raw_entry_mut()
.from_hash(hash, |index| value == self.storage.get(*index));

match entry {
RawEntryMut::Occupied(entry) => *entry.into_key(),
RawEntryMut::Vacant(entry) => {
let key = self.storage.push(value);

*entry
.insert_with_hasher(hash, key, (), |key| {
compute_hash(&self.state, self.storage.get(*key))
})
.0
}
}
}

/// Returns the storage for this interner
pub fn storage(&self) -> &S {
&self.storage
}
}

fn compute_hash<T: AsBytes + ?Sized>(state: &ahash::RandomState, value: &T) -> u64 {
use std::hash::{BuildHasher, Hasher};
let mut hasher = state.build_hasher();
value.as_bytes().hash(&mut hasher);
hasher.finish()
}
3 changes: 2 additions & 1 deletion parquet/src/util/mod.rs
Expand Up @@ -21,9 +21,10 @@ pub mod memory;
pub mod bit_util;
mod bit_packing;
pub mod cursor;
pub(crate) mod interner;
pub(crate) mod page_util;
#[cfg(any(test, feature = "test_common"))]
pub(crate) mod test_common;
pub(crate)mod page_util;

#[cfg(any(test, feature = "test_common"))]
pub use self::test_common::page_util::{
Expand Down

0 comments on commit 8be38af

Please sign in to comment.