Skip to content

Commit

Permalink
feat(cubestore): HyperLogLog++ support for BigQuery (#1872)
Browse files Browse the repository at this point in the history
* chore(cubestore): port ZetaSketch to Rust

This is a direct port of the
[ZetaSketch](https://github.com/google/zetasketch) library.

See README.md for details.

* feat(cubestore): Support HyperLogLog++ sketches

This allows to consume HyperLogLog sketches produced by BigQuery.
  • Loading branch information
ilya-biryukov committed Jan 25, 2021
1 parent 74ffacd commit 357ecef
Show file tree
Hide file tree
Showing 27 changed files with 2,636 additions and 27 deletions.
15 changes: 15 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Expand Up @@ -2,6 +2,7 @@
members = [
"cubestore",
"cubehll",
"cubezetasketch",
]

[patch.crates-io]
Expand Down
1 change: 1 addition & 0 deletions rust/Dockerfile
Expand Up @@ -17,6 +17,7 @@ WORKDIR /build/cubestore
COPY Cargo.toml .
COPY Cargo.lock .
COPY cubehll cubehll
COPY cubezetasketch cubezetasketch
COPY cubestore/Cargo.toml cubestore/Cargo.toml
RUN mkdir -p cubestore/src/bin && \
echo "fn main() {print!(\"Dummy main\");} // dummy file" > cubestore/src/bin/cubestored.rs
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/Cargo.toml
Expand Up @@ -18,6 +18,7 @@ sqlparser = "0.7.0"
serde_derive = "1.0.115"
serde = "1.0.115"
cubehll = { path = "../cubehll" }
cubezetasketch = { path = "../cubezetasketch" }
parquet = { git = 'https://github.com/cube-js/arrow', branch = 'cubestore-2021-01-02', version = "3.0.0-SNAPSHOT" }
arrow = { git = 'https://github.com/cube-js/arrow', branch = 'cubestore-2021-01-02', version = "3.0.0-SNAPSHOT" }
arrow-flight = { git = 'https://github.com/cube-js/arrow', branch = 'cubestore-2021-01-02', version = "3.0.0-SNAPSHOT" }
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/src/import/mod.rs
Expand Up @@ -90,7 +90,7 @@ impl ImportFormat {
.map(|d| TableValue::Decimal(d.to_string()))
.unwrap_or(TableValue::Null),
ColumnType::Bytes => unimplemented!(),
ColumnType::HyperLogLog => unimplemented!(),
ColumnType::HyperLogLog(_) => unimplemented!(),
ColumnType::Timestamp => timestamp_from_string(value)?,
ColumnType::Float => {
TableValue::Float(value.parse::<f64>()?.to_string())
Expand Down
7 changes: 7 additions & 0 deletions rust/cubestore/src/lib.rs
Expand Up @@ -14,6 +14,7 @@ use crate::remotefs::queue::RemoteFsOpResult;
use arrow::error::ArrowError;
use core::fmt;
use cubehll::HllError;
use cubezetasketch::ZetaError;
use flexbuffers::{DeserializationError, ReaderError};
use log::SetLoggerError;
use parquet::errors::ParquetError;
Expand Down Expand Up @@ -323,6 +324,12 @@ impl From<HllError> for CubeError {
}
}

impl From<ZetaError> for CubeError {
fn from(v: ZetaError) -> Self {
return CubeError::from_error(v);
}
}

impl From<cloud_storage::Error> for CubeError {
fn from(v: cloud_storage::Error) -> Self {
return CubeError::from_error(v);
Expand Down
15 changes: 11 additions & 4 deletions rust/cubestore/src/metastore/mod.rs
Expand Up @@ -276,12 +276,18 @@ impl DataFrameValue<String> for Option<Row> {
}
}

#[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub enum HllFlavour {
Airlift, // Compatible with Presto, Athena, etc.
ZetaSketch, // Compatible with BigQuery.
}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub enum ColumnType {
String,
Int,
Bytes,
HyperLogLog, // HLL Sketches, compatible with presto.
HyperLogLog(HllFlavour), // HLL Sketches, compatible with presto.
Timestamp,
Decimal { scale: i32, precision: i32 },
Float,
Expand Down Expand Up @@ -329,7 +335,7 @@ impl From<&Column> for parquet::schema::types::Type {
.build()
.unwrap()
}
crate::metastore::ColumnType::Bytes | ColumnType::HyperLogLog => {
crate::metastore::ColumnType::Bytes | ColumnType::HyperLogLog(_) => {
types::Type::primitive_type_builder(&column.get_name(), Type::BYTE_ARRAY)
.with_logical_type(LogicalType::NONE)
.with_repetition(Repetition::OPTIONAL)
Expand Down Expand Up @@ -380,7 +386,7 @@ impl Into<Field> for Column {
DataType::Int64Decimal(self.column_type.target_scale() as usize)
}
ColumnType::Bytes => DataType::Binary,
ColumnType::HyperLogLog => DataType::Binary,
ColumnType::HyperLogLog(_) => DataType::Binary,
ColumnType::Float => DataType::Float64,
},
false,
Expand All @@ -399,7 +405,8 @@ impl fmt::Display for Column {
format!("DECIMAL({}, {})", precision, scale)
}
ColumnType::Bytes => "BYTES".to_string(),
ColumnType::HyperLogLog => "HYPERLOGLOG".to_string(),
ColumnType::HyperLogLog(HllFlavour::Airlift) => "HYPERLOGLOG".to_string(),
ColumnType::HyperLogLog(HllFlavour::ZetaSketch) => "HYPERLOGLOGPP".to_string(),
ColumnType::Float => "FLOAT".to_string(),
};
f.write_fmt(format_args!("{} {}", self.name, column_type))
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/src/mysql/mod.rs
Expand Up @@ -67,7 +67,7 @@ impl<W: io::Write + Send> AsyncMysqlShim<W> for Backend {
metastore::ColumnType::Decimal { .. } => ColumnType::MYSQL_TYPE_DECIMAL,
metastore::ColumnType::Boolean => ColumnType::MYSQL_TYPE_STRING,
metastore::ColumnType::Bytes => ColumnType::MYSQL_TYPE_STRING,
metastore::ColumnType::HyperLogLog => ColumnType::MYSQL_TYPE_STRING,
metastore::ColumnType::HyperLogLog(_) => ColumnType::MYSQL_TYPE_STRING,
metastore::ColumnType::Float => ColumnType::MYSQL_TYPE_STRING,
},
colflags: ColumnFlags::empty(),
Expand Down
61 changes: 61 additions & 0 deletions rust/cubestore/src/queryplanner/hll.rs
@@ -0,0 +1,61 @@
use crate::CubeError;
use cubehll::HllSketch;
use cubezetasketch::HyperLogLogPlusPlus;

#[derive(Debug)]
pub enum Hll {
Airlift(HllSketch), // Compatible with Athena, Presto, etc.
ZetaSketch(HyperLogLogPlusPlus), // Compatible with BigQuery.
}

impl Hll {
pub fn read(data: &[u8]) -> Result<Hll, CubeError> {
if data.is_empty() {
return Err(CubeError::internal(
"invalid serialized HLL (empty data)".to_string(),
));
}
// The first byte:
// - must larger than 3 due to how protos are encoded in ZetaSketch.
// - represents the data format version and is <= 3 in AirLift.
if data[0] <= 3 {
return Ok(Hll::Airlift(HllSketch::read(data)?));
} else {
return Ok(Hll::ZetaSketch(HyperLogLogPlusPlus::read(data)?));
}
}

pub fn write(&self) -> Vec<u8> {
match self {
Hll::Airlift(h) => h.write(),
Hll::ZetaSketch(h) => h.write(),
}
}

pub fn is_compatible(&self, other: &Hll) -> bool {
match (self, other) {
(Hll::Airlift(l), Hll::Airlift(r)) => l.index_bit_len() == r.index_bit_len(),
(Hll::ZetaSketch(l), Hll::ZetaSketch(r)) => l.is_compatible(r),
_ => return false,
}
}

pub fn cardinality(&self) -> u64 {
match self {
Hll::Airlift(h) => h.cardinality(),
Hll::ZetaSketch(h) => h.cardinality(),
}
}

/// Clients are responsible for calling `is_compatible` before running this function.
/// On error, `self` may end up in inconsistent state and must be discarded.
pub fn merge_with(&mut self, other: &Hll) -> Result<(), CubeError> {
debug_assert!(self.is_compatible(other));
match (self, other) {
(Hll::Airlift(l), Hll::Airlift(r)) => l.merge_with(r),
(Hll::ZetaSketch(l), Hll::ZetaSketch(r)) => l.merge_with(r)?,
_ => panic!("incompatible HLL types"),
}
return Ok(());
}
}
1 change: 1 addition & 0 deletions rust/cubestore/src/queryplanner/mod.rs
@@ -1,3 +1,4 @@
pub mod hll;
pub mod query_executor;
pub mod serialized_plan;
pub mod udfs;
Expand Down
22 changes: 10 additions & 12 deletions rust/cubestore/src/queryplanner/udfs.rs
@@ -1,7 +1,7 @@
use crate::queryplanner::hll::Hll;
use crate::CubeError;
use arrow::array::{Array, BinaryArray, UInt64Builder};
use arrow::datatypes::DataType;
use cubehll::HllSketch;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::functions::Signature;
use datafusion::physical_plan::udaf::AggregateUDF;
Expand Down Expand Up @@ -126,7 +126,7 @@ impl CubeAggregateUDF for HllMergeUDF {
struct HllMergeAccumulator {
// TODO: store sketch for empty set from the start.
// this requires storing index_bit_len in the type.
acc: Option<HllSketch>,
acc: Option<Hll>,
}

impl Accumulator for HllMergeAccumulator {
Expand Down Expand Up @@ -183,27 +183,25 @@ impl Accumulator for HllMergeAccumulator {
}

impl HllMergeAccumulator {
fn merge_sketch(&mut self, s: HllSketch) -> Result<(), DataFusionError> {
fn merge_sketch(&mut self, s: Hll) -> Result<(), DataFusionError> {
if self.acc.is_none() {
self.acc = Some(s);
return Ok(());
} else if let Some(acc_s) = &mut self.acc {
if acc_s.index_bit_len() != s.index_bit_len() {
return Err(CubeError::internal(format!(
"cannot merge two HLL sketches with different number of buckets: {} and {}",
s.num_buckets(),
acc_s.num_buckets()
))
if !acc_s.is_compatible(&s) {
return Err(CubeError::internal(
"cannot merge two incompatible HLL sketches".to_string(),
)
.into());
}
acc_s.merge_with(&s);
acc_s.merge_with(&s)?;
} else {
unreachable!("impossible");
}
return Ok(());
}
}

fn read_sketch(data: &[u8]) -> Result<HllSketch, DataFusionError> {
return HllSketch::read(&data).map_err(|e| DataFusionError::Execution(e.message));
fn read_sketch(data: &[u8]) -> Result<Hll, DataFusionError> {
return Hll::read(&data).map_err(|e| DataFusionError::Execution(e.message));
}
21 changes: 14 additions & 7 deletions rust/cubestore/src/sql/mod.rs
Expand Up @@ -7,7 +7,8 @@ use sqlparser::ast::*;
use sqlparser::dialect::Dialect;

use crate::metastore::{
table::Table, IdRow, ImportFormat, Index, IndexDef, MetaStoreTable, RowKey, Schema, TableId,
table::Table, HllFlavour, IdRow, ImportFormat, Index, IndexDef, MetaStoreTable, RowKey, Schema,
TableId,
};
use crate::table::{Row, TableValue, TimestampValue};
use crate::CubeError;
Expand Down Expand Up @@ -490,7 +491,8 @@ fn convert_columns_type(columns: &Vec<ColumnDef>) -> Result<Vec<Column>, CubeErr
match custom_type_name.as_str() {
"mediumint" => ColumnType::Int,
"varbinary" => ColumnType::Bytes,
"hyperloglog" => ColumnType::HyperLogLog,
"hyperloglog" => ColumnType::HyperLogLog(HllFlavour::Airlift),
"hyperloglogpp" => ColumnType::HyperLogLog(HllFlavour::ZetaSketch),
_ => {
return Err(CubeError::user(format!(
"Custom type '{}' is not supported",
Expand Down Expand Up @@ -543,11 +545,16 @@ fn decode_byte(s: &str) -> Option<u8> {
return Some(v0 * 16 + v1);
}

fn parse_hyper_log_log(v: &Value) -> Result<Vec<u8>, CubeError> {
fn parse_hyper_log_log(v: &Value, f: HllFlavour) -> Result<Vec<u8>, CubeError> {
let bytes = parse_binary_string(v)?;
// TODO: check without memory allocations. this is run on hot path.
if let Err(e) = cubehll::HllSketch::read(&bytes) {
return Err(e.into());
match f {
HllFlavour::Airlift => {
cubehll::HllSketch::read(&bytes)?;
}
HllFlavour::ZetaSketch => {
cubezetasketch::HyperLogLogPlusPlus::read(&bytes)?;
}
}
return Ok(bytes);
}
Expand Down Expand Up @@ -633,10 +640,10 @@ fn extract_data(cell: &Expr, column: &Vec<&Column>, i: usize) -> Result<TableVal
};
return Ok(TableValue::Bytes(val?));
}
ColumnType::HyperLogLog => {
&ColumnType::HyperLogLog(f) => {
let val;
if let Expr::Value(v) = cell {
val = parse_hyper_log_log(v)
val = parse_hyper_log_log(v, f)
} else {
return Err(CubeError::user("Corrupted data in query.".to_string()));
};
Expand Down
4 changes: 2 additions & 2 deletions rust/cubestore/src/table/parquet.rs
Expand Up @@ -194,7 +194,7 @@ impl<'a> RowParquetReader<'a> {
c.get_index(),
match c.get_column_type() {
ColumnType::String => ColumnAccessor::Bytes(vec![ByteArray::new(); 16384]),
ColumnType::Bytes | ColumnType::HyperLogLog => {
ColumnType::Bytes | ColumnType::HyperLogLog(_) => {
ColumnAccessor::Bytes(vec![ByteArray::new(); 16384])
}
ColumnType::Int => ColumnAccessor::Int(vec![0; 16384]),
Expand Down Expand Up @@ -309,7 +309,7 @@ impl<'a> RowParquetReader<'a> {
}
}
}
ColumnType::Bytes | ColumnType::HyperLogLog => {
ColumnType::Bytes | ColumnType::HyperLogLog(_) => {
if let ColumnAccessor::Bytes(buffer) = &column_accessor {
for i in 0..values_read {
if levels[i] == 1 {
Expand Down
11 changes: 11 additions & 0 deletions rust/cubezetasketch/Cargo.toml
@@ -0,0 +1,11 @@
[package]
name = "cubezetasketch"
version = "0.1.0"
authors = ["Cube Dev, Inc."]
edition = "2018"
license = "Apache-2.0"
description = "HyperLogLog++ implementation ported from ZetaSketch"

[dependencies]
itertools = "0.10.0"
protobuf = "2.20.0"
14 changes: 14 additions & 0 deletions rust/cubezetasketch/README.md
@@ -0,0 +1,14 @@
# Overview

Rust implementation of HyperLogLog++ directly ported from the Java code in [ZetaSketch](https://github.com/google/zetasketch).
Based on commit `a2f2692fae8cf61103330f9f70e696c4ba8b94b0`.

This library allows to directly interoperate with sketches produced by `ZetaSketch`.
Only portion of the code is ported. In particular, we currently support:
- reading and writing sketches in the binary proto format,
- computing set cardinality estimates,
- merging sketches.

The major unsupported bits are:
- adding values to the sketches,
- mixing sketches of different precisions.

0 comments on commit 357ecef

Please sign in to comment.