Skip to content

Commit

Permalink
fix(cubestore): switch from string to float in table value (#2175)
Browse files Browse the repository at this point in the history
This significantly improves performance of import and other operations.
  • Loading branch information
ilya-biryukov committed Feb 21, 2021
1 parent e162662 commit 05dc7d2
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 15 deletions.
3 changes: 2 additions & 1 deletion rust/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::store::{ChunkDataStore, DataFrame};
use crate::sys::malloc::trim_allocs;
use crate::table::{Row, TableValue};
use crate::util::maybe_owned::MaybeOwnedStr;
use crate::util::ordfloat::OrdF64;
use crate::CubeError;
use async_compression::tokio::bufread::GzipDecoder;
use async_std::io::SeekFrom;
Expand Down Expand Up @@ -142,7 +143,7 @@ impl ImportFormat {
}
ColumnType::Timestamp => timestamp_from_string(value)?,
ColumnType::Float => {
TableValue::Float(value.parse::<f64>()?.to_string())
TableValue::Float(OrdF64(value.parse::<f64>()?))
}
ColumnType::Boolean => {
TableValue::Boolean(value.to_lowercase() == "true")
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![feature(box_patterns)]
#![feature(slice_internals)]
#![feature(raw)]
#![feature(total_cmp)]
// #![feature(trace_macros)]

// trace_macros!(true);
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ pub fn batch_to_dataframe(batches: &Vec<RecordBatch>) -> Result<DataFrame, CubeE
TableValue::Null
} else {
let decimal = a.value(i) as f64;
TableValue::Float(decimal.to_string())
TableValue::Float(decimal.into())
});
}
}
Expand Down
10 changes: 5 additions & 5 deletions rust/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ fn extract_data(cell: &Expr, column: &Vec<&Column>, i: usize) -> Result<TableVal
},
ColumnType::Float => {
let decimal_val = parse_decimal(cell)?;
TableValue::Float(decimal_val.to_string())
TableValue::Float(decimal_val.into())
}
}
};
Expand Down Expand Up @@ -999,21 +999,21 @@ mod tests {
.await
.unwrap();

assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal("160.61".to_string()), TableValue::Float("5.892".to_string())]));
assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal("160.61".to_string()), TableValue::Float(5.892.into())]));

let result = service
.exec_query("SELECT sum(dec_value), sum(dec_value_1) / 10 from foo.values where dec_value_1 < 10")
.await
.unwrap();

assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal("-132.99".to_string()), TableValue::Float("0.45".to_string())]));
assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal("-132.99".to_string()), TableValue::Float(0.45.into())]));

let result = service
.exec_query("SELECT sum(dec_value), sum(dec_value_1) / 10 from foo.values where dec_value_1 < '10'")
.await
.unwrap();

assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal("-132.99".to_string()), TableValue::Float("0.45".to_string())]));
assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal("-132.99".to_string()), TableValue::Float(0.45.into())]));
})
.await;
}
Expand Down Expand Up @@ -1116,7 +1116,7 @@ mod tests {
"SELECT SUM(decimal_value) FROM foo.decimal_group"
).await.unwrap();

assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Float("7456503871042.786".to_string())])]);
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Float(7456503871042.786.into())])]);
}).await;
}

Expand Down
3 changes: 2 additions & 1 deletion rust/cubestore/src/table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::util::ordfloat::OrdF64;
use crate::CubeError;
use chrono::{SecondsFormat, TimeZone, Utc};
use serde::{Deserialize, Serialize};
Expand All @@ -11,7 +12,7 @@ pub enum TableValue {
String(String),
Int(i64),
Decimal(String), // TODO bincode is incompatible with BigDecimal
Float(String), // TODO Eq
Float(OrdF64),
Bytes(Vec<u8>),
Timestamp(TimestampValue),
Boolean(bool),
Expand Down
15 changes: 8 additions & 7 deletions rust/cubestore/src/table/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl<'a> RowParquetReader<'a> {
for i in 0..values_read {
if levels[i] == 1 {
let value = buffer[cur_value_index];
vec_result[i].push(TableValue::Float(value.to_string()));
vec_result[i].push(TableValue::Float(value.into()));
cur_value_index += 1;
} else {
vec_result[i].push(TableValue::Null);
Expand Down Expand Up @@ -694,24 +694,25 @@ impl RowParquetWriter {
[column_index]
{
TableValue::Float(val) => match column.get_column_type() {
ColumnType::Float => Ok(val.parse::<f64>()?),
ColumnType::Float => Ok(val),
x => panic!("Unexpected type: {:?}", x),
},
x => panic!("Unsupported value: {:?}", x),
}
})
.map(|res_val| {
if res_val.is_err() {
return res_val;
if let Err(e) = res_val {
return Err(e);
}
// We must use OrdF64 here!
let v = res_val.unwrap();
if min.is_none() || v < min.unwrap() {
min = Some(v)
}
if max.is_none() || max.unwrap() < v {
max = Some(v)
}
return Ok(v);
return Ok(v.0);
})
.collect::<Result<Vec<f64>, _>>()?;
let def_levels = self.get_def_levels(
Expand All @@ -725,8 +726,8 @@ impl RowParquetWriter {
&column_values,
def_levels.as_ref().map(|b| b.as_slice()),
None,
&min,
&max,
&min.map(|f| f.0),
&max.map(|f| f.0),
None,
None,
)?;
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod maybe_owned;
pub mod ordfloat;

use crate::CubeError;
use log::error;
Expand Down
39 changes: 39 additions & 0 deletions rust/cubestore/src/util/ordfloat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use serde::{Deserialize, Serialize};
use smallvec::alloc::fmt::Formatter;
use std::cmp::Ordering;
use std::fmt;

#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
#[repr(transparent)]
pub struct OrdF64(pub f64);

impl PartialEq for OrdF64 {
fn eq(&self, other: &Self) -> bool {
return self.cmp(other) == Ordering::Equal;
}
}
impl Eq for OrdF64 {}

impl PartialOrd for OrdF64 {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
return Some(self.cmp(other));
}
}

impl Ord for OrdF64 {
fn cmp(&self, other: &Self) -> Ordering {
return self.0.total_cmp(&other.0);
}
}

impl fmt::Display for OrdF64 {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), fmt::Error> {
self.0.fmt(f)
}
}

impl From<f64> for OrdF64 {
fn from(v: f64) -> Self {
return Self(v);
}
}

0 comments on commit 05dc7d2

Please sign in to comment.