Skip to content

Commit

Permalink
fix: add time types to Java client (#8091)
Browse files Browse the repository at this point in the history
* fix: add time types to Java client

* add integration test
  • Loading branch information
Zara Lim committed Sep 2, 2021
1 parent cce585b commit fd9faf2
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
*/
public interface ColumnType {

enum Type { STRING, INTEGER, BIGINT, DOUBLE, BOOLEAN, DECIMAL, BYTES, ARRAY, MAP, STRUCT }
enum Type { STRING, INTEGER, BIGINT, DOUBLE, BOOLEAN, DECIMAL, BYTES, ARRAY, MAP, STRUCT,
TIMESTAMP, DATE, TIME }

/**
* Returns the type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1816,6 +1816,9 @@ private static void verifyRowWithIndex(final Row row, final int index) {
assertThat(values.getKsqlObject(8), is(row.getKsqlObject("f_map")));
assertThat(values.getKsqlObject(9), is(row.getKsqlObject("f_struct")));
assertThat(values.getValue(10), is(nullValue()));
assertThat(values.getValue(11), is(row.getString("f_timestamp")));
assertThat(values.getValue(12), is(row.getString("f_date")));
assertThat(values.getValue(13), is(row.getString("f_time")));
assertThat(values.toJsonString(), is((new JsonArray(values.getList())).toString()));
assertThat(values.toString(), is(values.toJsonString()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public class RowImplTest {

private static final List<String> COLUMN_NAMES =
ImmutableList.of("f_str", "f_int", "f_long", "f_double", "f_bool", "f_decimal", "f_bytes",
"f_array", "f_map", "f_struct", "f_null");
"f_array", "f_map", "f_struct", "f_null", "f_timestamp", "f_date", "f_time");
private static final List<ColumnType> COLUMN_TYPES = RowUtil.columnTypesFromStrings(
ImmutableList.of("STRING", "INTEGER", "BIGINT", "DOUBLE", "BOOLEAN", "DECIMAL", "BYTES",
"ARRAY", "MAP", "STRUCT", "INTEGER"));
"ARRAY", "MAP", "STRUCT", "INTEGER", "TIMESTAMP", "DATE", "TIME"));
private static final Map<String, Integer> COLUMN_NAME_TO_INDEX = RowUtil.valueToIndexMap(COLUMN_NAMES);
private static final JsonArray VALUES = new JsonArray()
.add("foo")
Expand All @@ -57,7 +57,10 @@ public class RowImplTest {
.add(new JsonArray("[\"e1\",\"e2\"]"))
.add(new JsonObject("{\"k1\":\"v1\",\"k2\":\"v2\"}"))
.add(new JsonObject("{\"f1\":\"baz\",\"f2\":12}"))
.addNull();
.addNull()
.add("2020-01-01T04:40:34.789") // server endpoint returns timestamp/date/time as strings
.add("2020-01-01")
.add("04:40:34.789");

private RowImpl row;

Expand All @@ -83,6 +86,9 @@ public void shouldOneIndexColumnNames() {
assertThat(row.getValue(9), is(new JsonObject("{\"k1\":\"v1\",\"k2\":\"v2\"}")));
assertThat(row.getValue(10), is(new JsonObject("{\"f1\":\"baz\",\"f2\":12}")));
assertThat(row.getValue(11), is(nullValue()));
assertThat(row.getValue(12), is("2020-01-01T04:40:34.789"));
assertThat(row.getValue(13), is("2020-01-01"));
assertThat(row.getValue(14), is("04:40:34.789"));
}

@Test
Expand Down Expand Up @@ -175,6 +181,9 @@ public void shouldGetAsObject() {
assertThat(obj.getKsqlArray("f_array"), is(new KsqlArray(ImmutableList.of("e1", "e2"))));
assertThat(obj.getKsqlObject("f_map"), is(new KsqlObject(ImmutableMap.of("k1", "v1", "k2", "v2"))));
assertThat(obj.getKsqlObject("f_struct"), is(new KsqlObject(ImmutableMap.of("f1", "baz", "f2", 12))));
assertThat(obj.getString("f_timestamp"), is("2020-01-01T04:40:34.789"));
assertThat(obj.getString("f_date"), is("2020-01-01"));
assertThat(obj.getString("f_time"), is("04:40:34.789"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SqlTimeTypes;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
Expand All @@ -97,6 +98,9 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -136,10 +140,11 @@ public class ClientIntegrationTest {
private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName();
private static final int TEST_NUM_ROWS = TEST_DATA_PROVIDER.data().size();
private static final List<String> TEST_COLUMN_NAMES =
ImmutableList.of("K", "STR", "LONG", "DEC", "BYTES_", "ARRAY", "MAP", "STRUCT", "COMPLEX");
ImmutableList.of("K", "STR", "LONG", "DEC", "BYTES_", "ARRAY", "MAP", "STRUCT", "COMPLEX",
"TIMESTAMP", "DATE", "TIME");
private static final List<ColumnType> TEST_COLUMN_TYPES =
RowUtil.columnTypesFromStrings(ImmutableList.of("STRUCT", "STRING", "BIGINT", "DECIMAL",
"BYTES", "ARRAY", "MAP", "STRUCT", "STRUCT"));
"BYTES", "ARRAY", "MAP", "STRUCT", "STRUCT", "TIMESTAMP", "DATE", "TIME"));
private static final List<KsqlArray> TEST_EXPECTED_ROWS =
convertToClientRows(TEST_DATA_PROVIDER.data());

Expand Down Expand Up @@ -623,7 +628,10 @@ public void shouldInsertInto() throws Exception {
.put("ARRAY", new KsqlArray().add("v1").add("v2"))
.put("MAP", new KsqlObject().put("some_key", "a_value").put("another_key", ""))
.put("STRUCT", new KsqlObject().put("f1", 12)) // Nested field names are case-insensitive
.put("COMPLEX", COMPLEX_FIELD_VALUE);
.put("COMPLEX", COMPLEX_FIELD_VALUE)
.put("TIMESTAMP", "1970-01-01T00:00:00.001")
.put("DATE", "1970-01-01")
.put("TIME", "00:00:01");

// When
client.insertInto(EMPTY_TEST_STREAM.toLowerCase(), insertRow).get(); // Stream name is case-insensitive
Expand All @@ -643,6 +651,9 @@ public void shouldInsertInto() throws Exception {
assertThat(rows.get(0).getKsqlObject("MAP"), is(new KsqlObject().put("some_key", "a_value").put("another_key", "")));
assertThat(rows.get(0).getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 12)));
assertThat(rows.get(0).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
assertThat(rows.get(0).getString("TIMESTAMP"), is("1970-01-01T00:00:00.001"));
assertThat(rows.get(0).getString("DATE"), is("1970-01-01"));
assertThat(rows.get(0).getString("TIME"), is("00:00:01"));
}

@Test
Expand Down Expand Up @@ -680,7 +691,10 @@ public void shouldStreamQueryWithProperties() throws Exception {
.put("ARRAY", new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties"))
.put("MAP", new KsqlObject().put("test_name", "shouldStreamQueryWithProperties"))
.put("STRUCT", new KsqlObject().put("F1", 4))
.put("COMPLEX", COMPLEX_FIELD_VALUE);
.put("COMPLEX", COMPLEX_FIELD_VALUE)
.put("TIMESTAMP", "1970-01-01T00:00:00.001")
.put("DATE", "1970-01-01")
.put("TIME", "00:00:00");

// When
final StreamedQueryResult queryResult = client.streamQuery(sql, properties).get();
Expand All @@ -705,6 +719,9 @@ public void shouldStreamQueryWithProperties() throws Exception {
assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldStreamQueryWithProperties")));
assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4)));
assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
assertThat(row.getString("TIMESTAMP"), is("1970-01-01T00:00:00.001"));
assertThat(row.getString("DATE"), is("1970-01-01"));
assertThat(row.getString("TIME"), is("00:00"));
}

@Test
Expand All @@ -723,7 +740,10 @@ public void shouldExecuteQueryWithProperties() {
.put("ARRAY", new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties"))
.put("MAP", new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties"))
.put("STRUCT", new KsqlObject().put("F1", 4))
.put("COMPLEX", COMPLEX_FIELD_VALUE);
.put("COMPLEX", COMPLEX_FIELD_VALUE)
.put("TIMESTAMP", "1970-01-01T00:00:00.001")
.put("DATE", "1970-01-01")
.put("TIME", "00:00:00");

// When
final BatchedQueryResult queryResult = client.executeQuery(sql, properties);
Expand Down Expand Up @@ -763,6 +783,9 @@ public void shouldExecuteQueryWithProperties() {
assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties")));
assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4)));
assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
assertThat(row.getString("TIMESTAMP"), is("1970-01-01T00:00:00.001"));
assertThat(row.getString("DATE"), is("1970-01-01"));
assertThat(row.getString("TIME"), is("00:00"));
}

@Test
Expand All @@ -787,7 +810,10 @@ public void shouldStreamInserts() throws Exception {
.put("BYTES_", new byte[]{0, 1, 2})
.put("ARRAY", new KsqlArray().add("v_" + i))
.put("MAP", new KsqlObject().put("k_" + i, "v_" + i))
.put("COMPLEX", COMPLEX_FIELD_VALUE));
.put("COMPLEX", COMPLEX_FIELD_VALUE)
.put("TIMESTAMP", "1970-01-01T00:00:00.001")
.put("DATE", "1970-01-01")
.put("TIME", "00:00"));
}

// Then
Expand Down Expand Up @@ -816,6 +842,9 @@ public void shouldStreamInserts() throws Exception {
assertThat(rows.get(i).getKsqlArray("ARRAY"), is(new KsqlArray().add("v_" + i)));
assertThat(rows.get(i).getKsqlObject("MAP"), is(new KsqlObject().put("k_" + i, "v_" + i)));
assertThat(rows.get(i).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
assertThat(rows.get(i).getString("TIMESTAMP"), is("1970-01-01T00:00:00.001"));
assertThat(rows.get(i).getString("DATE"), is("1970-01-01"));
assertThat(rows.get(i).getString("TIME"), is("00:00"));
}

// When: end connection
Expand Down Expand Up @@ -1067,7 +1096,7 @@ public void shouldDescribeSource() throws Exception {
+ "COMPLEX STRUCT<`DECIMAL` DECIMAL(2, 1), STRUCT STRUCT<F1 STRING, F2 INTEGER>, "
+ "ARRAY_ARRAY ARRAY<ARRAY<STRING>>, ARRAY_STRUCT ARRAY<STRUCT<F1 STRING>>, "
+ "ARRAY_MAP ARRAY<MAP<STRING, INTEGER>>, MAP_ARRAY MAP<STRING, ARRAY<STRING>>, "
+ "MAP_MAP MAP<STRING, MAP<STRING, INTEGER>>, MAP_STRUCT MAP<STRING, STRUCT<F1 STRING>>>) "
+ "MAP_MAP MAP<STRING, MAP<STRING, INTEGER>>, MAP_STRUCT MAP<STRING, STRUCT<F1 STRING>>>, TIMESTAMP TIMESTAMP, DATE DATE, TIME TIME) "
+ "WITH (KAFKA_TOPIC='STRUCTURED_TYPES_TOPIC', KEY_FORMAT='JSON', VALUE_FORMAT='JSON');"));
}

Expand Down Expand Up @@ -1430,6 +1459,12 @@ private static void addObjectToKsqlArray(final KsqlArray array, final Object val
// Can't use expectedRow.add((BigDecimal) value) directly since client serializes BigDecimal as string,
// whereas this method builds up the expected result (unrelated to serialization)
array.addAll(new KsqlArray(Collections.singletonList(value)));
} else if (value instanceof Timestamp) {
array.add(SqlTimeTypes.formatTimestamp((Timestamp) value));
} else if (value instanceof Date) {
array.add(SqlTimeTypes.formatDate((Date) value));
} else if (value instanceof Time) {
array.add(SqlTimeTypes.formatTime((Time) value));
} else {
array.add(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public void shouldGetColumnTypesFromStrings() {
"ARRAY<STRING>",
"MAP<STRING, STRING>",
"DECIMAL(4, 2)",
"STRUCT<`F1` STRING, `F2` INTEGER>"
"STRUCT<`F1` STRING, `F2` INTEGER>",
"TIMESTAMP",
"DATE",
"TIME"
);

// When
Expand All @@ -58,7 +61,10 @@ public void shouldGetColumnTypesFromStrings() {
"ARRAY",
"MAP",
"DECIMAL",
"STRUCT"
"STRUCT",
"TIMESTAMP",
"DATE",
"TIME"
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import io.confluent.ksql.serde.connect.ConnectSchemas;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -74,6 +76,9 @@ public class StructuredTypesDataProvider extends TestDataProvider {
))
.build()
)
.valueColumn(ColumnName.of("TIMESTAMP"), SqlTypes.TIMESTAMP)
.valueColumn(ColumnName.of("DATE"), SqlTypes.DATE)
.valueColumn(ColumnName.of("TIME"), SqlTypes.TIME)
.build();

private static final PhysicalSchema PHYSICAL_SCHEMA = PhysicalSchema
Expand All @@ -89,18 +94,24 @@ public class StructuredTypesDataProvider extends TestDataProvider {
private static final Multimap<GenericKey, GenericRow> ROWS = ImmutableListMultimap
.<GenericKey, GenericRow>builder()
.put(genericKey(generateStructKey("a")), genericRow("FOO", 1L, new BigDecimal("1.11"), new byte[]{1},
Collections.singletonList("a"), Collections.singletonMap("k1", "v1"), generateSimpleStructValue(2), generateComplexStructValue(0)))
Collections.singletonList("a"), Collections.singletonMap("k1", "v1"), generateSimpleStructValue(2), generateComplexStructValue(0),
new Timestamp(1), new Date(86400000), new Time(0)))
.put(genericKey(generateStructKey("b")), genericRow("BAR", 2L, new BigDecimal("2.22"), new byte[]{2},
Collections.emptyList(), Collections.emptyMap(), generateSimpleStructValue(3), generateComplexStructValue(1)))
Collections.emptyList(), Collections.emptyMap(), generateSimpleStructValue(3), generateComplexStructValue(1),
new Timestamp(2), new Date(86400000 * 2), new Time(1)))
.put(genericKey(generateStructKey("c")), genericRow("BAZ", 3L, new BigDecimal("30.33"), new byte[]{3},
Collections.singletonList("b"), Collections.emptyMap(), generateSimpleStructValue(null), generateComplexStructValue(2)))
Collections.singletonList("b"), Collections.emptyMap(), generateSimpleStructValue(null), generateComplexStructValue(2),
new Timestamp(3), new Date(86400000 * 3), new Time(2)))
.put(genericKey(generateStructKey("d")), genericRow("BUZZ", 4L, new BigDecimal("40.44"), new byte[]{4},
ImmutableList.of("c", "d"), Collections.emptyMap(), generateSimpleStructValue(88), generateComplexStructValue(3)))
ImmutableList.of("c", "d"), Collections.emptyMap(), generateSimpleStructValue(88), generateComplexStructValue(3),
new Timestamp(4), new Date(86400000 * 4), new Time(3)))
// Additional entries for repeated keys
.put(genericKey(generateStructKey("c")), genericRow("BAZ", 5L, new BigDecimal("12.0"), new byte[]{15},
ImmutableList.of("e"), ImmutableMap.of("k1", "v1", "k2", "v2"), generateSimpleStructValue(0), generateComplexStructValue(4)))
ImmutableList.of("e"), ImmutableMap.of("k1", "v1", "k2", "v2"), generateSimpleStructValue(0), generateComplexStructValue(4),
new Timestamp(11), new Date(86400000 * 11), new Time(11)))
.put(genericKey(generateStructKey("d")), genericRow("BUZZ", 6L, new BigDecimal("10.1"), new byte[]{6},
ImmutableList.of("f", "g"), Collections.emptyMap(), generateSimpleStructValue(null), generateComplexStructValue(5)))
ImmutableList.of("f", "g"), Collections.emptyMap(), generateSimpleStructValue(null), generateComplexStructValue(5),
new Timestamp(12), new Date(86400000 * 12), new Time(12)))
.build();

public StructuredTypesDataProvider() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ public class BaseApiTest {

protected static final JsonArray DEFAULT_COLUMN_NAMES = new JsonArray().add("f_str").add("f_int")
.add("f_bool").add("f_long").add("f_double").add("f_decimal").add("f_bytes")
.add("f_array").add("f_map").add("f_struct").add("f_null");
.add("f_array").add("f_map").add("f_struct").add("f_null").add("f_timestamp")
.add("f_date").add("f_time");
protected static final JsonArray DEFAULT_COLUMN_TYPES = new JsonArray().add("STRING").add("INTEGER")
.add("BOOLEAN").add("BIGINT").add("DOUBLE").add("DECIMAL(4, 2)").add("BYTES")
.add("ARRAY<STRING>").add("MAP<STRING, STRING>").add("STRUCT<`F1` STRING, `F2` INTEGER>").add("INTEGER");
.add("ARRAY<STRING>").add("MAP<STRING, STRING>").add("STRUCT<`F1` STRING, `F2` INTEGER>").add("INTEGER")
.add("TIMESTAMP").add("DATE").add("TIME");
protected static final Schema F_STRUCT_SCHEMA = SchemaBuilder.struct()
.field("F1", Schema.OPTIONAL_STRING_SCHEMA)
.field("F2", Schema.OPTIONAL_INT32_SCHEMA)
Expand Down Expand Up @@ -267,7 +269,10 @@ private static GenericRow rowWithIndex(final int index) {
ImmutableList.of("s" + index, "t" + index),
ImmutableMap.of("k" + index, "v" + index),
structField,
null
null,
"2020-01-01T04:40:34.789",
"2020-01-01",
"04:40:34.789"
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,16 @@ public void shouldExecutePushQueryWithLimit() {
assertThat(response.rows, hasSize(2));
assertThat(response.responseObject.getJsonArray("columnNames"), is(
new JsonArray().add("K").add("STR").add("LONG").add("DEC").add("BYTES_").add("ARRAY")
.add("MAP").add("STRUCT").add("COMPLEX")));
.add("MAP").add("STRUCT").add("COMPLEX").add("TIMESTAMP").add("DATE").add("TIME")));
assertThat(response.responseObject.getJsonArray("columnTypes"), is(
new JsonArray().add("STRUCT<`F1` ARRAY<STRING>>").add("STRING").add("BIGINT")
.add("DECIMAL(4, 2)").add("BYTES").add("ARRAY<STRING>").add("MAP<STRING, STRING>")
.add("STRUCT<`F1` INTEGER>")
.add("STRUCT<`DECIMAL` DECIMAL(2, 1), `STRUCT` STRUCT<`F1` STRING, `F2` INTEGER>, "
+ "`ARRAY_ARRAY` ARRAY<ARRAY<STRING>>, `ARRAY_STRUCT` ARRAY<STRUCT<`F1` STRING>>, "
+ "`ARRAY_MAP` ARRAY<MAP<STRING, INTEGER>>, `MAP_ARRAY` MAP<STRING, ARRAY<STRING>>, "
+ "`MAP_MAP` MAP<STRING, MAP<STRING, INTEGER>>, `MAP_STRUCT` MAP<STRING, STRUCT<`F1` STRING>>>")));
+ "`MAP_MAP` MAP<STRING, MAP<STRING, INTEGER>>, `MAP_STRUCT` MAP<STRING, STRUCT<`F1` STRING>>>")
.add("TIMESTAMP").add("DATE").add("TIME")));
assertThat(response.responseObject.getString("queryId"), is(notNullValue()));
}

Expand Down

0 comments on commit fd9faf2

Please sign in to comment.