Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
public final class DBUtils {
private static final Logger LOG = LoggerFactory.getLogger(DBUtils.class);

private static final Calendar PURE_GREGORIAN_CALENDAR = createPureGregorianCalender();
public static final Calendar PURE_GREGORIAN_CALENDAR = createPureGregorianCalender();

// Java by default uses October 15, 1582 as a Gregorian cut over date.
// Any timestamp created with time less than this cut over date is treated as Julian date.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ datatypesSchema=[{"key":"id","value":"string"},{"key":"col1","value":"string"},{
{"key":"col10","value":"decimal"},{"key":"col11","value":"decimal"},{"key":"col12","value":"float"},\
{"key":"col13","value":"double"},{"key":"col14","value":"string"},{"key":"col15","value":"string"},\
{"key":"col16","value":"string"},{"key":"col17","value":"double"},{"key":"col18","value":"decimal"},\
{"key":"col22","value":"timestamp"},{"key":"col23","value":"timestamp"},{"key":"col24","value":"time"},\
{"key":"col22","value":"datetime"},{"key":"col23","value":"timestamp"},{"key":"col24","value":"time"},\
{"key":"col25","value":"string"},{"key":"col26","value":"string"},{"key":"col27","value":"date"},\
{"key":"col28","value":"string"},{"key":"col29","value":"string"},{"key":"col30","value":"string"},\
{"key":"col31","value":"string"},{"key":"col32","value":"string"},{"key":"col33","value":"string"},\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@
import io.cdap.plugin.db.DBRecord;
import io.cdap.plugin.db.Operation;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.util.DBUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;

/**
Expand All @@ -51,13 +57,46 @@ public PostgresDBRecord() {
@Override
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
String columnTypeName = resultSet.getMetaData().getColumnTypeName(columnIndex);
if (isUseSchema(resultSet.getMetaData(), columnIndex)) {
setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex);
} else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) {
Timestamp timestamp = resultSet.getTimestamp(columnIndex, DBUtils.PURE_GREGORIAN_CALENDAR);
if (timestamp != null) {
ZonedDateTime zonedDateTime = OffsetDateTime.of(timestamp.toLocalDateTime(), OffsetDateTime.now().getOffset())
.atZoneSameInstant(ZoneId.of("UTC"));
Schema nonNullableSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
setZonedDateTimeBasedOnOuputSchema(recordBuilder, nonNullableSchema.getLogicalType(),
field.getName(), zonedDateTime);
} else {
recordBuilder.set(field.getName(), null);
}
} else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) {
OffsetDateTime timestamp = resultSet.getObject(columnIndex, OffsetDateTime.class);
if (timestamp != null) {
recordBuilder.setTimestamp(field.getName(), timestamp.atZoneSameInstant(ZoneId.of("UTC")));
} else {
recordBuilder.set(field.getName(), null);
}
} else {
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
}
}

private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordBuilder,
Schema.LogicalType logicalType,
String fieldName,
ZonedDateTime zonedDateTime) {
if (Schema.LogicalType.DATETIME.equals(logicalType)) {
recordBuilder.setDateTime(fieldName, zonedDateTime.toLocalDateTime());
} else if (Schema.LogicalType.TIMESTAMP_MICROS.equals(logicalType)) {
recordBuilder.setTimestamp(fieldName, zonedDateTime);
}

return;
}

private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
switch (metadata.getColumnTypeName(columnIndex)) {
case "bit":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class PostgresFieldsValidator extends CommonFieldsValidator {

@Override
public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, int index) throws SQLException {
Schema.Type fieldType = field.getSchema().isNullable() ? field.getSchema().getNonNullable().getType()
: field.getSchema().getType();
Schema schema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
Schema.Type fieldType = schema.getType();

String colTypeName = metadata.getColumnTypeName(index);
int columnType = metadata.getColumnType(index);
Expand All @@ -46,6 +46,11 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
}
}

if (colTypeName.equalsIgnoreCase("timestamp")
&& schema.getLogicalType().equals(Schema.LogicalType.DATETIME)) {
return true;
}

return super.isFieldCompatible(field, metadata, index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
return Schema.of(Schema.Type.STRING);
}

if (typeName.equalsIgnoreCase("timestamp")) {
return Schema.of(Schema.LogicalType.DATETIME);
}

return super.getSchema(metadata, index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
Expand Down Expand Up @@ -137,5 +138,20 @@ public void validate(FailureCollector collector) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
super.validate(collector);
}

@Override
protected void validateField(FailureCollector collector, Schema.Field field,
Schema actualFieldSchema, Schema expectedFieldSchema) {
// This change is needed to make sure that the pipeline upgrade continues to work post upgrade.
// Since the older handling of the Timestamp used to convert to CDAP TIMESTAMP type,
// but since PostgreSQL Timestamp does not have a timezone information, hence it should ideally map to
// CDAP DATETIME type. In that case the output schema would be set to TIMESTAMP,
// and the code internally would try to identify the schema of the field as DATETIME.
if (Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())
&& Schema.LogicalType.DATETIME.equals(actualFieldSchema.getLogicalType())) {
return;
}
super.validateField(collector, field, actualFieldSchema, expectedFieldSchema);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.postgres;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.util.DBUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

/**
* Unit Test class for the PostgresDBRecord
*/
@RunWith(MockitoJUnitRunner.class)
public class PostgresDBRecordUnitTest {
@Test
public void validateTimestampType() throws SQLException {
OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamp");

ResultSet resultSet = Mockito.mock(ResultSet.class);
when(resultSet.getMetaData()).thenReturn(metaData);
when(resultSet.getTimestamp(eq(0), eq(DBUtils.PURE_GREGORIAN_CALENDAR)))
.thenReturn(Timestamp.from(offsetDateTime.toInstant()));

Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.DATETIME));
Schema schema = Schema.recordOf(
"dbRecord",
field1
);
StructuredRecord.Builder builder = StructuredRecord.builder(schema);

PostgresDBRecord dbRecord = new PostgresDBRecord(null, null, null, null);
dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
StructuredRecord record = builder.build();
Assert.assertNotNull(record);
Assert.assertNotNull(record.getDateTime("field1"));
Assert.assertEquals(record.getDateTime("field1").toInstant(ZoneOffset.UTC), offsetDateTime.toInstant());

// Validate backward compatibility

field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
schema = Schema.recordOf(
"dbRecord",
field1
);
builder = StructuredRecord.builder(schema);
dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
record = builder.build();
Assert.assertNotNull(record);
Assert.assertNotNull(record.getTimestamp("field1"));
Assert.assertEquals(record.getTimestamp("field1").toInstant(), offsetDateTime.toInstant());
}

@Test
public void validateTimestampTZType() throws SQLException {
OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamptz");

ResultSet resultSet = Mockito.mock(ResultSet.class);
when(resultSet.getMetaData()).thenReturn(metaData);
when(resultSet.getObject(eq(0), eq(OffsetDateTime.class))).thenReturn(offsetDateTime);

Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
Schema schema = Schema.recordOf(
"dbRecord",
field1
);
StructuredRecord.Builder builder = StructuredRecord.builder(schema);

PostgresDBRecord dbRecord = new PostgresDBRecord(null, null, null, null);
dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
StructuredRecord record = builder.build();
Assert.assertNotNull(record);
Assert.assertNotNull(record.getTimestamp("field1", ZoneId.of("UTC")));
Assert.assertEquals(record.getTimestamp("field1", ZoneId.of("UTC")).toInstant(), offsetDateTime.toInstant());
}
}