Skip to content

Commit

Permalink
feat(cubestore): Improve support for the binary data type (#1759)
Browse files Browse the repository at this point in the history
* fix(cubestore): improve the binary type support

- allow queries to return binary data.
- support conversion from string and hexstring literals (e.g. X'deadbeef').
- fix errors when using binary type in computations.
- add a simple test.

This is a preparatory step for the upcoming HyperLogLog UDFs.

* fix(cubestore): output binary values in hex encoding

This matches MySQL behavior and is more practical.
  • Loading branch information
ilya-biryukov committed Jan 14, 2021
1 parent baa769f commit 925f813
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 17 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/cubestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ ipc-channel = "0.14.1"
reqwest = { version = "0.10.8", features = ["json", "rustls-tls"], default-features = false }
nanoid = "0.3.0"
rand = "0.8.0"
parquet-format = "=2.6.1"
parquet-format = "=2.6.1"
hex = "0.4.2"
6 changes: 6 additions & 0 deletions rust/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,9 @@ impl From<std::num::ParseFloatError> for CubeError {
CubeError::from_error(v)
}
}

impl From<hex::FromHexError> for CubeError {
fn from(v: hex::FromHexError) -> Self {
CubeError::from_error(v)
}
}
2 changes: 1 addition & 1 deletion rust/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl From<&Column> for parquet::schema::types::Type {
}
crate::metastore::ColumnType::Bytes => {
types::Type::primitive_type_builder(&column.get_name(), Type::BYTE_ARRAY)
.with_logical_type(LogicalType::LIST)
.with_logical_type(LogicalType::NONE)
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap()
Expand Down
7 changes: 5 additions & 2 deletions rust/cubestore/src/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use crate::sql::SqlService;
use crate::table::TableValue;
use crate::{metastore, CubeError};
use async_trait::async_trait;
use hex::ToHex;
use itertools::Itertools;
use log::{error, info, warn};
use msql_srv::*;
use std::io;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::net::TcpListener;
use itertools::Itertools;

struct Backend {
sql_service: Arc<dyn SqlService>,
Expand Down Expand Up @@ -83,7 +84,9 @@ impl<W: io::Write + Send> AsyncMysqlShim<W> for Backend {
TableValue::Decimal(v) => rw.write_col(v.to_string())?,
TableValue::Boolean(v) => rw.write_col(v.to_string())?,
TableValue::Float(v) => rw.write_col(v.to_string())?,
TableValue::Bytes(b) => rw.write_col(b.iter().map(|v| v.to_string()).join(" "))?,
TableValue::Bytes(b) => {
rw.write_col(format!("0x{}", b.encode_hex_upper::<String>()))?
}
TableValue::Null => rw.write_col(Option::<String>::None)?,
}
}
Expand Down
25 changes: 19 additions & 6 deletions rust/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::store::DataFrame;
use crate::table::{Row, TableValue, TimestampValue};
use crate::CubeError;
use arrow::array::{
Array, BooleanArray, Float64Array, Int64Array, Int64Decimal0Array, Int64Decimal10Array,
Int64Decimal1Array, Int64Decimal2Array, Int64Decimal3Array, Int64Decimal4Array,
Int64Decimal5Array, StringArray, TimestampMicrosecondArray, TimestampNanosecondArray,
UInt64Array,
Array, BinaryArray, BooleanArray, Float64Array, Int64Array, Int64Decimal0Array,
Int64Decimal10Array, Int64Decimal1Array, Int64Decimal2Array, Int64Decimal3Array,
Int64Decimal4Array, Int64Decimal5Array, StringArray, TimestampMicrosecondArray,
TimestampNanosecondArray, UInt64Array,
};
use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
use arrow::ipc::reader::StreamReader;
Expand Down Expand Up @@ -712,6 +712,15 @@ impl TableProvider for CubeTable {
}
}

macro_rules! convert_array_cast_native {
($V: expr, (Vec<u8>)) => {{
$V.to_vec()
}};
($V: expr, $T: ty) => {{
$V as $T
}};
}

