Skip to content

Commit

Permalink
[BEAM-9424] Allow grouping by LogicalType
Browse files Browse the repository at this point in the history
  • Loading branch information
Fernando Diaz authored and fediazgon committed Mar 3, 2020
1 parent d924f72 commit 73576ac
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
Expand Up @@ -133,6 +133,8 @@ public static <T> Coder<T> coderForFieldType(FieldType fieldType) {
MapCoder.of(
coderForFieldType(fieldType.getMapKeyType()),
coderForFieldType(fieldType.getMapValueType()));
case LOGICAL_TYPE:
return coderForFieldType(fieldType.getLogicalType().getBaseType());
default:
return (Coder<T>) CODER_MAP.get(fieldType.getTypeName());
}
Expand Down
Expand Up @@ -373,6 +373,41 @@ public void testNullInnerRow() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}

@Test
public void testLogicalTypes() {
DateTime dateTime = DateTime.parse("2020-02-02T00:00:00");

Schema inputRowSchema =
Schema.builder()
.addField("timeTypeField", FieldType.logicalType(new DummySqlTimeType()))
.addField("dateTypeField", FieldType.logicalType(new DummySqlDateType()))
.build();

Row row =
Row.withSchema(inputRowSchema)
.addValues(dateTime.getMillis(), dateTime.getMillis())
.build();

Schema outputRowSchema =
Schema.builder()
.addField("timeTypeField", FieldType.DATETIME)
.addNullableField("dateTypeField", FieldType.DATETIME)
.build();

PCollection<Row> outputRow =
pipeline
.apply(Create.of(row))
.setRowSchema(outputRowSchema)
.apply(
SqlTransform.query(
"SELECT timeTypeField, dateTypeField FROM PCOLLECTION GROUP BY timeTypeField, dateTypeField"));

PAssert.that(outputRow)
.containsInAnyOrder(Row.withSchema(outputRowSchema).addValues(dateTime, dateTime).build());

pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}

private static class DummySqlTimeType implements Schema.LogicalType<Long, Instant> {
@Override
public String getIdentifier() {
Expand Down

0 comments on commit 73576ac

Please sign in to comment.