Skip to content

Commit 58a8cb4

Browse files
committed
fix(cubestore): 'unsorted data' assertion with high-precision timestamps
CubeStore used to truncate timestamps to millisecond precision when writing to parquet, but sort the data with nanosecond precision. This led to 'unsorted data in merge' assertions. Ensure we truncate before we sort the data. Increasing the storage precision is another option, but that involves backward and forward compatibility issues and requires more planning. So stick with the current behavior for now. If you see 'unmerged data' assertion in the logs, you have to manually drop the tables where this happens, e.g. by rebuilding the rollups in CubeJS.
1 parent 9c0d4fe commit 58a8cb4

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

rust/cubestore-sql-tests/src/tests.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
101101
t("now", now),
102102
t("dump", dump),
103103
t("unsorted_merge_assertion", unsorted_merge_assertion),
104+
t("unsorted_data_timestamps", unsorted_data_timestamps),
104105
];
105106

106107
fn t<F>(name: &'static str, f: fn(Box<dyn SqlClient>) -> F) -> (&'static str, TestFn)
@@ -3332,6 +3333,42 @@ async fn unsorted_merge_assertion(service: Box<dyn SqlClient>) {
33323333
assert_eq!(to_rows(&r), rows(&[(3, 2, 2), (2, 3, 2), (1, 4, 2)]));
33333334
}
33343335

3336+
async fn unsorted_data_timestamps(service: Box<dyn SqlClient>) {
3337+
service.exec_query("CREATE SCHEMA s").await.unwrap();
3338+
service
3339+
.exec_query("CREATE TABLE s.data(t timestamp, n string)")
3340+
.await
3341+
.unwrap();
3342+
service
3343+
.exec_query(
3344+
"INSERT INTO s.data(t, n) VALUES \
3345+
('2020-01-01T00:00:00.000000005Z', 'a'), \
3346+
('2020-01-01T00:00:00.000000001Z', 'b'), \
3347+
('2020-01-01T00:00:00.000000002Z', 'c')",
3348+
)
3349+
.await
3350+
.unwrap();
3351+
3352+
// CubeStore currently truncs timestamps to millisecond precision.
3353+
// This checks we sort trunced precisions on inserts. We rely on implementation details of
3354+
// CubeStore here.
3355+
let r = service.exec_query("SELECT t, n FROM s.data").await.unwrap();
3356+
3357+
let t = timestamp_from_string("2020-01-01T00:00:00Z").unwrap();
3358+
assert_eq!(to_rows(&r), rows(&[(t, "a"), (t, "b"), (t, "c")]));
3359+
3360+
// This ends up using MergeSortExec, make sure we see no assertions.
3361+
let r = service
3362+
.exec_query(
3363+
"SELECT t, n FROM (SELECT * FROM s.data UNION ALL SELECT * FROM s.data) data \
3364+
GROUP BY 1, 2 \
3365+
ORDER BY 1, 2",
3366+
)
3367+
.await
3368+
.unwrap();
3369+
assert_eq!(to_rows(&r), rows(&[(t, "a"), (t, "b"), (t, "c")]));
3370+
}
3371+
33353372
async fn now(service: Box<dyn SqlClient>) {
33363373
let r = service.exec_query("SELECT now()").await.unwrap();
33373374
assert_eq!(r.get_rows().len(), 1);

rust/cubestore/src/table/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ pub struct TimestampValue {
2929
}
3030

3131
impl TimestampValue {
32-
pub fn new(unix_nano: i64) -> TimestampValue {
32+
pub fn new(mut unix_nano: i64) -> TimestampValue {
33+
// This is a hack to workaround a mismatch between on-disk and in-memory representations.
34+
// We use millisecond precision on-disk.
35+
unix_nano -= unix_nano % 1000;
3336
TimestampValue { unix_nano }
3437
}
3538

0 commit comments

Comments
 (0)