Skip to content

Commit

Permalink
[FLINK-28929][table] Add built-in datediff function.
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyongvs committed Sep 5, 2022
1 parent df0bc11 commit e23772a
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 1 deletion.
3 changes: 3 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ temporal:
- sql: TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
table: timestampDiff(TIMEPOINTUNIT, TIMEPOINT1, TIMEPOINT2)
description: 'Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.'
- sql: DATEDIFF(datetime1, datetime2)
table: dateDiff(datetime1, datetime2)
description: Returns the (signed) number of days between datetime1 and datetime2.
- sql: CONVERT_TZ(string1, string2, string3)
description: Converts a datetime string1 (with default ISO timestamp format 'yyyy-MM-dd HH:mm:ss') from time zone string2 to time zone string3. The format of time zone should be either an abbreviation such as "PST", a full name such as "America/Los_Angeles", or a custom ID such as "GMT-08:00". E.g., CONVERT_TZ('1970-01-01 00:00:00', 'UTC', 'America/Los_Angeles') returns '1969-12-31 16:00:00'.
- sql: FROM_UNIXTIME(numeric[, string])
Expand Down
4 changes: 4 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,10 @@ temporal:
description: |
返回 timepoint1 和 timepoint2 之间时间间隔。间隔的单位由第一个参数给出,它应该是以下值之一:
SECOND,MINUTE,HOUR,DAY,MONTH 或 YEAR。
- sql: DATEDIFF(datetime1, datetime2)
table: dateDiff(datetime1, datetime2)
description: |
返回 datetime1 和 datetime2 之间的天数.
- sql: CONVERT_TZ(string1, string2, string3)
description: |
将日期时间 string1(具有默认 ISO 时间戳格式 'yyyy-MM-dd HH:mm:ss')从时区 string2 转换为时区 string3 的值。
Expand Down
15 changes: 15 additions & 0 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,21 @@ def date_format(timestamp, format) -> Expression:
return _binary_op("dateFormat", timestamp, format)


def date_diff(datetime1, datetime2) -> Expression:
"""
Returns the (signed) number of days between datetime1 and datetime2.
For example,
`date_diff(lit("2007-12-31 23:59:59").to_timestamp, lit("2007-12-30").to_date`
leads to 1.
:param datetime1: The first date in time.
:param datetime2: The second date in time.
:return: The number of intervals as integer value.
"""
return _binary_op("dateDiff", datetime1, datetime2)


