Skip to content

Commit

Permalink
chore(cubestore): non-rolling aggregates in rolling window
Browse files Browse the repository at this point in the history
See DataFusion commit and tests for description and examples.
  • Loading branch information
ilya-biryukov committed Aug 27, 2021
1 parent 3d75d6b commit 61026b2
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 3 deletions.
4 changes: 2 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 128 additions & 0 deletions rust/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
"rolling_window_query_timestamps",
rolling_window_query_timestamps,
),
t(
"rolling_window_extra_aggregate",
rolling_window_extra_aggregate,
),
t("decimal_index", decimal_index),
t("float_index", float_index),
t("date_add", date_add),
Expand Down Expand Up @@ -3093,6 +3097,130 @@ async fn rolling_window_query_timestamps(service: Box<dyn SqlClient>) {
);
}

async fn rolling_window_extra_aggregate(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE TABLE s.Data(day int, name text, n int)")
.await
.unwrap();
service
.exec_query(
"INSERT INTO s.Data(day, name, n) VALUES (1, 'john', 10), \
(1, 'sara', 7), \
(3, 'sara', 3), \
(3, 'john', 9), \
(3, 'john', 11), \
(5, 'timmy', 5)",
)
.await
.unwrap();

let r = service
.exec_query(
"SELECT day, ROLLING(SUM(n) RANGE 1 PRECEDING), SUM(n) \
FROM (SELECT day, SUM(n) as n FROM s.Data GROUP BY 1) \
ROLLING_WINDOW DIMENSION day \
GROUP BY DIMENSION day \
FROM 1 TO 5 EVERY 1 \
ORDER BY 1",
)
.await
.unwrap();
assert_eq!(
to_rows(&r),
rows(&[
(1, 17, Some(17)),
(2, 17, None),
(3, 23, Some(23)),
(4, 23, None),
(5, 5, Some(5))
])
);

// We could also distribute differently.
let r = service
.exec_query(
"SELECT day, ROLLING(SUM(n) RANGE 1 PRECEDING), SUM(n) \
FROM (SELECT day, SUM(n) as n FROM s.Data GROUP BY 1) \
ROLLING_WINDOW DIMENSION day \
GROUP BY DIMENSION CASE WHEN day <= 3 THEN 1 ELSE 5 END \
FROM 1 TO 5 EVERY 1 \
ORDER BY 1",
)
.await
.unwrap();
assert_eq!(
to_rows(&r),
rows(&[
(1, 17, Some(40)),
(2, 17, None),
(3, 23, None),
(4, 23, None),
(5, 5, Some(5))
])
);

// Putting everything into an out-of-range dimension.
let r = service
.exec_query(
"SELECT day, ROLLING(SUM(n) RANGE 1 PRECEDING), SUM(n) \
FROM (SELECT day, SUM(n) as n FROM s.Data GROUP BY 1) \
ROLLING_WINDOW DIMENSION day \
GROUP BY DIMENSION 6 \
FROM 1 TO 5 EVERY 1 \
ORDER BY 1",
)
.await
.unwrap();
assert_eq!(
to_rows(&r),
rows(&[
(1, 17, NULL),
(2, 17, NULL),
(3, 23, NULL),
(4, 23, NULL),
(5, 5, NULL)
])
);

// Check errors.
// Mismatched types.
service
.exec_query(
"SELECT day, ROLLING(SUM(n) RANGE 1 PRECEDING), SUM(n) \
FROM (SELECT day, SUM(n) as n FROM s.Data GROUP BY 1) \
ROLLING_WINDOW DIMENSION day \
GROUP BY DIMENSION 'aaa' \
FROM 1 TO 5 EVERY 1 \
ORDER BY 1",
)
.await
.unwrap_err();
// Aggregate without GROUP BY DIMENSION.
service
.exec_query(
"SELECT day, ROLLING(SUM(n) RANGE 1 PRECEDING), SUM(n) \
FROM (SELECT day, SUM(n) as n FROM s.Data GROUP BY 1) \
ROLLING_WINDOW DIMENSION day \
FROM 1 TO 5 EVERY 1 \
ORDER BY 1",
)
.await
.unwrap_err();
// GROUP BY DIMENSION without aggregates.
service
.exec_query(
"SELECT day, ROLLING(SUM(n) RANGE 1 PRECEDING) \
FROM (SELECT day, SUM(n) as n FROM s.Data GROUP BY 1) \
ROLLING_WINDOW DIMENSION day \
GROUP BY DIMENSION 0 \
FROM 1 TO 5 EVERY 1 \
ORDER BY 1",
)
.await
.unwrap_err();
}

async fn decimal_index(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ base64 = "0.13.0"
bumpalo = "3.6.1"
tokio = { version = "1.0", features = ["full", "rt"] }
warp = { git = 'https://github.com/seanmonstar/warp', version = "0.3.0" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "6008dfab082a3455c54b023be878d92ec9acef43" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "2fcd06f7354e8c85f170b49a08fc018749289a40" }
serde_derive = "1.0.115"
serde = "1.0.115"
serde_bytes = "0.11.5"
Expand Down
11 changes: 11 additions & 0 deletions rust/cubestore/src/queryplanner/serialized_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ pub enum SerializedLogicalPlan {
to: SerializedExpr,
every: SerializedExpr,
rolling_aggs: Vec<SerializedExpr>,
group_by_dimension: Option<SerializedExpr>,
aggs: Vec<SerializedExpr>,
},
}

Expand Down Expand Up @@ -384,6 +386,8 @@ impl SerializedLogicalPlan {
to,
every,
rolling_aggs,
group_by_dimension,
aggs,
} => LogicalPlan::Extension {
node: Arc::new(RollingWindowAggregate {
schema: schema.clone(),
Expand All @@ -394,6 +398,8 @@ impl SerializedLogicalPlan {
every: every.expr(),
partition_by: partition_by.clone(),
rolling_aggs: exprs(&rolling_aggs),
group_by_dimension: group_by_dimension.as_ref().map(|d| d.expr()),
aggs: exprs(&aggs),
}),
},
})
Expand Down Expand Up @@ -802,6 +808,11 @@ impl SerializedPlan {
to: Self::serialized_expr(&r.to),
every: Self::serialized_expr(&r.every),
rolling_aggs: Self::serialized_exprs(&r.rolling_aggs),
group_by_dimension: r
.group_by_dimension
.as_ref()
.map(|d| Self::serialized_expr(d)),
aggs: Self::serialized_exprs(&r.aggs),
}
} else {
panic!("unknown extension");
Expand Down

0 comments on commit 61026b2

Please sign in to comment.