Skip to content

Commit

Permalink
fix: align linear_regression to PromQL's behavior (#2879)
Browse files Browse the repository at this point in the history
* fix: accept f64 and i64 as predict_linear's param

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use second instead of millisecond

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add test to linear_regression

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia committed Dec 8, 2023
1 parent 09aa4b7 commit 58183fe
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 14 deletions.
28 changes: 23 additions & 5 deletions src/promql/src/functions.rs
Expand Up @@ -97,7 +97,7 @@ pub(crate) fn linear_regression(
const_y = false;
}
count += 1.0;
let x = time - intercept_time as f64 / 1e3;
let x = (time - intercept_time as f64) / 1e3f64;
(sum_x, comp_x) = compensated_sum_inc(x, sum_x, comp_x);
(sum_y, comp_y) = compensated_sum_inc(value, sum_y, comp_y);
(sum_xy, comp_xy) = compensated_sum_inc(x * value, sum_xy, comp_xy);
Expand Down Expand Up @@ -188,8 +188,12 @@ mod test {
0.0, 10.0, 20.0, 30.0, 40.0, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0,
]);
let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
assert_eq!(slope, Some(0.010606060606060607));
assert_eq!(intercept, Some(6.818181818181818));
assert_eq!(slope, Some(10.606060606060607));
assert_eq!(intercept, Some(6.818181818181815));

let (slope, intercept) = linear_regression(&ts_array, &values_array, 3000);
assert_eq!(slope, Some(10.606060606060607));
assert_eq!(intercept, Some(38.63636363636364));
}

#[test]
Expand Down Expand Up @@ -219,8 +223,8 @@ mod test {
.into_iter()
.collect();
let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
assert_eq!(slope, Some(0.010606060606060607));
assert_eq!(intercept, Some(6.818181818181818));
assert_eq!(slope, Some(10.606060606060607));
assert_eq!(intercept, Some(6.818181818181815));
}

#[test]
Expand All @@ -231,4 +235,18 @@ mod test {
assert_eq!(slope, None);
assert_eq!(intercept, None);
}

// From prometheus `promql/functions_test.go` case `TestKahanSum`
#[test]
fn test_kahan_sum() {
let inputs = vec![1.0, 10.0f64.powf(100.0), 1.0, -1.0 * 10.0f64.powf(100.0)];

let mut sum = 0.0;
let mut c = 0f64;

for v in inputs {
(sum, c) = compensated_sum_inc(v, sum, c);
}
assert_eq!(sum + c, 2.0)
}
}
29 changes: 28 additions & 1 deletion src/promql/src/functions/deriv.rs
Expand Up @@ -42,6 +42,8 @@ pub fn deriv(times: &TimestampMillisecondArray, values: &Float64Array) -> Option

#[cfg(test)]
mod test {
use std::sync::Arc;

use super::*;
use crate::functions::test_util::simple_range_udf_runner;

Expand Down Expand Up @@ -73,7 +75,32 @@ mod test {
Deriv::scalar_udf(),
ts_array,
value_array,
vec![Some(0.010606060606060607), None],
vec![Some(10.606060606060607), None],
);
}

// From prometheus `promql/functions_test.go` case `TestDeriv`
#[test]
fn complicate_deriv() {
let start = 1493712816939;
let interval = 30 * 1000;
let mut ts_data = vec![];
for i in 0..15 {
let jitter = 12 * i % 2;
ts_data.push(Some(start + interval * i + jitter));
}
let val_data = vec![Some(1.0); 15];
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(ts_data));
let val_array = Arc::new(Float64Array::from_iter(val_data));
let range = [(0, 15)];
let ts_range_array = RangeArray::from_ranges(ts_array, range).unwrap();
let value_range_array = RangeArray::from_ranges(val_array, range).unwrap();

simple_range_udf_runner(
Deriv::scalar_udf(),
ts_range_array,
value_range_array,
vec![Some(0.0)],
);
}
}
16 changes: 9 additions & 7 deletions src/promql/src/functions/predict_linear.rs
Expand Up @@ -30,6 +30,7 @@ use crate::functions::{extract_array, linear_regression};
use crate::range_array::RangeArray;

pub struct PredictLinear {
/// Duration. The second param of (`predict_linear(v range-vector, t scalar)`).
t: i64,
}

Expand Down Expand Up @@ -147,8 +148,9 @@ fn predict_linear_impl(
return None;
}

let intercept_time = timestamps.value(0);
let (slope, intercept) = linear_regression(timestamps, values, intercept_time);
// last timestamp is evaluation timestamp
let evaluate_ts = timestamps.value(timestamps.len() - 1);
let (slope, intercept) = linear_regression(timestamps, values, evaluate_ts);

if slope.is_none() || intercept.is_none() {
return None;
Expand Down Expand Up @@ -210,7 +212,7 @@ mod test {
ts_array,
value_array,
// value at t = 0
vec![Some(6.818181818181818)],
vec![Some(38.63636363636364)],
);
}

Expand All @@ -222,7 +224,7 @@ mod test {
ts_array,
value_array,
// value at t = 3000
vec![Some(38.63636363636364)],
vec![Some(31856.818181818187)],
);
}

Expand All @@ -234,7 +236,7 @@ mod test {
ts_array,
value_array,
// value at t = 4200
vec![Some(51.36363636363637)],
vec![Some(44584.09090909091)],
);
}

Expand All @@ -246,7 +248,7 @@ mod test {
ts_array,
value_array,
// value at t = 6600
vec![Some(76.81818181818181)],
vec![Some(70038.63636363638)],
);
}

Expand All @@ -258,7 +260,7 @@ mod test {
ts_array,
value_array,
// value at t = 7800
vec![Some(89.54545454545455)],
vec![Some(82765.9090909091)],
);
}
}
3 changes: 2 additions & 1 deletion src/promql/src/planner.rs
Expand Up @@ -1017,7 +1017,8 @@ impl PromPlanner {
}
"predict_linear" => {
let t_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Time64Microsecond(Some(t)))) => t,
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t as i64,
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t,
other => UnexpectedPlanExprSnafu {
desc: format!("expect i64 literal as t, but found {:?}", other),
}
Expand Down

0 comments on commit 58183fe

Please sign in to comment.