diff --git a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java index 4a7d979c6..584c7bb3f 100644 --- a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java +++ b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java @@ -59,7 +59,7 @@ public final class DBUtils { private static final Logger LOG = LoggerFactory.getLogger(DBUtils.class); - private static final Calendar PURE_GREGORIAN_CALENDAR = createPureGregorianCalender(); + public static final Calendar PURE_GREGORIAN_CALENDAR = createPureGregorianCalender(); // Java by default uses October 15, 1582 as a Gregorian cut over date. // Any timestamp created with time less than this cut over date is treated as Julian date. diff --git a/postgresql-plugin/src/e2e-test/resources/pluginParameters.properties b/postgresql-plugin/src/e2e-test/resources/pluginParameters.properties index 00c4aa0ae..8b25c884c 100644 --- a/postgresql-plugin/src/e2e-test/resources/pluginParameters.properties +++ b/postgresql-plugin/src/e2e-test/resources/pluginParameters.properties @@ -42,7 +42,7 @@ datatypesSchema=[{"key":"id","value":"string"},{"key":"col1","value":"string"},{ {"key":"col10","value":"decimal"},{"key":"col11","value":"decimal"},{"key":"col12","value":"float"},\ {"key":"col13","value":"double"},{"key":"col14","value":"string"},{"key":"col15","value":"string"},\ {"key":"col16","value":"string"},{"key":"col17","value":"double"},{"key":"col18","value":"decimal"},\ - {"key":"col22","value":"timestamp"},{"key":"col23","value":"timestamp"},{"key":"col24","value":"time"},\ + {"key":"col22","value":"datetime"},{"key":"col23","value":"timestamp"},{"key":"col24","value":"time"},\ {"key":"col25","value":"string"},{"key":"col26","value":"string"},{"key":"col27","value":"date"},\ {"key":"col28","value":"string"},{"key":"col29","value":"string"},{"key":"col30","value":"string"},\ {"key":"col31","value":"string"},{"key":"col32","value":"string"},{"key":"col33","value":"string"},\ diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java index e489ab54b..d0d7157ae 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java @@ -22,6 +22,7 @@ import io.cdap.plugin.db.DBRecord; import io.cdap.plugin.db.Operation; import io.cdap.plugin.db.SchemaReader; +import io.cdap.plugin.util.DBUtils; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -29,6 +30,11 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.List; /** @@ -51,13 +57,46 @@ public PostgresDBRecord() { @Override protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException { + String columnTypeName = resultSet.getMetaData().getColumnTypeName(columnIndex); if (isUseSchema(resultSet.getMetaData(), columnIndex)) { setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex); + } else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) { + Timestamp timestamp = resultSet.getTimestamp(columnIndex, DBUtils.PURE_GREGORIAN_CALENDAR); + if (timestamp != null) { + ZonedDateTime zonedDateTime = OffsetDateTime.of(timestamp.toLocalDateTime(), OffsetDateTime.now().getOffset()) + .atZoneSameInstant(ZoneId.of("UTC")); + Schema nonNullableSchema = field.getSchema().isNullable() ? + field.getSchema().getNonNullable() : field.getSchema(); + setZonedDateTimeBasedOnOuputSchema(recordBuilder, nonNullableSchema.getLogicalType(), + field.getName(), zonedDateTime); + } else { + recordBuilder.set(field.getName(), null); + } + } else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) { + OffsetDateTime timestamp = resultSet.getObject(columnIndex, OffsetDateTime.class); + if (timestamp != null) { + recordBuilder.setTimestamp(field.getName(), timestamp.atZoneSameInstant(ZoneId.of("UTC"))); + } else { + recordBuilder.set(field.getName(), null); + } } else { setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); } } + private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordBuilder, + Schema.LogicalType logicalType, + String fieldName, + ZonedDateTime zonedDateTime) { + if (Schema.LogicalType.DATETIME.equals(logicalType)) { + recordBuilder.setDateTime(fieldName, zonedDateTime.toLocalDateTime()); + } else if (Schema.LogicalType.TIMESTAMP_MICROS.equals(logicalType)) { + recordBuilder.setTimestamp(fieldName, zonedDateTime); + } + + return; + } + private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException { switch (metadata.getColumnTypeName(columnIndex)) { case "bit": diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresFieldsValidator.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresFieldsValidator.java index b3b8cac62..ad70a6abc 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresFieldsValidator.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresFieldsValidator.java @@ -30,8 +30,8 @@ public class PostgresFieldsValidator extends CommonFieldsValidator { @Override public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, int index) throws SQLException { - Schema.Type fieldType = field.getSchema().isNullable() ? field.getSchema().getNonNullable().getType() - : field.getSchema().getType(); + Schema schema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); + Schema.Type fieldType = schema.getType(); String colTypeName = metadata.getColumnTypeName(index); int columnType = metadata.getColumnType(index); @@ -46,6 +46,11 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, } } + if (colTypeName.equalsIgnoreCase("timestamp") + && schema.getLogicalType().equals(Schema.LogicalType.DATETIME)) { + return true; + } + return super.isFieldCompatible(field, metadata, index); } } diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java index 685f4ffc6..5ba9d8cc2 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java @@ -58,6 +58,10 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti return Schema.of(Schema.Type.STRING); } + if (typeName.equalsIgnoreCase("timestamp")) { + return Schema.of(Schema.LogicalType.DATETIME); + } + return super.getSchema(metadata, index); } diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java index 3be7cdb30..507392382 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java @@ -23,6 +23,7 @@ import io.cdap.cdap.api.annotation.MetadataProperty; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; @@ -137,5 +138,20 @@ public void validate(FailureCollector collector) { ConfigUtil.validateConnection(this, useConnection, connection, collector); super.validate(collector); } + + @Override + protected void validateField(FailureCollector collector, Schema.Field field, + Schema actualFieldSchema, Schema expectedFieldSchema) { + // This change is needed to make sure that the pipeline upgrade continues to work post upgrade. + // Since the older handling of the Timestamp used to convert to CDAP TIMESTAMP type, + // but since PostgreSQL Timestamp does not have a timezone information, hence it should ideally map to + // CDAP DATETIME type. In that case the output schema would be set to TIMESTAMP, + // and the code internally would try to identify the schema of the field as DATETIME. + if (Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType()) + && Schema.LogicalType.DATETIME.equals(actualFieldSchema.getLogicalType())) { + return; + } + super.validateField(collector, field, actualFieldSchema, expectedFieldSchema); + } } } diff --git a/postgresql-plugin/src/test/java/io/cdap/plugin/postgres/PostgresDBRecordUnitTest.java b/postgresql-plugin/src/test/java/io/cdap/plugin/postgres/PostgresDBRecordUnitTest.java new file mode 100644 index 000000000..a1bde2dab --- /dev/null +++ b/postgresql-plugin/src/test/java/io/cdap/plugin/postgres/PostgresDBRecordUnitTest.java @@ -0,0 +1,109 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.postgres; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.util.DBUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +/** + * Unit Test class for the PostgresDBRecord + */ +@RunWith(MockitoJUnitRunner.class) +public class PostgresDBRecordUnitTest { + @Test + public void validateTimestampType() throws SQLException { + OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC); + ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class); + when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamp"); + + ResultSet resultSet = Mockito.mock(ResultSet.class); + when(resultSet.getMetaData()).thenReturn(metaData); + when(resultSet.getTimestamp(eq(0), eq(DBUtils.PURE_GREGORIAN_CALENDAR))) + .thenReturn(Timestamp.from(offsetDateTime.toInstant())); + + Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.DATETIME)); + Schema schema = Schema.recordOf( + "dbRecord", + field1 + ); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + + PostgresDBRecord dbRecord = new PostgresDBRecord(null, null, null, null); + dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0); + StructuredRecord record = builder.build(); + Assert.assertNotNull(record); + Assert.assertNotNull(record.getDateTime("field1")); + Assert.assertEquals(record.getDateTime("field1").toInstant(ZoneOffset.UTC), offsetDateTime.toInstant()); + + // Validate backward compatibility + + field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)); + schema = Schema.recordOf( + "dbRecord", + field1 + ); + builder = StructuredRecord.builder(schema); + dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0); + record = builder.build(); + Assert.assertNotNull(record); + Assert.assertNotNull(record.getTimestamp("field1")); + Assert.assertEquals(record.getTimestamp("field1").toInstant(), offsetDateTime.toInstant()); + } + + @Test + public void validateTimestampTZType() throws SQLException { + OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC); + ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class); + when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamptz"); + + ResultSet resultSet = Mockito.mock(ResultSet.class); + when(resultSet.getMetaData()).thenReturn(metaData); + when(resultSet.getObject(eq(0), eq(OffsetDateTime.class))).thenReturn(offsetDateTime); + + Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)); + Schema schema = Schema.recordOf( + "dbRecord", + field1 + ); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + + PostgresDBRecord dbRecord = new PostgresDBRecord(null, null, null, null); + dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0); + StructuredRecord record = builder.build(); + Assert.assertNotNull(record); + Assert.assertNotNull(record.getTimestamp("field1", ZoneId.of("UTC"))); + Assert.assertEquals(record.getTimestamp("field1", ZoneId.of("UTC")).toInstant(), offsetDateTime.toInstant()); + } +}