diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 03669b14e970a..b2286f9699a81 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -574,9 +574,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.90" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" +checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" dependencies = [ "jobserver", "libc", @@ -678,6 +678,21 @@ dependencies = [ "libloading", ] +[[package]] +name = "clap" +version = "2.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +dependencies = [ + "ansi_term", + "atty", + "bitflags 1.3.2", + "strsim", + "textwrap 0.11.0", + "unicode-width", + "vec_map", +] + [[package]] name = "clap" version = "3.2.23" @@ -687,7 +702,7 @@ dependencies = [ "bitflags 1.3.2", "clap_lex", "indexmap 1.7.0", - "textwrap", + "textwrap 0.16.0", ] [[package]] @@ -732,13 +747,23 @@ dependencies = [ [[package]] name = "cmake" -version = "0.1.46" +version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b858541263efe664aead4a5209a4ae5c5d2811167d4ed4ee0944503f8d2089" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" dependencies = [ "cc", ] +[[package]] +name = "codespan-reporting" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" +dependencies = [ + "termcolor", + "unicode-width", +] + [[package]] name = "colored" version = "2.1.0" @@ -822,7 +847,7 @@ dependencies = [ "atty", "cast", "ciborium", - "clap", + "clap 3.2.23", "criterion-plot", "futures", "itertools 0.10.1", @@ -1001,6 +1026,13 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "cubedatasketches" +version = "0.1.0" +dependencies = [ + "dsrs", +] + [[package]] name = "cubehll" version = "0.1.0" @@ -1057,6 +1089,7 @@ dependencies = [ "cloud-storage", "csv", "ctor", + "cubedatasketches", "cubehll", "cuberockstore", "cuberpc", @@ -1159,6 +1192,50 @@ dependencies = [ "protobuf", ] +[[package]] +name = "cxx" +version = "1.0.122" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb497fad022245b29c2a0351df572e2d67c1046bcef2260ebc022aec81efea82" +dependencies = [ + "cc", + "cxxbridge-flags", + "cxxbridge-macro", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c0c11acd0e63bae27dcd2afced407063312771212b7a823b4fd72d633be30fb" +dependencies = [ + "cc", + "codespan-reporting", + "once_cell", + "proc-macro2", + "quote", + "scratch", + "syn 2.0.58", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.122" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c799a4a846f1c0acb9f36bb9c6272d9b3d9457f3633c7753c6057270df13c" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.122" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928bc249a7e3cd554fd2e8e08a426e9670c50bbfc9a621653cfa9accc9641783" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -1340,6 +1417,20 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" +[[package]] +name = "dsrs" +version = "0.6.1" +source = "git+https://github.com/cube-js/datasketches-rs.git?branch=cubestore#8ccba2f46be111702e553a807a2b0cc3d91be27c" +dependencies = [ + "base64 0.13.0", + "bstr", + "cxx", + "cxx-build", + "memchr", + "structopt", + "thin-dst", +] + [[package]] name = "duct" version = "0.13.6" @@ -2337,6 +2428,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "link-cplusplus" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d240c6f7e1ba3a28b0249f774e6a9dd0175054b52dfbb61b16eb8505c3785c9" +dependencies = [ + "cc", +] + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -3389,6 +3489,30 @@ dependencies = [ "toml", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.107", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -4074,6 +4198,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scratch" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3cf7c11c38cb994f3d40e8a8cde3bbd1f72a435e4c49e85d6553d8312306152" + [[package]] name = "security-framework" version = "2.3.1" @@ -4391,6 +4521,36 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + +[[package]] +name = "structopt" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" +dependencies = [ + "clap 2.34.0", + "lazy_static", + "structopt-derive", +] + +[[package]] +name = "structopt-derive" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.107", +] + [[package]] name = "strum" version = "0.21.0" @@ -4534,12 +4694,36 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + [[package]] name = "textwrap" version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" +[[package]] +name = "thin-dst" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3c46be180f1af9673ebb27bc1235396f61ef6965b3fe0dbb2e624deb604f0e" + [[package]] name = "thiserror" version = "1.0.38" @@ -5041,6 +5225,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.3" diff --git a/rust/cubestore/Cargo.toml b/rust/cubestore/Cargo.toml index 2e925776f955c..7f8199f52b73d 100644 --- a/rust/cubestore/Cargo.toml +++ b/rust/cubestore/Cargo.toml @@ -4,6 +4,7 @@ members = [ "cubestore", "cubestore-sql-tests", "cubehll", + "cubedatasketches", "cubezetasketch", "cuberpc", "cuberockstore", diff --git a/rust/cubestore/Dockerfile b/rust/cubestore/Dockerfile index f2de85b818236..153884120eac5 100644 --- a/rust/cubestore/Dockerfile +++ b/rust/cubestore/Dockerfile @@ -26,6 +26,7 @@ COPY Cargo.lock . COPY cuberockstore cuberockstore COPY cubehll cubehll COPY cubezetasketch cubezetasketch +COPY cubedatasketches cubedatasketches COPY cuberpc cuberpc COPY cubestore-sql-tests cubestore-sql-tests COPY cubestore/Cargo.toml cubestore/Cargo.toml diff --git a/rust/cubestore/cubedatasketches/Cargo.toml b/rust/cubestore/cubedatasketches/Cargo.toml new file mode 100644 index 0000000000000..d32ccadbc0b66 --- /dev/null +++ b/rust/cubestore/cubedatasketches/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "cubedatasketches" +version = "0.1.0" +authors = ["Cube Dev, Inc."] +edition = "2021" +license = "Apache-2.0" +description = "Implementation of HLL from Apache DataSketches" + +[dependencies] + +[target.'cfg(not(target_os = "windows"))'.dependencies] +dsrs = { git = "https://github.com/cube-js/datasketches-rs.git", branch = "cubestore" } diff --git a/rust/cubestore/cubedatasketches/README.md b/rust/cubestore/cubedatasketches/README.md new file mode 100644 index 0000000000000..07dd0c5c77035 --- /dev/null +++ b/rust/cubestore/cubedatasketches/README.md @@ -0,0 +1 @@ +# Overview diff --git a/rust/cubestore/cubedatasketches/src/error.rs b/rust/cubestore/cubedatasketches/src/error.rs new file mode 100644 index 0000000000000..1459d86f3dbaf --- /dev/null +++ b/rust/cubestore/cubedatasketches/src/error.rs @@ -0,0 +1,50 @@ +/* + * Copyright 2024 Cube Dev, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::fmt::{Display, Formatter}; + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub struct DataSketchesError { + pub message: String, +} + +impl Display for DataSketchesError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "DataSketchesError: {}", self.message) + } +} + +impl DataSketchesError { + pub fn new(message: Str) -> Self { + return Self { + message: message.to_string(), + }; + } +} + +impl From for DataSketchesError { + fn from(err: std::io::Error) -> Self { + return DataSketchesError::new(err); + } +} + +#[cfg(not(target_os = "windows"))] +impl From for DataSketchesError { + fn from(err: dsrs::DataSketchesError) -> Self { + return DataSketchesError::new(err); + } +} diff --git a/rust/cubestore/cubedatasketches/src/lib.rs b/rust/cubestore/cubedatasketches/src/lib.rs new file mode 100644 index 0000000000000..aadf5963cbf16 --- /dev/null +++ b/rust/cubestore/cubedatasketches/src/lib.rs @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Cube Dev, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +mod error; + +#[cfg(target_os = "windows")] +#[path = "unsupported.rs"] +mod imp; + +// TODO(ovr): https://github.com/cube-js/datasketches-rs/pull/2 +#[cfg(not(target_os = "windows"))] +#[path = "native.rs"] +mod imp; + +pub use error::DataSketchesError; +pub use imp::{HLLDataSketch, HLLUnionDataSketch}; diff --git a/rust/cubestore/cubedatasketches/src/native.rs b/rust/cubestore/cubedatasketches/src/native.rs new file mode 100644 index 0000000000000..723c9a2f03dea --- /dev/null +++ b/rust/cubestore/cubedatasketches/src/native.rs @@ -0,0 +1,97 @@ +/* + * Copyright 2024 Cube Dev, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub use crate::error::Result; +use std::fmt::{Debug, Formatter}; + +use dsrs::{HLLSketch, HLLType, HLLUnion}; + +pub struct HLLDataSketch { + pub(crate) instance: HLLSketch, +} + +unsafe impl Send for HLLDataSketch {} +unsafe impl Sync for HLLDataSketch {} + +impl Debug for HLLDataSketch { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HLLDataSketch") + .field("instance", &""); + + Ok(()) + } +} + +impl HLLDataSketch { + pub fn read(data: &[u8]) -> Result { + return Ok(Self { + instance: HLLSketch::deserialize(data)?, + }); + } + + pub fn cardinality(&self) -> u64 { + return self.instance.estimate().round() as u64; + } + + pub fn get_lg_config_k(&self) -> u8 { + return self.instance.get_lg_config_k(); + } + + pub fn write(&self) -> Vec { + // TODO(ovr): Better way? + self.instance.serialize().as_ref().iter().copied().collect() + } +} + +pub struct HLLUnionDataSketch { + pub(crate) instance: HLLUnion, +} + +impl Debug for HLLUnionDataSketch { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HLLUnionDataSketch") + .field("instance", &""); + + Ok(()) + } +} + +unsafe impl Send for HLLUnionDataSketch {} +unsafe impl Sync for HLLUnionDataSketch {} + +impl HLLUnionDataSketch { + pub fn new(lg_max_k: u8) -> Result { + Ok(Self { + instance: HLLUnion::new(lg_max_k), + }) + } + + pub fn get_lg_config_k(&self) -> u8 { + return self.instance.get_lg_config_k(); + } + + pub fn write(&self) -> Vec { + let sketch = self.instance.sketch(HLLType::HLL_4); + // TODO(ovr): Better way? + sketch.serialize().as_ref().iter().copied().collect() + } + + pub fn merge_with(&mut self, other: HLLDataSketch) -> Result<()> { + self.instance.merge(other.instance); + + Ok(()) + } +} diff --git a/rust/cubestore/cubedatasketches/src/unsupported.rs b/rust/cubestore/cubedatasketches/src/unsupported.rs new file mode 100644 index 0000000000000..cfc82bd91a711 --- /dev/null +++ b/rust/cubestore/cubedatasketches/src/unsupported.rs @@ -0,0 +1,68 @@ +/* + * Copyright 2024 Cube Dev, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub use crate::error::DataSketchesError; +pub use crate::error::Result; +use std::fmt::Debug; + +#[derive(Debug)] +pub struct HLLDataSketch {} + +unsafe impl Send for HLLDataSketch {} + +unsafe impl Sync for HLLDataSketch {} + +impl HLLDataSketch { + pub fn read(_data: &[u8]) -> Result { + Err(DataSketchesError::new("Not supported on Windows")) + } + + pub fn cardinality(&self) -> u64 { + unimplemented!(); + } + + pub fn get_lg_config_k(&self) -> u8 { + unimplemented!(); + } + + pub fn write(&self) -> Vec { + unimplemented!(); + } +} + +#[derive(Debug)] +pub struct HLLUnionDataSketch {} + +unsafe impl Send for HLLUnionDataSketch {} +unsafe impl Sync for HLLUnionDataSketch {} + +impl HLLUnionDataSketch { + pub fn new(_lg_max_k: u8) -> Result { + Err(DataSketchesError::new("Not supported on Windows")) + } + + pub fn get_lg_config_k(&self) -> u8 { + unimplemented!(); + } + + pub fn write(&self) -> Vec { + unimplemented!(); + } + + pub fn merge_with(&mut self, _other: HLLDataSketch) -> Result<()> { + unimplemented!(); + } +} diff --git a/rust/cubestore/cubehll/src/error.rs b/rust/cubestore/cubehll/src/error.rs index 4c77c3d5077e0..428a00639ed0d 100644 --- a/rust/cubestore/cubehll/src/error.rs +++ b/rust/cubestore/cubehll/src/error.rs @@ -8,7 +8,7 @@ pub struct HllError { impl Display for HllError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}", self.message) + write!(f, "HllError: {}", self.message) } } diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index aa76aa2405782..8d918cd7926c8 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -133,6 +133,11 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> { t("hyperloglog_inplace_group_by", hyperloglog_inplace_group_by), t("hyperloglog_postgres", hyperloglog_postgres), t("hyperloglog_snowflake", hyperloglog_snowflake), + t("hyperloglog_databricks", hyperloglog_databricks), + t( + "aggregate_index_hll_databricks", + aggregate_index_hll_databricks, + ), t("physical_plan_flags", physical_plan_flags), t("planning_inplace_aggregate", planning_inplace_aggregate), t("planning_hints", planning_hints), @@ -2775,6 +2780,76 @@ async fn hyperloglog_snowflake(service: Box) { .unwrap_err(); } +async fn hyperloglog_databricks(service: Box) { + service.exec_query("CREATE SCHEMA s").await.unwrap(); + service + .exec_query("CREATE TABLE s.hlls(id int, hll HLL_DATASKETCHES)") + .await + .unwrap(); + + service.exec_query("INSERT INTO s.hlls(id, hll) VALUES \ + (1, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000'), \ + (2, X'0201070c03000408c39b17c29a0ac383c2b03804067365047b65c3a60800000000000000000000000000000000'), \ + (3, X'0301070c05000009140000000000000021c3b23905c2a1c38d490ac283c2b711071bc2a1c3961200000000000000000000000008c29bc39904497ac39908000000002bc3b2c3bb062c45670ac3adc29e24074bc298c2a6086f2c7f050000000000000000c392c295c2900dc3b3c28bc38106c38dc3884607c2b50dc3b70600000000c3b762c28207c398c393350f00000000000000001b27c2b20b00000000c29dc28a7210000000003fc3b95b0f')" + ).await.unwrap(); + + let r = service + .exec_query("SELECT id, cardinality(hll) FROM s.hlls ORDER BY id") + .await + .unwrap(); + assert_eq!(to_rows(&r), rows(&[(1, 4), (2, 4), (3, 20)])); +} + +async fn aggregate_index_hll_databricks(service: Box) { + service.exec_query("CREATE SCHEMA s").await.unwrap(); + service + .exec_query( + "CREATE TABLE s.Orders(a int, b int, a_hll HLL_DATASKETCHES) + AGGREGATIONS(merge(a_hll)) + AGGREGATE INDEX aggr_index (a, b) + ", + ) + .await + .unwrap(); + service + .exec_query( + "INSERT INTO s.Orders (a, b, a_hll) VALUES \ + (1, 10, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000'), \ + (1, 20, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000'), \ + (1, 10, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000'), \ + (1, 20, X'0201070c03000408067365047b65c3a608c39b17c29a0ac383c2b0380400000000000000000000000000000000') + ", + ) + .await + .unwrap(); + + let res = service + .exec_query("SELECT a, b, cardinality(merge(a_hll)) as hll FROM s.Orders GROUP BY 1, 2 ORDER BY 1, 2") + .await + .unwrap(); + assert_eq!( + to_rows(&res), + [ + [TableValue::Int(1), TableValue::Int(10), TableValue::Int(4)], + [TableValue::Int(1), TableValue::Int(20), TableValue::Int(4)], + ] + ); + + let res = service + .exec_query("SELECT a, cardinality(merge(a_hll)) as hll FROM s.Orders WHERE b = 20 GROUP BY 1 ORDER BY 1") + .await + .unwrap(); + assert_eq!(to_rows(&res), [[TableValue::Int(1), TableValue::Int(4)],]); + + let res = service + .exec_query( + "SELECT a, cardinality(merge(a_hll)) as hll FROM s.Orders GROUP BY 1 ORDER BY 1", + ) + .await + .unwrap(); + assert_eq!(to_rows(&res), [[TableValue::Int(1), TableValue::Int(4)],]); +} + async fn physical_plan_flags(service: Box) { service.exec_query("CREATE SCHEMA s").await.unwrap(); service diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index f29a93b6c7a96..aeb18250c81b4 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -26,6 +26,7 @@ serde_bytes = "0.11.5" cuberockstore = { path = "../cuberockstore" } cubehll = { path = "../cubehll" } cubezetasketch = { path = "../cubezetasketch" } +cubedatasketches = { path = "../cubedatasketches" } cuberpc = { path = "../cuberpc" } parquet = { git = "https://github.com/cube-js/arrow-rs", branch = "cube", features = ["arrow"] } arrow = { git = "https://github.com/cube-js/arrow-rs", branch = "cube" } diff --git a/rust/cubestore/cubestore/src/import/mod.rs b/rust/cubestore/cubestore/src/import/mod.rs index 35340756fe60b..7673a8a9847bf 100644 --- a/rust/cubestore/cubestore/src/import/mod.rs +++ b/rust/cubestore/cubestore/src/import/mod.rs @@ -43,6 +43,7 @@ use crate::util::decimal::{Decimal, Decimal96}; use crate::util::int96::Int96; use crate::util::maybe_owned::MaybeOwnedStr; use crate::CubeError; +use cubedatasketches::HLLDataSketch; use datafusion::cube_ext::ordfloat::OrdF64; use tokio::time::{sleep, Duration}; @@ -210,6 +211,11 @@ impl ImportFormat { is_valid_plain_binary_hll(&data, *f)?; TableValue::Bytes(data) } + ColumnType::HyperLogLog(HllFlavour::DataSketches) => { + let data = parse_binary_data(value)?; + let hll = HLLDataSketch::read(&data)?; + TableValue::Bytes(hll.write()) + } ColumnType::Timestamp => TableValue::Timestamp(timestamp_from_string(value)?), ColumnType::Float => TableValue::Float(OrdF64(value.parse::()?)), ColumnType::Boolean => { diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs index 015fba8235226..63d5fd55948fa 100644 --- a/rust/cubestore/cubestore/src/lib.rs +++ b/rust/cubestore/cubestore/src/lib.rs @@ -443,6 +443,12 @@ impl From for CubeError { } } +impl From for CubeError { + fn from(v: cubedatasketches::DataSketchesError) -> Self { + return CubeError::from_error(v); + } +} + impl From for CubeError { fn from(v: ZetaError) -> Self { return CubeError::from_error(v); diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 2e915ae3f281c..96a7d2237532d 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -88,6 +88,7 @@ use crate::cachestore::{ CacheItem, QueueItem, QueueItemPayload, QueueItemStatus, QueueResult, QueueResultAckEvent, }; use crate::remotefs::LocalDirRemoteFs; +use cubedatasketches::HLLDataSketch; use deepsize::DeepSizeOf; use snapshot_info::SnapshotInfo; use std::time::{Duration, SystemTime}; @@ -342,10 +343,11 @@ impl DataFrameValue for Option> { #[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, DeepSizeOf)] pub enum HllFlavour { - Airlift, // Compatible with Presto, Athena, etc. - Snowflake, // Same storage as Airlift, imports from Snowflake JSON. - Postgres, // Same storage as Airlift, imports from HLL Storage Specification. - ZetaSketch, // Compatible with BigQuery. + Airlift, // Compatible with Presto, Athena, etc. + Snowflake, // Same storage as Airlift, imports from Snowflake JSON. + Postgres, // Same storage as Airlift, imports from HLL Storage Specification. + ZetaSketch, // Compatible with BigQuery. + DataSketches, // Compatible with DataBricks. } pub fn is_valid_plain_binary_hll(data: &[u8], f: HllFlavour) -> Result<(), CubeError> { @@ -360,6 +362,9 @@ pub fn is_valid_plain_binary_hll(data: &[u8], f: HllFlavour) -> Result<(), CubeE HllFlavour::Postgres | HllFlavour::Snowflake => { panic!("string formats should be handled separately") } + HllFlavour::DataSketches => { + HLLDataSketch::read(data)?; + } } return Ok(()); } @@ -391,6 +396,7 @@ impl Display for ColumnType { ColumnType::HyperLogLog(HllFlavour::ZetaSketch) => "hyperloglogpp", ColumnType::HyperLogLog(HllFlavour::Postgres) => "hll_postgres", ColumnType::HyperLogLog(HllFlavour::Snowflake) => "hll_snowflake", + ColumnType::HyperLogLog(HllFlavour::DataSketches) => "hll_datasketches", ColumnType::Timestamp => "timestamp", ColumnType::Float => "float", ColumnType::Boolean => "boolean", @@ -436,6 +442,7 @@ impl ColumnType { "hyperloglogpp" => Ok(ColumnType::HyperLogLog(HllFlavour::ZetaSketch)), "hll_postgres" => Ok(ColumnType::HyperLogLog(HllFlavour::Postgres)), "hll_snowflake" => Ok(ColumnType::HyperLogLog(HllFlavour::Snowflake)), + "hll_datasketches" => Ok(ColumnType::HyperLogLog(HllFlavour::DataSketches)), "timestamp" => Ok(ColumnType::Timestamp), "float" => Ok(ColumnType::Float), "boolean" => Ok(ColumnType::Boolean), @@ -597,6 +604,7 @@ impl fmt::Display for Column { ColumnType::HyperLogLog(HllFlavour::ZetaSketch) => "HYPERLOGLOGPP".to_string(), ColumnType::HyperLogLog(HllFlavour::Postgres) => "HLL_POSTGRES".to_string(), ColumnType::HyperLogLog(HllFlavour::Snowflake) => "HLL_SNOWFLAKE".to_string(), + ColumnType::HyperLogLog(HllFlavour::DataSketches) => "HLL_DATASKETCHES".to_string(), ColumnType::Float => "FLOAT".to_string(), }; f.write_fmt(format_args!("{} {}", self.name, column_type)) diff --git a/rust/cubestore/cubestore/src/queryplanner/hll.rs b/rust/cubestore/cubestore/src/queryplanner/hll.rs index 14fc0cb0721d3..32e3f29743baa 100644 --- a/rust/cubestore/cubestore/src/queryplanner/hll.rs +++ b/rust/cubestore/cubestore/src/queryplanner/hll.rs @@ -1,4 +1,5 @@ use crate::CubeError; +use cubedatasketches::{HLLDataSketch, HLLUnionDataSketch}; use cubehll::HllSketch; use cubezetasketch::HyperLogLogPlusPlus; @@ -6,8 +7,15 @@ use cubezetasketch::HyperLogLogPlusPlus; pub enum Hll { Airlift(HllSketch), // Compatible with Athena, Presto, etc. ZetaSketch(HyperLogLogPlusPlus), // Compatible with BigQuery. + DataSketches(HLLDataSketch), // Compatible with DataBricks } +const DS_LIST_PREINTS: u8 = 2; +const DS_HASH_SET_PREINTS: u8 = 3; +const DS_HLL_PREINTS: u8 = 10; +const DS_SER_VER: u8 = 1; +const DS_FAMILY_ID: u8 = 7; + impl Hll { pub fn read(data: &[u8]) -> Result { if data.is_empty() { @@ -15,10 +23,18 @@ impl Hll { "invalid serialized HLL (empty data)".to_string(), )); } - // The first byte: + // - must larger than 3 due to how protos are encoded in ZetaSketch. // - represents the data format version and is <= 3 in AirLift. - if data[0] <= 3 { + // - checking first 3 bytes for figure out HLL from Apache DataSketches + if (data[0] == DS_LIST_PREINTS + || data[0] == DS_HASH_SET_PREINTS + || data[0] == DS_HLL_PREINTS) + && data[1] == DS_SER_VER + && data[2] == DS_FAMILY_ID + { + return Ok(Hll::DataSketches(HLLDataSketch::read(data)?)); + } else if data[0] <= 3 { return Ok(Hll::Airlift(HllSketch::read(data)?)); } else { return Ok(Hll::ZetaSketch(HyperLogLogPlusPlus::read(data)?)); @@ -27,16 +43,9 @@ impl Hll { pub fn write(&self) -> Vec { match self { - Hll::Airlift(h) => h.write(), - Hll::ZetaSketch(h) => h.write(), - } - } - - pub fn is_compatible(&self, other: &Hll) -> bool { - match (self, other) { - (Hll::Airlift(l), Hll::Airlift(r)) => l.index_bit_len() == r.index_bit_len(), - (Hll::ZetaSketch(l), Hll::ZetaSketch(r)) => l.is_compatible(r), - _ => return false, + Self::Airlift(h) => h.write(), + Self::ZetaSketch(h) => h.write(), + Self::DataSketches(h) => h.write(), } } @@ -44,18 +53,91 @@ impl Hll { match self { Hll::Airlift(h) => h.cardinality(), Hll::ZetaSketch(h) => h.cardinality(), + Hll::DataSketches(h) => h.cardinality(), + } + } +} + +#[derive(Debug)] +pub enum HllUnion { + Airlift(HllSketch), + ZetaSketch(HyperLogLogPlusPlus), + DataSketches(HLLUnionDataSketch), +} + +impl HllUnion { + pub fn new(hll: Hll) -> Result { + match hll { + Hll::Airlift(h) => Ok(Self::Airlift(h)), + Hll::ZetaSketch(h) => Ok(Self::ZetaSketch(h)), + Hll::DataSketches(h) => { + let mut union = HLLUnionDataSketch::new(h.get_lg_config_k())?; + union.merge_with(h)?; + + Ok(Self::DataSketches(union)) + } + } + } + + pub fn write(&self) -> Vec { + match self { + Self::Airlift(h) => h.write(), + Self::ZetaSketch(h) => h.write(), + Self::DataSketches(h) => h.write(), + } + } + + pub fn is_compatible(&self, other: &Hll) -> bool { + match (self, other) { + (Self::Airlift(l), Hll::Airlift(r)) => l.index_bit_len() == r.index_bit_len(), + (Self::ZetaSketch(l), Hll::ZetaSketch(r)) => l.is_compatible(r), + (Self::DataSketches(l), Hll::DataSketches(r)) => { + l.get_lg_config_k() == r.get_lg_config_k() + } + _ => return false, } } /// Clients are responsible for calling `is_compatible` before running this function. /// On error, `self` may end up in inconsistent state and must be discarded. - pub fn merge_with(&mut self, other: &Hll) -> Result<(), CubeError> { - debug_assert!(self.is_compatible(other)); + pub fn merge_with(&mut self, other: Hll) -> Result<(), CubeError> { + debug_assert!(self.is_compatible(&other)); + match (self, other) { - (Hll::Airlift(l), Hll::Airlift(r)) => l.merge_with(r), - (Hll::ZetaSketch(l), Hll::ZetaSketch(r)) => l.merge_with(r)?, - _ => panic!("incompatible HLL types"), + (Self::Airlift(l), Hll::Airlift(r)) => l.merge_with(&r), + (Self::ZetaSketch(l), Hll::ZetaSketch(r)) => l.merge_with(&r)?, + (Self::DataSketches(l), Hll::DataSketches(r)) => l.merge_with(r)?, + _ => return Err(CubeError::internal("incompatible HLL types".to_string())), } + return Ok(()); } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn hll_detect_sketches_success() -> Result<(), CubeError> { + { + let mut sketch = Hll::read(&hex::decode(&"0201070C03080108320B1F05")?)?; + assert!(matches!(sketch, Hll::DataSketches(_))); + assert_eq!(sketch.cardinality(), 1); + } + + { + let mut sketch = Hll::read(&hex::decode(&"0301070C050800010800000064987A05A5456B06E9EBD51A4C116B08307C4E047258E51293723306176C6E06")?)?; + assert!(matches!(sketch, Hll::DataSketches(_))); + assert_eq!(sketch.cardinality(), 8); + } + + { + let mut sketch = Hll::read(&hex::decodeassert!(matches!(sketch, Hll::DataSketches(_))); + assert_eq!(sketch.cardinality(), 589); + } + + Ok(()) + } +} diff --git a/rust/cubestore/cubestore/src/queryplanner/udfs.rs b/rust/cubestore/cubestore/src/queryplanner/udfs.rs index 95a16bfd960f3..017b996c70fd9 100644 --- a/rust/cubestore/cubestore/src/queryplanner/udfs.rs +++ b/rust/cubestore/cubestore/src/queryplanner/udfs.rs @@ -1,5 +1,5 @@ use crate::queryplanner::coalesce::{coalesce, SUPPORTED_COALESCE_TYPES}; -use crate::queryplanner::hll::Hll; +use crate::queryplanner::hll::{Hll, HllUnion}; use crate::CubeError; use arrow::array::{Array, BinaryArray, TimestampNanosecondArray, UInt64Builder}; use arrow::datatypes::{DataType, IntervalUnit, TimeUnit}; @@ -353,7 +353,7 @@ impl CubeAggregateUDF for HllMergeUDF { struct HllMergeAccumulator { // TODO: store sketch for empty set from the start. // this requires storing index_bit_len in the type. - acc: Option, + acc: Option, } impl Accumulator for HllMergeAccumulator { @@ -421,7 +421,7 @@ impl Accumulator for HllMergeAccumulator { impl HllMergeAccumulator { fn merge_sketch(&mut self, s: Hll) -> Result<(), DataFusionError> { if self.acc.is_none() { - self.acc = Some(s); + self.acc = Some(HllUnion::new(s)?); return Ok(()); } else if let Some(acc_s) = &mut self.acc { if !acc_s.is_compatible(&s) { @@ -430,7 +430,7 @@ impl HllMergeAccumulator { ) .into()); } - acc_s.merge_with(&s)?; + acc_s.merge_with(s)?; } else { unreachable!("impossible"); } diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 3895d521d8761..d795b2b60b906 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -1263,6 +1263,11 @@ fn parse_hyper_log_log<'a>( is_valid_plain_binary_hll(bytes, f)?; Ok(bytes) } + HllFlavour::DataSketches => { + let bytes = parse_binary_string(buffer, v)?; + is_valid_plain_binary_hll(bytes, f)?; + Ok(bytes) + } } } diff --git a/rust/cubestore/cubestore/src/sql/table_creator.rs b/rust/cubestore/cubestore/src/sql/table_creator.rs index 9c4a743daa1bc..a39db27f3b9f9 100644 --- a/rust/cubestore/cubestore/src/sql/table_creator.rs +++ b/rust/cubestore/cubestore/src/sql/table_creator.rs @@ -576,6 +576,7 @@ pub fn convert_columns_type(columns: &Vec) -> Result, Cub "hyperloglogpp" => ColumnType::HyperLogLog(HllFlavour::ZetaSketch), "hll_snowflake" => ColumnType::HyperLogLog(HllFlavour::Snowflake), "hll_postgres" => ColumnType::HyperLogLog(HllFlavour::Postgres), + "hll_datasketches" => ColumnType::HyperLogLog(HllFlavour::DataSketches), _ => { return Err(CubeError::user(format!( "Custom type '{}' is not supported", diff --git a/rust/cubestore/cubezetasketch/src/error.rs b/rust/cubestore/cubezetasketch/src/error.rs index cfa34371de714..988c94c068789 100644 --- a/rust/cubestore/cubezetasketch/src/error.rs +++ b/rust/cubestore/cubezetasketch/src/error.rs @@ -26,7 +26,7 @@ pub struct ZetaError { impl Display for ZetaError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}", self.message) + write!(f, "ZetaError: {}", self.message) } } @@ -46,7 +46,7 @@ impl From for ZetaError { impl From for ZetaError { fn from(err: ProtobufError) -> Self { - return ZetaError::new(err); + return ZetaError::new(format!("Protobuf: {}", err)); } }