Skip to content

Commit

Permalink
fix: make formattimestamp and parsetimestamp default to utc (#6954)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zara Lim committed Feb 11, 2021
1 parent 4b3bc96 commit a5ea98a
Show file tree
Hide file tree
Showing 15 changed files with 854 additions and 34 deletions.
Expand Up @@ -55,7 +55,7 @@ public String formatTimestamp(
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
return formatTimestamp(timestamp, formatPattern, ZoneId.systemDefault().getId());
return formatTimestamp(timestamp, formatPattern, ZoneId.of("GMT").getId());
}

@Udf(description = "Converts a TIMESTAMP value into the"
Expand Down
Expand Up @@ -53,7 +53,7 @@ public Timestamp parseTimestamp(
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
return parseTimestamp(formattedTimestamp, formatPattern, ZoneId.systemDefault().getId());
return parseTimestamp(formattedTimestamp, formatPattern, ZoneId.of("GMT").getId());
}

@Udf(description = "Converts a string representation of a date at the given time zone"
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -44,10 +45,11 @@ public void shouldConvertTimestampToString() {
// When:
final String result = udf.formatTimestamp(new Timestamp(1638360611123L),
"yyyy-MM-dd HH:mm:ss.SSS");
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));

// Then:
final String expectedResult = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
.format(new Date(1638360611123L));
final String expectedResult = sdf.format(new Date(1638360611123L));
assertThat(result, is(expectedResult));
}

Expand All @@ -71,21 +73,6 @@ public void testPSTTimeZone() {
assertThat(result, is("2018-08-15 10:10:43"));
}

@Test
public void testTimeZoneInLocalTime() {
// Given:
final Timestamp timestamp = new Timestamp(1534353043000L);

// When:
final String localTime = udf.formatTimestamp(timestamp, "yyyy-MM-dd HH:mm:ss zz");

// Then:
final String expected = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zz")
.format(timestamp);

assertThat(localTime, is(expected));
}

@Test
public void testTimeZoneInPacificTime() {
// Given:
Expand Down Expand Up @@ -134,8 +121,9 @@ public void shouldSupportEmbeddedChars() {
"yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'");

// Then:
final String expectedResult = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'")
.format(new Date(1638360611123L));
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String expectedResult = sdf.format(new Date(1638360611123L));
assertThat(result, is(expectedResult));
}

Expand Down Expand Up @@ -170,7 +158,9 @@ public void shouldWorkWithManyDifferentFormatters() {
final String pattern = "yyyy-MM-dd HH:mm:ss.SSS'X" + idx + "'";
final Timestamp timestamp = new Timestamp(1538361611123L + idx);
final String result = udf.formatTimestamp(timestamp, pattern);
final String expectedResult = new SimpleDateFormat(pattern).format(timestamp);
final SimpleDateFormat sdf = new SimpleDateFormat(pattern);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String expectedResult = sdf.format(timestamp);
assertThat(result, is(expectedResult));
} catch (final Exception e) {
fail(e.getMessage());
Expand All @@ -187,7 +177,9 @@ public void shouldRoundTripWithParseTimestamp() {
.forEach(idx -> {
final Timestamp timestamp = new Timestamp(1538361611123L + idx);
final String result = udf.formatTimestamp(timestamp, pattern);
final String expectedResult = new SimpleDateFormat(pattern).format(timestamp);
final SimpleDateFormat sdf = new SimpleDateFormat(pattern);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String expectedResult = sdf.format(timestamp);
assertThat(result, is(expectedResult));

final Timestamp roundtripTimestamp = parseTimestamp.parseTimestamp(result, pattern);
Expand Down Expand Up @@ -225,7 +217,9 @@ public void shouldBehaveLikeSimpleDateFormat() {
}

private void assertLikeSimpleDateFormat(final String format) {
final String expected = new SimpleDateFormat(format).format(1538361611123L);
final SimpleDateFormat sdf = new SimpleDateFormat(format);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String expected = sdf.format(1538361611123L);
final Object result = new FormatTimestamp()
.formatTimestamp(new Timestamp(1538361611123L), format);
assertThat(result, is(expected));
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.TimeZone;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -43,10 +44,11 @@ public void shouldParseTimestamp() throws ParseException {
// When:
final Object result = udf.parseTimestamp("2021-12-01 12:10:11.123",
"yyyy-MM-dd HH:mm:ss.SSS");
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));

// Then:
final Timestamp expectedResult = Timestamp.from(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
.parse("2021-12-01 12:10:11.123").toInstant());
final Timestamp expectedResult = Timestamp.from(sdf.parse("2021-12-01 12:10:11.123").toInstant());
assertThat(result, is(expectedResult));
}

Expand All @@ -56,9 +58,11 @@ public void shouldSupportEmbeddedChars() throws ParseException {
final Object result = udf.parseTimestamp("2021-12-01T12:10:11.123Fred",
"yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'");

final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));

// Then:
final Timestamp expectedResult = Timestamp.from(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'")
.parse("2021-12-01T12:10:11.123Fred").toInstant());
final Timestamp expectedResult = Timestamp.from(sdf.parse("2021-12-01T12:10:11.123Fred").toInstant());
assertThat(result, is(expectedResult));
}

Expand Down Expand Up @@ -142,7 +146,9 @@ public void shouldWorkWithManyDifferentFormatters() {
final String sourceDate = "2018-12-01 10:12:13.456X" + idx;
final String pattern = "yyyy-MM-dd HH:mm:ss.SSS'X" + idx + "'";
final Timestamp result = udf.parseTimestamp(sourceDate, pattern);
final Timestamp expectedResult = Timestamp.from(new SimpleDateFormat(pattern).parse(sourceDate).toInstant());
final SimpleDateFormat sdf = new SimpleDateFormat(pattern);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final Timestamp expectedResult = Timestamp.from(sdf.parse(sourceDate).toInstant());
assertThat(result, is(expectedResult));
} catch (final Exception e) {
fail(e.getMessage());
Expand Down Expand Up @@ -175,7 +181,9 @@ public void shouldBehaveLikeSimpleDateFormat() throws Exception {
}

private void assertLikeSimpleDateFormat(final String value, final String format) throws Exception {
final Timestamp expected = Timestamp.from(new SimpleDateFormat(format).parse(value).toInstant());
final SimpleDateFormat sdf = new SimpleDateFormat(format);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final Timestamp expected = Timestamp.from(sdf.parse(value).toInstant());
final Object result = new ParseTimestamp().parseTimestamp(value, format);
assertThat(result, is(expected));
}
Expand Down
@@ -0,0 +1,145 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (K STRING KEY, ID BIGINT, NAME STRING, TIME STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `TIME` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TS AS SELECT\n TEST.K K,\n TEST.ID ID,\n PARSE_TIMESTAMP(TEST.TIME, 'yyyy-MM-dd''T''HH:mm:ss') TS\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TS",
"schema" : "`K` STRING KEY, `ID` BIGINT, `TS` TIMESTAMP",
"topicName" : "TS",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TS",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TS"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"sourceSchema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `TIME` STRING"
},
"keyColumnNames" : [ "K" ],
"selectExpressions" : [ "ID AS ID", "PARSE_TIMESTAMP(TIME, 'yyyy-MM-dd''T''HH:mm:ss') AS TS" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"topicName" : "TS"
},
"queryId" : "CSAS_TS_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.suppress.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}

0 comments on commit a5ea98a

Please sign in to comment.