macro_rules! convert_array {
($ARRAY:expr, $NUM_ROWS:expr, $ROWS:expr, $ARRAY_TYPE: ident, Decimal, $SCALE: expr, $CUT_TRAILING_ZEROS: expr) => {{
let a = $ARRAY.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
Expand All @@ -728,13 +737,13 @@ macro_rules! convert_array {
});
}
}};
($ARRAY:expr, $NUM_ROWS:expr, $ROWS:expr, $ARRAY_TYPE: ident, $TABLE_TYPE: ident, $NATIVE: ty) => {{
($ARRAY:expr, $NUM_ROWS:expr, $ROWS:expr, $ARRAY_TYPE: ident, $TABLE_TYPE: ident, $NATIVE: tt) => {{
let a = $ARRAY.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
for i in 0..$NUM_ROWS {
$ROWS[i].push(if a.is_null(i) {
TableValue::Null
} else {
TableValue::$TABLE_TYPE(a.value(i) as $NATIVE)
TableValue::$TABLE_TYPE(convert_array_cast_native!(a.value(i), $NATIVE))
});
}
}};
Expand Down Expand Up @@ -874,6 +883,9 @@ pub fn batch_to_dataframe(batches: &Vec<RecordBatch>) -> Result<DataFrame, CubeE
});
}
}
DataType::Binary => {
convert_array!(array, num_rows, rows, BinaryArray, Bytes, (Vec<u8>))
}
DataType::Utf8 => {
let a = array.as_any().downcast_ref::<StringArray>().unwrap();
for i in 0..num_rows {
Expand Down Expand Up @@ -904,6 +916,7 @@ pub fn batch_to_dataframe(batches: &Vec<RecordBatch>) -> Result<DataFrame, CubeE

pub fn arrow_to_column_type(arrow_type: DataType) -> Result<ColumnType, CubeError> {
match arrow_type {
DataType::Binary => Ok(ColumnType::Bytes),
DataType::Utf8 | DataType::LargeUtf8 => Ok(ColumnType::String),
DataType::Timestamp(_, _) => Ok(ColumnType::Timestamp),
DataType::Float16 | DataType::Float64 => Ok(ColumnType::Float),
Expand Down
63 changes: 56 additions & 7 deletions rust/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::sql::parser::CubeStoreParser;
use datafusion::physical_plan::datetime_expressions::string_to_timestamp_nanos;
use datafusion::sql::parser::Statement as DFStatement;
use parser::Statement as CubeStoreStatement;
use hex::FromHex;

#[async_trait]
pub trait SqlService: Send + Sync {
Expand Down Expand Up @@ -506,6 +507,18 @@ fn parse_chunk(chunk: &[Vec<Expr>], column: &Vec<&Column>) -> Result<DataFrame,
))
}

fn parse_binary_string(v: &Value) -> Result<Vec<u8>, CubeError> {
match v {
// TODO: unescape the string
Value::SingleQuotedString(s) | Value::Number(s) => Ok(s.as_bytes().to_vec()),
Value::HexStringLiteral(s) => Ok(Vec::from_hex(s.as_bytes())?),
_ => Err(CubeError::user(format!(
"cannot convert value to binary string: {}",
v
))),
}
}

fn extract_data(cell: &Expr, column: &Vec<&Column>, i: usize) -> Result<TableValue, CubeError> {
if let Expr::Value(Value::Null) = cell {
return Ok(TableValue::Null);
Expand Down Expand Up @@ -557,16 +570,13 @@ fn extract_data(cell: &Expr, column: &Vec<&Column>, i: usize) -> Result<TableVal
}
ColumnType::Bytes => {
// TODO What we need to do with Bytes, now it just convert each element of string to u8 item of Vec<u8>
let val = if let Expr::Value(Value::Number(v)) = cell {
v
let val;
if let Expr::Value(v) = cell {
val = parse_binary_string(v)
} else {
return Err(CubeError::user("Corrupted data in query.".to_string()));
};
let main_vec: Vec<u8> = val
.split("") // split string into words by whitespace
.filter_map(|w| w.parse::<u8>().ok()) // calling ok() turns Result to Option so that filter_map can discard None values
.collect();
TableValue::Bytes(main_vec)
return Ok(TableValue::Bytes(val?));
}
ColumnType::Timestamp => match cell {
Expr::Value(Value::SingleQuotedString(v)) => {
Expand Down Expand Up @@ -1770,6 +1780,45 @@ mod tests {
}).await;
}

#[tokio::test]
async fn bytes() {
Config::run_test("bytes", async move |services| {
let service = services.sql_service;

let _ = service
.exec_query("CREATE SCHEMA IF NOT EXISTS s")
.await
.unwrap();
let _ = service
.exec_query("CREATE TABLE s.Bytes (id int, data bytea)")
.await
.unwrap();
let _ = service
.exec_query(
"INSERT INTO s.Bytes(id, data) VALUES (1, '123'), (2, X'deADbeef'), (3, 456)",
)
.await
.unwrap();

let result = service.exec_query("SELECT * from s.Bytes").await.unwrap();
let r = result.get_rows();
assert_eq!(r.len(), 3);
assert_eq!(
r[0].values()[1],
TableValue::Bytes("123".as_bytes().to_vec())
);
assert_eq!(
r[1].values()[1],
TableValue::Bytes(vec![0xde, 0xad, 0xbe, 0xef])
);
assert_eq!(
r[2].values()[1],
TableValue::Bytes("456".as_bytes().to_vec())
);
})
.await;
}

#[tokio::test]
async fn compaction() {
Config::test("compaction").update_config(|mut config| {
Expand Down

0 comments on commit 925f813

Please sign in to comment.