Skip to content

Commit

Permalink
feat: add PARSE_DATE and FORMAT_DATE functions (#7733)
Browse files Browse the repository at this point in the history
* feat: add PARSE_DATE and FORMAT_DATE functions

* address review comments

* add check for time fields in parser

* whitespace
  • Loading branch information
Zara Lim committed Jul 1, 2021
1 parent 79d14fb commit 5a64ed7
Show file tree
Hide file tree
Showing 16 changed files with 1,058 additions and 0 deletions.
25 changes: 25 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,8 @@ ksqlDB Server instances.

Since: -

Deprecated since 0.20.0 (use FORMAT_DATE)

```sql
DATETOSTRING(START_DATE, 'yyyy-MM-dd')
```
Expand All @@ -1121,6 +1123,8 @@ The integer represents days since epoch matching the encoding used by

Since: -

Deprecated since 0.20.0 (use PARSE_DATE)

```sql
STRINGTODATE(col1, 'yyyy-MM-dd')
```
Expand Down Expand Up @@ -1203,6 +1207,27 @@ TIMEZONE is an optional parameter and it is a `java.util.TimeZone` ID format, fo
"America/Los_Angeles", "PDT", "Europe/London". For more information on timestamp formats, see
[DateTimeFormatter](https://cnfl.io/java-dtf).

### `FORMAT_DATE`

```sql
FORMAT_DATE(date, 'yyyy-MM-dd')
```

Converts a DATE value into a string that represents the date in the given format.
You can escape single-quote characters in the timestamp format by using two successive single
quotes, `''`, for example: `'yyyy-MM-dd''T'''`.

### `PARSE_DATE`

```sql
PARSE_DATE(col1, 'yyyy-MM-dd')
```

Converts a string representation of a date in the
given format into a DATE value. You can escape
single-quote characters in the timestamp format by using two successive single
quotes, `''`, for example: `'yyyy-MM-dd''T'''`.

### `FORMAT_TIME`

Since: 0.20
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.List;
Expand Down Expand Up @@ -57,6 +58,7 @@ class UdafTypes {
.add(Struct.class)
.add(List.class)
.add(Map.class)
.add(Date.class)
.add(Time.class)
.add(Timestamp.class)
.add(TimeUnit.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Date;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "format_date",
category = FunctionCategory.DATE_TIME,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts a DATE value to a string"
+ " using the given format pattern. The format pattern should be"
+ " in the format expected by java.time.format.DateTimeFormatter."
)
public class FormatDate {

private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Udf(description = "Converts the number of days since 1970-01-01 00:00:00 UTC/GMT to a date "
+ "string using the given format pattern. The format pattern should be in the format"
+ " expected by java.time.format.DateTimeFormatter")
public String formatDate(
@UdfParameter(
description = "The date to convert") final Date date,
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
if (date == null) {
return null;
}
try {
final DateTimeFormatter formatter = formatters.get(formatPattern);
return LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(date.getTime())).format(formatter);
} catch (final ExecutionException | RuntimeException e) {
throw new KsqlFunctionException("Failed to format date " + date
+ " with formatter '" + formatPattern
+ "': " + e.getMessage(), e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Date;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "parse_date",
category = FunctionCategory.DATE_TIME,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts a string representation of a date in the given format"
+ " into a DATE value. The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter"
)
public class ParseDate {

private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Udf(description = "Converts a string representation of a date in the given format"
+ " into a DATE value.")
public Date parseDate(
@UdfParameter(
description = "The string representation of a date.") final String formattedDate,
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
try {

final TemporalAccessor ta = formatters.get(formatPattern).parse(formattedDate);
final Optional<ChronoField> timeField = Arrays.stream(ChronoField.values())
.filter(field -> field.isTimeBased())
.filter(field -> ta.isSupported(field))
.findFirst();

if (timeField.isPresent()) {
throw new KsqlFunctionException("Date format contains time field.");
}

return new Date(
TimeUnit.DAYS.toMillis(LocalDate.from(ta).toEpochDay()));
} catch (final ExecutionException | RuntimeException e) {
throw new KsqlFunctionException("Failed to parse date '" + formattedDate
+ "' with formatter '" + formatPattern
+ "': " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import io.confluent.ksql.function.KsqlFunctionException;
import java.sql.Date;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;

public class FormatDateTest {

private FormatDate udf;

@Before
public void setUp() {
udf = new FormatDate();
}

@Test
public void shouldConvertDateToString() {
// When:
final String result = udf.formatDate(Date.valueOf("2014-11-09"), "yyyy-MM-dd");

// Then:
assertThat(result, is("2014-11-09"));
}

@Test
public void shouldThrowOnUnsupportedFields() {
// When:
final Exception e = assertThrows(
KsqlFunctionException.class,
() -> udf.formatDate(Date.valueOf("2014-11-09"), "yyyy-MM-dd HH:mm"));

// Then:
assertThat(e.getMessage(), is("Failed to format date 2014-11-09 with formatter 'yyyy-MM-dd HH:mm': Unsupported field: HourOfDay"));
}

@Test
public void shouldRoundTripWithStringToDate() {
final String format = "dd/MM/yyyy'Freya'";
final ParseDate parseDate = new ParseDate();
IntStream.range(-10_000, 20_000)
.parallel()
.forEach(idx -> {
final String result = udf.formatDate(new Date(idx * 86400000L), format);
final Date date = parseDate.parseDate(result, format);
assertThat(date.getTime(), is(idx * 86400000L));
});
}

@Test
public void shouldSupportEmbeddedChars() {
// When:
final Object result = udf.formatDate(Date.valueOf("2014-11-09"), "yyyy-dd-MM'Fred'");

// Then:
assertThat(result, is("2014-09-11Fred"));
}

@Test
public void shouldThrowIfFormatInvalid() {
// When:
final Exception e = assertThrows(
KsqlFunctionException.class,
() -> udf.formatDate(Date.valueOf("2014-11-09"), "invalid")
);

// Then:
assertThat(e.getMessage(), containsString("Failed to format date 2014-11-09 with formatter 'invalid'"));
}

@Test
public void shouldByThreadSafeAndWorkWithManyDifferentFormatters() {
IntStream.range(0, 10_000)
.parallel()
.forEach(idx -> {
try {
final String pattern = "yyyy-MM-dd'X" + idx + "'";
final String result = udf.formatDate(Date.valueOf("2021-05-18"), pattern);
assertThat(result, is("2021-05-18X" + idx));
} catch (final Exception e) {
fail(e.getMessage());
}
});
}

}

0 comments on commit 5a64ed7

Please sign in to comment.