Skip to content

Commit d56bb1b

Browse files
zhilinli123zhilinli
andauthored
[Improve][Connector-V2][Jdbc-Source] Support for Decimal types as splict keys (#4634)
* [Improve][Connector-V2][Jdbc-Source]Support Compatible Mysql bigint(20) used as a partition_column #4634 Co-authored-by: zhilinli <lzl15844876351@163.com>
1 parent 4d429ca commit d56bb1b

File tree

10 files changed

+141
-88
lines changed

10 files changed

+141
-88
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.lang.reflect.ParameterizedType;
2727
import java.lang.reflect.Type;
28+
import java.math.BigDecimal;
2829
import java.time.Duration;
2930
import java.util.List;
3031
import java.util.Map;
@@ -74,6 +75,10 @@ public TypedOptionBuilder<Integer> intType() {
7475
public TypedOptionBuilder<Long> longType() {
7576
return new TypedOptionBuilder<>(key, new TypeReference<Long>() {});
7677
}
78+
/** Defines that the value of the option should be of {@link BigDecimal} type. */
79+
public TypedOptionBuilder<BigDecimal> bigDecimalType() {
80+
return new TypedOptionBuilder<>(key, new TypeReference<BigDecimal>() {});
81+
}
7782

7883
/** Defines that the value of the option should be of {@link Float} type. */
7984
public TypedOptionBuilder<Float> floatType() {

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
2222

23+
import java.math.BigDecimal;
2324
import java.util.List;
2425

2526
@SuppressWarnings("checkstyle:MagicNumber")
@@ -122,14 +123,14 @@ public interface JdbcOptions {
122123
.noDefaultValue()
123124
.withDescription("partition column");
124125

125-
Option<Long> PARTITION_UPPER_BOUND =
126+
Option<BigDecimal> PARTITION_UPPER_BOUND =
126127
Options.key("partition_upper_bound")
127-
.longType()
128+
.bigDecimalType()
128129
.noDefaultValue()
129130
.withDescription("partition upper bound");
130-
Option<Long> PARTITION_LOWER_BOUND =
131+
Option<BigDecimal> PARTITION_LOWER_BOUND =
131132
Options.key("partition_lower_bound")
132-
.longType()
133+
.bigDecimalType()
133134
.noDefaultValue()
134135
.withDescription("partition lower bound");
135136
Option<Integer> PARTITION_NUM =

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import lombok.Data;
2424

2525
import java.io.Serializable;
26+
import java.math.BigDecimal;
2627
import java.util.Optional;
2728

2829
@Data
@@ -33,8 +34,8 @@ public class JdbcSourceConfig implements Serializable {
3334
private JdbcConnectionConfig jdbcConnectionConfig;
3435
public String query;
3536
private String partitionColumn;
36-
private Long partitionUpperBound;
37-
private Long partitionLowerBound;
37+
private BigDecimal partitionUpperBound;
38+
private BigDecimal partitionLowerBound;
3839
private int fetchSize;
3940
private Integer partitionNumber;
4041

@@ -60,11 +61,11 @@ public Optional<String> getPartitionColumn() {
6061
return Optional.ofNullable(partitionColumn);
6162
}
6263

63-
public Optional<Long> getPartitionUpperBound() {
64+
public Optional<BigDecimal> getPartitionUpperBound() {
6465
return Optional.ofNullable(partitionUpperBound);
6566
}
6667

67-
public Optional<Long> getPartitionLowerBound() {
68+
public Optional<BigDecimal> getPartitionLowerBound() {
6869
return Optional.ofNullable(partitionLowerBound);
6970
}
7071

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;
1919

2020
import java.io.Serializable;
21+
import java.math.BigDecimal;
22+
import java.math.RoundingMode;
2123

2224
import static com.google.common.base.Preconditions.checkArgument;
2325
import static com.google.common.base.Preconditions.checkState;
@@ -39,8 +41,8 @@
3941
*/
4042
public class JdbcNumericBetweenParametersProvider implements JdbcParameterValuesProvider {
4143

42-
private final long minVal;
43-
private final long maxVal;
44+
private final BigDecimal minVal;
45+
private final BigDecimal maxVal;
4446

4547
private long batchSize;
4648
private int batchNum;
@@ -51,8 +53,8 @@ public class JdbcNumericBetweenParametersProvider implements JdbcParameterValues
5153
* @param minVal the lower bound of the produced "from" values
5254
* @param maxVal the upper bound of the produced "to" values
5355
*/
54-
public JdbcNumericBetweenParametersProvider(long minVal, long maxVal) {
55-
checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal");
56+
public JdbcNumericBetweenParametersProvider(BigDecimal minVal, BigDecimal maxVal) {
57+
checkArgument(minVal.compareTo(maxVal) <= 0, "minVal must not be larger than maxVal");
5658
this.minVal = minVal;
5759
this.maxVal = maxVal;
5860
}
@@ -64,8 +66,9 @@ public JdbcNumericBetweenParametersProvider(long minVal, long maxVal) {
6466
* @param minVal the lower bound of the produced "from" values
6567
* @param maxVal the upper bound of the produced "to" values
6668
*/
67-
public JdbcNumericBetweenParametersProvider(long fetchSize, long minVal, long maxVal) {
68-
checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal");
69+
public JdbcNumericBetweenParametersProvider(
70+
long fetchSize, BigDecimal minVal, BigDecimal maxVal) {
71+
checkArgument(minVal.compareTo(maxVal) <= 0, "minVal must not be larger than maxVal");
6972
this.minVal = minVal;
7073
this.maxVal = maxVal;
7174
ofBatchSize(fetchSize);
@@ -74,24 +77,33 @@ public JdbcNumericBetweenParametersProvider(long fetchSize, long minVal, long ma
7477
public JdbcNumericBetweenParametersProvider ofBatchSize(long batchSize) {
7578
checkArgument(batchSize > 0, "Batch size must be positive");
7679

77-
long maxElemCount = (maxVal - minVal) + 1;
78-
if (batchSize > maxElemCount) {
79-
batchSize = maxElemCount;
80+
BigDecimal maxElemCount = (maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
81+
if (BigDecimal.valueOf(batchSize).compareTo(maxElemCount) > 0) {
82+
batchSize = maxElemCount.longValue();
8083
}
8184
this.batchSize = batchSize;
82-
this.batchNum = new Double(Math.ceil((double) maxElemCount / batchSize)).intValue();
85+
this.batchNum =
86+
new Double(
87+
Math.ceil(
88+
(maxElemCount.divide(BigDecimal.valueOf(batchSize)))
89+
.doubleValue()))
90+
.intValue();
8391
return this;
8492
}
8593

8694
public JdbcNumericBetweenParametersProvider ofBatchNum(int batchNum) {
8795
checkArgument(batchNum > 0, "Batch number must be positive");
8896

89-
long maxElemCount = (maxVal - minVal) + 1;
90-
if (batchNum > maxElemCount) {
91-
batchNum = (int) maxElemCount;
97+
BigDecimal maxElemCount = (maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
98+
if (BigDecimal.valueOf(batchNum).compareTo(maxElemCount) > 0) {
99+
batchNum = maxElemCount.intValue();
92100
}
93101
this.batchNum = batchNum;
94-
this.batchSize = new Double(Math.ceil((double) maxElemCount / batchNum)).longValue();
102+
// For the presence of a decimal we take the integer up
103+
this.batchSize =
104+
(maxElemCount.divide(BigDecimal.valueOf(batchNum), 2, RoundingMode.HALF_UP))
105+
.setScale(0, RoundingMode.CEILING)
106+
.longValue();
95107
return this;
96108
}
97109

@@ -101,15 +113,24 @@ public Serializable[][] getParameterValues() {
101113
batchSize > 0,
102114
"Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?");
103115

104-
long maxElemCount = (maxVal - minVal) + 1;
105-
long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
116+
BigDecimal maxElemCount = (maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
117+
BigDecimal bigBatchNum =
118+
maxElemCount
119+
.subtract(BigDecimal.valueOf(batchSize - 1))
120+
.multiply(BigDecimal.valueOf(batchNum));
106121

107122
Serializable[][] parameters = new Serializable[batchNum][2];
108-
long start = minVal;
123+
BigDecimal start = minVal;
109124
for (int i = 0; i < batchNum; i++) {
110-
long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
111-
parameters[i] = new Long[] {start, end};
112-
start = end + 1;
125+
BigDecimal end =
126+
start.add(BigDecimal.valueOf(batchSize))
127+
.subtract(BigDecimal.valueOf(1))
128+
.subtract(
129+
BigDecimal.valueOf(i).compareTo(bigBatchNum) >= 0
130+
? BigDecimal.ONE
131+
: BigDecimal.ZERO);
132+
parameters[i] = new BigDecimal[] {start, end};
133+
start = end.add(BigDecimal.valueOf(1));
113134
}
114135
return parameters;
115136
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
3030
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
3131
import org.apache.seatunnel.api.table.type.BasicType;
32+
import org.apache.seatunnel.api.table.type.DecimalType;
3233
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3334
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3435
import org.apache.seatunnel.common.constants.PluginType;
@@ -45,6 +46,7 @@
4546
import lombok.extern.slf4j.Slf4j;
4647

4748
import java.io.Serializable;
49+
import java.math.BigDecimal;
4850
import java.sql.Connection;
4951
import java.sql.ResultSet;
5052
import java.sql.SQLException;
@@ -135,8 +137,8 @@ public static Optional<PartitionParameter> createPartitionParameter(
135137

136138
static PartitionParameter createPartitionParameter(
137139
JdbcSourceConfig config, String columnName, Connection connection) {
138-
long max = Long.MAX_VALUE;
139-
long min = Long.MIN_VALUE;
140+
BigDecimal max = null;
141+
BigDecimal min = null;
140142
if (config.getPartitionLowerBound().isPresent()
141143
&& config.getPartitionUpperBound().isPresent()) {
142144
max = config.getPartitionUpperBound().get();
@@ -155,11 +157,11 @@ static PartitionParameter createPartitionParameter(
155157
max =
156158
config.getPartitionUpperBound().isPresent()
157159
? config.getPartitionUpperBound().get()
158-
: rs.getLong(1);
160+
: rs.getBigDecimal(1);
159161
min =
160162
config.getPartitionLowerBound().isPresent()
161163
? config.getPartitionLowerBound().get()
162-
: rs.getLong(2);
164+
: rs.getBigDecimal(2);
163165
}
164166
} catch (SQLException e) {
165167
throw new PrepareFailException("jdbc", PluginType.SOURCE, e.toString());
@@ -200,7 +202,18 @@ static void validationPartitionColumn(String partitionColumn, SeaTunnelRowType r
200202
}
201203

202204
private static boolean isNumericType(SeaTunnelDataType<?> type) {
203-
return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.LONG_TYPE);
205+
int scale = 1;
206+
if (type instanceof DecimalType) {
207+
scale = ((DecimalType) type).getScale() == 0 ? 0 : ((DecimalType) type).getScale();
208+
if (scale != 0) {
209+
throw new JdbcConnectorException(
210+
CommonErrorCode.ILLEGAL_ARGUMENT,
211+
String.format(
212+
"The current field is DecimalType containing decimals: %d Unable to support",
213+
scale));
214+
}
215+
}
216+
return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.LONG_TYPE) || scale == 0;
204217
}
205218

206219
@Override

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@
2121
import lombok.Data;
2222

2323
import java.io.Serializable;
24+
import java.math.BigDecimal;
2425

2526
@Data
2627
@AllArgsConstructor
2728
public class PartitionParameter implements Serializable {
2829

2930
String partitionColumnName;
30-
long minValue;
31-
long maxValue;
31+
BigDecimal minValue;
32+
BigDecimal maxValue;
3233
Integer partitionNumber;
3334
}

0 commit comments

Comments
 (0)