Skip to content

Commit

Permalink
feat(cubestore): support reading of postgres-hll sketches
Browse files Browse the repository at this point in the history
Default configuration should work, but there are some limitations.
The log2m parameter is limited to 16 rather than 31.
The precision parameter is limited to 6 rather than 8.
  • Loading branch information
ilya-biryukov committed Sep 14, 2021
1 parent 35e1eab commit 72c38ba
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 19 deletions.
6 changes: 6 additions & 0 deletions rust/cubehll/src/sketch.rs
Expand Up @@ -51,6 +51,12 @@ impl HllSketch {
});
}

pub fn read_hll_storage_spec(data: &[u8]) -> Result<HllSketch> {
return Ok(HllSketch {
instance: HllInstance::read_hll_storage_spec(data)?,
});
}

/// Read from the snowflake JSON format, i.e. result of HLL_EXPORT serialized to string.
pub fn read_snowflake(s: &str) -> Result<HllSketch> {
return Ok(HllSketch {
Expand Down
21 changes: 21 additions & 0 deletions rust/cubestore-sql-tests/src/tests.rs
Expand Up @@ -77,6 +77,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
t("hyperloglog_empty_group_by", hyperloglog_empty_group_by),
t("hyperloglog_inserts", hyperloglog_inserts),
t("hyperloglog_inplace_group_by", hyperloglog_inplace_group_by),
t("hyperloglog_postgres", hyperloglog_postgres),
t("hyperloglog_snowflake", hyperloglog_snowflake),
t("planning_inplace_aggregate", planning_inplace_aggregate),
t("planning_hints", planning_hints),
Expand Down Expand Up @@ -1730,6 +1731,26 @@ async fn hyperloglog_inplace_group_by(service: Box<dyn SqlClient>) {
)
}

async fn hyperloglog_postgres(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE TABLE s.hlls(id int, hll HLL_POSTGRES)")
.await
.unwrap();
service.exec_query("INSERT INTO s.hlls(id, hll) VALUES \
(1, X'118b7f'),\
(2, X'128b7fee22c470691a8134'),\
(3, X'138b7f04a10642078507c308e309230a420ac10c2510a2114511611363138116811848188218a119411a821ae11f0122e223a125a126632685276327a328e2296129e52b812fe23081320132c133e335a53641368236a23721374237e1382138e13a813c243e6140e341854304434148a24a034f8150c1520152e254e155a1564157e158e35ac25b265b615c615fc1620166a368226a416a626c016c816d677163728275817a637a817ac37b617c247c427d677f6180e18101826382e1846184e18541858287e1880189218a418b818bc38e018ea290a19244938295e4988198c299e29b239b419c419ce49da1a1e1a321a381a4c1aa61acc2ae01b0a1b101b142b161b443b801bd02bd61bf61c263c4a3c501c7a1caa1cb03cd03cf03cf42d123d4c3d662d744d901dd01df81e001e0a2e641e7e3edc1f0a2f1c1f203f484f5c4f763fc84fdc1fe02fea1'),\
(4, X'148b7f21083288a4320a12086719c65108c1088422884511063388232904418c8520484184862886528c65198832106328c83114e6214831108518d03208851948511884188441908119083388661842818c43190c320ce4210a50948221083084a421c8328c632104221c4120d01284e20902318ca5214641942319101294641906228483184e128c43188e308882204a538c8328903288642102220c64094631086330c832106320c46118443886329062118a230c63108a320c23204a11852419c6528c85210a318c6308c41088842086308ce7110a418864190650884210ca631064108642a1022186518c8509862109020a0a4318671144150842400e5090631a0811848320c821888120c81114a220880290622906310d0220c83090a118c433106128c221902210cc23106029044114841104409862190c43188111063104c310c6728c8618c62290441102310c23214440882438ca2110a32908548c432110329462188a43946328842114640944320884190c928c442084228863318a2190a318c6618ca3114651886618c44190c5108e2110612144319062284641908428882314862106419883310421988619ca420cc511442104633888218c4428465288651910730c81118821088218c6418c45108452106519ce410d841904218863308622086211483198c710c83104a328c620906218864118623086418c8711423094632186420c4620c41104620a441108e40882628c6311c212046428c8319021104672888428ca320c431984418c4209043084451886510c641108310c4c20c66188472146310ca71084820c621946218c8228822190e2410861904411c27288621144328c6440c6311063190813086228ca710c2218c4718865188c2114850888608864404a3194e22882310ce53088619ca31904519503188e1118c4214cb2948110c6119c2818c843108520c43188c5204821186528c871908311086214c630c4218c8418cc3298a31888210c63110a121042198622886531082098c419c4210c6210c8338c25294610944518c442104610884104424206310c8311462288873102308c2440c451082228824310440982220c4240c622084310c642850118c641148430d0128c8228c2120c221884428863208c21a0a4190a4404c21186548865204633906308ca32086211c8319ce22146520c6120803318a518c840084519461208c21908538cc428c2110844384e40906320c44014a3204e62042408c8328c632146318c812004310c41318e3208a5308a511827104a4188c51048421446090a7088631102231484104473084318c41210860906919083190652906129c4628c45310652848221443114420084500865184a618c81198c32906418c63190e320c231882728484184671888309465188a320c83208632144318c6331c642988108c61218812144328d022844021022184a31908328c6218c2328c4528cc541428190641046418c84108443146230c6419483214232184411863290a210824318c220868194631106618c43188821048230c4128c6310c0330462094241106330c42188c321043118863046438823110a041464108e3190e4209a11902439c43188631104321008090441106218c6419064294a229463594622244320cc71184510902924421908218c62308641044328ca328882111012884120ca52882428c62184442086718c4221c8211082208a321023115270086218c4218c6528ce400482310a520c43104a520c44210811884118c4310864198263942331822')"
).await.unwrap();

let r = service
.exec_query("SELECT id, cardinality(hll) FROM s.hlls ORDER BY id")
.await
.unwrap();
assert_eq!(to_rows(&r), rows(&[(1, 0), (2, 1), (3, 164), (4, 9722)]));
}

async fn hyperloglog_snowflake(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
Expand Down
14 changes: 10 additions & 4 deletions rust/cubestore/src/import/mod.rs
Expand Up @@ -22,7 +22,7 @@ use crate::config::injection::DIService;
use crate::config::ConfigObj;
use crate::import::limits::ConcurrencyLimits;
use crate::metastore::table::Table;
use crate::metastore::{is_valid_binary_hll_input, HllFlavour, IdRow};
use crate::metastore::{is_valid_plain_binary_hll, HllFlavour, IdRow};
use crate::metastore::{Column, ColumnType, ImportFormat, MetaStore};
use crate::remotefs::RemoteFs;
use crate::sql::timestamp_from_string;
Expand Down Expand Up @@ -129,10 +129,16 @@ impl ImportFormat {
let hll = HllSketch::read_snowflake(value)?;
TableValue::Bytes(hll.write())
}
ColumnType::HyperLogLog(f) => {
assert!(f.imports_from_binary());
ColumnType::HyperLogLog(HllFlavour::Postgres) => {
let data = base64::decode(value)?;
is_valid_binary_hll_input(&data, *f)?;
let hll = HllSketch::read_hll_storage_spec(&data)?;
TableValue::Bytes(hll.write())
}
ColumnType::HyperLogLog(
f @ (HllFlavour::Airlift | HllFlavour::ZetaSketch),
) => {
let data = base64::decode(value)?;
is_valid_plain_binary_hll(&data, *f)?;
TableValue::Bytes(data)
}
ColumnType::Timestamp => {
Expand Down
15 changes: 4 additions & 11 deletions rust/cubestore/src/metastore/mod.rs
Expand Up @@ -300,19 +300,11 @@ impl DataFrameValue<String> for Option<Row> {
pub enum HllFlavour {
Airlift, // Compatible with Presto, Athena, etc.
Snowflake, // Same storage as Airlift, imports from Snowflake JSON.
Postgres, // Same storage as Airlift, imports from HLL Storage Specification.
ZetaSketch, // Compatible with BigQuery.
}

impl HllFlavour {
pub fn imports_from_binary(&self) -> bool {
match self {
HllFlavour::Airlift | HllFlavour::ZetaSketch => true,
HllFlavour::Snowflake => false,
}
}
}

pub fn is_valid_binary_hll_input(data: &[u8], f: HllFlavour) -> Result<(), CubeError> {
pub fn is_valid_plain_binary_hll(data: &[u8], f: HllFlavour) -> Result<(), CubeError> {
// TODO: do no memory allocations for better performance, this is run on hot path.
match f {
HllFlavour::Airlift => {
Expand All @@ -321,7 +313,7 @@ pub fn is_valid_binary_hll_input(data: &[u8], f: HllFlavour) -> Result<(), CubeE
HllFlavour::ZetaSketch => {
HyperLogLogPlusPlus::read(data)?;
}
HllFlavour::Snowflake => {
HllFlavour::Postgres | HllFlavour::Snowflake => {
panic!("string formats should be handled separately")
}
}
Expand Down Expand Up @@ -453,6 +445,7 @@ impl fmt::Display for Column {
ColumnType::Bytes => "BYTES".to_string(),
ColumnType::HyperLogLog(HllFlavour::Airlift) => "HYPERLOGLOG".to_string(),
ColumnType::HyperLogLog(HllFlavour::ZetaSketch) => "HYPERLOGLOGPP".to_string(),
ColumnType::HyperLogLog(HllFlavour::Postgres) => "HLL_POSTGRES".to_string(),
ColumnType::HyperLogLog(HllFlavour::Snowflake) => "HLL_SNOWFLAKE".to_string(),
ColumnType::Float => "FLOAT".to_string(),
};
Expand Down
14 changes: 10 additions & 4 deletions rust/cubestore/src/sql/mod.rs
Expand Up @@ -35,7 +35,7 @@ use crate::import::limits::ConcurrencyLimits;
use crate::import::Ingestion;
use crate::metastore::job::JobType;
use crate::metastore::{
is_valid_binary_hll_input, table::Table, HllFlavour, IdRow, ImportFormat, Index, IndexDef,
is_valid_plain_binary_hll, table::Table, HllFlavour, IdRow, ImportFormat, Index, IndexDef,
MetaStoreTable, RowKey, Schema, TableId,
};
use crate::queryplanner::query_executor::{batch_to_dataframe, QueryExecutor};
Expand Down Expand Up @@ -763,6 +763,7 @@ fn convert_columns_type(columns: &Vec<ColumnDef>) -> Result<Vec<Column>, CubeErr
"hyperloglog" => ColumnType::HyperLogLog(HllFlavour::Airlift),
"hyperloglogpp" => ColumnType::HyperLogLog(HllFlavour::ZetaSketch),
"hll_snowflake" => ColumnType::HyperLogLog(HllFlavour::Snowflake),
"hll_postgres" => ColumnType::HyperLogLog(HllFlavour::Postgres),
_ => {
return Err(CubeError::user(format!(
"Custom type '{}' is not supported",
Expand Down Expand Up @@ -834,10 +835,15 @@ fn parse_hyper_log_log<'a>(
*buffer = hll.write();
Ok(buffer)
}
f => {
assert!(f.imports_from_binary());
HllFlavour::Postgres => {
let bytes = parse_binary_string(buffer, v)?;
is_valid_binary_hll_input(bytes, f)?;
let hll = HllSketch::read_hll_storage_spec(bytes)?;
*buffer = hll.write();
Ok(buffer)
}
HllFlavour::Airlift | HllFlavour::ZetaSketch => {
let bytes = parse_binary_string(buffer, v)?;
is_valid_plain_binary_hll(bytes, f)?;
Ok(bytes)
}
}
Expand Down

0 comments on commit 72c38ba

Please sign in to comment.