Skip to content

Commit

Permalink
feat(cubestore): allow to import base64-encoded bytes in CSV (#1891)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilya-biryukov committed Jan 26, 2021
1 parent 849f3d8 commit 2f43afa
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 13 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/cubestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository = "https://github.com/cube-js/cube.js"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
base64 = "0.13.0"
tokio = { version = "1.0", features = ["full", "rt"] }
warp = "0.2"
sqlparser = "0.7.0"
Expand Down
12 changes: 10 additions & 2 deletions rust/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::config::ConfigObj;
use crate::metastore::is_valid_hll;
use crate::metastore::{Column, ColumnType, ImportFormat, MetaStore};
use crate::sql::timestamp_from_string;
use crate::store::{DataFrame, WALDataStore};
Expand Down Expand Up @@ -106,8 +107,15 @@ impl ImportFormat {
.map(|d| TableValue::Decimal(d.to_string()))
.unwrap_or(TableValue::Null)
}
ColumnType::Bytes => unimplemented!(),
ColumnType::HyperLogLog(_) => unimplemented!(),
ColumnType::Bytes => {
TableValue::Bytes(base64::decode(value)?)
}
ColumnType::HyperLogLog(f) => {
let data = base64::decode(value)?;
is_valid_hll(&data, *f)?;

TableValue::Bytes(data)
}
ColumnType::Timestamp => {
timestamp_from_string(value.as_str())?
}
Expand Down
6 changes: 6 additions & 0 deletions rust/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,9 @@ impl From<cloud_storage::Error> for CubeError {
return CubeError::from_error(v);
}
}

impl From<base64::DecodeError> for CubeError {
fn from(v: base64::DecodeError) -> Self {
return CubeError::from_error(v);
}
}
15 changes: 15 additions & 0 deletions rust/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use arrow::datatypes::{DataType, Field};
use chrono::{DateTime, Utc};
use chunks::ChunkRocksTable;
use core::{fmt, mem};
use cubehll::HllSketch;
use cubezetasketch::HyperLogLogPlusPlus;
use futures::future::join_all;
use futures_timer::Delay;
use index::{IndexRocksIndex, IndexRocksTable};
Expand Down Expand Up @@ -282,6 +284,19 @@ pub enum HllFlavour {
ZetaSketch, // Compatible with BigQuery.
}

pub fn is_valid_hll(data: &[u8], f: HllFlavour) -> Result<(), CubeError> {
// TODO: do no memory allocations for better performance, this is run on hot path.
match f {
HllFlavour::Airlift => {
HllSketch::read(data)?;
}
HllFlavour::ZetaSketch => {
HyperLogLogPlusPlus::read(data)?;
}
}
return Ok(());
}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub enum ColumnType {
String,
Expand Down
15 changes: 4 additions & 11 deletions rust/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use sqlparser::ast::*;
use sqlparser::dialect::Dialect;

use crate::metastore::{
table::Table, HllFlavour, IdRow, ImportFormat, Index, IndexDef, MetaStoreTable, RowKey, Schema,
TableId,
is_valid_hll, table::Table, HllFlavour, IdRow, ImportFormat, Index, IndexDef, MetaStoreTable,
RowKey, Schema, TableId,
};
use crate::table::{Row, TableValue, TimestampValue};
use crate::CubeError;
Expand Down Expand Up @@ -547,15 +547,8 @@ fn decode_byte(s: &str) -> Option<u8> {

fn parse_hyper_log_log(v: &Value, f: HllFlavour) -> Result<Vec<u8>, CubeError> {
let bytes = parse_binary_string(v)?;
// TODO: check without memory allocations. this is run on hot path.
match f {
HllFlavour::Airlift => {
cubehll::HllSketch::read(&bytes)?;
}
HllFlavour::ZetaSketch => {
cubezetasketch::HyperLogLogPlusPlus::read(&bytes)?;
}
}
is_valid_hll(&bytes, f)?;

return Ok(bytes);
}

Expand Down

0 comments on commit 2f43afa

Please sign in to comment.