Skip to content

Commit

Permalink
feat(flow): shared in-memory state for dataflow operator (#3508)
Browse files Browse the repository at this point in the history
* feat: Arrangement shared state

* feat: arrange&tests

* docs: detailed&tests for get

* chore: license

* refactor: opt out ts expr&tests: internal ts

* docs: remove some TODOs

* feat: use smallvec size of 2

* refactor: per review

* chore: per review

* chore: per review

* chore: remove reduant clone

* feat: return max expire time&docs: more explain cur expire config
  • Loading branch information
discord9 committed Mar 19, 2024
1 parent 6415926 commit 2c115bc
Show file tree
Hide file tree
Showing 5 changed files with 611 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Expand Up @@ -25,6 +25,7 @@ num-traits = "0.2"
serde.workspace = true
servers.workspace = true
session.workspace = true
smallvec.workspace = true
snafu.workspace = true
tokio.workspace = true
tonic.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/lib.rs
Expand Up @@ -19,3 +19,4 @@ mod adapter;
mod expr;
mod plan;
mod repr;
mod utils;
97 changes: 75 additions & 22 deletions src/flow/src/repr.rs
Expand Up @@ -31,25 +31,40 @@ pub(crate) use relation::{RelationDesc, RelationType};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;

use crate::expr::error::{CastValueSnafu, EvalError};
use crate::expr::error::{CastValueSnafu, EvalError, InvalidArgumentSnafu};

/// System-wide Record count difference type. Useful for capture data change
///
/// i.e. +1 means insert one record, -1 means remove,
/// and +/-n means insert/remove multiple duplicate records.
pub type Diff = i64;

/// System-wide default timestamp type
/// System-wide default timestamp type, in milliseconds
pub type Timestamp = i64;

/// System-wide default duration type, in milliseconds
pub type Duration = i64;

/// Default type for a repr of changes to a collection.
pub type DiffRow = (Row, Timestamp, Diff);

pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);

/// Convert a value that is or can be converted to Datetime to internal timestamp
pub fn value_to_internal_ts(value: Value) -> Result<Timestamp, EvalError> {
let is_supported_time_type = |arg: &Value| {
let ty = arg.data_type();
matches!(
ty,
ConcreteDataType::Date(..)
| ConcreteDataType::DateTime(..)
| ConcreteDataType::Timestamp(..)
)
};
match value {
Value::DateTime(ts) => Ok(ts.val()),
arg => {
Value::Int64(ts) => Ok(ts),
arg if is_supported_time_type(&arg) => {
let arg_ty = arg.data_type();
let res = cast(arg, &ConcreteDataType::datetime_datatype()).context({
CastValueSnafu {
Expand All @@ -63,6 +78,10 @@ pub fn value_to_internal_ts(value: Value) -> Result<Timestamp, EvalError> {
unreachable!()
}
}
_ => InvalidArgumentSnafu {
reason: format!("Expect a time type or i64, got {:?}", value.data_type()),
}
.fail(),
}
}

Expand Down Expand Up @@ -145,24 +164,58 @@ impl From<Row> for ProtoRow {
ProtoRow { values }
}
}
#[cfg(test)]
mod test {
use common_time::{Date, DateTime};

use super::*;

#[test]
fn test_row() {
let row = Row::empty();
let row_1 = Row::new(vec![]);
assert_eq!(row, row_1);
let mut row_2 = Row::new(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
row_2.clear();
assert_eq!(row_2.get(0), None);
row_2
.packer()
.extend(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
row_2.extend(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.len(), 4);
let row_3 = Row::pack(row_2.into_iter());
assert_eq!(row_3.len(), 4);
let row_4 = Row::pack(row_3.iter().cloned());
assert_eq!(row_3, row_4);
}

#[test]
fn test_cast_to_internal_ts() {
{
let a = Value::from(1i32);
let b = Value::from(1i64);
let c = Value::DateTime(DateTime::new(1i64));
let d = Value::from(1.0);

assert!(value_to_internal_ts(a).is_err());
assert_eq!(value_to_internal_ts(b).unwrap(), 1i64);
assert_eq!(value_to_internal_ts(c).unwrap(), 1i64);
assert!(value_to_internal_ts(d).is_err());
}

#[test]
fn test_row() {
let row = Row::empty();
let row_1 = Row::new(vec![]);
assert_eq!(row, row_1);
let mut row_2 = Row::new(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
row_2.clear();
assert_eq!(row_2.get(0), None);
row_2
.packer()
.extend(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
row_2.extend(vec![Value::Int32(1), Value::Int32(2)]);
assert_eq!(row_2.len(), 4);
let row_3 = Row::pack(row_2.into_iter());
assert_eq!(row_3.len(), 4);
let row_4 = Row::pack(row_3.iter().cloned());
assert_eq!(row_3, row_4);
{
// time related type
let a = Value::Date(Date::new(1));
assert_eq!(value_to_internal_ts(a).unwrap(), 86400 * 1000i64);
let b = Value::Timestamp(common_time::Timestamp::new_second(1));
assert_eq!(value_to_internal_ts(b).unwrap(), 1000i64);
let c = Value::Time(common_time::time::Time::new_second(1));
assert!(matches!(
value_to_internal_ts(c),
Err(EvalError::InvalidArgument { .. })
));
}
}
}

0 comments on commit 2c115bc

Please sign in to comment.