From d4718f33314eb6f6ea6c4ccb96253344e5f980b2 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Sat, 10 Sep 2022 18:30:53 -0400 Subject: [PATCH] Fix Java Runner V2 cannot encode/decode datetime type in zetasql * Specify correct coder for millis_instant * [DO NOT MERGE] test only --- .../beam/sdk/schemas/SchemaCoderHelpers.java | 47 +++++++++++++++---- .../org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 13 ----- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java index c661a59926f2..68ce55010a1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.coders.BigDecimalCoder; import org.apache.beam.sdk.coders.BigEndianShortCoder; import org.apache.beam.sdk.coders.BooleanCoder; @@ -47,6 +49,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.joda.time.ReadableInstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings({ "nullness", // TODO(https://github.com/apache/beam/issues/20497) @@ -69,6 +73,14 @@ class SchemaCoderHelpers { .put(TypeName.BOOLEAN, BooleanCoder.of()) .build(); + private static final String URN_BEAM_LOGICAL_MILLIS_INSTANT = + SchemaApi.LogicalTypes.Enum.MILLIS_INSTANT + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamUrn); + + private static final Logger LOG = LoggerFactory.getLogger(SchemaCoderHelpers.class); + private static class LogicalTypeCoder extends Coder { private final LogicalType logicalType; private final Coder baseTypeCoder; @@ -77,20 +89,29 @@ private static class LogicalTypeCoder extends Coder { LogicalTypeCoder(LogicalType logicalType, Coder baseTypeCoder) { this.logicalType = logicalType; this.baseTypeCoder = baseTypeCoder; - this.isDateTime = logicalType.getBaseType().equals(FieldType.DATETIME); + // the MILLIS_INSTANT logical type decodes/encodes joda.time.Instant + this.isDateTime = + logicalType.getBaseType().equals(FieldType.DATETIME) + || logicalType.getIdentifier().equals(URN_BEAM_LOGICAL_MILLIS_INSTANT); } @Override public void encode(InputT value, OutputStream outStream) throws CoderException, IOException { - BaseT baseType = logicalType.toBaseType(value); - if (isDateTime) { - baseType = (BaseT) ((ReadableInstant) baseType).toInstant(); + try { + BaseT baseType = logicalType.toBaseType(value); + LOG.info("Encoding type {}", logicalType.getIdentifier()); + if (isDateTime) { + baseType = (BaseT) ((ReadableInstant) baseType).toInstant(); + } + baseTypeCoder.encode(baseType, outStream); + } catch (Exception e) { + throw new RuntimeException("logical type identifier: " + logicalType.getIdentifier(), e); } - baseTypeCoder.encode(baseType, outStream); } @Override public InputT decode(InputStream inStream) throws CoderException, IOException { + LOG.info("Decoding type {}", logicalType.getIdentifier()); BaseT baseType = baseTypeCoder.decode(inStream); return logicalType.toInputType(baseType); } @@ -155,10 +176,18 @@ public static Coder coderForFieldType(FieldType fieldType) { coderForFieldType(fieldType.getMapValueType())); break; case LOGICAL_TYPE: - coder = - new LogicalTypeCoder( - fieldType.getLogicalType(), - coderForFieldType(fieldType.getLogicalType().getBaseType())); + // The millis_instant logical type decodes/encodes joda.time.Instant + // TODO(yathu): Implement CoderLogicalType and make millis_instant as CoderLogicalType + // then assign the corresponding coder here. + LogicalType logicalType = fieldType.getLogicalType(); + if (logicalType.getIdentifier().equals(URN_BEAM_LOGICAL_MILLIS_INSTANT)) { + coder = (Coder) InstantCoder.of(); + } else { + coder = + new LogicalTypeCoder( + fieldType.getLogicalType(), + coderForFieldType(fieldType.getLogicalType().getBaseType())); + } break; default: coder = (Coder) CODER_MAP.get(fieldType.getTypeName()); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java index 644d3d801a5d..f4a5169c77bc 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.io.jdbc.JdbcIO.ReadWithPartitions; import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.Row; @@ -190,18 +189,6 @@ static JdbcIO.PreparedStatementSetCaller getPreparedStatementSetCaller( } String logicalTypeName = fieldType.getLogicalType().getIdentifier(); - - if (logicalTypeName.equals(MicrosInstant.IDENTIFIER)) { - // Process timestamp of MicrosInstant kind, which should only be passed from other type - // systems such as SQL and other Beam SDKs. - return (element, ps, i, fieldWithIndex) -> { - // MicrosInstant uses native java.time.Instant instead of joda.Instant. - java.time.Instant value = - element.getLogicalTypeValue(fieldWithIndex.getIndex(), java.time.Instant.class); - ps.setTimestamp(i + 1, value == null ? null : new Timestamp(value.toEpochMilli())); - }; - } - JDBCType jdbcType = JDBCType.valueOf(logicalTypeName); switch (jdbcType) { case DATE: