Skip to content

Commit d827c70

Browse files
authored
[improve][CDC base] Implement Sample-based Sharding Strategy with Configurable Sampling Rate (#4856)
1 parent 9a2efa5 commit d827c70

File tree

15 files changed

+277
-5
lines changed

15 files changed

+277
-5
lines changed

docs/en/connector-v2/source/MySQL-CDC.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL
4141
| connect.timeout.ms | Duration | No | 30000 |
4242
| connect.max-retries | Integer | No | 3 |
4343
| connection.pool.size | Integer | No | 20 |
44-
| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 |
44+
| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 |
4545
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
46+
| sample-sharding.threshold | int | No | 1000 |
47+
| inverse-sampling.rate | int | No | 1000 |
4648
| debezium.* | config | No | - |
4749
| format | Enum | No | DEFAULT |
4850
| common-options | | no | - |
@@ -124,6 +126,22 @@ of table.
124126

125127
The maximum fetch size for per poll when read table snapshot.
126128

129+
### chunk-key.even-distribution.factor.upper-bound [Double]
130+
131+
The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0.
132+
133+
### chunk-key.even-distribution.factor.lower-bound [Double]
134+
135+
The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05.
136+
137+
### sample-sharding.threshold [Integer]
138+
139+
This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.
140+
141+
### inverse-sampling.rate [Integer]
142+
143+
The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.
144+
127145
### server-id [String]
128146

129147
A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range

docs/en/connector-v2/source/SqlServer-CDC.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq
4040
| connect.timeout | Duration | No | 30s |
4141
| connect.max-retries | Integer | No | 3 |
4242
| connection.pool.size | Integer | No | 20 |
43-
| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 |
43+
| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 |
4444
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
45+
| sample-sharding.threshold | int | No | 1000 |
46+
| inverse-sampling.rate | int | No | 1000 |
4547
| debezium.* | config | No | - |
4648
| format | Enum | No | DEFAULT |
4749
| common-options | | no | - |
@@ -123,6 +125,22 @@ of table.
123125

124126
The maximum fetch size for per poll when read table snapshot.
125127

128+
### chunk-key.even-distribution.factor.upper-bound [Double]
129+
130+
The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0.
131+
132+
### chunk-key.even-distribution.factor.lower-bound [Double]
133+
134+
The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05.
135+
136+
### sample-sharding.threshold [Integer]
137+
138+
This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.
139+
140+
### inverse-sampling.rate [Integer]
141+
142+
The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.
143+
126144
### server-time-zone [String]
127145

128146
The session time zone in database server.

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public abstract class BaseSourceConfig implements SourceConfig {
3737

3838
@Getter protected final double distributionFactorUpper;
3939
@Getter protected final double distributionFactorLower;
40+
@Getter protected final int sampleShardingThreshold;
41+
@Getter protected final int inverseSamplingRate;
4042

4143
// --------------------------------------------------------------------------------------------
4244
// Debezium Configurations
@@ -49,12 +51,16 @@ public BaseSourceConfig(
4951
int splitSize,
5052
double distributionFactorUpper,
5153
double distributionFactorLower,
54+
int sampleShardingThreshold,
55+
int inverseSamplingRate,
5256
Properties dbzProperties) {
5357
this.startupConfig = startupConfig;
5458
this.stopConfig = stopConfig;
5559
this.splitSize = splitSize;
5660
this.distributionFactorUpper = distributionFactorUpper;
5761
this.distributionFactorLower = distributionFactorLower;
62+
this.sampleShardingThreshold = sampleShardingThreshold;
63+
this.inverseSamplingRate = inverseSamplingRate;
5864
this.dbzProperties = dbzProperties;
5965
}
6066

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public JdbcSourceConfig(
5151
int splitSize,
5252
double distributionFactorUpper,
5353
double distributionFactorLower,
54+
int sampleShardingThreshold,
55+
int inverseSamplingRate,
5456
Properties dbzProperties,
5557
String driverClassName,
5658
String hostname,
@@ -69,6 +71,8 @@ public JdbcSourceConfig(
6971
splitSize,
7072
distributionFactorUpper,
7173
distributionFactorLower,
74+
sampleShardingThreshold,
75+
inverseSamplingRate,
7276
dbzProperties);
7377
this.driverClassName = driverClassName;
7478
this.hostname = hostname;

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,13 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<Jd
4242
protected StartupConfig startupConfig;
4343
protected StopConfig stopConfig;
4444
protected boolean includeSchemaChanges = false;
45-
protected double distributionFactorUpper = 1000.0d;
46-
protected double distributionFactorLower = 0.05d;
45+
protected double distributionFactorUpper =
46+
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
47+
protected double distributionFactorLower =
48+
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
49+
protected int sampleShardingThreshold =
50+
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD.defaultValue();
51+
protected int inverseSamplingRate = JdbcSourceOptions.INVERSE_SAMPLING_RATE.defaultValue();
4752
protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
4853
protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
4954
protected String serverTimeZone = JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
@@ -139,6 +144,34 @@ public JdbcSourceConfigFactory distributionFactorLower(double distributionFactor
139144
return this;
140145
}
141146

147+
/**
148+
* The threshold for the row count to trigger sample-based sharding strategy. When the
149+
* distribution factor is within the upper and lower bounds, if the approximate row count
150+
* exceeds this threshold, the sample-based sharding strategy will be used. This can help to
151+
* handle large datasets in a more efficient manner.
152+
*
153+
* @param sampleShardingThreshold The threshold of row count.
154+
* @return This JdbcSourceConfigFactory.
155+
*/
156+
public JdbcSourceConfigFactory sampleShardingThreshold(int sampleShardingThreshold) {
157+
this.sampleShardingThreshold = sampleShardingThreshold;
158+
return this;
159+
}
160+
161+
/**
162+
* The inverse of the sampling rate to be used for data sharding based on sampling. The actual
163+
* sampling rate is 1 / inverseSamplingRate. For instance, if inverseSamplingRate is 1000, then
164+
* the sampling rate is 1/1000, meaning every 1000th record will be included in the sample used
165+
* for sharding.
166+
*
167+
* @param inverseSamplingRate The value representing the inverse of the desired sampling rate.
168+
* @return this JdbcSourceConfigFactory instance.
169+
*/
170+
public JdbcSourceConfigFactory inverseSamplingRate(int inverseSamplingRate) {
171+
this.inverseSamplingRate = inverseSamplingRate;
172+
return this;
173+
}
174+
142175
/** The maximum fetch size for per poll when read table snapshot. */
143176
public JdbcSourceConfigFactory fetchSize(int fetchSize) {
144177
this.fetchSize = fetchSize;
@@ -201,6 +234,8 @@ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
201234
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
202235
this.distributionFactorLower =
203236
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
237+
this.sampleShardingThreshold = config.get(JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD);
238+
this.inverseSamplingRate = config.get(JdbcSourceOptions.INVERSE_SAMPLING_RATE);
204239
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
205240
this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
206241
this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public class JdbcSourceOptions extends SourceOptions {
100100
public static final Option<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
101101
Options.key("chunk-key.even-distribution.factor.upper-bound")
102102
.doubleType()
103-
.defaultValue(1000.0d)
103+
.defaultValue(100.0d)
104104
.withDescription(
105105
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
106106
+ " table is evenly distribution or not."
@@ -118,4 +118,25 @@ public class JdbcSourceOptions extends SourceOptions {
118118
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
119119
+ " and the query for splitting would happen when it is uneven."
120120
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
121+
122+
public static final Option<Integer> SAMPLE_SHARDING_THRESHOLD =
123+
Options.key("sample-sharding.threshold")
124+
.intType()
125+
.defaultValue(1000) // 1000 shards
126+
.withDescription(
127+
"The threshold of estimated shard count to trigger the sample sharding strategy. "
128+
+ "When the distribution factor is outside the upper and lower bounds, "
129+
+ "and if the estimated shard count (approximateRowCnt/chunkSize) exceeds this threshold, "
130+
+ "the sample sharding strategy will be used. "
131+
+ "This strategy can help to handle large datasets more efficiently. "
132+
+ "The default value is 1000 shards.");
133+
public static final Option<Integer> INVERSE_SAMPLING_RATE =
134+
Options.key("inverse-sampling.rate")
135+
.intType()
136+
.defaultValue(1000) // 1/1000 sampling rate
137+
.withDescription(
138+
"The inverse of the sampling rate for the sample sharding strategy. "
139+
+ "The value represents the denominator of the sampling rate fraction. "
140+
+ "For example, a value of 1000 means a sampling rate of 1/1000. "
141+
+ "This parameter is used when the sample sharding strategy is triggered.");
121142
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,22 @@ Object queryMin(
6262
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
6363
throws SQLException;
6464

65+
/**
66+
* Performs a sampling operation on the specified column of a table in a JDBC-connected
67+
* database.
68+
*
69+
* @param jdbc The JDBC connection object used to connect to the database.
70+
* @param tableId The ID of the table in which the column resides.
71+
* @param columnName The name of the column to be sampled.
72+
* @param samplingRate samplingRate The inverse of the fraction of the data to be sampled from
73+
* the column. For example, a value of 1000 would mean 1/1000 of the data will be sampled.
74+
* @return Returns a List of sampled data from the specified column.
75+
* @throws SQLException If an SQL error occurs during the sampling operation.
76+
*/
77+
Object[] sampleDataFromColumn(
78+
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
79+
throws SQLException;
80+
6581
/**
6682
* Query the maximum value of the next chunk, and the next chunk must be greater than or equal
6783
* to <code>includedLowerBound</code> value [min_1, max_1), [min_2, max_2),... [min_n, null).

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public MySqlSourceConfig(
4343
int splitSize,
4444
double distributionFactorUpper,
4545
double distributionFactorLower,
46+
int sampleShardingThreshold,
47+
int inverseSamplingRate,
4648
Properties dbzProperties,
4749
String driverClassName,
4850
String hostname,
@@ -63,6 +65,8 @@ public MySqlSourceConfig(
6365
splitSize,
6466
distributionFactorUpper,
6567
distributionFactorLower,
68+
sampleShardingThreshold,
69+
inverseSamplingRate,
6670
dbzProperties,
6771
driverClassName,
6872
hostname,

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ public MySqlSourceConfig create(int subtaskId) {
117117
splitSize,
118118
distributionFactorUpper,
119119
distributionFactorLower,
120+
sampleShardingThreshold,
121+
inverseSamplingRate,
120122
props,
121123
driverClassName,
122124
hostname,

0 commit comments

Comments
 (0)