From d6996c02119628deafcca755773a84c2f0c31c29 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 7 May 2024 17:05:05 +0200 Subject: [PATCH 01/15] feat(cubestore): Support HLL from Apache DataSketches (for DataBricks) --- rust/cubestore/Cargo.lock | 194 +++++++++++++++++- rust/cubestore/Cargo.toml | 1 + rust/cubestore/cubedatasketch/Cargo.toml | 10 + rust/cubestore/cubedatasketch/README.md | 1 + rust/cubestore/cubedatasketch/src/error.rs | 49 +++++ rust/cubestore/cubedatasketch/src/lib.rs | 86 ++++++++ .../cubestore-sql-tests/src/tests.rs | 75 +++++++ rust/cubestore/cubestore/Cargo.toml | 1 + rust/cubestore/cubestore/src/import/mod.rs | 7 + rust/cubestore/cubestore/src/lib.rs | 6 + rust/cubestore/cubestore/src/metastore/mod.rs | 16 +- .../cubestore/src/queryplanner/hll.rs | 82 ++++++-- .../cubestore/src/queryplanner/udfs.rs | 8 +- rust/cubestore/cubestore/src/sql/mod.rs | 5 + .../cubestore/src/sql/table_creator.rs | 1 + 15 files changed, 515 insertions(+), 27 deletions(-) create mode 100644 rust/cubestore/cubedatasketch/Cargo.toml create mode 100644 rust/cubestore/cubedatasketch/README.md create mode 100644 rust/cubestore/cubedatasketch/src/error.rs create mode 100644 rust/cubestore/cubedatasketch/src/lib.rs diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 03669b14e970a..39d4f27aa82b5 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -678,6 +678,21 @@ dependencies = [ "libloading", ] +[[package]] +name = "clap" +version = "2.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +dependencies = [ + "ansi_term", + "atty", + "bitflags 1.3.2", + "strsim", + "textwrap 0.11.0", + "unicode-width", + "vec_map", +] + [[package]] name = "clap" version = "3.2.23" @@ -687,7 +702,7 @@ dependencies = [ "bitflags 1.3.2", "clap_lex", "indexmap 1.7.0", - "textwrap", + "textwrap 0.16.0", ] [[package]] @@ -739,6 +754,16 @@ dependencies = [ "cc", ] +[[package]] +name = "codespan-reporting" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" +dependencies = [ + "termcolor", + "unicode-width", +] + [[package]] name = "colored" version = "2.1.0" @@ -822,7 +847,7 @@ dependencies = [ "atty", "cast", "ciborium", - "clap", + "clap 3.2.23", "criterion-plot", "futures", "itertools 0.10.1", @@ -1001,6 +1026,13 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "cubedatasketch" +version = "0.1.0" +dependencies = [ + "dsrs", +] + [[package]] name = "cubehll" version = "0.1.0" @@ -1057,6 +1089,7 @@ dependencies = [ "cloud-storage", "csv", "ctor", + "cubedatasketch", "cubehll", "cuberockstore", "cuberpc", @@ -1159,6 +1192,50 @@ dependencies = [ "protobuf", ] +[[package]] +name = "cxx" +version = "1.0.121" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21db378d04296a84d8b7d047c36bb3954f0b46529db725d7e62fb02f9ba53ccc" +dependencies = [ + "cc", + "cxxbridge-flags", + "cxxbridge-macro", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c0c11acd0e63bae27dcd2afced407063312771212b7a823b4fd72d633be30fb" +dependencies = [ + "cc", + "codespan-reporting", + "once_cell", + "proc-macro2", + "quote", + "scratch", + "syn 2.0.58", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.121" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be8dcadd2e2fb4a501e1d9e93d6e88e6ea494306d8272069c92d5a9edf8855c0" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.121" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad08a837629ad949b73d032c637653d069e909cffe4ee7870b02301939ce39cc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -1340,6 +1417,20 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" +[[package]] +name = "dsrs" +version = "0.6.1" +source = "git+https://github.com/cube-js/datasketches-rs.git?branch=cubestore#bc14eb6ee8bf8aef1911fe97aedfba04c1572d55" +dependencies = [ + "base64 0.13.0", + "bstr", + "cxx", + "cxx-build", + "memchr", + "structopt", + "thin-dst", +] + [[package]] name = "duct" version = "0.13.6" @@ -2337,6 +2428,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "link-cplusplus" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d240c6f7e1ba3a28b0249f774e6a9dd0175054b52dfbb61b16eb8505c3785c9" +dependencies = [ + "cc", +] + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -3389,6 +3489,30 @@ dependencies = [ "toml", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.107", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -4074,6 +4198,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scratch" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3cf7c11c38cb994f3d40e8a8cde3bbd1f72a435e4c49e85d6553d8312306152" + [[package]] name = "security-framework" version = "2.3.1" @@ -4391,6 +4521,36 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + +[[package]] +name = "structopt" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" +dependencies = [ + "clap 2.34.0", + "lazy_static", + "structopt-derive", +] + +[[package]] +name = "structopt-derive" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.107", +] + [[package]] name = "strum" version = "0.21.0" @@ -4534,12 +4694,36 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + [[package]] name = "textwrap" version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" +[[package]] +name = "thin-dst" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3c46be180f1af9673ebb27bc1235396f61ef6965b3fe0dbb2e624deb604f0e" + [[package]] name = "thiserror" version = "1.0.38" @@ -5041,6 +5225,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.3" diff --git a/rust/cubestore/Cargo.toml b/rust/cubestore/Cargo.toml index 2e925776f955c..d5797f646f9be 100644 --- a/rust/cubestore/Cargo.toml +++ b/rust/cubestore/Cargo.toml @@ -4,6 +4,7 @@ members = [ "cubestore", "cubestore-sql-tests", "cubehll", + "cubedatasketch", "cubezetasketch", "cuberpc", "cuberockstore", diff --git a/rust/cubestore/cubedatasketch/Cargo.toml b/rust/cubestore/cubedatasketch/Cargo.toml new file mode 100644 index 0000000000000..62c644f9bf63b --- /dev/null +++ b/rust/cubestore/cubedatasketch/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "cubedatasketch" +version = "0.1.0" +authors = ["Cube Dev, Inc."] +edition = "2018" +license = "Apache-2.0" +description = "HyperLogLog++ implementation ported from ZetaSketch" + +[dependencies] +dsrs = { git = "https://github.com/cube-js/datasketches-rs.git", branch = "cubestore" } diff --git a/rust/cubestore/cubedatasketch/README.md b/rust/cubestore/cubedatasketch/README.md new file mode 100644 index 0000000000000..07dd0c5c77035 --- /dev/null +++ b/rust/cubestore/cubedatasketch/README.md @@ -0,0 +1 @@ +# Overview diff --git a/rust/cubestore/cubedatasketch/src/error.rs b/rust/cubestore/cubedatasketch/src/error.rs new file mode 100644 index 0000000000000..1ba8e67063f20 --- /dev/null +++ b/rust/cubestore/cubedatasketch/src/error.rs @@ -0,0 +1,49 @@ +/* + * Copyright 2024 Cube Dev, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::fmt::{Display, Formatter}; + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub struct DataSketchesError { + pub message: String, +} + +impl Display for DataSketchesError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +impl DataSketchesError { + pub fn new(message: Str) -> Self { + return Self { + message: message.to_string(), + }; + } +} + +impl From for DataSketchesError { + fn from(err: std::io::Error) -> Self { + return DataSketchesError::new(err); + } +} + +impl From for DataSketchesError { + fn from(err: dsrs::DataSketchesError) -> Self { + return DataSketchesError::new(err); + } +} diff --git a/rust/cubestore/cubedatasketch/src/lib.rs b/rust/cubestore/cubedatasketch/src/lib.rs new file mode 100644 index 0000000000000..47ff97593999f --- /dev/null +++ b/rust/cubestore/cubedatasketch/src/lib.rs @@ -0,0 +1,86 @@ +/* + * Copyright 2024 Cube Dev, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +mod error; + +pub use error::DataSketchesError; +pub use error::Result; +use std::fmt::{Debug, Formatter}; + +use dsrs::{HLLSketch, HLLType, HLLUnion}; + +pub struct HLLDataSketch { + pub(crate) instance: HLLSketch, +} + +unsafe impl Send for HLLDataSketch {} +unsafe impl Sync for HLLDataSketch {} + +impl Debug for HLLDataSketch { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl HLLDataSketch { + pub fn read(data: &[u8]) -> Result { + return Ok(Self { + instance: HLLSketch::deserialize(data)?, + }); + } + + pub fn cardinality(&self) -> u64 { + return self.instance.estimate().round() as u64; + } + + pub fn write(&self) -> Vec { + // TODO(ovr): Better way? + self.instance.serialize().as_ref().iter().copied().collect() + } +} + +pub struct HLLUnionDataSketch { + pub(crate) instance: HLLUnion, +} + +impl Debug for HLLUnionDataSketch { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +unsafe impl Send for HLLUnionDataSketch {} +unsafe impl Sync for HLLUnionDataSketch {} + +impl HLLUnionDataSketch { + pub fn new(lg_max_k: u8) -> Self { + Self { + instance: HLLUnion::new(lg_max_k), + } + } + + pub fn write(&self) -> Vec { + let sketch = self.instance.sketch(HLLType::HLL_4); + // TODO(ovr): Better way? + sketch.serialize().as_ref().iter().copied().collect() + } + + pub fn merge_with(&mut self, other: HLLDataSketch) -> Result<()> { + let mut union = HLLUnion::new(12); + union.merge(other.instance); + + Ok(()) + } +} diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index aa76aa2405782..10c68aed64f67 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -133,6 +133,11 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> { t("hyperloglog_inplace_group_by", hyperloglog_inplace_group_by), t("hyperloglog_postgres", hyperloglog_postgres), t("hyperloglog_snowflake", hyperloglog_snowflake), + t("hyperloglog_databricks", hyperloglog_databricks), + t( + "aggregate_index_hll_databricks", + aggregate_index_hll_databricks, + ), t("physical_plan_flags", physical_plan_flags), t("planning_inplace_aggregate", planning_inplace_aggregate), t("planning_hints", planning_hints), @@ -2775,6 +2780,76 @@ async fn hyperloglog_snowflake(service: Box) { .unwrap_err(); } +async fn hyperloglog_databricks(service: Box) { + service.exec_query("CREATE SCHEMA s").await.unwrap(); + service + .exec_query("CREATE TABLE s.hlls(id int, hll HLL_DATASKETCHES)") + .await + .unwrap(); + + service.exec_query("INSERT INTO s.hlls(id, hll) VALUES \ + (1, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000'), \ + (2, X'0201070c03000408c39b17c29a0ac383c2b03804067365047b65c3a60800000000000000000000000000000000'), \ + (3, X'0301070c05000009140000000000000021c3b23905c2a1c38d490ac283c2b711071bc2a1c3961200000000000000000000000008c29bc39904497ac39908000000002bc3b2c3bb062c45670ac3adc29e24074bc298c2a6086f2c7f050000000000000000c392c295c2900dc3b3c28bc38106c38dc3884607c2b50dc3b70600000000c3b762c28207c398c393350f00000000000000001b27c2b20b00000000c29dc28a7210000000003fc3b95b0f')" + ).await.unwrap(); + + let r = service + .exec_query("SELECT id, cardinality(hll) FROM s.hlls ORDER BY id") + .await + .unwrap(); + assert_eq!(to_rows(&r), rows(&[(1, 4), (2, 4), (3, 20)])); +} + +async fn aggregate_index_hll_databricks(service: Box) { + service.exec_query("CREATE SCHEMA s").await.unwrap(); + service + .exec_query( + "CREATE TABLE s.Orders(a int, b int, a_hll HLL_DATASKETCHES) + AGGREGATIONS(merge(a_hll)) + AGGREGATE INDEX aggr_index (a, b) + ", + ) + .await + .unwrap(); + service + .exec_query( + "INSERT INTO s.Orders (a, b, a_hll) VALUES \ + (1, 10, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000'), \ + (1, 20, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000'), \ + (1, 10, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000'), \ + (1, 20, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000') + ", + ) + .await + .unwrap(); + + let res = service + .exec_query("SELECT a, b, cardinality(merge(a_hll)) as hll FROM s.Orders GROUP BY 1, 2 ORDER BY 1, 2") + .await + .unwrap(); + assert_eq!( + to_rows(&res), + [ + [TableValue::Int(1), TableValue::Int(10), TableValue::Int(0)], + [TableValue::Int(1), TableValue::Int(20), TableValue::Int(0)], + ] + ); + + let res = service + .exec_query("SELECT a, cardinality(merge(a_hll)) as hll FROM s.Orders WHERE b = 20 GROUP BY 1 ORDER BY 1") + .await + .unwrap(); + assert_eq!(to_rows(&res), [[TableValue::Int(1), TableValue::Int(0)],]); + + let res = service + .exec_query( + "SELECT a, cardinality(merge(a_hll)) as hll FROM s.Orders GROUP BY 1 ORDER BY 1", + ) + .await + .unwrap(); + assert_eq!(to_rows(&res), [[TableValue::Int(1), TableValue::Int(0)],]); +} + async fn physical_plan_flags(service: Box) { service.exec_query("CREATE SCHEMA s").await.unwrap(); service diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index f29a93b6c7a96..7227cefd4b810 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -26,6 +26,7 @@ serde_bytes = "0.11.5" cuberockstore = { path = "../cuberockstore" } cubehll = { path = "../cubehll" } cubezetasketch = { path = "../cubezetasketch" } +cubedatasketch = { path = "../cubedatasketch" } cuberpc = { path = "../cuberpc" } parquet = { git = "https://github.com/cube-js/arrow-rs", branch = "cube", features = ["arrow"] } arrow = { git = "https://github.com/cube-js/arrow-rs", branch = "cube" } diff --git a/rust/cubestore/cubestore/src/import/mod.rs b/rust/cubestore/cubestore/src/import/mod.rs index 35340756fe60b..5e1212375053b 100644 --- a/rust/cubestore/cubestore/src/import/mod.rs +++ b/rust/cubestore/cubestore/src/import/mod.rs @@ -31,6 +31,7 @@ use crate::import::limits::ConcurrencyLimits; use crate::metastore::table::Table; use crate::metastore::{is_valid_plain_binary_hll, HllFlavour, IdRow}; use crate::metastore::{Column, ColumnType, ImportFormat, MetaStore}; +use crate::queryplanner::hll::Hll::DataSketches; use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::remotefs::RemoteFs; use crate::sql::timestamp_from_string; @@ -43,6 +44,7 @@ use crate::util::decimal::{Decimal, Decimal96}; use crate::util::int96::Int96; use crate::util::maybe_owned::MaybeOwnedStr; use crate::CubeError; +use cubedatasketch::HLLDataSketch; use datafusion::cube_ext::ordfloat::OrdF64; use tokio::time::{sleep, Duration}; @@ -210,6 +212,11 @@ impl ImportFormat { is_valid_plain_binary_hll(&data, *f)?; TableValue::Bytes(data) } + ColumnType::HyperLogLog(HllFlavour::DataSketches) => { + let data = parse_binary_data(value)?; + let hll = HLLDataSketch::read(&data)?; + TableValue::Bytes(hll.write()) + } ColumnType::Timestamp => TableValue::Timestamp(timestamp_from_string(value)?), ColumnType::Float => TableValue::Float(OrdF64(value.parse::()?)), ColumnType::Boolean => { diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs index 015fba8235226..99b1bdb39c098 100644 --- a/rust/cubestore/cubestore/src/lib.rs +++ b/rust/cubestore/cubestore/src/lib.rs @@ -443,6 +443,12 @@ impl From for CubeError { } } +impl From for CubeError { + fn from(v: cubedatasketch::DataSketchesError) -> Self { + return CubeError::from_error(v); + } +} + impl From for CubeError { fn from(v: ZetaError) -> Self { return CubeError::from_error(v); diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 2e915ae3f281c..7ad1b5e9d2586 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -88,6 +88,7 @@ use crate::cachestore::{ CacheItem, QueueItem, QueueItemPayload, QueueItemStatus, QueueResult, QueueResultAckEvent, }; use crate::remotefs::LocalDirRemoteFs; +use cubedatasketch::HLLDataSketch; use deepsize::DeepSizeOf; use snapshot_info::SnapshotInfo; use std::time::{Duration, SystemTime}; @@ -342,10 +343,11 @@ impl DataFrameValue for Option> { #[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, DeepSizeOf)] pub enum HllFlavour { - Airlift, // Compatible with Presto, Athena, etc. - Snowflake, // Same storage as Airlift, imports from Snowflake JSON. - Postgres, // Same storage as Airlift, imports from HLL Storage Specification. - ZetaSketch, // Compatible with BigQuery. + Airlift, // Compatible with Presto, Athena, etc. + Snowflake, // Same storage as Airlift, imports from Snowflake JSON. + Postgres, // Same storage as Airlift, imports from HLL Storage Specification. + ZetaSketch, // Compatible with BigQuery. + DataSketches, // Compatible with DataBricks. } pub fn is_valid_plain_binary_hll(data: &[u8], f: HllFlavour) -> Result<(), CubeError> { @@ -360,6 +362,9 @@ pub fn is_valid_plain_binary_hll(data: &[u8], f: HllFlavour) -> Result<(), CubeE HllFlavour::Postgres | HllFlavour::Snowflake => { panic!("string formats should be handled separately") } + HllFlavour::DataSketches => { + HLLDataSketch::read(data)?; + } } return Ok(()); } @@ -391,6 +396,7 @@ impl Display for ColumnType { ColumnType::HyperLogLog(HllFlavour::ZetaSketch) => "hyperloglogpp", ColumnType::HyperLogLog(HllFlavour::Postgres) => "hll_postgres", ColumnType::HyperLogLog(HllFlavour::Snowflake) => "hll_snowflake", + ColumnType::HyperLogLog(HllFlavour::DataSketches) => "hll_datasketches", ColumnType::Timestamp => "timestamp", ColumnType::Float => "float", ColumnType::Boolean => "boolean", @@ -436,6 +442,7 @@ impl ColumnType { "hyperloglogpp" => Ok(ColumnType::HyperLogLog(HllFlavour::ZetaSketch)), "hll_postgres" => Ok(ColumnType::HyperLogLog(HllFlavour::Postgres)), "hll_snowflake" => Ok(ColumnType::HyperLogLog(HllFlavour::Snowflake)), + "hll_datasketches" => Ok(ColumnType::HyperLogLog(HllFlavour::DataSketches)), "timestamp" => Ok(ColumnType::Timestamp), "float" => Ok(ColumnType::Float), "boolean" => Ok(ColumnType::Boolean), @@ -597,6 +604,7 @@ impl fmt::Display for Column { ColumnType::HyperLogLog(HllFlavour::ZetaSketch) => "HYPERLOGLOGPP".to_string(), ColumnType::HyperLogLog(HllFlavour::Postgres) => "HLL_POSTGRES".to_string(), ColumnType::HyperLogLog(HllFlavour::Snowflake) => "HLL_SNOWFLAKE".to_string(), + ColumnType::HyperLogLog(HllFlavour::DataSketches) => "HLL_DATASKETCHES".to_string(), ColumnType::Float => "FLOAT".to_string(), }; f.write_fmt(format_args!("{} {}", self.name, column_type)) diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index 14fc0cb0721d3..d395cd74f7ca3 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -1,4 +1,5 @@ use crate::CubeError; +use cubedatasketch::{HLLDataSketch, HLLUnionDataSketch}; use cubehll::HllSketch; use cubezetasketch::HyperLogLogPlusPlus; @@ -6,6 +7,7 @@ use cubezetasketch::HyperLogLogPlusPlus; pub enum Hll { Airlift(HllSketch), // Compatible with Athena, Presto, etc. ZetaSketch(HyperLogLogPlusPlus), // Compatible with BigQuery. + DataSketches(HLLDataSketch), // Compatible with DataBricks } impl Hll { @@ -15,10 +17,20 @@ impl Hll { "invalid serialized HLL (empty data)".to_string(), )); } - // The first byte: + + const DS_LIST_PREINTS: u8 = 2; + const DS_HASH_SET_PREINTSS: u8 = 3; + const DS_SER_VER: u8 = 1; + const DS_FAMILY_ID: u8 = 7; + // - must larger than 3 due to how protos are encoded in ZetaSketch. // - represents the data format version and is <= 3 in AirLift. - if data[0] <= 3 { + if (data[0] == DS_LIST_PREINTS || data[0] == DS_HASH_SET_PREINTSS) + && data[1] == DS_SER_VER + && data[2] == DS_FAMILY_ID + { + return Ok(Hll::DataSketches(HLLDataSketch::read(data)?)); + } else if data[0] <= 3 { return Ok(Hll::Airlift(HllSketch::read(data)?)); } else { return Ok(Hll::ZetaSketch(HyperLogLogPlusPlus::read(data)?)); @@ -27,16 +39,9 @@ impl Hll { pub fn write(&self) -> Vec { match self { - Hll::Airlift(h) => h.write(), - Hll::ZetaSketch(h) => h.write(), - } - } - - pub fn is_compatible(&self, other: &Hll) -> bool { - match (self, other) { - (Hll::Airlift(l), Hll::Airlift(r)) => l.index_bit_len() == r.index_bit_len(), - (Hll::ZetaSketch(l), Hll::ZetaSketch(r)) => l.is_compatible(r), - _ => return false, + Self::Airlift(h) => h.write(), + Self::ZetaSketch(h) => h.write(), + Self::DataSketches(h) => h.write(), } } @@ -44,18 +49,61 @@ impl Hll { match self { Hll::Airlift(h) => h.cardinality(), Hll::ZetaSketch(h) => h.cardinality(), + Hll::DataSketches(h) => h.cardinality(), + } + } +} + +#[derive(Debug)] +pub enum HllUnion { + Airlift(HllSketch), + ZetaSketch(HyperLogLogPlusPlus), + DataSketches(HLLUnionDataSketch), +} + +impl HllUnion { + pub fn new(hll: Hll) -> Result { + match hll { + Hll::Airlift(h) => Ok(Self::Airlift(h)), + Hll::ZetaSketch(h) => Ok(Self::ZetaSketch(h)), + Hll::DataSketches(h) => { + let mut union = HLLUnionDataSketch::new(12); + union.merge_with(h)?; + + Ok(Self::DataSketches(union)) + } + } + } + + pub fn write(&self) -> Vec { + match self { + Self::Airlift(h) => h.write(), + Self::ZetaSketch(h) => h.write(), + Self::DataSketches(h) => h.write(), + } + } + + pub fn is_compatible(&self, other: &Hll) -> bool { + match (self, other) { + (Self::Airlift(l), Hll::Airlift(r)) => l.index_bit_len() == r.index_bit_len(), + (Self::ZetaSketch(l), Hll::ZetaSketch(r)) => l.is_compatible(r), + (Self::DataSketches(_), Hll::DataSketches(_)) => true, + _ => return false, } } /// Clients are responsible for calling `is_compatible` before running this function. /// On error, `self` may end up in inconsistent state and must be discarded. - pub fn merge_with(&mut self, other: &Hll) -> Result<(), CubeError> { - debug_assert!(self.is_compatible(other)); + pub fn merge_with(&mut self, other: Hll) -> Result<(), CubeError> { + debug_assert!(self.is_compatible(&other)); + match (self, other) { - (Hll::Airlift(l), Hll::Airlift(r)) => l.merge_with(r), - (Hll::ZetaSketch(l), Hll::ZetaSketch(r)) => l.merge_with(r)?, - _ => panic!("incompatible HLL types"), + (Self::Airlift(l), Hll::Airlift(r)) => l.merge_with(&r), + (Self::ZetaSketch(l), Hll::ZetaSketch(r)) => l.merge_with(&r)?, + (Self::DataSketches(l), Hll::DataSketches(r)) => l.merge_with(r)?, + _ => return Err(CubeError::internal("incompatible HLL types".to_string())), } + return Ok(()); } } diff --git a/rust/cubestore/cubestore/src/queryplanner/udfs.rs b/rust/cubestore/cubestore/src/queryplanner/udfs.rs index 95a16bfd960f3..017b996c70fd9 100644 --- a/rust/cubestore/cubestore/src/queryplanner/udfs.rs +++ b/rust/cubestore/cubestore/src/queryplanner/udfs.rs @@ -1,5 +1,5 @@ use crate::queryplanner::coalesce::{coalesce, SUPPORTED_COALESCE_TYPES}; -use crate::queryplanner::hll::Hll; +use crate::queryplanner::hll::{Hll, HllUnion}; use crate::CubeError; use arrow::array::{Array, BinaryArray, TimestampNanosecondArray, UInt64Builder}; use arrow::datatypes::{DataType, IntervalUnit, TimeUnit}; @@ -353,7 +353,7 @@ impl CubeAggregateUDF for HllMergeUDF { struct HllMergeAccumulator { // TODO: store sketch for empty set from the start. // this requires storing index_bit_len in the type. - acc: Option, + acc: Option, } impl Accumulator for HllMergeAccumulator { @@ -421,7 +421,7 @@ impl Accumulator for HllMergeAccumulator { impl HllMergeAccumulator { fn merge_sketch(&mut self, s: Hll) -> Result<(), DataFusionError> { if self.acc.is_none() { - self.acc = Some(s); + self.acc = Some(HllUnion::new(s)?); return Ok(()); } else if let Some(acc_s) = &mut self.acc { if !acc_s.is_compatible(&s) { @@ -430,7 +430,7 @@ impl HllMergeAccumulator { ) .into()); } - acc_s.merge_with(&s)?; + acc_s.merge_with(s)?; } else { unreachable!("impossible"); } diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 3895d521d8761..d795b2b60b906 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -1263,6 +1263,11 @@ fn parse_hyper_log_log<'a>( is_valid_plain_binary_hll(bytes, f)?; Ok(bytes) } + HllFlavour::DataSketches => { + let bytes = parse_binary_string(buffer, v)?; + is_valid_plain_binary_hll(bytes, f)?; + Ok(bytes) + } } } diff --git a/rust/cubestore/cubestore/src/sql/table_creator.rs b/rust/cubestore/cubestore/src/sql/table_creator.rs index 9c4a743daa1bc..a39db27f3b9f9 100644 --- a/rust/cubestore/cubestore/src/sql/table_creator.rs +++ b/rust/cubestore/cubestore/src/sql/table_creator.rs @@ -576,6 +576,7 @@ pub fn convert_columns_type(columns: &Vec) -> Result, Cub "hyperloglogpp" => ColumnType::HyperLogLog(HllFlavour::ZetaSketch), "hll_snowflake" => ColumnType::HyperLogLog(HllFlavour::Snowflake), "hll_postgres" => ColumnType::HyperLogLog(HllFlavour::Postgres), + "hll_datasketches" => ColumnType::HyperLogLog(HllFlavour::DataSketches), _ => { return Err(CubeError::user(format!( "Custom type '{}' is not supported", From 7612a1dc1c38b0a22ce02462ba2001ec07e982b0 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 7 May 2024 21:10:20 +0200 Subject: [PATCH 02/15] chore: naming --- rust/cubestore/Cargo.lock | 4 ++-- rust/cubestore/Cargo.toml | 2 +- rust/cubestore/Dockerfile | 1 + .../{cubedatasketch => cubedatasketches}/Cargo.toml | 6 +++--- .../{cubedatasketch => cubedatasketches}/README.md | 0 .../{cubedatasketch => cubedatasketches}/src/error.rs | 0 .../{cubedatasketch => cubedatasketches}/src/lib.rs | 0 rust/cubestore/cubestore/Cargo.toml | 2 +- rust/cubestore/cubestore/src/import/mod.rs | 2 +- rust/cubestore/cubestore/src/lib.rs | 4 ++-- rust/cubestore/cubestore/src/metastore/mod.rs | 2 +- rust/cubestore/cubestore/src/queryplanner/hll.rs | 2 +- 12 files changed, 13 insertions(+), 12 deletions(-) rename rust/cubestore/{cubedatasketch => cubedatasketches}/Cargo.toml (62%) rename rust/cubestore/{cubedatasketch => cubedatasketches}/README.md (100%) rename rust/cubestore/{cubedatasketch => cubedatasketches}/src/error.rs (100%) rename rust/cubestore/{cubedatasketch => cubedatasketches}/src/lib.rs (100%) diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 39d4f27aa82b5..62aa248e3de00 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1027,7 +1027,7 @@ dependencies = [ ] [[package]] -name = "cubedatasketch" +name = "cubedatasketches" version = "0.1.0" dependencies = [ "dsrs", @@ -1089,7 +1089,7 @@ dependencies = [ "cloud-storage", "csv", "ctor", - "cubedatasketch", + "cubedatasketches", "cubehll", "cuberockstore", "cuberpc", diff --git a/rust/cubestore/Cargo.toml b/rust/cubestore/Cargo.toml index d5797f646f9be..7f8199f52b73d 100644 --- a/rust/cubestore/Cargo.toml +++ b/rust/cubestore/Cargo.toml @@ -4,7 +4,7 @@ members = [ "cubestore", "cubestore-sql-tests", "cubehll", - "cubedatasketch", + "cubedatasketches", "cubezetasketch", "cuberpc", "cuberockstore", diff --git a/rust/cubestore/Dockerfile b/rust/cubestore/Dockerfile index f2de85b818236..153884120eac5 100644 --- a/rust/cubestore/Dockerfile +++ b/rust/cubestore/Dockerfile @@ -26,6 +26,7 @@ COPY Cargo.lock . COPY cuberockstore cuberockstore COPY cubehll cubehll COPY cubezetasketch cubezetasketch +COPY cubedatasketches cubedatasketches COPY cuberpc cuberpc COPY cubestore-sql-tests cubestore-sql-tests COPY cubestore/Cargo.toml cubestore/Cargo.toml diff --git a/rust/cubestore/cubedatasketch/Cargo.toml b/rust/cubestore/cubedatasketches/Cargo.toml similarity index 62% rename from rust/cubestore/cubedatasketch/Cargo.toml rename to rust/cubestore/cubedatasketches/Cargo.toml index 62c644f9bf63b..235071083f9bd 100644 --- a/rust/cubestore/cubedatasketch/Cargo.toml +++ b/rust/cubestore/cubedatasketches/Cargo.toml @@ -1,10 +1,10 @@ [package] -name = "cubedatasketch" +name = "cubedatasketches" version = "0.1.0" authors = ["Cube Dev, Inc."] -edition = "2018" +edition = "2021" license = "Apache-2.0" -description = "HyperLogLog++ implementation ported from ZetaSketch" +description = "Implementation of HLL from Apache DataSketches" [dependencies] dsrs = { git = "https://github.com/cube-js/datasketches-rs.git", branch = "cubestore" } diff --git a/rust/cubestore/cubedatasketch/README.md b/rust/cubestore/cubedatasketches/README.md similarity index 100% rename from rust/cubestore/cubedatasketch/README.md rename to rust/cubestore/cubedatasketches/README.md diff --git a/rust/cubestore/cubedatasketch/src/error.rs b/rust/cubestore/cubedatasketches/src/error.rs similarity index 100% rename from rust/cubestore/cubedatasketch/src/error.rs rename to rust/cubestore/cubedatasketches/src/error.rs diff --git a/rust/cubestore/cubedatasketch/src/lib.rs b/rust/cubestore/cubedatasketches/src/lib.rs similarity index 100% rename from rust/cubestore/cubedatasketch/src/lib.rs rename to rust/cubestore/cubedatasketches/src/lib.rs diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index 7227cefd4b810..aeb18250c81b4 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -26,7 +26,7 @@ serde_bytes = "0.11.5" cuberockstore = { path = "../cuberockstore" } cubehll = { path = "../cubehll" } cubezetasketch = { path = "../cubezetasketch" } -cubedatasketch = { path = "../cubedatasketch" } +cubedatasketches = { path = "../cubedatasketches" } cuberpc = { path = "../cuberpc" } parquet = { git = "https://github.com/cube-js/arrow-rs", branch = "cube", features = ["arrow"] } arrow = { git = "https://github.com/cube-js/arrow-rs", branch = "cube" } diff --git a/rust/cubestore/cubestore/src/import/mod.rs b/rust/cubestore/cubestore/src/import/mod.rs index 5e1212375053b..119924fb1250f 100644 --- a/rust/cubestore/cubestore/src/import/mod.rs +++ b/rust/cubestore/cubestore/src/import/mod.rs @@ -44,7 +44,7 @@ use crate::util::decimal::{Decimal, Decimal96}; use crate::util::int96::Int96; use crate::util::maybe_owned::MaybeOwnedStr; use crate::CubeError; -use cubedatasketch::HLLDataSketch; +use cubedatasketches::HLLDataSketch; use datafusion::cube_ext::ordfloat::OrdF64; use tokio::time::{sleep, Duration}; diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs index 99b1bdb39c098..63d5fd55948fa 100644 --- a/rust/cubestore/cubestore/src/lib.rs +++ b/rust/cubestore/cubestore/src/lib.rs @@ -443,8 +443,8 @@ impl From for CubeError { } } -impl From for CubeError { - fn from(v: cubedatasketch::DataSketchesError) -> Self { +impl From for CubeError { + fn from(v: cubedatasketches::DataSketchesError) -> Self { return CubeError::from_error(v); } } diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 7ad1b5e9d2586..96a7d2237532d 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -88,7 +88,7 @@ use crate::cachestore::{ CacheItem, QueueItem, QueueItemPayload, QueueItemStatus, QueueResult, QueueResultAckEvent, }; use crate::remotefs::LocalDirRemoteFs; -use cubedatasketch::HLLDataSketch; +use cubedatasketches::HLLDataSketch; use deepsize::DeepSizeOf; use snapshot_info::SnapshotInfo; use std::time::{Duration, SystemTime}; diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index d395cd74f7ca3..216e79735b825 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -1,5 +1,5 @@ use crate::CubeError; -use cubedatasketch::{HLLDataSketch, HLLUnionDataSketch}; +use cubedatasketches::{HLLDataSketch, HLLUnionDataSketch}; use cubehll::HllSketch; use cubezetasketch::HyperLogLogPlusPlus; From 3f463ae8d51176aa72ccd24c6d6e130dd56a0d08 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 8 May 2024 15:44:05 +0200 Subject: [PATCH 03/15] chore: naming --- rust/cubestore/cubestore/src/queryplanner/hll.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index 216e79735b825..4967114c85a21 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -10,6 +10,11 @@ pub enum Hll { DataSketches(HLLDataSketch), // Compatible with DataBricks } +const DS_LIST_PREINTS: u8 = 2; +const DS_HASH_SET_PREINTS: u8 = 3; +const DS_SER_VER: u8 = 1; +const DS_FAMILY_ID: u8 = 7; + impl Hll { pub fn read(data: &[u8]) -> Result { if data.is_empty() { @@ -18,14 +23,9 @@ impl Hll { )); } - const DS_LIST_PREINTS: u8 = 2; - const DS_HASH_SET_PREINTSS: u8 = 3; - const DS_SER_VER: u8 = 1; - const DS_FAMILY_ID: u8 = 7; - // - must larger than 3 due to how protos are encoded in ZetaSketch. // - represents the data format version and is <= 3 in AirLift. - if (data[0] == DS_LIST_PREINTS || data[0] == DS_HASH_SET_PREINTSS) + if (data[0] == DS_LIST_PREINTS || data[0] == DS_HASH_SET_PREINTS) && data[1] == DS_SER_VER && data[2] == DS_FAMILY_ID { From 69706bf7e752f4cb24dd571882b7a5bde8649124 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 8 May 2024 15:45:14 +0200 Subject: [PATCH 04/15] chore: comment --- rust/cubestore/cubestore/src/queryplanner/hll.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index 4967114c85a21..add89ba62ed47 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -25,6 +25,7 @@ impl Hll { // - must larger than 3 due to how protos are encoded in ZetaSketch. // - represents the data format version and is <= 3 in AirLift. + // - checking first 3 bytes for figure out HLL from Apache DataSketches if (data[0] == DS_LIST_PREINTS || data[0] == DS_HASH_SET_PREINTS) && data[1] == DS_SER_VER && data[2] == DS_FAMILY_ID From 915e09bc925f3f46478dcbde1364081aace6fbdb Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 8 May 2024 18:38:18 +0200 Subject: [PATCH 05/15] chore: fix cardinality and step --- rust/cubestore/Cargo.lock | 14 +++++++------- rust/cubestore/cubedatasketches/src/lib.rs | 7 +++++-- rust/cubestore/cubestore-sql-tests/src/tests.rs | 8 ++++---- rust/cubestore/cubestore/src/import/mod.rs | 1 - rust/cubestore/cubestore/src/queryplanner/hll.rs | 2 +- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 62aa248e3de00..a05cb941e1cfb 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1194,9 +1194,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.121" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21db378d04296a84d8b7d047c36bb3954f0b46529db725d7e62fb02f9ba53ccc" +checksum = "bb497fad022245b29c2a0351df572e2d67c1046bcef2260ebc022aec81efea82" dependencies = [ "cc", "cxxbridge-flags", @@ -1221,15 +1221,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.121" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be8dcadd2e2fb4a501e1d9e93d6e88e6ea494306d8272069c92d5a9edf8855c0" +checksum = "688c799a4a846f1c0acb9f36bb9c6272d9b3d9457f3633c7753c6057270df13c" [[package]] name = "cxxbridge-macro" -version = "1.0.121" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad08a837629ad949b73d032c637653d069e909cffe4ee7870b02301939ce39cc" +checksum = "928bc249a7e3cd554fd2e8e08a426e9670c50bbfc9a621653cfa9accc9641783" dependencies = [ "proc-macro2", "quote", @@ -1420,7 +1420,7 @@ checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" [[package]] name = "dsrs" version = "0.6.1" -source = "git+https://github.com/cube-js/datasketches-rs.git?branch=cubestore#bc14eb6ee8bf8aef1911fe97aedfba04c1572d55" +source = "git+https://github.com/cube-js/datasketches-rs.git?branch=cubestore#8ccba2f46be111702e553a807a2b0cc3d91be27c" dependencies = [ "base64 0.13.0", "bstr", diff --git a/rust/cubestore/cubedatasketches/src/lib.rs b/rust/cubestore/cubedatasketches/src/lib.rs index 47ff97593999f..d596d8374f813 100644 --- a/rust/cubestore/cubedatasketches/src/lib.rs +++ b/rust/cubestore/cubedatasketches/src/lib.rs @@ -45,6 +45,10 @@ impl HLLDataSketch { return self.instance.estimate().round() as u64; } + pub fn get_lg_config_k(&self) -> u8 { + return self.instance.get_lg_config_k(); + } + pub fn write(&self) -> Vec { // TODO(ovr): Better way? self.instance.serialize().as_ref().iter().copied().collect() @@ -78,8 +82,7 @@ impl HLLUnionDataSketch { } pub fn merge_with(&mut self, other: HLLDataSketch) -> Result<()> { - let mut union = HLLUnion::new(12); - union.merge(other.instance); + self.instance.merge(other.instance); Ok(()) } diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index 10c68aed64f67..8d918cd7926c8 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -2830,8 +2830,8 @@ async fn aggregate_index_hll_databricks(service: Box) { assert_eq!( to_rows(&res), [ - [TableValue::Int(1), TableValue::Int(10), TableValue::Int(0)], - [TableValue::Int(1), TableValue::Int(20), TableValue::Int(0)], + [TableValue::Int(1), TableValue::Int(10), TableValue::Int(4)], + [TableValue::Int(1), TableValue::Int(20), TableValue::Int(4)], ] ); @@ -2839,7 +2839,7 @@ async fn aggregate_index_hll_databricks(service: Box) { .exec_query("SELECT a, cardinality(merge(a_hll)) as hll FROM s.Orders WHERE b = 20 GROUP BY 1 ORDER BY 1") .await .unwrap(); - assert_eq!(to_rows(&res), [[TableValue::Int(1), TableValue::Int(0)],]); + assert_eq!(to_rows(&res), [[TableValue::Int(1), TableValue::Int(4)],]); let res = service .exec_query( @@ -2847,7 +2847,7 @@ async fn aggregate_index_hll_databricks(service: Box) { ) .await .unwrap(); - assert_eq!(to_rows(&res), [[TableValue::Int(1), TableValue::Int(0)],]); + assert_eq!(to_rows(&res), [[TableValue::Int(1), TableValue::Int(4)],]); } async fn physical_plan_flags(service: Box) { diff --git a/rust/cubestore/cubestore/src/import/mod.rs b/rust/cubestore/cubestore/src/import/mod.rs index 119924fb1250f..7673a8a9847bf 100644 --- a/rust/cubestore/cubestore/src/import/mod.rs +++ b/rust/cubestore/cubestore/src/import/mod.rs @@ -31,7 +31,6 @@ use crate::import::limits::ConcurrencyLimits; use crate::metastore::table::Table; use crate::metastore::{is_valid_plain_binary_hll, HllFlavour, IdRow}; use crate::metastore::{Column, ColumnType, ImportFormat, MetaStore}; -use crate::queryplanner::hll::Hll::DataSketches; use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::remotefs::RemoteFs; use crate::sql::timestamp_from_string; diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index add89ba62ed47..0b83586f23c62 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -68,7 +68,7 @@ impl HllUnion { Hll::Airlift(h) => Ok(Self::Airlift(h)), Hll::ZetaSketch(h) => Ok(Self::ZetaSketch(h)), Hll::DataSketches(h) => { - let mut union = HLLUnionDataSketch::new(12); + let mut union = HLLUnionDataSketch::new(h.get_lg_config_k()); union.merge_with(h)?; Ok(Self::DataSketches(union)) From f4acc456caeffcbd1a793938b7ee50be4c696303 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 8 May 2024 19:50:48 +0200 Subject: [PATCH 06/15] chore: check sketches for compatiblity --- rust/cubestore/cubedatasketches/src/lib.rs | 14 ++++++++++++-- rust/cubestore/cubestore/src/queryplanner/hll.rs | 4 +++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/rust/cubestore/cubedatasketches/src/lib.rs b/rust/cubestore/cubedatasketches/src/lib.rs index d596d8374f813..55c11864304de 100644 --- a/rust/cubestore/cubedatasketches/src/lib.rs +++ b/rust/cubestore/cubedatasketches/src/lib.rs @@ -30,7 +30,10 @@ unsafe impl Sync for HLLDataSketch {} impl Debug for HLLDataSketch { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - todo!() + f.debug_struct("HLLDataSketch") + .field("instance", &""); + + Ok(()) } } @@ -61,7 +64,10 @@ pub struct HLLUnionDataSketch { impl Debug for HLLUnionDataSketch { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - todo!() + f.debug_struct("HLLUnionDataSketch") + .field("instance", &""); + + Ok(()) } } @@ -75,6 +81,10 @@ impl HLLUnionDataSketch { } } + pub fn get_lg_config_k(&self) -> u8 { + return self.instance.get_lg_config_k(); + } + pub fn write(&self) -> Vec { let sketch = self.instance.sketch(HLLType::HLL_4); // TODO(ovr): Better way? diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index 0b83586f23c62..da7a4fcff4a53 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -88,7 +88,9 @@ impl HllUnion { match (self, other) { (Self::Airlift(l), Hll::Airlift(r)) => l.index_bit_len() == r.index_bit_len(), (Self::ZetaSketch(l), Hll::ZetaSketch(r)) => l.is_compatible(r), - (Self::DataSketches(_), Hll::DataSketches(_)) => true, + (Self::DataSketches(l), Hll::DataSketches(r)) => { + l.get_lg_config_k() == r.get_lg_config_k() + } _ => return false, } } From 2a6f3541a8aefcae7acc8643d01650cec019a1d5 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 8 May 2024 21:39:44 +0200 Subject: [PATCH 07/15] fix: support HLL_PREINTS --- rust/cubestore/cubestore/src/queryplanner/hll.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index da7a4fcff4a53..404f771fca73e 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -12,6 +12,7 @@ pub enum Hll { const DS_LIST_PREINTS: u8 = 2; const DS_HASH_SET_PREINTS: u8 = 3; +const DS_HLL_PREINTS: u8 = 10; const DS_SER_VER: u8 = 1; const DS_FAMILY_ID: u8 = 7; @@ -26,7 +27,7 @@ impl Hll { // - must larger than 3 due to how protos are encoded in ZetaSketch. // - represents the data format version and is <= 3 in AirLift. // - checking first 3 bytes for figure out HLL from Apache DataSketches - if (data[0] == DS_LIST_PREINTS || data[0] == DS_HASH_SET_PREINTS) + if (data[0] == DS_LIST_PREINTS || data[0] == DS_HASH_SET_PREINTS || data[0] == DS_HLL_PREINTS) && data[1] == DS_SER_VER && data[2] == DS_FAMILY_ID { From f9bc0ece4a0ea8c9d394a74f8b641e558e1bb2f6 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 8 May 2024 21:42:59 +0200 Subject: [PATCH 08/15] chore: fmt --- rust/cubestore/cubestore/src/queryplanner/hll.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index 404f771fca73e..e41122dd3a8c5 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -27,7 +27,9 @@ impl Hll { // - must larger than 3 due to how protos are encoded in ZetaSketch. // - represents the data format version and is <= 3 in AirLift. // - checking first 3 bytes for figure out HLL from Apache DataSketches - if (data[0] == DS_LIST_PREINTS || data[0] == DS_HASH_SET_PREINTS || data[0] == DS_HLL_PREINTS) + if (data[0] == DS_LIST_PREINTS + || data[0] == DS_HASH_SET_PREINTS + || data[0] == DS_HLL_PREINTS) && data[1] == DS_SER_VER && data[2] == DS_FAMILY_ID { From 5555b442980a3ab66061f0f8d734b335ea60e184 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 8 May 2024 21:49:34 +0200 Subject: [PATCH 09/15] chore: add prefix Protobuf errors --- rust/cubestore/cubezetasketch/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/cubestore/cubezetasketch/src/error.rs b/rust/cubestore/cubezetasketch/src/error.rs index cfa34371de714..13901306b1a8c 100644 --- a/rust/cubestore/cubezetasketch/src/error.rs +++ b/rust/cubestore/cubezetasketch/src/error.rs @@ -46,7 +46,7 @@ impl From for ZetaError { impl From for ZetaError { fn from(err: ProtobufError) -> Self { - return ZetaError::new(err); + return ZetaError::new(format!("Protobuf: {}", err)); } } From a21a4d55670e199926f261ea0c34f52adc891d0c Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 8 May 2024 23:46:12 +0200 Subject: [PATCH 10/15] chore: update cc --- rust/cubestore/Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index a05cb941e1cfb..911f9ec589e5c 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -574,9 +574,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.90" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" +checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" dependencies = [ "jobserver", "libc", From cc953cf46ebcd8a50f0d5fcca55bc5c46f3c87fe Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 8 May 2024 23:49:41 +0200 Subject: [PATCH 11/15] chore: update cmake --- rust/cubestore/Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 911f9ec589e5c..b2286f9699a81 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -747,9 +747,9 @@ dependencies = [ [[package]] name = "cmake" -version = "0.1.46" +version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b858541263efe664aead4a5209a4ae5c5d2811167d4ed4ee0944503f8d2089" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" dependencies = [ "cc", ] From 9285dc3252b16c6e05ce28e138da087ea842ffba Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 13 May 2024 13:38:04 +0200 Subject: [PATCH 12/15] chore: add prefix for hll errors Signed-off-by: Dmitry Patsura --- rust/cubestore/cubedatasketches/src/error.rs | 2 +- rust/cubestore/cubehll/src/error.rs | 2 +- rust/cubestore/cubezetasketch/src/error.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/cubestore/cubedatasketches/src/error.rs b/rust/cubestore/cubedatasketches/src/error.rs index 1ba8e67063f20..bf168a797880c 100644 --- a/rust/cubestore/cubedatasketches/src/error.rs +++ b/rust/cubestore/cubedatasketches/src/error.rs @@ -24,7 +24,7 @@ pub struct DataSketchesError { impl Display for DataSketchesError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}", self.message) + write!(f, "DataSketchesError: {}", self.message) } } diff --git a/rust/cubestore/cubehll/src/error.rs b/rust/cubestore/cubehll/src/error.rs index 4c77c3d5077e0..428a00639ed0d 100644 --- a/rust/cubestore/cubehll/src/error.rs +++ b/rust/cubestore/cubehll/src/error.rs @@ -8,7 +8,7 @@ pub struct HllError { impl Display for HllError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}", self.message) + write!(f, "HllError: {}", self.message) } } diff --git a/rust/cubestore/cubezetasketch/src/error.rs b/rust/cubestore/cubezetasketch/src/error.rs index 13901306b1a8c..988c94c068789 100644 --- a/rust/cubestore/cubezetasketch/src/error.rs +++ b/rust/cubestore/cubezetasketch/src/error.rs @@ -26,7 +26,7 @@ pub struct ZetaError { impl Display for ZetaError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}", self.message) + write!(f, "ZetaError: {}", self.message) } } From e49454b3856523215c2f62cab85b3eed49e71449 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 13 May 2024 13:59:53 +0200 Subject: [PATCH 13/15] chore: unimplemented on windows --- rust/cubestore/cubedatasketches/Cargo.toml | 2 + rust/cubestore/cubedatasketches/src/lib.rs | 89 ++--------------- rust/cubestore/cubedatasketches/src/native.rs | 97 +++++++++++++++++++ .../cubedatasketches/src/unsupported.rs | 68 +++++++++++++ .../cubestore/src/queryplanner/hll.rs | 2 +- 5 files changed, 177 insertions(+), 81 deletions(-) create mode 100644 rust/cubestore/cubedatasketches/src/native.rs create mode 100644 rust/cubestore/cubedatasketches/src/unsupported.rs diff --git a/rust/cubestore/cubedatasketches/Cargo.toml b/rust/cubestore/cubedatasketches/Cargo.toml index 235071083f9bd..d32ccadbc0b66 100644 --- a/rust/cubestore/cubedatasketches/Cargo.toml +++ b/rust/cubestore/cubedatasketches/Cargo.toml @@ -7,4 +7,6 @@ license = "Apache-2.0" description = "Implementation of HLL from Apache DataSketches" [dependencies] + +[target.'cfg(not(target_os = "windows"))'.dependencies] dsrs = { git = "https://github.com/cube-js/datasketches-rs.git", branch = "cubestore" } diff --git a/rust/cubestore/cubedatasketches/src/lib.rs b/rust/cubestore/cubedatasketches/src/lib.rs index 55c11864304de..aadf5963cbf16 100644 --- a/rust/cubestore/cubedatasketches/src/lib.rs +++ b/rust/cubestore/cubedatasketches/src/lib.rs @@ -15,85 +15,14 @@ */ mod error; -pub use error::DataSketchesError; -pub use error::Result; -use std::fmt::{Debug, Formatter}; - -use dsrs::{HLLSketch, HLLType, HLLUnion}; - -pub struct HLLDataSketch { - pub(crate) instance: HLLSketch, -} - -unsafe impl Send for HLLDataSketch {} -unsafe impl Sync for HLLDataSketch {} - -impl Debug for HLLDataSketch { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("HLLDataSketch") - .field("instance", &""); - - Ok(()) - } -} - -impl HLLDataSketch { - pub fn read(data: &[u8]) -> Result { - return Ok(Self { - instance: HLLSketch::deserialize(data)?, - }); - } - - pub fn cardinality(&self) -> u64 { - return self.instance.estimate().round() as u64; - } - - pub fn get_lg_config_k(&self) -> u8 { - return self.instance.get_lg_config_k(); - } +#[cfg(target_os = "windows")] +#[path = "unsupported.rs"] +mod imp; - pub fn write(&self) -> Vec { - // TODO(ovr): Better way? - self.instance.serialize().as_ref().iter().copied().collect() - } -} +// TODO(ovr): https://github.com/cube-js/datasketches-rs/pull/2 +#[cfg(not(target_os = "windows"))] +#[path = "native.rs"] +mod imp; -pub struct HLLUnionDataSketch { - pub(crate) instance: HLLUnion, -} - -impl Debug for HLLUnionDataSketch { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("HLLUnionDataSketch") - .field("instance", &""); - - Ok(()) - } -} - -unsafe impl Send for HLLUnionDataSketch {} -unsafe impl Sync for HLLUnionDataSketch {} - -impl HLLUnionDataSketch { - pub fn new(lg_max_k: u8) -> Self { - Self { - instance: HLLUnion::new(lg_max_k), - } - } - - pub fn get_lg_config_k(&self) -> u8 { - return self.instance.get_lg_config_k(); - } - - pub fn write(&self) -> Vec { - let sketch = self.instance.sketch(HLLType::HLL_4); - // TODO(ovr): Better way? - sketch.serialize().as_ref().iter().copied().collect() - } - - pub fn merge_with(&mut self, other: HLLDataSketch) -> Result<()> { - self.instance.merge(other.instance); - - Ok(()) - } -} +pub use error::DataSketchesError; +pub use imp::{HLLDataSketch, HLLUnionDataSketch}; diff --git a/rust/cubestore/cubedatasketches/src/native.rs b/rust/cubestore/cubedatasketches/src/native.rs new file mode 100644 index 0000000000000..723c9a2f03dea --- /dev/null +++ b/rust/cubestore/cubedatasketches/src/native.rs @@ -0,0 +1,97 @@ +/* + * Copyright 2024 Cube Dev, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub use crate::error::Result; +use std::fmt::{Debug, Formatter}; + +use dsrs::{HLLSketch, HLLType, HLLUnion}; + +pub struct HLLDataSketch { + pub(crate) instance: HLLSketch, +} + +unsafe impl Send for HLLDataSketch {} +unsafe impl Sync for HLLDataSketch {} + +impl Debug for HLLDataSketch { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HLLDataSketch") + .field("instance", &""); + + Ok(()) + } +} + +impl HLLDataSketch { + pub fn read(data: &[u8]) -> Result { + return Ok(Self { + instance: HLLSketch::deserialize(data)?, + }); + } + + pub fn cardinality(&self) -> u64 { + return self.instance.estimate().round() as u64; + } + + pub fn get_lg_config_k(&self) -> u8 { + return self.instance.get_lg_config_k(); + } + + pub fn write(&self) -> Vec { + // TODO(ovr): Better way? + self.instance.serialize().as_ref().iter().copied().collect() + } +} + +pub struct HLLUnionDataSketch { + pub(crate) instance: HLLUnion, +} + +impl Debug for HLLUnionDataSketch { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HLLUnionDataSketch") + .field("instance", &""); + + Ok(()) + } +} + +unsafe impl Send for HLLUnionDataSketch {} +unsafe impl Sync for HLLUnionDataSketch {} + +impl HLLUnionDataSketch { + pub fn new(lg_max_k: u8) -> Result { + Ok(Self { + instance: HLLUnion::new(lg_max_k), + }) + } + + pub fn get_lg_config_k(&self) -> u8 { + return self.instance.get_lg_config_k(); + } + + pub fn write(&self) -> Vec { + let sketch = self.instance.sketch(HLLType::HLL_4); + // TODO(ovr): Better way? + sketch.serialize().as_ref().iter().copied().collect() + } + + pub fn merge_with(&mut self, other: HLLDataSketch) -> Result<()> { + self.instance.merge(other.instance); + + Ok(()) + } +} diff --git a/rust/cubestore/cubedatasketches/src/unsupported.rs b/rust/cubestore/cubedatasketches/src/unsupported.rs new file mode 100644 index 0000000000000..cfc82bd91a711 --- /dev/null +++ b/rust/cubestore/cubedatasketches/src/unsupported.rs @@ -0,0 +1,68 @@ +/* + * Copyright 2024 Cube Dev, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub use crate::error::DataSketchesError; +pub use crate::error::Result; +use std::fmt::Debug; + +#[derive(Debug)] +pub struct HLLDataSketch {} + +unsafe impl Send for HLLDataSketch {} + +unsafe impl Sync for HLLDataSketch {} + +impl HLLDataSketch { + pub fn read(_data: &[u8]) -> Result { + Err(DataSketchesError::new("Not supported on Windows")) + } + + pub fn cardinality(&self) -> u64 { + unimplemented!(); + } + + pub fn get_lg_config_k(&self) -> u8 { + unimplemented!(); + } + + pub fn write(&self) -> Vec { + unimplemented!(); + } +} + +#[derive(Debug)] +pub struct HLLUnionDataSketch {} + +unsafe impl Send for HLLUnionDataSketch {} +unsafe impl Sync for HLLUnionDataSketch {} + +impl HLLUnionDataSketch { + pub fn new(_lg_max_k: u8) -> Result { + Err(DataSketchesError::new("Not supported on Windows")) + } + + pub fn get_lg_config_k(&self) -> u8 { + unimplemented!(); + } + + pub fn write(&self) -> Vec { + unimplemented!(); + } + + pub fn merge_with(&mut self, _other: HLLDataSketch) -> Result<()> { + unimplemented!(); + } +} diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index e41122dd3a8c5..6b77fef39e824 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -71,7 +71,7 @@ impl HllUnion { Hll::Airlift(h) => Ok(Self::Airlift(h)), Hll::ZetaSketch(h) => Ok(Self::ZetaSketch(h)), Hll::DataSketches(h) => { - let mut union = HLLUnionDataSketch::new(h.get_lg_config_k()); + let mut union = HLLUnionDataSketch::new(h.get_lg_config_k())?; union.merge_with(h)?; Ok(Self::DataSketches(union)) From 8cc16e883b1570c72ec455bc681e35f6bb910bbb Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 13 May 2024 14:37:52 +0200 Subject: [PATCH 14/15] chore: fix windows --- rust/cubestore/cubedatasketches/src/error.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/cubestore/cubedatasketches/src/error.rs b/rust/cubestore/cubedatasketches/src/error.rs index bf168a797880c..1459d86f3dbaf 100644 --- a/rust/cubestore/cubedatasketches/src/error.rs +++ b/rust/cubestore/cubedatasketches/src/error.rs @@ -42,6 +42,7 @@ impl From for DataSketchesError { } } +#[cfg(not(target_os = "windows"))] impl From for DataSketchesError { fn from(err: dsrs::DataSketchesError) -> Self { return DataSketchesError::new(err); From 16b1a15137def97c921f06a55ddfa433743ace4c Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 13 May 2024 14:38:12 +0200 Subject: [PATCH 15/15] chore: some tests --- .../cubestore/src/queryplanner/hll.rs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index 6b77fef39e824..32e3f29743baa 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -113,3 +113,31 @@ impl HllUnion { return Ok(()); } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn hll_detect_sketches_success() -> Result<(), CubeError> { + { + let mut sketch = Hll::read(&hex::decode(&"0201070C03080108320B1F05")?)?; + assert!(matches!(sketch, Hll::DataSketches(_))); + assert_eq!(sketch.cardinality(), 1); + } + + { + let mut sketch = Hll::read(&hex::decode(&"0301070C050800010800000064987A05A5456B06E9EBD51A4C116B08307C4E047258E51293723306176C6E06")?)?; + assert!(matches!(sketch, Hll::DataSketches(_))); + assert_eq!(sketch.cardinality(), 8); + } + + { + let mut sketch = Hll::read(&hex::decode(&"0A01070C0008000274799646F46B824000000040101BAD400000000000000000DC0D0000000000000000000600000001000100000000200000000000100100000000000000000000000000000000000000007000000000000000010000030000500000000050000000000002200003202000002000040000000000001003000010000000020000000000030000000000000000010000000000000010002206000000500000000001000000130000000003000000000020000000270000020000002000201000000100000000010001090000020000002000010120000000000000000000000000000010000004000000001000000010000000000000000000700000000000000000000100000001000011000000000000000000000000100000000000000010001000000000001002000000000100000000000000000000000004000000000000000000002000000000011000000000002020000000050000000000000000000010000000000000000001100050000000000220000000000000000000000010300000041000000000000000108000000000104000000001303410010001000002000000000000000000000001002000000002020000030001000013000000000010000000000010020000000005100000000020000000900000000000003100100000000000020000000000000000100000000000000004000200300000001000001110000000000000000000103005010020060000010000000000010000010000010002000000000000000010100000000000010001000003000000000030020110000000000000010020000000000000000100000000000000010003000005002000000400000000000000100200100000000000100000300000000000100000300000000000010200000000002000502000020000000000000000000000303000000000000000000000040000000040000000000020000010001000003010005100002001000000000000000000000000003000000000103000005000012000100000010B00000000000001030020000000000000000000020100002003001000000000002000000000210410000001000050000000000120000000000010000001000202000100000001002000000200001000000100100000050200000000000030001000020000001002402230001000010010010000110200010001100000000100001010000100010000000100000000100040020000106000020000000000000000001001000000000000000101000000010000131000000000000010010000000400000000000000010001000000000000000000000000000000000000001302001000020000000000001300000000000000000000000002000000200000000000004000101000100000000000000300000000020000000000100001000001000000330000000000000000000400000400000000000000003000000000000000000000000000000000100000000100000000000000000001000000000020010000000000020000524000020701000000100000000000001000000000020000000021000002210000030003000000000000000000000000001010000010101100000100030000001101000000031100010000000000000000000000032000010000000000000200010000000000000000020000030010301000000030001000000000000000100000000040000004001002200012020000000000000110000000000020000001000000000000000000000000000000021000022000000000000000100000000000000010000000010000000000000020000000000100001002000000000400000000000203010000000022000000000010000000000004000100000000010000000000000012000000010200000000000002010200000204000000002300000100002010000000000000000000000000200000000002020002000000001000000100000001000000000010000000001000104000400000600001000000100000000000000000003000100000150000010020000031000000000000000000003012002000001000000100000000000300030000000401000000000000000020010000000000000102000000000000000001000000000000000500002000000000300502000000020000010000000000000011000000000000000003000000000000010000000000020000100000100001000000000000000010000000020000001020000000000000000000001000000010002002000000000000300000000020000000000000000000040000000000000010221000000001000006000000000000000000010020010001000000401000000000000030000000000001000000110000000000000120000000000000010000400300010000010100000001002000001000000600001000000000000000000021001000000003000000000000000000002000020000000500000000000010000050040002000000000000000000000000100000000004000000030200000000050000100000000000000300000001020000000000010000000300000000000003000000500000010000000000000000000000000000000300000000000000004000000000000000520000000000000000010000002000200000000300200020000002000000000000002000000000100000000000000050000100111700000010000000000000000000000030001000000002000000000000002100000000000000000000000000000100000004300000000003000000000303001000000300300000002000001000013010000000002000000000000000000010020000400000100000001000100300000000000000105300000000000000000000000000000010303000")?)?; + assert!(matches!(sketch, Hll::DataSketches(_))); + assert_eq!(sketch.cardinality(), 589); + } + + Ok(()) + } +}