diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java index fbdf17c95..3ed85b009 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java @@ -114,7 +114,8 @@ protected DBConnectorPath getDBConnectorPath(String path) { @Override protected SchemaReader getSchemaReader(String sessionID) { return new OracleSourceSchemaReader(sessionID, config.getTreatAsOldTimestamp(), - config.getTreatPrecisionlessNumAsDeci()); + config.getTreatPrecisionlessNumAsDeci(), + config.getTreatTimestampLTZAsTimestamp()); } @Override diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java index cbc1e5ed2..79d14215b 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java @@ -42,13 +42,14 @@ public OracleConnectorConfig(String host, int port, String user, String password public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName, String connectionArguments, String connectionType, String database) { this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null, null, - null); + null, null); } public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName, String connectionArguments, String connectionType, String database, String role, Boolean useSSL, @Nullable Boolean treatAsOldTimestamp, - @Nullable Boolean treatPrecisionlessNumAsDeci) { + @Nullable Boolean treatPrecisionlessNumAsDeci, + @Nullable Boolean treatTimestampLTZAsTimestamp) { this.host = host; this.port = port; @@ -62,6 +63,7 @@ public OracleConnectorConfig(String host, int port, String user, String password this.useSSL = useSSL; this.treatAsOldTimestamp = treatAsOldTimestamp; this.treatPrecisionlessNumAsDeci = treatPrecisionlessNumAsDeci; + this.treatTimestampLTZAsTimestamp = treatTimestampLTZAsTimestamp; } @Override @@ -98,6 +100,11 @@ public String getConnectionString() { @Nullable public Boolean treatPrecisionlessNumAsDeci; + @Name(OracleConstants.TREAT_TIMESTAMP_LTZ_AS_TIMESTAMP) + @Description("A hidden field to handle mapping of Oracle Timestamp_LTZ data type to BQ Timestamp.") + @Nullable + public Boolean treatTimestampLTZAsTimestamp; + @Override protected int getDefaultPort() { return 1521; @@ -128,6 +135,10 @@ public Boolean getTreatPrecisionlessNumAsDeci() { return Boolean.TRUE.equals(treatPrecisionlessNumAsDeci); } + public Boolean getTreatTimestampLTZAsTimestamp() { + return Boolean.TRUE.equals(treatTimestampLTZAsTimestamp); + } + @Override public Properties getConnectionArgumentsProperties() { Properties prop = super.getConnectionArgumentsProperties(); diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java index 7d86ec820..a3560e969 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java @@ -46,6 +46,7 @@ private OracleConstants() { public static final String USE_SSL = "useSSL"; public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp"; public static final String TREAT_PRECISIONLESSNUM_AS_DECI = "treatPrecisionlessNumAsDeci"; + public static final String TREAT_TIMESTAMP_LTZ_AS_TIMESTAMP = "treatTimestampLTZAsTimestamp"; /** * Constructs the Oracle connection string based on the provided connection type, host, port, and database. diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 068ffa28c..38e610855 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -68,8 +68,10 @@ protected SchemaReader getSchemaReader() { // handle schema to make it backward compatible. boolean treatAsOldTimestamp = oracleSourceConfig.getConnection().getTreatAsOldTimestamp(); boolean treatPrecisionlessNumAsDeci = oracleSourceConfig.getConnection().getTreatPrecisionlessNumAsDeci(); + boolean treatTimestampLTZAsTimestamp = oracleSourceConfig.getConnection().getTreatTimestampLTZAsTimestamp(); - return new OracleSourceSchemaReader(null, treatAsOldTimestamp, treatPrecisionlessNumAsDeci); + return new OracleSourceSchemaReader(null, treatAsOldTimestamp, treatPrecisionlessNumAsDeci, + treatTimestampLTZAsTimestamp); } @Override @@ -137,10 +139,10 @@ public OracleSourceConfig(String host, int port, String user, String password, S int defaultBatchValue, int defaultRowPrefetch, String importQuery, Integer numSplits, int fetchSize, String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp, - Boolean treatPrecisionlessNumAsDeci) { + Boolean treatPrecisionlessNumAsDeci, Boolean treatTimestampLTZAsTimestamp) { this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, role, useSSL, treatAsOldTimestamp, - treatPrecisionlessNumAsDeci); + treatPrecisionlessNumAsDeci, treatTimestampLTZAsTimestamp); this.defaultBatchValue = defaultBatchValue; this.defaultRowPrefetch = defaultRowPrefetch; this.fetchSize = fetchSize; diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index dd17d2e84..208b70410 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableSet; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.db.CommonSchemaReader; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,15 +69,17 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { private final String sessionID; private final Boolean isTimestampOldBehavior; private final Boolean isPrecisionlessNumAsDecimal; + private final Boolean isTimestampLtzFieldTimestamp; public OracleSourceSchemaReader() { - this(null, false, false); + this(null, false, false, false); } public OracleSourceSchemaReader(@Nullable String sessionID, boolean isTimestampOldBehavior, - boolean isPrecisionlessNumAsDecimal) { + boolean isPrecisionlessNumAsDecimal, boolean isTimestampLtzFieldTimestamp) { this.sessionID = sessionID; this.isTimestampOldBehavior = isTimestampOldBehavior; this.isPrecisionlessNumAsDecimal = isPrecisionlessNumAsDecimal; + this.isTimestampLtzFieldTimestamp = isTimestampLtzFieldTimestamp; } @Override @@ -87,8 +90,7 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti case TIMESTAMP_TZ: return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); case TIMESTAMP_LTZ: - return isTimestampOldBehavior ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) - : Schema.of(Schema.LogicalType.DATETIME); + return getTimestampLtzSchema(); case Types.TIMESTAMP: return isTimestampOldBehavior ? super.getSchema(metadata, index) : Schema.of(Schema.LogicalType.DATETIME); case BINARY_FLOAT: @@ -139,6 +141,12 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti } } + private @NotNull Schema getTimestampLtzSchema() { + return isTimestampOldBehavior || isTimestampLtzFieldTimestamp + ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) + : Schema.of(Schema.LogicalType.DATETIME); + } + @Override public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException { if (sessionID == null) { diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java new file mode 100644 index 000000000..1ff77c533 --- /dev/null +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java @@ -0,0 +1,94 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import com.google.common.collect.Lists; +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.List; + +public class OracleSchemaReaderTest { + + @Test + public void getSchema_timestampLTZFieldTrue_returnTimestamp() throws SQLException { + OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(null, false, false, true); + + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + + Mockito.when(metadata.getColumnCount()).thenReturn(2); + // -101 is for TIMESTAMP_TZ + Mockito.when(metadata.getColumnType(1)).thenReturn(-101); + Mockito.when(metadata.getColumnName(1)).thenReturn("column1"); + + // -102 is for TIMESTAMP_LTZ + Mockito.when(metadata.getColumnType(2)).thenReturn(-102); + Mockito.when(metadata.getColumnName(2)).thenReturn("column2"); + + List expectedSchemaFields = Lists.newArrayList(); + expectedSchemaFields.add(Schema.Field.of("column1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))); + expectedSchemaFields.add(Schema.Field.of("column2", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))); + + List actualSchemaFields = schemaReader.getSchemaFields(resultSet); + + Assert.assertEquals(expectedSchemaFields.get(0).getName(), actualSchemaFields.get(0).getName()); + Assert.assertEquals(expectedSchemaFields.get(0).getSchema(), actualSchemaFields.get(0).getSchema()); + Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName()); + Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema()); + + } + + @Test + public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLException { + OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(null, false, false, false); + + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + + Mockito.when(metadata.getColumnCount()).thenReturn(2); + // -101 is for TIMESTAMP_TZ + Mockito.when(metadata.getColumnType(1)).thenReturn(-101); + Mockito.when(metadata.getColumnName(1)).thenReturn("column1"); + + // -102 is for TIMESTAMP_LTZ + Mockito.when(metadata.getColumnType(2)).thenReturn(-102); + Mockito.when(metadata.getColumnName(2)).thenReturn("column2"); + + List expectedSchemaFields = Lists.newArrayList(); + expectedSchemaFields.add(Schema.Field.of("column1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))); + expectedSchemaFields.add(Schema.Field.of("column2", Schema.of(Schema.LogicalType.DATETIME))); + + List actualSchemaFields = schemaReader.getSchemaFields(resultSet); + + Assert.assertEquals(expectedSchemaFields.get(0).getName(), actualSchemaFields.get(0).getName()); + Assert.assertEquals(expectedSchemaFields.get(0).getSchema(), actualSchemaFields.get(0).getSchema()); + Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName()); + Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema()); + } +} diff --git a/oracle-plugin/widgets/Oracle-batchsource.json b/oracle-plugin/widgets/Oracle-batchsource.json index f8da352a2..37adc011d 100644 --- a/oracle-plugin/widgets/Oracle-batchsource.json +++ b/oracle-plugin/widgets/Oracle-batchsource.json @@ -158,6 +158,25 @@ ] } }, + { + "widget-type": "hidden", + "label": "Treat Timestamp_LTZ as Timestamp", + "name": "treatTimestampLTZAsTimestamp", + "widget-attributes": { + "layout": "inline", + "default": "false", + "options": [ + { + "id": "true", + "label": "true" + }, + { + "id": "false", + "label": "false" + } + ] + } + }, { "name": "connectionType", "label": "Connection Type", diff --git a/oracle-plugin/widgets/Oracle-connector.json b/oracle-plugin/widgets/Oracle-connector.json index 4335b66c4..413aa4b7a 100644 --- a/oracle-plugin/widgets/Oracle-connector.json +++ b/oracle-plugin/widgets/Oracle-connector.json @@ -167,6 +167,25 @@ } ] } + }, + { + "widget-type": "hidden", + "label": "Treat Timestamp_LTZ as Timestamp", + "name": "treatTimestampLTZAsTimestamp", + "widget-attributes": { + "layout": "inline", + "default": "false", + "options": [ + { + "id": "true", + "label": "true" + }, + { + "id": "false", + "label": "false" + } + ] + } } ] },