Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianShortCoder;
import org.apache.beam.sdk.coders.BooleanCoder;
Expand All @@ -47,6 +49,8 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings({
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
Expand All @@ -69,6 +73,14 @@ class SchemaCoderHelpers {
.put(TypeName.BOOLEAN, BooleanCoder.of())
.build();

private static final String URN_BEAM_LOGICAL_MILLIS_INSTANT =
SchemaApi.LogicalTypes.Enum.MILLIS_INSTANT
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamUrn);

private static final Logger LOG = LoggerFactory.getLogger(SchemaCoderHelpers.class);

private static class LogicalTypeCoder<InputT, BaseT> extends Coder<InputT> {
private final LogicalType<InputT, BaseT> logicalType;
private final Coder<BaseT> baseTypeCoder;
Expand All @@ -77,20 +89,29 @@ private static class LogicalTypeCoder<InputT, BaseT> extends Coder<InputT> {
LogicalTypeCoder(LogicalType<InputT, BaseT> logicalType, Coder baseTypeCoder) {
this.logicalType = logicalType;
this.baseTypeCoder = baseTypeCoder;
this.isDateTime = logicalType.getBaseType().equals(FieldType.DATETIME);
// the MILLIS_INSTANT logical type decodes/encodes joda.time.Instant
this.isDateTime =
logicalType.getBaseType().equals(FieldType.DATETIME)
|| logicalType.getIdentifier().equals(URN_BEAM_LOGICAL_MILLIS_INSTANT);
}

@Override
public void encode(InputT value, OutputStream outStream) throws CoderException, IOException {
BaseT baseType = logicalType.toBaseType(value);
if (isDateTime) {
baseType = (BaseT) ((ReadableInstant) baseType).toInstant();
try {
BaseT baseType = logicalType.toBaseType(value);
LOG.info("Encoding type {}", logicalType.getIdentifier());
if (isDateTime) {
baseType = (BaseT) ((ReadableInstant) baseType).toInstant();
}
baseTypeCoder.encode(baseType, outStream);
} catch (Exception e) {
throw new RuntimeException("logical type identifier: " + logicalType.getIdentifier(), e);
}
baseTypeCoder.encode(baseType, outStream);
}

@Override
public InputT decode(InputStream inStream) throws CoderException, IOException {
LOG.info("Decoding type {}", logicalType.getIdentifier());
BaseT baseType = baseTypeCoder.decode(inStream);
return logicalType.toInputType(baseType);
}
Expand Down Expand Up @@ -155,10 +176,18 @@ public static <T> Coder<T> coderForFieldType(FieldType fieldType) {
coderForFieldType(fieldType.getMapValueType()));
break;
case LOGICAL_TYPE:
coder =
new LogicalTypeCoder(
fieldType.getLogicalType(),
coderForFieldType(fieldType.getLogicalType().getBaseType()));
// The millis_instant logical type decodes/encodes joda.time.Instant
// TODO(yathu): Implement CoderLogicalType and make millis_instant as CoderLogicalType
// then assign the corresponding coder here.
LogicalType logicalType = fieldType.getLogicalType();
if (logicalType.getIdentifier().equals(URN_BEAM_LOGICAL_MILLIS_INSTANT)) {
coder = (Coder<T>) InstantCoder.of();
} else {
coder =
new LogicalTypeCoder(
fieldType.getLogicalType(),
coderForFieldType(fieldType.getLogicalType().getBaseType()));
}
break;
default:
coder = (Coder<T>) CODER_MAP.get(fieldType.getTypeName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.beam.sdk.io.jdbc.JdbcIO.ReadWithPartitions;
import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -190,18 +189,6 @@ static JdbcIO.PreparedStatementSetCaller getPreparedStatementSetCaller(
}

String logicalTypeName = fieldType.getLogicalType().getIdentifier();

if (logicalTypeName.equals(MicrosInstant.IDENTIFIER)) {
// Process timestamp of MicrosInstant kind, which should only be passed from other type
// systems such as SQL and other Beam SDKs.
return (element, ps, i, fieldWithIndex) -> {
// MicrosInstant uses native java.time.Instant instead of joda.Instant.
java.time.Instant value =
element.getLogicalTypeValue(fieldWithIndex.getIndex(), java.time.Instant.class);
ps.setTimestamp(i + 1, value == null ? null : new Timestamp(value.toEpochMilli()));
};
}

JDBCType jdbcType = JDBCType.valueOf(logicalTypeName);
switch (jdbcType) {
case DATE:
Expand Down