From 8a2439050ef9c76782df44ebfb9dcb3853ec310e Mon Sep 17 00:00:00 2001
From: sgarg-CS
Date: Fri, 5 Sep 2025 16:42:54 +0530
Subject: [PATCH] PLUGIN-1925: Introduce SafeBigDecimal Splitter
---
.../db/source/DataDrivenETLDBInputFormat.java | 11 +++
.../db/source/SafeBigDecimalSplitter.java | 55 +++++++++++
.../db/source/SafeBigDecimalSplitterTest.java | 98 +++++++++++++++++++
pom.xml | 2 +-
.../resources/errorMessage.properties | 2 +-
5 files changed, 166 insertions(+), 2 deletions(-)
create mode 100644 database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java
create mode 100644 database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java
index 3416360a9..1c347dd03 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBSplitter;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
import org.slf4j.Logger;
@@ -39,6 +40,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Types;
import java.util.Properties;
/**
@@ -128,6 +130,15 @@ public Connection createConnection() {
return getConnection();
}
+ @Override
+ protected DBSplitter getSplitter(int sqlDataType) {
+ // Use SafeBigDecimalSplitter for columns having high precision decimal or numeric columns
+ if (sqlDataType == Types.NUMERIC || sqlDataType == Types.DECIMAL) {
+ return new SafeBigDecimalSplitter();
+ }
+ return super.getSplitter(sqlDataType);
+ }
+
@Override
public RecordReader createDBRecordReader(DBInputSplit split, Configuration conf) throws IOException {
final RecordReader dbRecordReader = super.createDBRecordReader(split, conf);
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java b/database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java
new file mode 100644
index 000000000..8649515e8
--- /dev/null
+++ b/database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.db.source;
+
+import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+
+/**
+ * Safe implementation of {@link BigDecimalSplitter} to ensure precise division of BigDecimal values while calculating
+ * split points for NUMERIC and DECIMAL types.
+ *
+ * Problem: The default {@link BigDecimalSplitter} implementation may return 0 when the numerator is smaller than the
+ * denominator (e.g., 1 / 4 = 0), due to the lack of a defined scale for division. Since the result (0) is smaller than
+ * {@link BigDecimalSplitter#MIN_INCREMENT} (i.e. {@code 10000 * Double.MIN_VALUE}), the split size defaults to
+ * {@code MIN_INCREMENT}, leading to an excessive number of splits (~10M) and potential OOM errors.
+ *
+ * Fix: This implementation derives scale from column metadata, adds a buffer of 5 decimal places, and uses
+ * {@link RoundingMode#HALF_UP} as the rounding mode.
Note: This class is used by {@link DataDrivenETLDBInputFormat}.
+ */
+public class SafeBigDecimalSplitter extends BigDecimalSplitter {
+
+ /* An additional buffer of +5 digits is applied to preserve accuracy during division. */
+ public static final int SCALE_BUFFER = 5;
+ /**
+ * Performs safe division with correct scale handling.
+ *
+ * @param numerator the dividend (BigDecimal)
+ * @param denominator the divisor (BigDecimal)
+ * @return quotient with derived scale
+ * @throws ArithmeticException if denominator is zero
+ */
+ @Override
+ protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
+ // Determine the required scale for the division and add a buffer to ensure accuracy
+ int effectiveScale = Math.max(numerator.scale(), denominator.scale()) + SCALE_BUFFER;
+ return numerator.divide(denominator, effectiveScale, RoundingMode.HALF_UP);
+ }
+}
diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java
new file mode 100644
index 000000000..4aff4eac2
--- /dev/null
+++ b/database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.db.source;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for {@link SafeBigDecimalSplitter}
+ */
+public class SafeBigDecimalSplitterTest {
+ private final SafeBigDecimalSplitter splitter = new SafeBigDecimalSplitter();
+
+ @Test
+ public void testSmallRangeDivision() {
+ BigDecimal result = splitter.tryDivide(BigDecimal.ONE, new BigDecimal("4"));
+ assertEquals(new BigDecimal("0.25000"), result);
+ }
+
+ @Test
+ public void testLargePrecision() {
+ BigDecimal numerator = new BigDecimal("1.0000000000000000001");
+ BigDecimal denominator = new BigDecimal("3");
+ BigDecimal result = splitter.tryDivide(numerator, denominator);
+ assertTrue(result.compareTo(BigDecimal.ZERO) > 0);
+ }
+
+ @Test
+ public void testDivisionByZero() {
+ assertThrows(ArithmeticException.class, () ->
+ splitter.tryDivide(BigDecimal.ONE, BigDecimal.ZERO));
+ }
+
+ @Test
+ public void testDivisionWithZeroNumerator() {
+ // when minVal == maxVal
+ BigDecimal result = splitter.tryDivide(BigDecimal.ZERO, BigDecimal.ONE);
+ assertEquals(0, result.compareTo(BigDecimal.ZERO));
+ }
+
+ @Test
+ public void testSplits() throws SQLException {
+ BigDecimal minVal = BigDecimal.valueOf(1);
+ BigDecimal maxVal = BigDecimal.valueOf(2);
+ int numSplits = 4;
+ ResultSet resultSet = mock(ResultSet.class);
+ Configuration conf = mock(Configuration.class);
+ when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
+ when(resultSet.getBigDecimal(1)).thenReturn(minVal);
+ when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
+ BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
+ List actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
+ assertEquals(numSplits, actualSplits.size());
+ }
+
+ @Test
+ public void testSplitsWithMinValueEqualToMaxValue() throws SQLException {
+ // when minVal == maxVal
+ BigDecimal minVal = BigDecimal.valueOf(1);
+ BigDecimal maxVal = BigDecimal.valueOf(1);
+ int numSplits = 1;
+ ResultSet resultSet = mock(ResultSet.class);
+ Configuration conf = mock(Configuration.class);
+ when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
+ when(resultSet.getBigDecimal(1)).thenReturn(minVal);
+ when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
+ BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
+ List actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
+ assertEquals(numSplits, actualSplits.size());
+ }
+}
diff --git a/pom.xml b/pom.xml
index 77d70dfd5..54e6ef09e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,7 @@
UTF-8
6.11.0
- 2.13.1-SNAPSHOT
+ 2.13.1
13.0.1
3.3.6
2.2.4
diff --git a/postgresql-plugin/src/e2e-test/resources/errorMessage.properties b/postgresql-plugin/src/e2e-test/resources/errorMessage.properties
index 4889c9d16..a6c500f1d 100644
--- a/postgresql-plugin/src/e2e-test/resources/errorMessage.properties
+++ b/postgresql-plugin/src/e2e-test/resources/errorMessage.properties
@@ -16,7 +16,7 @@ errorMessageBlankPassword=SQL error while getting query schema: The server reque
errorMessageInvalidPassword=SQL error while getting query schema: FATAL: password authentication failed for user
errorMessageInvalidSourceHost=SQL error while getting query schema: The connection attempt failed.
errorMessageInvalidTableName=Table 'table' does not exist. Ensure table '"table"' is set correctly and that the
-errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '08004', errorCode: '0', errorMessage: SQL Exception occurred: [Message='The server requested SCRAM-based authentication
+errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '3D000', errorCode: '0', errorMessage: SQL Exception occurred
errorMessageInvalidHost=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '08001', errorCode: '0', errorMessage: SQL Exception occurred: [Message='The connection attempt failed.', SQLState='08001', ErrorCode='0'].'
errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'PostgreSQL' encountered : \
java.io.IOException: The column index is out of range: 1, number of columns: 0.. Please check the system logs for more details.