Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBSplitter;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
import org.slf4j.Logger;
Expand All @@ -39,6 +40,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Properties;

/**
Expand Down Expand Up @@ -128,6 +130,15 @@ public Connection createConnection() {
return getConnection();
}

@Override
protected DBSplitter getSplitter(int sqlDataType) {
// Use SafeBigDecimalSplitter for columns having high precision decimal or numeric columns
if (sqlDataType == Types.NUMERIC || sqlDataType == Types.DECIMAL) {
return new SafeBigDecimalSplitter();
}
return super.getSplitter(sqlDataType);
}

@Override
public RecordReader createDBRecordReader(DBInputSplit split, Configuration conf) throws IOException {
final RecordReader dbRecordReader = super.createDBRecordReader(split, conf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.db.source;

import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
import java.math.BigDecimal;
import java.math.RoundingMode;

/**
* Safe implementation of {@link BigDecimalSplitter} to ensure precise division of BigDecimal values while calculating
* split points for NUMERIC and DECIMAL types.
*
* <p>Problem: The default {@link BigDecimalSplitter} implementation may return 0 when the numerator is smaller than the
* denominator (e.g., 1 / 4 = 0), due to the lack of a defined scale for division. Since the result (0) is smaller than
* {@link BigDecimalSplitter#MIN_INCREMENT} (i.e. {@code 10000 * Double.MIN_VALUE}), the split size defaults to
* {@code MIN_INCREMENT}, leading to an excessive number of splits (~10M) and potential OOM errors.</p>
*
* <p>Fix: This implementation derives scale from column metadata, adds a buffer of 5 decimal places, and uses
* {@link RoundingMode#HALF_UP} as the rounding mode.</p
*
* <p>Note: This class is used by {@link DataDrivenETLDBInputFormat}.</p>
*/
public class SafeBigDecimalSplitter extends BigDecimalSplitter {

/* An additional buffer of +5 digits is applied to preserve accuracy during division. */
public static final int SCALE_BUFFER = 5;
/**
* Performs safe division with correct scale handling.
*
* @param numerator the dividend (BigDecimal)
* @param denominator the divisor (BigDecimal)
* @return quotient with derived scale
* @throws ArithmeticException if denominator is zero
*/
@Override
protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
// Determine the required scale for the division and add a buffer to ensure accuracy
int effectiveScale = Math.max(numerator.scale(), denominator.scale()) + SCALE_BUFFER;
return numerator.divide(denominator, effectiveScale, RoundingMode.HALF_UP);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.db.source;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
import org.junit.Test;

import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Test class for {@link SafeBigDecimalSplitter}
*/
public class SafeBigDecimalSplitterTest {
private final SafeBigDecimalSplitter splitter = new SafeBigDecimalSplitter();

@Test
public void testSmallRangeDivision() {
BigDecimal result = splitter.tryDivide(BigDecimal.ONE, new BigDecimal("4"));
assertEquals(new BigDecimal("0.25000"), result);
}

@Test
public void testLargePrecision() {
BigDecimal numerator = new BigDecimal("1.0000000000000000001");
BigDecimal denominator = new BigDecimal("3");
BigDecimal result = splitter.tryDivide(numerator, denominator);
assertTrue(result.compareTo(BigDecimal.ZERO) > 0);
}

@Test
public void testDivisionByZero() {
assertThrows(ArithmeticException.class, () ->
splitter.tryDivide(BigDecimal.ONE, BigDecimal.ZERO));
}

@Test
public void testDivisionWithZeroNumerator() {
// when minVal == maxVal
BigDecimal result = splitter.tryDivide(BigDecimal.ZERO, BigDecimal.ONE);
assertEquals(0, result.compareTo(BigDecimal.ZERO));
}

@Test
public void testSplits() throws SQLException {
BigDecimal minVal = BigDecimal.valueOf(1);
BigDecimal maxVal = BigDecimal.valueOf(2);
int numSplits = 4;
ResultSet resultSet = mock(ResultSet.class);
Configuration conf = mock(Configuration.class);
when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
when(resultSet.getBigDecimal(1)).thenReturn(minVal);
when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
List<InputSplit> actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
assertEquals(numSplits, actualSplits.size());
}

@Test
public void testSplitsWithMinValueEqualToMaxValue() throws SQLException {
// when minVal == maxVal
BigDecimal minVal = BigDecimal.valueOf(1);
BigDecimal maxVal = BigDecimal.valueOf(1);
int numSplits = 1;
ResultSet resultSet = mock(ResultSet.class);
Configuration conf = mock(Configuration.class);
when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
when(resultSet.getBigDecimal(1)).thenReturn(minVal);
when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
List<InputSplit> actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
assertEquals(numSplits, actualSplits.size());
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- version properties -->
<cdap.version>6.11.0</cdap.version>
<cdap.plugin.version>2.13.1-SNAPSHOT</cdap.plugin.version>
<cdap.plugin.version>2.13.1</cdap.plugin.version>
<guava.version>13.0.1</guava.version>
<hadoop.version>3.3.6</hadoop.version>
<hsql.version>2.2.4</hsql.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ errorMessageBlankPassword=SQL error while getting query schema: The server reque
errorMessageInvalidPassword=SQL error while getting query schema: FATAL: password authentication failed for user
errorMessageInvalidSourceHost=SQL error while getting query schema: The connection attempt failed.
errorMessageInvalidTableName=Table 'table' does not exist. Ensure table '"table"' is set correctly and that the
errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '08004', errorCode: '0', errorMessage: SQL Exception occurred: [Message='The server requested SCRAM-based authentication
errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '3D000', errorCode: '0', errorMessage: SQL Exception occurred
errorMessageInvalidHost=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '08001', errorCode: '0', errorMessage: SQL Exception occurred: [Message='The connection attempt failed.', SQLState='08001', ErrorCode='0'].'
errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'PostgreSQL' encountered : \
java.io.IOException: The column index is out of range: 1, number of columns: 0.. Please check the system logs for more details.
Loading