def timestamp_diff(time_point_unit: TimePointUnit, time_point1, time_point2) -> Expression:
"""
Returns the (signed) number of :class:`~pyflink.table.expression.TimePointUnit` between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,20 @@ public static ApiExpression dateFormat(Object timestamp, Object format) {
return apiCall(BuiltInFunctionDefinitions.DATE_FORMAT, timestamp, format);
}

/**
* Returns the (signed) number of days between datetime1 and datetime2.
*
* <p>For example, {@code dateDiff(lit("2007-12-31 23:59:59").toTimestamp(),
* lit("2007-12-30").toDate())} leads to 1.
*
* @param datetime1 The first date in time.
* @param datetime2 The second date in time.
* @return The number of intervals as integer value.
*/
public static ApiExpression dateDiff(Object datetime1, Object datetime2) {
return apiCall(BuiltInFunctionDefinitions.DATEDIFF, datetime1, datetime2);
}

/**
* Returns the (signed) number of {@link TimePointUnit} between timePoint1 and timePoint2.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,23 @@ trait ImplicitExpressionConversions {
Expressions.dateFormat(timestamp, format)
}

/**
* Returns the (signed) number of days between datetime1 and datetime2.
*
* <p>For example, {@code dateDiff(lit("2007-12-31 23:59:59").toTimestamp(),
* lit("2007-12-30").toDate())} leads to 1.
*
* @param datetime1
* The first date in time.
* @param datetime2
* The second date in time.
* @return
* The number of intervals as integer value.
*/
def dateDiff(datetime1: Expression, datetime2: Expression): Expression = {
Expressions.dateDiff(datetime1, datetime2)
}

/**
* Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,19 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(nullableIfArgs(explicit(STRING())))
.build();

public static final BuiltInFunctionDefinition DATEDIFF =
BuiltInFunctionDefinition.newBuilder()
.name("DATEDIFF")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.DATETIME),
logical(LogicalTypeFamily.DATETIME)))
.outputTypeStrategy(nullableIfArgs(explicit(INT())))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.DateDiffFunction")
.build();

public static final BuiltInFunctionDefinition TIMESTAMP_DIFF =
BuiltInFunctionDefinition.newBuilder()
.name("timestampDiff")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
import static org.apache.flink.table.api.Expressions.dateDiff;
import static org.apache.flink.table.api.Expressions.temporalOverlaps;

/** Test time-related built-in functions. */
Expand All @@ -53,7 +54,8 @@ Stream<TestSetSpec> getTestSetSpecs() {
extractTestCases(),
temporalOverlapsTestCases(),
ceilTestCases(),
floorTestCases())
floorTestCases(),
dateDiffTestCases())
.flatMap(s -> s);
}

Expand Down Expand Up @@ -734,4 +736,46 @@ private Stream<TestSetSpec> floorTestCases() {
LocalDateTime.of(2001, 1, 1, 0, 0),
TIMESTAMP().nullable()));
}

private Stream<TestSetSpec> dateDiffTestCases() {
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.DATEDIFF)
.onFieldsWithData(
LocalDateTime.of(2007, 12, 31, 23, 59, 59),
LocalDate.of(2007, 12, 30),
LocalDateTime.of(2010, 11, 30, 23, 59, 59),
LocalDate.of(2010, 12, 31),
null)
.andDataTypes(
TIMESTAMP().notNull(),
DATE().notNull(),
TIMESTAMP().notNull(),
DATE().notNull(),
TIMESTAMP().nullable())
.testResult(
dateDiff($("f0"), $("f1")), "DATEDIFF (f0, f1)", 1, INT().notNull())
.testResult(
dateDiff($("f2"), $("f3")),
"DATEDIFF (f2, f3)",
-31,
INT().notNull())
.testResult(
dateDiff($("f0"), $("f0")), "DATEDIFF (f0, f0)", 0, INT().notNull())
.testResult(
dateDiff($("f0"), $("f2")),
"DATEDIFF (f0, f2)",
-1065,
INT().notNull())
.testResult(
dateDiff($("f3"), $("f1")),
"DATEDIFF (f3, f1)",
1097,
INT().notNull())
// arg is null.
.testResult(
dateDiff($("f0"), $("f4")),
"DATEDIFF (f0, f4)",
null,
INT().nullable()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.functions.scalar;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;

import javax.annotation.Nullable;

import static org.apache.flink.table.utils.DateTimeUtils.timestampMillisToDate;

/** Implementation of {@link BuiltInFunctionDefinitions#DATEDIFF}. */
@Internal
public class DateDiffFunction extends BuiltInScalarFunction {

public DateDiffFunction(SpecializedContext context) {
super(BuiltInFunctionDefinitions.DATEDIFF, context);
}

public @Nullable Object eval(TimestampData time1, TimestampData time2) {
if (time1 == null || time2 == null) {
return null;
}
return timestampMillisToDate(time1.getMillisecond())
- timestampMillisToDate(time2.getMillisecond());
}

public @Nullable Object eval(TimestampData time1, Integer time2) {
if (time1 == null || time2 == null) {
return null;
}
return timestampMillisToDate(time1.getMillisecond()) - time2;
}

public @Nullable Object eval(Integer time1, TimestampData time2) {
if (time1 == null || time2 == null) {
return null;
}
return time1 - timestampMillisToDate(time2.getMillisecond());
}

public @Nullable Object eval(Integer time1, Integer time2) {
if (time1 == null || time2 == null) {
return null;
}
return time1 - time2;
}
}

0 comments on commit e23772a

Please sign in to comment.