diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java index 3416360a9..1c347dd03 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java @@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBSplitter; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; import org.slf4j.Logger; @@ -39,6 +40,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.util.Properties; /** @@ -128,6 +130,15 @@ public Connection createConnection() { return getConnection(); } + @Override + protected DBSplitter getSplitter(int sqlDataType) { + // Use SafeBigDecimalSplitter for columns having high precision decimal or numeric columns + if (sqlDataType == Types.NUMERIC || sqlDataType == Types.DECIMAL) { + return new SafeBigDecimalSplitter(); + } + return super.getSplitter(sqlDataType); + } + @Override public RecordReader createDBRecordReader(DBInputSplit split, Configuration conf) throws IOException { final RecordReader dbRecordReader = super.createDBRecordReader(split, conf); diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java b/database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java new file mode 100644 index 000000000..8649515e8 --- /dev/null +++ b/database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.source; + +import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter; +import java.math.BigDecimal; +import java.math.RoundingMode; + +/** + * Safe implementation of {@link BigDecimalSplitter} to ensure precise division of BigDecimal values while calculating + * split points for NUMERIC and DECIMAL types. + * + *

Problem: The default {@link BigDecimalSplitter} implementation may return 0 when the numerator is smaller than the + * denominator (e.g., 1 / 4 = 0), due to the lack of a defined scale for division. Since the result (0) is smaller than + * {@link BigDecimalSplitter#MIN_INCREMENT} (i.e. {@code 10000 * Double.MIN_VALUE}), the split size defaults to + * {@code MIN_INCREMENT}, leading to an excessive number of splits (~10M) and potential OOM errors.

+ * + *

Fix: This implementation derives scale from column metadata, adds a buffer of 5 decimal places, and uses + * {@link RoundingMode#HALF_UP} as the rounding mode.

Note: This class is used by {@link DataDrivenETLDBInputFormat}.

+ */ +public class SafeBigDecimalSplitter extends BigDecimalSplitter { + + /* An additional buffer of +5 digits is applied to preserve accuracy during division. */ + public static final int SCALE_BUFFER = 5; + /** + * Performs safe division with correct scale handling. + * + * @param numerator the dividend (BigDecimal) + * @param denominator the divisor (BigDecimal) + * @return quotient with derived scale + * @throws ArithmeticException if denominator is zero + */ + @Override + protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) { + // Determine the required scale for the division and add a buffer to ensure accuracy + int effectiveScale = Math.max(numerator.scale(), denominator.scale()) + SCALE_BUFFER; + return numerator.divide(denominator, effectiveScale, RoundingMode.HALF_UP); + } +} diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java new file mode 100644 index 000000000..4aff4eac2 --- /dev/null +++ b/database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java @@ -0,0 +1,98 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.source; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter; +import org.junit.Test; + +import java.math.BigDecimal; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test class for {@link SafeBigDecimalSplitter} + */ +public class SafeBigDecimalSplitterTest { + private final SafeBigDecimalSplitter splitter = new SafeBigDecimalSplitter(); + + @Test + public void testSmallRangeDivision() { + BigDecimal result = splitter.tryDivide(BigDecimal.ONE, new BigDecimal("4")); + assertEquals(new BigDecimal("0.25000"), result); + } + + @Test + public void testLargePrecision() { + BigDecimal numerator = new BigDecimal("1.0000000000000000001"); + BigDecimal denominator = new BigDecimal("3"); + BigDecimal result = splitter.tryDivide(numerator, denominator); + assertTrue(result.compareTo(BigDecimal.ZERO) > 0); + } + + @Test + public void testDivisionByZero() { + assertThrows(ArithmeticException.class, () -> + splitter.tryDivide(BigDecimal.ONE, BigDecimal.ZERO)); + } + + @Test + public void testDivisionWithZeroNumerator() { + // when minVal == maxVal + BigDecimal result = splitter.tryDivide(BigDecimal.ZERO, BigDecimal.ONE); + assertEquals(0, result.compareTo(BigDecimal.ZERO)); + } + + @Test + public void testSplits() throws SQLException { + BigDecimal minVal = BigDecimal.valueOf(1); + BigDecimal maxVal = BigDecimal.valueOf(2); + int numSplits = 4; + ResultSet resultSet = mock(ResultSet.class); + Configuration conf = mock(Configuration.class); + when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits); + when(resultSet.getBigDecimal(1)).thenReturn(minVal); + when(resultSet.getBigDecimal(2)).thenReturn(maxVal); + BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter(); + List actualSplits = bigDecimalSplitter.split(conf, resultSet, "id"); + assertEquals(numSplits, actualSplits.size()); + } + + @Test + public void testSplitsWithMinValueEqualToMaxValue() throws SQLException { + // when minVal == maxVal + BigDecimal minVal = BigDecimal.valueOf(1); + BigDecimal maxVal = BigDecimal.valueOf(1); + int numSplits = 1; + ResultSet resultSet = mock(ResultSet.class); + Configuration conf = mock(Configuration.class); + when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits); + when(resultSet.getBigDecimal(1)).thenReturn(minVal); + when(resultSet.getBigDecimal(2)).thenReturn(maxVal); + BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter(); + List actualSplits = bigDecimalSplitter.split(conf, resultSet, "id"); + assertEquals(numSplits, actualSplits.size()); + } +} diff --git a/pom.xml b/pom.xml index 77d70dfd5..54e6ef09e 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,7 @@ UTF-8 6.11.0 - 2.13.1-SNAPSHOT + 2.13.1 13.0.1 3.3.6 2.2.4 diff --git a/postgresql-plugin/src/e2e-test/resources/errorMessage.properties b/postgresql-plugin/src/e2e-test/resources/errorMessage.properties index 4889c9d16..a6c500f1d 100644 --- a/postgresql-plugin/src/e2e-test/resources/errorMessage.properties +++ b/postgresql-plugin/src/e2e-test/resources/errorMessage.properties @@ -16,7 +16,7 @@ errorMessageBlankPassword=SQL error while getting query schema: The server reque errorMessageInvalidPassword=SQL error while getting query schema: FATAL: password authentication failed for user errorMessageInvalidSourceHost=SQL error while getting query schema: The connection attempt failed. errorMessageInvalidTableName=Table 'table' does not exist. Ensure table '"table"' is set correctly and that the -errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '08004', errorCode: '0', errorMessage: SQL Exception occurred: [Message='The server requested SCRAM-based authentication +errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '3D000', errorCode: '0', errorMessage: SQL Exception occurred errorMessageInvalidHost=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '08001', errorCode: '0', errorMessage: SQL Exception occurred: [Message='The connection attempt failed.', SQLState='08001', ErrorCode='0'].' errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'PostgreSQL' encountered : \ java.io.IOException: The column index is out of range: 1, number of columns: 0.. Please check the system logs for more details.