Skip to content

Commit c0422db

Browse files
authored
[Imporve] [CDC Base] Add a fast sampling method that supports character types (#5179)
1 parent 4280291 commit c0422db

File tree

6 files changed

+136
-4
lines changed

6 files changed

+136
-4
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,19 @@ private List<ChunkRange> splitTableIntoChunks(
112112
final int chunkSize = sourceConfig.getSplitSize();
113113
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
114114
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
115+
final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold();
116+
117+
log.info(
118+
"Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, "
119+
+ "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}",
120+
tableId,
121+
splitColumnName,
122+
min,
123+
max,
124+
chunkSize,
125+
distributionFactorUpper,
126+
distributionFactorLower,
127+
sampleShardingThreshold);
115128

116129
if (isEvenlySplitColumn(splitColumn)) {
117130
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
@@ -130,7 +143,7 @@ private List<ChunkRange> splitTableIntoChunks(
130143
} else {
131144
int shardCount = (int) (approximateRowCnt / chunkSize);
132145
int inverseSamplingRate = sourceConfig.getInverseSamplingRate();
133-
if (sourceConfig.getSampleShardingThreshold() < shardCount) {
146+
if (sampleShardingThreshold < shardCount) {
134147
// It is necessary to ensure that the number of data rows sampled by the
135148
// sampling rate is greater than the number of shards.
136149
// Otherwise, if the sampling rate is too low, it may result in an insufficient
@@ -144,9 +157,17 @@ private List<ChunkRange> splitTableIntoChunks(
144157
chunkSize);
145158
inverseSamplingRate = chunkSize;
146159
}
160+
log.info(
161+
"Use sampling sharding for table {}, the sampling rate is {}",
162+
tableId,
163+
inverseSamplingRate);
147164
Object[] sample =
148165
sampleDataFromColumn(
149166
jdbc, tableId, splitColumnName, inverseSamplingRate);
167+
log.info(
168+
"Sample data from table {} end, the sample size is {}",
169+
tableId,
170+
sample.length);
150171
return efficientShardingThroughSampling(
151172
tableId, sample, approximateRowCnt, shardCount);
152173
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public OptionRule optionRule() {
6868
JdbcSourceOptions.CONNECTION_POOL_SIZE,
6969
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
7070
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
71-
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
71+
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
72+
JdbcSourceOptions.INVERSE_SAMPLING_RATE)
7273
.optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE)
7374
.conditional(
7475
MySqlSourceOptions.STARTUP_MODE,

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import io.debezium.jdbc.JdbcConnection;
2929
import io.debezium.relational.Column;
3030
import io.debezium.relational.TableId;
31+
import lombok.extern.slf4j.Slf4j;
3132

3233
import java.sql.SQLException;
3334

3435
/** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */
36+
@Slf4j
3537
public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter {
3638

3739
public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
@@ -55,7 +57,7 @@ public Object queryMin(
5557
public Object[] sampleDataFromColumn(
5658
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
5759
throws SQLException {
58-
return MySqlUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate);
60+
return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName, inverseSamplingRate);
5961
}
6062

6163
@Override

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@
3636
import io.debezium.relational.TableId;
3737
import io.debezium.schema.TopicSelector;
3838
import io.debezium.util.SchemaNameAdjuster;
39+
import lombok.extern.slf4j.Slf4j;
3940

4041
import java.sql.Connection;
4142
import java.sql.PreparedStatement;
4243
import java.sql.ResultSet;
4344
import java.sql.SQLException;
45+
import java.sql.Statement;
4446
import java.util.ArrayList;
4547
import java.util.Arrays;
4648
import java.util.HashMap;
@@ -52,6 +54,7 @@
5254
import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.rowToArray;
5355

5456
/** Utils to prepare MySQL SQL statement. */
57+
@Slf4j
5558
public class MySqlUtils {
5659

5760
private MySqlUtils() {}
@@ -142,6 +145,56 @@ public static Object[] sampleDataFromColumn(
142145
});
143146
}
144147

148+
public static Object[] skipReadAndSortSampleData(
149+
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
150+
throws SQLException {
151+
final String sampleQuery =
152+
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));
153+
154+
Statement stmt = null;
155+
ResultSet rs = null;
156+
157+
List<Object> results = new ArrayList<>();
158+
try {
159+
stmt =
160+
jdbc.connection()
161+
.createStatement(
162+
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
163+
164+
stmt.setFetchSize(Integer.MIN_VALUE);
165+
rs = stmt.executeQuery(sampleQuery);
166+
167+
int count = 0;
168+
while (rs.next()) {
169+
count++;
170+
if (count % 100000 == 0) {
171+
log.info("Processing row index: {}", count);
172+
}
173+
if (count % inverseSamplingRate == 0) {
174+
results.add(rs.getObject(1));
175+
}
176+
}
177+
} finally {
178+
if (rs != null) {
179+
try {
180+
rs.close();
181+
} catch (SQLException e) {
182+
log.error("Failed to close ResultSet", e);
183+
}
184+
}
185+
if (stmt != null) {
186+
try {
187+
stmt.close();
188+
} catch (SQLException e) {
189+
log.error("Failed to close Statement", e);
190+
}
191+
}
192+
}
193+
Object[] resultsArray = results.toArray();
194+
Arrays.sort(resultsArray);
195+
return resultsArray;
196+
}
197+
145198
public static Object queryNextChunkMax(
146199
JdbcConnection jdbc,
147200
TableId tableId,

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public Object queryMin(
5757
public Object[] sampleDataFromColumn(
5858
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
5959
throws SQLException {
60-
return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate);
60+
return SqlServerUtils.skipReadAndSortSampleData(
61+
jdbc, tableId, columnName, inverseSamplingRate);
6162
}
6263

6364
@Override

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,13 @@
3939
import io.debezium.relational.TableId;
4040
import io.debezium.schema.TopicSelector;
4141
import io.debezium.util.SchemaNameAdjuster;
42+
import lombok.extern.slf4j.Slf4j;
4243

4344
import java.sql.Connection;
4445
import java.sql.PreparedStatement;
46+
import java.sql.ResultSet;
4547
import java.sql.SQLException;
48+
import java.sql.Statement;
4649
import java.util.ArrayList;
4750
import java.util.Arrays;
4851
import java.util.HashMap;
@@ -52,6 +55,7 @@
5255
import java.util.Optional;
5356

5457
/** The utils for SqlServer data source. */
58+
@Slf4j
5559
public class SqlServerUtils {
5660

5761
public SqlServerUtils() {}
@@ -145,6 +149,56 @@ public static Object[] sampleDataFromColumn(
145149
});
146150
}
147151

152+
public static Object[] skipReadAndSortSampleData(
153+
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
154+
throws SQLException {
155+
final String sampleQuery =
156+
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));
157+
158+
Statement stmt = null;
159+
ResultSet rs = null;
160+
161+
List<Object> results = new ArrayList<>();
162+
try {
163+
stmt =
164+
jdbc.connection()
165+
.createStatement(
166+
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
167+
168+
stmt.setFetchSize(Integer.MIN_VALUE);
169+
rs = stmt.executeQuery(sampleQuery);
170+
171+
int count = 0;
172+
while (rs.next()) {
173+
count++;
174+
if (count % 100000 == 0) {
175+
log.info("Processing row index: {}", count);
176+
}
177+
if (count % inverseSamplingRate == 0) {
178+
results.add(rs.getObject(1));
179+
}
180+
}
181+
} finally {
182+
if (rs != null) {
183+
try {
184+
rs.close();
185+
} catch (SQLException e) {
186+
log.error("Failed to close ResultSet", e);
187+
}
188+
}
189+
if (stmt != null) {
190+
try {
191+
stmt.close();
192+
} catch (SQLException e) {
193+
log.error("Failed to close Statement", e);
194+
}
195+
}
196+
}
197+
Object[] resultsArray = results.toArray();
198+
Arrays.sort(resultsArray);
199+
return resultsArray;
200+
}
201+
148202
/**
149203
* Returns the next LSN to be read from the database. This is the LSN of the last record that
150204
* was read from the database.

0 commit comments

Comments
 (0)