Skip to content

Commit

Permalink
feat: Add timestamp arithmetic functionality (#6901)
Browse files Browse the repository at this point in the history
* timestamp math functions

* fix compilation errors after merge conflict

* fix merge

* separate the unit and the interval

* fix compilation errors
  • Loading branch information
Zara Lim committed Mar 4, 2021
1 parent dcbe16e commit e2c06dc
Show file tree
Hide file tree
Showing 49 changed files with 1,798 additions and 12 deletions.
22 changes: 22 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,28 @@ FROM_UNIXTIME(milliseconds)

Converts a BIGINT millisecond timestamp value into a TIMESTAMP value.

### TIMESTAMPADD

Since: 0.17

```sql
TIMESTAMPADD(unit, interval, COL0)
```

Adds an interval to a timestamp. Intervals are defined by an integer value and a supported
[time unit](../../reference/sql/time.md#Time units).

### TIMESTAMPSUB

Since: 0.17

```sql
TIMESTAMPSUB(unit, interval, COL0)
```

Subtracts an interval from a timestamp. Intervals are defined by an integer value and a supported
[time unit](../../reference/sql/time.md#Time units).

## URLs

!!! note
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/sql/time.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ keywords: time, datetime, timestamp, format, window
### Time units

The following list shows valid time units for the `SIZE`, `ADVANCE BY`,
`SESSION`, and `WITHIN` clauses.
`SESSION`, and `WITHIN` clauses, or to pass as time unit parameters in functions.

- `DAY`, `DAYS`
- `HOUR`, `HOURS`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.types;

public class IntervalUnitType extends ObjectType {

public static final IntervalUnitType INSTANCE = new IntervalUnitType();

@Override
public int hashCode() {
return 8;
}

@Override
public boolean equals(final Object obj) {
return obj instanceof IntervalUnitType;
}

@Override
public String toString() {
return "INTERVAL UNIT";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private ParamTypes() {
public static final LongType LONG = LongType.INSTANCE;
public static final ParamType DECIMAL = DecimalType.INSTANCE;
public static final TimestampType TIMESTAMP = TimestampType.INSTANCE;
public static final IntervalUnitType INTERVALUNIT = IntervalUnitType.INSTANCE;

public static boolean areCompatible(final SqlArgument actual, final ParamType declared) {
return areCompatible(actual, declared, false);
Expand Down Expand Up @@ -86,6 +87,12 @@ public static boolean areCompatible(
}
}

if (argument.getSqlIntervalUnit().isPresent() && declared instanceof IntervalUnitType) {
return true;
} else if (argument.getSqlIntervalUnit().isPresent() || declared instanceof IntervalUnitType) {
return false;
}

final SqlType argumentSqlType = argument.getSqlTypeOrThrow();
if (argumentSqlType.baseType() == SqlBaseType.ARRAY && declared instanceof ArrayType) {
return areCompatible(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public final class DurationParser {
private DurationParser() {
}

public static Duration buildDuration(final long size, final String timeUnitName) {
final TimeUnit timeUnit = parseTimeUnit(timeUnitName.toUpperCase());
return Duration.ofNanos(timeUnit.toNanos(size));
}

public static Duration parse(final String text) {
try {
final String[] parts = text.split("\\s");
Expand All @@ -35,9 +40,7 @@ public static Duration parse(final String text) {
}

final long size = parseNumeric(parts[0]);
final TimeUnit timeUnit = parseTimeUnit(parts[1].toUpperCase());

return Duration.ofNanos(timeUnit.toNanos(size));
return buildDuration(size, parts[1]);
} catch (final Exception e) {
throw new IllegalArgumentException("Invalid duration: '" + text + "'. " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.schema.ksql.SqlArgument;
import io.confluent.ksql.schema.ksql.types.SqlArray;
import io.confluent.ksql.schema.ksql.types.SqlIntervalUnit;
import io.confluent.ksql.schema.ksql.types.SqlLambdaResolved;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
Expand All @@ -47,6 +48,7 @@ public class UdfIndexTest {
private static final ParamType STRUCT2 = StructType.builder().field("b", INT).build();
private static final ParamType MAP1 = MapType.of(STRING, STRING);
private static final ParamType MAP2 = MapType.of(INT, INT);
private static final ParamType INTERVALUNIT = ParamTypes.INTERVALUNIT;
private static final ParamType LAMBDA_KEY_FUNCTION = LambdaType.of(ImmutableList.of(GenericType.of("A")), GenericType.of("C"));
private static final ParamType LAMBDA_VALUE_FUNCTION = LambdaType.of(ImmutableList.of(GenericType.of("B")), GenericType.of("D"));
private static final ParamType LAMBDA_BI_FUNCTION = LambdaType.of(ImmutableList.of(GenericType.of("A"), GenericType.of("B")), GenericType.of("C"));
Expand Down Expand Up @@ -242,6 +244,21 @@ public void shouldChooseCorrectMap() {
assertThat(fun.name(), equalTo(EXPECTED));
}

@Test
public void shouldChooseIntervalUnit() {
// Given:
givenFunctions(
function(EXPECTED, false, INTERVALUNIT)
);

// When:
final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(SqlArgument.of(
SqlIntervalUnit.INSTANCE)));

// Then:
assertThat(fun.name(), equalTo(EXPECTED));
}

@Test
public void shouldChooseCorrectLambdaFunction() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.schema.ksql.SqlArgument;
import io.confluent.ksql.schema.ksql.types.SqlIntervalUnit;
import io.confluent.ksql.schema.ksql.types.SqlLambda;
import io.confluent.ksql.schema.ksql.types.SqlLambdaResolved;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
Expand Down Expand Up @@ -82,6 +83,12 @@ public void shouldPassCompatibleSchemas() {
false),
is(true));

assertThat(ParamTypes.areCompatible(
SqlArgument.of(SqlIntervalUnit.INSTANCE),
ParamTypes.INTERVALUNIT,
true),
is(true));

assertThat(
ParamTypes.areCompatible(
SqlArgument.of(SqlTypes.array(SqlTypes.INTEGER)),
Expand Down Expand Up @@ -142,6 +149,7 @@ public void shouldNotPassInCompatibleSchemasWithImplicitCasting() {
assertThat(ParamTypes.areCompatible(SqlArgument.of(SqlTypes.DOUBLE), ParamTypes.LONG, true), is(false));

assertThat(ParamTypes.areCompatible(SqlArgument.of(SqlTypes.DOUBLE), ParamTypes.DECIMAL, true), is(false));
assertThat(ParamTypes.areCompatible(SqlArgument.of(SqlTypes.INTEGER), ParamTypes.INTERVALUNIT, true), is(false));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,9 @@ public void shouldSupportHours() {
public void shouldSupportDays() {
assertThat(DurationParser.parse("98 Day"), is(Duration.ofDays(98)));
}

@Test
public void shouldBuildDuration() {
assertThat(DurationParser.buildDuration(20, "DAYS"), is(Duration.ofDays(20)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.confluent.ksql.execution.expression.tree.InListExpression;
import io.confluent.ksql.execution.expression.tree.InPredicate;
import io.confluent.ksql.execution.expression.tree.IntegerLiteral;
import io.confluent.ksql.execution.expression.tree.IntervalUnit;
import io.confluent.ksql.execution.expression.tree.IsNotNullPredicate;
import io.confluent.ksql.execution.expression.tree.IsNullPredicate;
import io.confluent.ksql.execution.expression.tree.LambdaFunctionCall;
Expand Down Expand Up @@ -504,6 +505,11 @@ public Expression visitLambdaVariable(final LambdaVariable node, final C context
return plugin.apply(node, new Context<>(context, this)).orElse(node);
}

@Override
public Expression visitIntervalUnit(final IntervalUnit node, final C context) {
return plugin.apply(node, new Context<>(context, this)).orElse(node);
}

@Override
public Expression visitNullLiteral(final NullLiteral node, final C context) {
return plugin.apply(node, new Context<>(context, this)).orElse(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -56,6 +57,7 @@ class UdafTypes {
.add(List.class)
.add(Map.class)
.add(Timestamp.class)
.add(TimeUnit.class)
.add(Function.class)
.add(BiFunction.class)
.add(TriFunction.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Timestamp;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "timestampadd",
category = FunctionCategory.DATE_TIME,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Adds a duration to a TIMESTAMP value."
)
public class TimestampAdd {

@Udf(description = "Adds a duration to a timestamp")
public Timestamp timestampAdd(
@UdfParameter(description = "A unit of time, for example DAY or HOUR") final TimeUnit unit,
@UdfParameter(description = "An integer number of intervals to add") final int interval,
@UdfParameter(description = "A TIMESTAMP value.") final Timestamp timestamp
) {
return new Timestamp(timestamp.getTime() + unit.toMillis(interval));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Timestamp;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "timestampsub",
category = FunctionCategory.DATE_TIME,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Subtracts a duration from a TIMESTAMP value."
)
public class TimestampSub {

@Udf(description = "Subtracts a duration from a timestamp")
public Timestamp timestampSub(
@UdfParameter(description = "A unit of time, for example DAY or HOUR") final TimeUnit unit,
@UdfParameter(description = "An integer number of intervals to subtract") final int interval,
@UdfParameter(description = "A TIMESTAMP value.") final Timestamp timestamp
) {
return new Timestamp(timestamp.getTime() - unit.toMillis(interval));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.confluent.ksql.execution.expression.tree.InListExpression;
import io.confluent.ksql.execution.expression.tree.InPredicate;
import io.confluent.ksql.execution.expression.tree.IntegerLiteral;
import io.confluent.ksql.execution.expression.tree.IntervalUnit;
import io.confluent.ksql.execution.expression.tree.IsNotNullPredicate;
import io.confluent.ksql.execution.expression.tree.IsNullPredicate;
import io.confluent.ksql.execution.expression.tree.LikePredicate;
Expand Down Expand Up @@ -78,6 +79,7 @@
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -746,6 +748,13 @@ public void shouldRewriteTypeUsingPlugin() {
shouldRewriteUsingPlugin(type);
}

@Test
public void shouldRewriteIntervalUnitUsingPlugin() {
// Given:
final IntervalUnit expression = new IntervalUnit(TimeUnit.DAYS);
shouldRewriteUsingPlugin(expression);
}

@SuppressWarnings("unchecked")
private <T extends Expression> T parseExpression(final String asText) {
final String ksql = String.format("SELECT %s FROM test1;", asText);
Expand Down

0 comments on commit e2c06dc

Please sign in to comment.