Skip to content

Commit

Permalink
chore(cubestore): reading skecthes from HLL Storage Specification
Browse files Browse the repository at this point in the history
See https://github.com/aggregateknowledge/hll-storage-spec for the
binary format specification.

While here, also update invalid constant (`MAX_BUCKETS`).
  • Loading branch information
ilya-biryukov committed Sep 14, 2021
1 parent e86b3fa commit 35e1eab
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 4 deletions.
213 changes: 209 additions & 4 deletions rust/cubehll/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use crate::bias_correction;
use crate::error::HllError;
use crate::error::Result;
use crate::instance::HllInstance::{Dense, Sparse};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
use itertools::Itertools;
use serde_derive::Deserialize;
use std::cmp::max;
use std::cmp::{max, min};
use std::collections::HashSet;
use std::convert::TryInto;
use std::io::{Cursor, Read};
Expand All @@ -32,7 +32,7 @@ pub enum HllInstance {

/// Implementation of HyperLogLog compatible ported from [airlift](https://github.com/airlift/airlift/blob/master/stats/src/main/java/io/airlift/stats/cardinality/HyperLogLog.java).
/// This implementation has to be binary compatible.
pub const MAX_BUCKETS: u32 = 65546;
pub const MAX_BUCKETS: u32 = 65536;
impl HllInstance {
pub fn new(num_buckets: u32) -> Result<HllInstance> {
assert!(num_buckets <= MAX_BUCKETS);
Expand Down Expand Up @@ -95,6 +95,141 @@ impl HllInstance {
}
}

/// Reads v1 of https://github.com/aggregateknowledge/hll-storage-spec and converts it to the
/// Airlift representation of HLL. This means extra limitations on input and can produce
/// different estimates due to implementation differences.
pub fn read_hll_storage_spec(data: &[u8]) -> Result<HllInstance> {
if data.len() < 3 {
return Err(HllError::new(
"Not enough bytes to read HLL header".to_string(),
));
}
// Read HLL header first.
let v = (data[0] & 0xF0) >> 4;
if v != 1 {
return Err(HllError::new(format!(
"Unknown HLL schema version ({}), can only read version 1.",
v
)));
}
const ENC_EMPTY: u8 = 1;
const ENC_EXPLICIT: u8 = 2;
const ENC_SPARSE: u8 = 3;
const ENC_FULL: u8 = 4;
let encoding = match data[0] & 0x0F {
0 => {
return Err(HllError::new(
"Cannot read HLL with undefined encoding".to_string(),
))
}
n if 1 <= n && n <= 4 => n,
n => {
return Err(HllError::new(format!(
"Unknown HLL encoding ordinal: {}",
n
)))
}
};
let reg_width = 1 + ((data[1] & 0b11100000) >> 5);
if reg_width < 1 || 6 < reg_width {
return Err(HllError::new(format!(
"Register width must be between 1 and 6, got {}",
reg_width
)));
}
let log_num_buckets = data[1] & 0b00011111;
// Note: the upper limit in storage spec is 31, but our implementation is limited to 16.
if log_num_buckets < 4 || 16 < log_num_buckets {
return Err(HllError::new(format!(
"Log2m must be between 4 and 16, got {}",
log_num_buckets
)));
}
let num_buckets = 1u32 << log_num_buckets as u32;
let _cutoff = data[2]; // Not used.

// Read the HLL body.
let data = &data[3..];
match encoding {
ENC_EMPTY => {
if !data.is_empty() {
return Err(HllError::new(format!(
"HLL with encdoing EMPTY has data {} bytes",
data.len()
)));
}
return HllInstance::new(num_buckets);
}
ENC_EXPLICIT => {
if data.len() % 8 != 0 {
return Err(HllError::new(format!(
"Size of EXPLICIT encoding is not a multiple of 8: {}",
data.len()
)));
}
let num_hashes = data.len() / 8;
if 256 * 8 < data.len() {
return Err(HllError::new(format!(
"EXPLICIT encoding has {} hashes, expected no more than 256 ",
num_hashes
)));
}
// Convert to sparse representation in Airlift.
let mut indices = Vec::with_capacity(num_hashes);
let mut values = Vec::with_capacity(num_hashes);
let mut data = Cursor::new(data);
while !data.is_empty() {
let hash = data.read_u64::<BigEndian>().unwrap();
indices.push(compute_index(hash, log_num_buckets));
values.push(compute_value(hash, log_num_buckets));
}

return Ok(HllInstance::Sparse(SparseHll::new_from_indices_and_values(
log_num_buckets,
indices,
&values,
)?));
}
ENC_SPARSE => {
let mut cursor = BitCursor::new(data);
let entry_len = (log_num_buckets + reg_width) as usize;
let mut indices = Vec::new();
let mut values = Vec::new();
while let Some(e) = cursor.read_bits(entry_len) {
indices.push((e >> reg_width) as u32);
values.push((e & ((1 << reg_width) - 1)) as u8);
}
return Ok(HllInstance::Sparse(SparseHll::new_from_indices_and_values(
log_num_buckets,
indices,
&values,
)?));
}
ENC_FULL => {
let expected_bits = num_buckets * reg_width as u32;
let expected_len = expected_bits / 8 + (expected_bits % 8 != 0) as u32;
if data.len() != expected_len as usize {
return Err(HllError::new(format!(
"Expected {} data bytes for encoding FULL with log2m={}, got {}",
expected_len,
log_num_buckets,
data.len()
)));
}
let mut values = Vec::with_capacity(num_buckets as usize);
let mut cursor = BitCursor::new(data);
for _ in 0..num_buckets {
values.push(cursor.read_bits(reg_width as usize).unwrap() as u8)
}
return Ok(HllInstance::Dense(DenseHll::new_from_entries(
log_num_buckets,
values,
)?));
}
enc => panic!("Unhandled encoding ordinal {}", enc),
}
}

pub fn read_snowflake(s: &str) -> Result<HllInstance> {
#[derive(Deserialize)]
struct SerializedHll {
Expand Down Expand Up @@ -219,16 +354,22 @@ impl SparseHll {
if values.len() != indices.len() {
return Err(HllError::new("values and indices are or different lengths"));
}

// Turn indices into the entries array inplace.
let mut entries = indices;
for i in 0..values.len() {
for i in 0..entries.len() {
// TODO: validate range of index values.
// High bits are bucket index, followed by zeros and the actual value.
// Airlift stores actual bits of the hash in the available bits, but inputs of this
// function do not have this information, so we set those bits to 0.
let bucket = entries[i];
entries[i] = (bucket << (32 - index_bit_len)) | (values[i] as u32);
}

// Sort by bucket index.
entries
.sort_unstable_by(|l, r| (l >> (32 - index_bit_len)).cmp(&(r >> (32 - index_bit_len))));

Ok(SparseHll {
index_bit_len,
entries,
Expand Down Expand Up @@ -1025,6 +1166,47 @@ const TAG_DENSE_V1: u8 = 1;
const TAG_SPARSE_V2: u8 = 2;
const TAG_DENSE_V2: u8 = 3;

struct BitCursor<'a> {
input: &'a [u8],
pos: usize,
bit_pos: usize,
}

impl BitCursor<'_> {
pub fn new(input: &[u8]) -> BitCursor {
BitCursor {
input,
pos: 0,
bit_pos: 0,
}
}

/// This will return [None] if we managed to read less than [num_bits] and hit the end of the
/// input buffer. This allows to ignore padding bytes.
pub fn read_bits(&mut self, mut num_bits: usize) -> Option<u64> {
debug_assert!(num_bits <= 64);
let mut r = 0;
while num_bits != 0 {
if self.pos == self.input.len() {
return None;
}
let read_bits = min(num_bits, 8 - self.bit_pos);

let high_zeros = (1u64 << (8 - self.bit_pos)) - 1;
let b = (self.input[self.pos] as u64 & high_zeros) >> (8 - self.bit_pos - read_bits);
r |= (b as u64) << (num_bits - read_bits);
num_bits -= read_bits;

self.bit_pos += read_bits;
if self.bit_pos == 8 {
self.pos += 1;
self.bit_pos = 0;
}
}
return Some(r);
}
}

#[cfg(test)]
mod tests {
use crate::instance::{compute_index, compute_value, number_of_buckets};
Expand Down Expand Up @@ -1122,6 +1304,29 @@ mod tests {
HllInstance::read_snowflake(r#"{ "precision": 1, "dense": [0, 0], "sparse": {"indices":[], "maxLzCounts":[]}, "version": 4 }"#).unwrap_err();
HllInstance::read_snowflake(r#"{ "precision": 1, "version": 4 }"#).unwrap_err();
}

#[test]
fn test_hll_storage_spec() {
let read = |s: &str| HllInstance::read_hll_storage_spec(&hex::decode(s).unwrap());

// Empty encoding.
let h = read("118b7f").unwrap();
assert_eq!(h.index_bit_len(), 11);
assert_eq!(h.cardinality(), 0);

// Explicit encoding, 1 value.
let h = read("128b7fee22c470691a8134").unwrap();
assert_eq!(h.cardinality(), 1);

// Sparse encoding, 169 values.
// TODO: the estimate is off, fix calculation in sparse mode.
let h = read("138b7f04a10642078507c308e309230a420ac10c2510a2114511611363138116811848188218a119411a821ae11f0122e223a125a126632685276327a328e2296129e52b812fe23081320132c133e335a53641368236a23721374237e1382138e13a813c243e6140e341854304434148a24a034f8150c1520152e254e155a1564157e158e35ac25b265b615c615fc1620166a368226a416a626c016c816d677163728275817a637a817ac37b617c247c427d677f6180e18101826382e1846184e18541858287e1880189218a418b818bc38e018ea290a19244938295e4988198c299e29b239b419c419ce49da1a1e1a321a381a4c1aa61acc2ae01b0a1b101b142b161b443b801bd02bd61bf61c263c4a3c501c7a1caa1cb03cd03cf03cf42d123d4c3d662d744d901dd01df81e001e0a2e641e7e3edc1f0a2f1c1f203f484f5c4f763fc84fdc1fe02fea1").unwrap();
assert_eq!(h.cardinality(), 164);

// Full (dense) encoding, 10k values.
let h = read("148b7f21083288a4320a12086719c65108c1088422884511063388232904418c8520484184862886528c65198832106328c83114e6214831108518d03208851948511884188441908119083388661842818c43190c320ce4210a50948221083084a421c8328c632104221c4120d01284e20902318ca5214641942319101294641906228483184e128c43188e308882204a538c8328903288642102220c64094631086330c832106320c46118443886329062118a230c63108a320c23204a11852419c6528c85210a318c6308c41088842086308ce7110a418864190650884210ca631064108642a1022186518c8509862109020a0a4318671144150842400e5090631a0811848320c821888120c81114a220880290622906310d0220c83090a118c433106128c221902210cc23106029044114841104409862190c43188111063104c310c6728c8618c62290441102310c23214440882438ca2110a32908548c432110329462188a43946328842114640944320884190c928c442084228863318a2190a318c6618ca3114651886618c44190c5108e2110612144319062284641908428882314862106419883310421988619ca420cc511442104633888218c4428465288651910730c81118821088218c6418c45108452106519ce410d841904218863308622086211483198c710c83104a328c620906218864118623086418c8711423094632186420c4620c41104620a441108e40882628c6311c212046428c8319021104672888428ca320c431984418c4209043084451886510c641108310c4c20c66188472146310ca71084820c621946218c8228822190e2410861904411c27288621144328c6440c6311063190813086228ca710c2218c4718865188c2114850888608864404a3194e22882310ce53088619ca31904519503188e1118c4214cb2948110c6119c2818c843108520c43188c5204821186528c871908311086214c630c4218c8418cc3298a31888210c63110a121042198622886531082098c419c4210c6210c8338c25294610944518c442104610884104424206310c8311462288873102308c2440c451082228824310440982220c4240c622084310c642850118c641148430d0128c8228c2120c221884428863208c21a0a4190a4404c21186548865204633906308ca32086211c8319ce22146520c6120803318a518c840084519461208c21908538cc428c2110844384e40906320c44014a3204e62042408c8328c632146318c812004310c41318e3208a5308a511827104a4188c51048421446090a7088631102231484104473084318c41210860906919083190652906129c4628c45310652848221443114420084500865184a618c81198c32906418c63190e320c231882728484184671888309465188a320c83208632144318c6331c642988108c61218812144328d022844021022184a31908328c6218c2328c4528cc541428190641046418c84108443146230c6419483214232184411863290a210824318c220868194631106618c43188821048230c4128c6310c0330462094241106330c42188c321043118863046438823110a041464108e3190e4209a11902439c43188631104321008090441106218c6419064294a229463594622244320cc71184510902924421908218c62308641044328ca328882111012884120ca52882428c62184442086718c4221c8211082208a321023115270086218c4218c6528ce400482310a520c43104a520c44210811884118c4310864198263942331822").unwrap();
assert_eq!(h.cardinality(), 9722);
}
}

mod dense {
Expand Down
1 change: 1 addition & 0 deletions rust/cubehll/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(cursor_remaining)]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down

0 comments on commit 35e1eab

Please sign in to comment.