From 72c38badec1ee2ffc64218299653af1897042671 Mon Sep 17 00:00:00 2001 From: Ilya Biryukov Date: Tue, 14 Sep 2021 15:15:26 +0300 Subject: [PATCH] feat(cubestore): support reading of postgres-hll sketches Default configuration should work, but there are some limitations. The log2m parameter is limited to 16 rather than 31. The precision parameter is limited to 6 rather than 8. --- rust/cubehll/src/sketch.rs | 6 ++++++ rust/cubestore-sql-tests/src/tests.rs | 21 +++++++++++++++++++++ rust/cubestore/src/import/mod.rs | 14 ++++++++++---- rust/cubestore/src/metastore/mod.rs | 15 ++++----------- rust/cubestore/src/sql/mod.rs | 14 ++++++++++---- 5 files changed, 51 insertions(+), 19 deletions(-) diff --git a/rust/cubehll/src/sketch.rs b/rust/cubehll/src/sketch.rs index 54a6d259cb6b..bfcfe7c802ee 100644 --- a/rust/cubehll/src/sketch.rs +++ b/rust/cubehll/src/sketch.rs @@ -51,6 +51,12 @@ impl HllSketch { }); } + pub fn read_hll_storage_spec(data: &[u8]) -> Result { + return Ok(HllSketch { + instance: HllInstance::read_hll_storage_spec(data)?, + }); + } + /// Read from the snowflake JSON format, i.e. result of HLL_EXPORT serialized to string. pub fn read_snowflake(s: &str) -> Result { return Ok(HllSketch { diff --git a/rust/cubestore-sql-tests/src/tests.rs b/rust/cubestore-sql-tests/src/tests.rs index 6afcbcbecc46..47c72632ca9e 100644 --- a/rust/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore-sql-tests/src/tests.rs @@ -77,6 +77,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> { t("hyperloglog_empty_group_by", hyperloglog_empty_group_by), t("hyperloglog_inserts", hyperloglog_inserts), t("hyperloglog_inplace_group_by", hyperloglog_inplace_group_by), + t("hyperloglog_postgres", hyperloglog_postgres), t("hyperloglog_snowflake", hyperloglog_snowflake), t("planning_inplace_aggregate", planning_inplace_aggregate), t("planning_hints", planning_hints), @@ -1730,6 +1731,26 @@ async fn hyperloglog_inplace_group_by(service: Box) { ) } +async fn hyperloglog_postgres(service: Box) { + service.exec_query("CREATE SCHEMA s").await.unwrap(); + service + .exec_query("CREATE TABLE s.hlls(id int, hll HLL_POSTGRES)") + .await + .unwrap(); + service.exec_query("INSERT INTO s.hlls(id, hll) VALUES \ + (1, X'118b7f'),\ + (2, X'128b7fee22c470691a8134'),\ + (3, X'138b7f04a10642078507c308e309230a420ac10c2510a2114511611363138116811848188218a119411a821ae11f0122e223a125a126632685276327a328e2296129e52b812fe23081320132c133e335a53641368236a23721374237e1382138e13a813c243e6140e341854304434148a24a034f8150c1520152e254e155a1564157e158e35ac25b265b615c615fc1620166a368226a416a626c016c816d677163728275817a637a817ac37b617c247c427d677f6180e18101826382e1846184e18541858287e1880189218a418b818bc38e018ea290a19244938295e4988198c299e29b239b419c419ce49da1a1e1a321a381a4c1aa61acc2ae01b0a1b101b142b161b443b801bd02bd61bf61c263c4a3c501c7a1caa1cb03cd03cf03cf42d123d4c3d662d744d901dd01df81e001e0a2e641e7e3edc1f0a2f1c1f203f484f5c4f763fc84fdc1fe02fea1'),\ + (4, X'148b7f21083288a4320a12086719c65108c1088422884511063388232904418c8520484184862886528c65198832106328c83114e6214831108518d03208851948511884188441908119083388661842818c43190c320ce4210a50948221083084a421c8328c632104221c4120d01284e20902318ca5214641942319101294641906228483184e128c43188e308882204a538c8328903288642102220c64094631086330c832106320c46118443886329062118a230c63108a320c23204a11852419c6528c85210a318c6308c41088842086308ce7110a418864190650884210ca631064108642a1022186518c8509862109020a0a4318671144150842400e5090631a0811848320c821888120c81114a220880290622906310d0220c83090a118c433106128c221902210cc23106029044114841104409862190c43188111063104c310c6728c8618c62290441102310c23214440882438ca2110a32908548c432110329462188a43946328842114640944320884190c928c442084228863318a2190a318c6618ca3114651886618c44190c5108e2110612144319062284641908428882314862106419883310421988619ca420cc511442104633888218c4428465288651910730c81118821088218c6418c45108452106519ce410d841904218863308622086211483198c710c83104a328c620906218864118623086418c8711423094632186420c4620c41104620a441108e40882628c6311c212046428c8319021104672888428ca320c431984418c4209043084451886510c641108310c4c20c66188472146310ca71084820c621946218c8228822190e2410861904411c27288621144328c6440c6311063190813086228ca710c2218c4718865188c2114850888608864404a3194e22882310ce53088619ca31904519503188e1118c4214cb2948110c6119c2818c843108520c43188c5204821186528c871908311086214c630c4218c8418cc3298a31888210c63110a121042198622886531082098c419c4210c6210c8338c25294610944518c442104610884104424206310c8311462288873102308c2440c451082228824310440982220c4240c622084310c642850118c641148430d0128c8228c2120c221884428863208c21a0a4190a4404c21186548865204633906308ca32086211c8319ce22146520c6120803318a518c840084519461208c21908538cc428c2110844384e40906320c44014a3204e62042408c8328c632146318c812004310c41318e3208a5308a511827104a4188c51048421446090a7088631102231484104473084318c41210860906919083190652906129c4628c45310652848221443114420084500865184a618c81198c32906418c63190e320c231882728484184671888309465188a320c83208632144318c6331c642988108c61218812144328d022844021022184a31908328c6218c2328c4528cc541428190641046418c84108443146230c6419483214232184411863290a210824318c220868194631106618c43188821048230c4128c6310c0330462094241106330c42188c321043118863046438823110a041464108e3190e4209a11902439c43188631104321008090441106218c6419064294a229463594622244320cc71184510902924421908218c62308641044328ca328882111012884120ca52882428c62184442086718c4221c8211082208a321023115270086218c4218c6528ce400482310a520c43104a520c44210811884118c4310864198263942331822')" + ).await.unwrap(); + + let r = service + .exec_query("SELECT id, cardinality(hll) FROM s.hlls ORDER BY id") + .await + .unwrap(); + assert_eq!(to_rows(&r), rows(&[(1, 0), (2, 1), (3, 164), (4, 9722)])); +} + async fn hyperloglog_snowflake(service: Box) { service.exec_query("CREATE SCHEMA s").await.unwrap(); service diff --git a/rust/cubestore/src/import/mod.rs b/rust/cubestore/src/import/mod.rs index 70f02fc3af8f..8e5a187c00ad 100644 --- a/rust/cubestore/src/import/mod.rs +++ b/rust/cubestore/src/import/mod.rs @@ -22,7 +22,7 @@ use crate::config::injection::DIService; use crate::config::ConfigObj; use crate::import::limits::ConcurrencyLimits; use crate::metastore::table::Table; -use crate::metastore::{is_valid_binary_hll_input, HllFlavour, IdRow}; +use crate::metastore::{is_valid_plain_binary_hll, HllFlavour, IdRow}; use crate::metastore::{Column, ColumnType, ImportFormat, MetaStore}; use crate::remotefs::RemoteFs; use crate::sql::timestamp_from_string; @@ -129,10 +129,16 @@ impl ImportFormat { let hll = HllSketch::read_snowflake(value)?; TableValue::Bytes(hll.write()) } - ColumnType::HyperLogLog(f) => { - assert!(f.imports_from_binary()); + ColumnType::HyperLogLog(HllFlavour::Postgres) => { let data = base64::decode(value)?; - is_valid_binary_hll_input(&data, *f)?; + let hll = HllSketch::read_hll_storage_spec(&data)?; + TableValue::Bytes(hll.write()) + } + ColumnType::HyperLogLog( + f @ (HllFlavour::Airlift | HllFlavour::ZetaSketch), + ) => { + let data = base64::decode(value)?; + is_valid_plain_binary_hll(&data, *f)?; TableValue::Bytes(data) } ColumnType::Timestamp => { diff --git a/rust/cubestore/src/metastore/mod.rs b/rust/cubestore/src/metastore/mod.rs index e25ab6183425..3a7342785a34 100644 --- a/rust/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/src/metastore/mod.rs @@ -300,19 +300,11 @@ impl DataFrameValue for Option { pub enum HllFlavour { Airlift, // Compatible with Presto, Athena, etc. Snowflake, // Same storage as Airlift, imports from Snowflake JSON. + Postgres, // Same storage as Airlift, imports from HLL Storage Specification. ZetaSketch, // Compatible with BigQuery. } -impl HllFlavour { - pub fn imports_from_binary(&self) -> bool { - match self { - HllFlavour::Airlift | HllFlavour::ZetaSketch => true, - HllFlavour::Snowflake => false, - } - } -} - -pub fn is_valid_binary_hll_input(data: &[u8], f: HllFlavour) -> Result<(), CubeError> { +pub fn is_valid_plain_binary_hll(data: &[u8], f: HllFlavour) -> Result<(), CubeError> { // TODO: do no memory allocations for better performance, this is run on hot path. match f { HllFlavour::Airlift => { @@ -321,7 +313,7 @@ pub fn is_valid_binary_hll_input(data: &[u8], f: HllFlavour) -> Result<(), CubeE HllFlavour::ZetaSketch => { HyperLogLogPlusPlus::read(data)?; } - HllFlavour::Snowflake => { + HllFlavour::Postgres | HllFlavour::Snowflake => { panic!("string formats should be handled separately") } } @@ -453,6 +445,7 @@ impl fmt::Display for Column { ColumnType::Bytes => "BYTES".to_string(), ColumnType::HyperLogLog(HllFlavour::Airlift) => "HYPERLOGLOG".to_string(), ColumnType::HyperLogLog(HllFlavour::ZetaSketch) => "HYPERLOGLOGPP".to_string(), + ColumnType::HyperLogLog(HllFlavour::Postgres) => "HLL_POSTGRES".to_string(), ColumnType::HyperLogLog(HllFlavour::Snowflake) => "HLL_SNOWFLAKE".to_string(), ColumnType::Float => "FLOAT".to_string(), }; diff --git a/rust/cubestore/src/sql/mod.rs b/rust/cubestore/src/sql/mod.rs index 42228e8ea19f..008f00aa5066 100644 --- a/rust/cubestore/src/sql/mod.rs +++ b/rust/cubestore/src/sql/mod.rs @@ -35,7 +35,7 @@ use crate::import::limits::ConcurrencyLimits; use crate::import::Ingestion; use crate::metastore::job::JobType; use crate::metastore::{ - is_valid_binary_hll_input, table::Table, HllFlavour, IdRow, ImportFormat, Index, IndexDef, + is_valid_plain_binary_hll, table::Table, HllFlavour, IdRow, ImportFormat, Index, IndexDef, MetaStoreTable, RowKey, Schema, TableId, }; use crate::queryplanner::query_executor::{batch_to_dataframe, QueryExecutor}; @@ -763,6 +763,7 @@ fn convert_columns_type(columns: &Vec) -> Result, CubeErr "hyperloglog" => ColumnType::HyperLogLog(HllFlavour::Airlift), "hyperloglogpp" => ColumnType::HyperLogLog(HllFlavour::ZetaSketch), "hll_snowflake" => ColumnType::HyperLogLog(HllFlavour::Snowflake), + "hll_postgres" => ColumnType::HyperLogLog(HllFlavour::Postgres), _ => { return Err(CubeError::user(format!( "Custom type '{}' is not supported", @@ -834,10 +835,15 @@ fn parse_hyper_log_log<'a>( *buffer = hll.write(); Ok(buffer) } - f => { - assert!(f.imports_from_binary()); + HllFlavour::Postgres => { let bytes = parse_binary_string(buffer, v)?; - is_valid_binary_hll_input(bytes, f)?; + let hll = HllSketch::read_hll_storage_spec(bytes)?; + *buffer = hll.write(); + Ok(buffer) + } + HllFlavour::Airlift | HllFlavour::ZetaSketch => { + let bytes = parse_binary_string(buffer, v)?; + is_valid_plain_binary_hll(bytes, f)?; Ok(bytes) } }