Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): shared in-memory state for dataflow operator #3508

Merged
merged 12 commits into from Mar 19, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Expand Up @@ -25,6 +25,7 @@ num-traits = "0.2"
serde.workspace = true
servers.workspace = true
session.workspace = true
smallvec.workspace = true
snafu.workspace = true
tokio.workspace = true
tonic.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/lib.rs
Expand Up @@ -19,3 +19,4 @@ mod adapter;
mod expr;
mod plan;
mod repr;
mod utils;
89 changes: 68 additions & 21 deletions src/flow/src/repr.rs
Expand Up @@ -31,7 +31,7 @@ pub(crate) use relation::{RelationDesc, RelationType};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;

use crate::expr::error::{CastValueSnafu, EvalError};
use crate::expr::error::{CastValueSnafu, EvalError, InvalidArgumentSnafu};

/// System-wide Record count difference type. Useful for capture data change
///
Expand All @@ -45,11 +45,20 @@ pub type Timestamp = i64;
/// Default type for a repr of changes to a collection.
pub type DiffRow = (Row, Timestamp, Diff);

pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);

/// Convert a value that is or can be converted to Datetime to internal timestamp
pub fn value_to_internal_ts(value: Value) -> Result<Timestamp, EvalError> {
let is_supported_time_type = |arg: &Value| {
let ty = arg.data_type();
matches!(ty, ConcreteDataType::Date(..))
| matches!(ty, ConcreteDataType::DateTime(..))
| matches!(ty, ConcreteDataType::Timestamp(..))
discord9 marked this conversation as resolved.
Show resolved Hide resolved
};
match value {
Value::DateTime(ts) => Ok(ts.val()),
arg => {
Value::Int64(ts) => Ok(ts),
arg if is_supported_time_type(&arg) => {
let arg_ty = arg.data_type();
let res = cast(arg, &ConcreteDataType::datetime_datatype()).context({
CastValueSnafu {
Expand All @@ -63,6 +72,10 @@ pub fn value_to_internal_ts(value: Value) -> Result<Timestamp, EvalError> {
unreachable!()
}
}
_ => InvalidArgumentSnafu {
reason: format!("Expect a time type or i64, got {:?}", value.data_type()),
}
.fail(),
}
}

Expand Down Expand Up @@ -145,24 +158,58 @@ impl From<Row> for ProtoRow {
ProtoRow { values }
}
}
#[cfg(test)]
mod test {
use common_time::{Date, DateTime};

use super::*;

#[test]
fn test_row() {
let row = Row::empty();
let row_1 = Row::new(vec![]);
assert_eq!(row, row_1);
let mut row_2 = Row::new(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
row_2.clear();
assert_eq!(row_2.get(0), None);
row_2
.packer()
.extend(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
row_2.extend(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.len(), 4);
let row_3 = Row::pack(row_2.into_iter());
assert_eq!(row_3.len(), 4);
let row_4 = Row::pack(row_3.iter().cloned());
assert_eq!(row_3, row_4);
}

#[test]
fn test_row() {
let row = Row::empty();
let row_1 = Row::new(vec![]);
assert_eq!(row, row_1);
let mut row_2 = Row::new(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
row_2.clear();
assert_eq!(row_2.get(0), None);
row_2
.packer()
.extend(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
row_2.extend(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.len(), 4);
let row_3 = Row::pack(row_2.into_iter());
assert_eq!(row_3.len(), 4);
let row_4 = Row::pack(row_3.iter().cloned());
assert_eq!(row_3, row_4);
#[test]
fn test_cast_to_internal_ts() {
{
let a = Value::from(1i32);
let b = Value::from(1i64);
let c = Value::DateTime(DateTime::new(1i64));
let d = Value::from(1.0);

assert!(value_to_internal_ts(a).is_err());
assert_eq!(value_to_internal_ts(b).unwrap(), 1i64);
assert_eq!(value_to_internal_ts(c).unwrap(), 1i64);
assert!(value_to_internal_ts(d).is_err());
}

{
// time related type
let a = Value::Date(Date::new(1));
assert_eq!(value_to_internal_ts(a).unwrap(), 86400 * 1000i64);
let b = Value::Timestamp(common_time::Timestamp::new_second(1));
assert_eq!(value_to_internal_ts(b).unwrap(), 1000i64);
let c = Value::Time(common_time::time::Time::new_second(1));
assert!(matches!(
value_to_internal_ts(c),
Err(EvalError::InvalidArgument { .. })
));
}
}
}