Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/jobs/scripts/check_style/aspell-ignore/en/aspell-dict.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3044,10 +3044,12 @@ throwif
timeDiff
timeSeriesData
timeSeriesDeltaToGrid
timeSeriesDerivToGrid
timeSeriesInstantDeltaToGrid
timeSeriesInstantRateToGrid
timeSeriesLastTwoSamples
timeSeriesMetrics
timeSeriesPredictLinearToGrid
timeSeriesRateToGrid
timeSeriesResampleToGridWithStaleness
timeSeriesTags
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
description: 'Aggregate function that calculates PromQL-like derivative over time series data on the specified grid.'
sidebar_position: 227
slug: /sql-reference/aggregate-functions/reference/timeSeriesDerivToGrid
title: 'timeSeriesDerivToGrid'
---

Aggregate function that takes time series data as pairs of timestamps and values and calculates [PromQL-like derivative](https://prometheus.io/docs/prometheus/latest/querying/functions/#deriv) from this data on a regular time grid described by start timestamp, end timestamp and step. For each point on the grid the samples for calculating `deriv` are considered within the specified time window.

Parameters:
- `start timestamp` - specifies start of the grid
- `end timestamp` - specifies end of the grid
- `grid step` - specifies step of the grid in seconds
- `staleness` - specifies the maximum "staleness" in seconds of the considered samples

Arguments:
- `timestamp` - timestamp of the sample
- `value` - value of the time series corresponding to the `timestamp`

Return value:
`deriv` values on the specified grid as an `Array(Nullable(Float64))`. The returned array contains one value for each time grid point. The value is NULL if there are not enough samples within the window to calculate the derivative value for a particular grid point.

Example:
The following query calculates `deriv` values on the grid [90, 105, 120, 135, 150, 165, 180, 195, 210]:

```sql
WITH
-- NOTE: the gap between 140 and 190 is to show how values are filled for ts = 150, 165, 180 according to window parameter
[110, 120, 130, 140, 190, 200, 210, 220, 230]::Array(DateTime) AS timestamps,
[1, 1, 3, 4, 5, 5, 8, 12, 13]::Array(Float32) AS values, -- array of values corresponding to timestamps above
90 AS start_ts, -- start of timestamp grid
90 + 120 AS end_ts, -- end of timestamp grid
15 AS step_seconds, -- step of timestamp grid
45 AS window_seconds -- "staleness" window
SELECT timeSeriesDerivToGrid(start_ts, end_ts, step_seconds, window_seconds)(timestamp, value)
FROM
(
-- This subquery converts arrays of timestamps and values into rows of `timestamp`, `value`
SELECT
arrayJoin(arrayZip(timestamps, values)) AS ts_and_val,
ts_and_val.1 AS timestamp,
ts_and_val.2 AS value
);
```

Response:

```response
┌─timeSeriesDerivToGrid(start_ts, end_ts, step_seconds, window_seconds)(timestamp, value)─┐
1. │ [NULL,NULL,0,0.1,0.11,0.15,NULL,NULL,0.15] │
└─────────────────────────────────────────────────────────────────────────────────────────┘
```

Also it is possible to pass multiple samples of timestamps and values as Arrays of equal size. The same query with array arguments:

```sql
WITH
[110, 120, 130, 140, 190, 200, 210, 220, 230]::Array(DateTime) AS timestamps,
[1, 1, 3, 4, 5, 5, 8, 12, 13]::Array(Float32) AS values,
90 AS start_ts,
90 + 120 AS end_ts,
15 AS step_seconds,
45 AS window_seconds
SELECT timeSeriesDerivToGrid(start_ts, end_ts, step_seconds, window_seconds)(timestamps, values);
```

:::note
This function is experimental, enable it by setting `allow_experimental_ts_to_grid_aggregate_function=true`.
:::
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
---
description: 'Aggregate function that calculates PromQL-like linear prediction over time series data on the specified grid.'
sidebar_position: 228
slug: /sql-reference/aggregate-functions/reference/timeSeriesPredictLinearToGrid
title: 'timeSeriesPredictLinearToGrid'
---

Aggregate function that takes time series data as pairs of timestamps and values and calculates a [PromQL-like linear prediction](https://prometheus.io/docs/prometheus/latest/querying/functions/#predict_linear) with a specified prediction timestamp offset from this data on a regular time grid described by start timestamp, end timestamp and step. For each point on the grid the samples for calculating `predict_linear` are considered within the specified time window.

Parameters:
- `start timestamp` - specifies start of the grid
- `end timestamp` - specifies end of the grid
- `grid step` - specifies step of the grid in seconds
- `staleness` - specifies the maximum "staleness" in seconds of the considered samples
- `predict_offset` - specifies number of seconds of offset to add to prediction time

Arguments:
- `timestamp` - timestamp of the sample
- `value` - value of the time series corresponding to the `timestamp`

Return value:
`predict_linear` values on the specified grid as an `Array(Nullable(Float64))`. The returned array contains one value for each time grid point. The value is NULL if there are not enough samples within the window to calculate the rate value for a particular grid point.

Example:
The following query calculates `predict_linear` values on the grid [90, 105, 120, 135, 150, 165, 180, 195, 210] with a 60 second offset:

```sql
WITH
-- NOTE: the gap between 140 and 190 is to show how values are filled for ts = 150, 165, 180 according to window paramater
[110, 120, 130, 140, 190, 200, 210, 220, 230]::Array(DateTime) AS timestamps,
[1, 1, 3, 4, 5, 5, 8, 12, 13]::Array(Float32) AS values, -- array of values corresponding to timestamps above
90 AS start_ts, -- start of timestamp grid
90 + 120 AS end_ts, -- end of timestamp grid
15 AS step_seconds, -- step of timestamp grid
45 AS window_seconds, -- "staleness" window
60 AS predict_offset -- prediction time offset
SELECT timeSeriesPredictLinearToGrid(start_ts, end_ts, step_seconds, window_seconds, predict_offset)(timestamp, value)
FROM
(
-- This subquery converts arrays of timestamps and values into rows of `timestamp`, `value`
SELECT
arrayJoin(arrayZip(timestamps, values)) AS ts_and_val,
ts_and_val.1 AS timestamp,
ts_and_val.2 AS value
);
```

Response:

```response
┌─timeSeriesPredictLinearToGrid(start_ts, end_ts, step_seconds, window_seconds, predict_offset)(timestamp, value)─┐
1. │ [NULL,NULL,1,9.166667,11.6,16.916666,NULL,NULL,16.5] │
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```

Also it is possible to pass multiple samples of timestamps and values as Arrays of equal size. The same query with array arguments:

```sql
WITH
[110, 120, 130, 140, 190, 200, 210, 220, 230]::Array(DateTime) AS timestamps,
[1, 1, 3, 4, 5, 5, 8, 12, 13]::Array(Float32) AS values,
90 AS start_ts,
90 + 120 AS end_ts,
15 AS step_seconds,
45 AS window_seconds,
60 AS predict_offset
SELECT timeSeriesPredictLinearToGrid(start_ts, end_ts, step_seconds, window_seconds, predict_offset)(timestamps, values);
```

:::note
This function is experimental, enable it by setting `allow_experimental_ts_to_grid_aggregate_function=true`.
:::
4 changes: 2 additions & 2 deletions src/AggregateFunctions/AggregateFunctionTimeseriesBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class AggregateFunctionTimeseriesBase :

void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
if (Traits::array_agruments)
if (Traits::array_arguments)
{
addBatchSinglePlace(row_num, row_num + 1, place, columns, arena, -1);
}
Expand Down Expand Up @@ -221,7 +221,7 @@ class AggregateFunctionTimeseriesBase :
const IColumn ** columns,
const UInt8 * flags_data) const
{
if (Traits::array_agruments)
if (Traits::array_arguments)
{
const auto & timestamp_column = typeid_cast<const ColumnArray &>(*columns[0]);
const auto & value_column = typeid_cast<const ColumnArray &>(*columns[1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
namespace DB
{

template <bool array_agruments_, typename TimestampType_, typename IntervalType_, typename ValueType_, bool is_rate_>
template <bool array_arguments_, typename TimestampType_, typename IntervalType_, typename ValueType_, bool is_rate_>
struct AggregateFunctionTimeseriesExtrapolatedValueTraits
{
static constexpr bool array_agruments = array_agruments_;
static constexpr bool array_arguments = array_arguments_;
static constexpr bool is_rate = is_rate_;

using TimestampType = TimestampType_;
Expand Down
99 changes: 81 additions & 18 deletions src/AggregateFunctions/AggregateFunctionTimeseriesHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <AggregateFunctions/AggregateFunctionTimeseriesInstantValue.h>
#include <AggregateFunctions/AggregateFunctionTimeseriesExtrapolatedValue.h>
#include <AggregateFunctions/AggregateFunctionTimeseriesToGridSparse.h>
#include <AggregateFunctions/AggregateFunctionTimeseriesLinearRegression.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeArray.h>
Expand Down Expand Up @@ -67,7 +68,7 @@ Decimal64 normalizeParameter(const std::string & function_name, const std::strin
}
}

UInt64 extractIntParamater(const std::string & function_name, const std::string & parameter_name, const Field & parameter_field)
UInt64 extractIntParameter(const std::string & function_name, const std::string & parameter_name, const Field & parameter_field)
{
if (UInt64 int_value = 0; parameter_field.tryGet(int_value))
{
Expand All @@ -91,6 +92,38 @@ UInt64 extractIntParamater(const std::string & function_name, const std::string
}
}

Float64 extractFloatParameter(const std::string & function_name, const std::string & parameter_name, const Field & parameter_field)
{
if (Float64 float_value = 0; parameter_field.tryGet(float_value))
{
return float_value;
}
else if (Int64 int_value = 0; parameter_field.tryGet(int_value))
{
return static_cast<Float64>(int_value);
}
else if (UInt64 uint_value = 0; parameter_field.tryGet(uint_value))
{
return static_cast<Float64>(uint_value);
}
else if (String string_value; parameter_field.tryGet(string_value))
{
Float64 value{};
ReadBufferFromString buf(string_value);
if (tryReadFloatText(value, buf))
return value;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot parse {} parameter for aggregate function {}", parameter_name, function_name);
}
else
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of {} parameter for aggregate function {}",
parameter_field.getTypeName(), parameter_name, function_name);
}
}


namespace Setting
{
Expand All @@ -102,6 +135,7 @@ namespace

template <
bool is_rate,
bool is_predict,
bool array_arguments,
typename ValueType,
template <bool, typename, typename, typename, bool> class FunctionTraits,
Expand All @@ -111,14 +145,19 @@ AggregateFunctionPtr createWithValueType(const std::string & name, const DataTyp
{
const auto & timestamp_type = array_arguments ? typeid_cast<const DataTypeArray *>(argument_types[0].get())->getNestedType() : argument_types[0];

if (parameters.size() != 4)
if (!is_predict && parameters.size() != 4)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Aggregate function {} requires 4 parameters: start_timestamp, end_timestamp, step, window", name);

if (is_predict && parameters.size() != 5)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Aggregate function {} requires 5 parameters: start_timestamp, end_timestamp, step, window, predict_offset", name);

const Field & start_timestamp_param = parameters[0];
const Field & end_timestamp_param = parameters[1];
const Field & step_param = parameters[2];
const Field & window_param = parameters[3];
const Field & predict_offset_param = is_predict ? parameters[4] : Field();

AggregateFunctionPtr res;
if (isDateTime64(timestamp_type))
Expand All @@ -132,18 +171,36 @@ AggregateFunctionPtr createWithValueType(const std::string & name, const DataTyp
DateTime64 step = normalizeParameter(name, "step", step_param, target_scale);
DateTime64 window = normalizeParameter(name, "window", window_param, target_scale);

res = std::make_shared<Function<FunctionTraits<array_arguments, DateTime64, Int64, ValueType, is_rate>>>
(argument_types, start_timestamp, end_timestamp, step, window, target_scale);
if constexpr (is_predict)
{
Float64 predict_offset = extractFloatParameter(name, "predict_offset", predict_offset_param) * DecimalUtils::scaleMultiplier<Int64>(target_scale);
res = std::make_shared<Function<FunctionTraits<array_arguments, DateTime64, Int64, ValueType, is_predict>>>
(argument_types, start_timestamp, end_timestamp, step, window, target_scale, predict_offset);
}
else
{
res = std::make_shared<Function<FunctionTraits<array_arguments, DateTime64, Int64, ValueType, is_rate>>>
(argument_types, start_timestamp, end_timestamp, step, window, target_scale);
}
}
else if (isDateTime(timestamp_type) || isUInt32(timestamp_type))
{
UInt64 start_timestamp = extractIntParamater(name, "start", start_timestamp_param);
UInt64 end_timestamp = extractIntParamater(name, "end", end_timestamp_param);
Int64 step = extractIntParamater(name, "step", step_param);
Int64 window = extractIntParamater(name, "window", window_param);
UInt64 start_timestamp = extractIntParameter(name, "start", start_timestamp_param);
UInt64 end_timestamp = extractIntParameter(name, "end", end_timestamp_param);
Int64 step = extractIntParameter(name, "step", step_param);
Int64 window = extractIntParameter(name, "window", window_param);

res = std::make_shared<Function<FunctionTraits<array_arguments, UInt32, Int32, ValueType, is_rate>>>
if constexpr (is_predict)
{
Float64 predict_offset = extractFloatParameter(name, "predict_offset", predict_offset_param);
res = std::make_shared<Function<FunctionTraits<array_arguments, UInt32, Int32, ValueType, is_predict>>>
(argument_types, start_timestamp, end_timestamp, step, window, 0, predict_offset);
}
else
{
res = std::make_shared<Function<FunctionTraits<array_arguments, UInt32, Int32, ValueType, is_rate>>>
(argument_types, start_timestamp, end_timestamp, step, window, 0);
}
}

if (!res)
Expand All @@ -155,6 +212,7 @@ AggregateFunctionPtr createWithValueType(const std::string & name, const DataTyp

template <
bool is_rate,
bool is_predict,
template <bool, typename, typename, typename, bool> class FunctionTraits,
template <typename> class Function
>
Expand All @@ -180,16 +238,16 @@ AggregateFunctionPtr createAggregateFunctionTimeseries(const std::string & name,
if (value_type->getTypeId() == TypeIndex::Float64)
{
if (array_arguments)
res = createWithValueType<is_rate, true, Float64, FunctionTraits, Function>(name, argument_types, parameters);
res = createWithValueType<is_rate, is_predict, true, Float64, FunctionTraits, Function>(name, argument_types, parameters);
else
res = createWithValueType<is_rate, false, Float64, FunctionTraits, Function>(name, argument_types, parameters);
res = createWithValueType<is_rate, is_predict, false, Float64, FunctionTraits, Function>(name, argument_types, parameters);
}
else if (value_type->getTypeId() == TypeIndex::Float32)
{
if (array_arguments)
res = createWithValueType<is_rate, true, Float32, FunctionTraits, Function>(name, argument_types, parameters);
res = createWithValueType<is_rate, is_predict, true, Float32, FunctionTraits, Function>(name, argument_types, parameters);
else
res = createWithValueType<is_rate, false, Float32, FunctionTraits, Function>(name, argument_types, parameters);
res = createWithValueType<is_rate, is_predict, false, Float32, FunctionTraits, Function>(name, argument_types, parameters);
}
else
{
Expand All @@ -205,17 +263,22 @@ AggregateFunctionPtr createAggregateFunctionTimeseries(const std::string & name,
void registerAggregateFunctionTimeseries(AggregateFunctionFactory & factory)
{
factory.registerFunction("timeSeriesRateToGrid",
createAggregateFunctionTimeseries<true, AggregateFunctionTimeseriesExtrapolatedValueTraits, AggregateFunctionTimeseriesExtrapolatedValue>);
createAggregateFunctionTimeseries<true, false, AggregateFunctionTimeseriesExtrapolatedValueTraits, AggregateFunctionTimeseriesExtrapolatedValue>);
factory.registerFunction("timeSeriesDeltaToGrid",
createAggregateFunctionTimeseries<false, AggregateFunctionTimeseriesExtrapolatedValueTraits, AggregateFunctionTimeseriesExtrapolatedValue>);
createAggregateFunctionTimeseries<false, false, AggregateFunctionTimeseriesExtrapolatedValueTraits, AggregateFunctionTimeseriesExtrapolatedValue>);

factory.registerFunction("timeSeriesInstantRateToGrid",
createAggregateFunctionTimeseries<true, AggregateFunctionTimeseriesInstantValueTraits, AggregateFunctionTimeseriesInstantValue>);
createAggregateFunctionTimeseries<true, false, AggregateFunctionTimeseriesInstantValueTraits, AggregateFunctionTimeseriesInstantValue>);
factory.registerFunction("timeSeriesInstantDeltaToGrid",
createAggregateFunctionTimeseries<false, AggregateFunctionTimeseriesInstantValueTraits, AggregateFunctionTimeseriesInstantValue>);
createAggregateFunctionTimeseries<false, false, AggregateFunctionTimeseriesInstantValueTraits, AggregateFunctionTimeseriesInstantValue>);

factory.registerFunction("timeSeriesDerivToGrid",
createAggregateFunctionTimeseries<false, false, AggregateFunctionTimeseriesLinearRegressionTraits, AggregateFunctionTimeseriesLinearRegression>);
factory.registerFunction("timeSeriesPredictLinearToGrid",
createAggregateFunctionTimeseries<false, true, AggregateFunctionTimeseriesLinearRegressionTraits, AggregateFunctionTimeseriesLinearRegression>);

factory.registerFunction("timeSeriesResampleToGridWithStaleness",
createAggregateFunctionTimeseries<false, AggregateFunctionTimeseriesToGridSparseTraits, AggregateFunctionTimeseriesToGridSparse>);
createAggregateFunctionTimeseries<false, false, AggregateFunctionTimeseriesToGridSparseTraits, AggregateFunctionTimeseriesToGridSparse>);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}

template <bool array_agruments_, typename TimestampType_, typename IntervalType_, typename ValueType_, bool is_rate_>
template <bool array_arguments_, typename TimestampType_, typename IntervalType_, typename ValueType_, bool is_rate_>
struct AggregateFunctionTimeseriesInstantValueTraits
{
static constexpr bool array_agruments = array_agruments_;
static constexpr bool array_arguments = array_arguments_;
static constexpr bool is_rate = is_rate_;

using TimestampType = TimestampType_;
Expand Down
Loading
Loading