Skip to content

Commit 0d08b20

Browse files
authored
[Improve][Connector-v2] Optimize the count table rows for jdbc-oracle and oracle-cdc (#7248)
1 parent cc59499 commit 0d08b20

File tree

23 files changed

+594
-179
lines changed

23 files changed

+594
-179
lines changed

docs/en/connector-v2/source/Jdbc.md

Lines changed: 56 additions & 142 deletions
Large diffs are not rendered by default.

docs/en/connector-v2/source/Oracle-CDC.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ exit;
244244
| sample-sharding.threshold | Integer | No | 1000 | This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. |
245245
| inverse-sampling.rate | Integer | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. |
246246
| exactly_once | Boolean | No | false | Enable exactly once semantic. |
247+
| use_select_count | Boolean | No | false | Use select count for table count rather then other methods in full stage.In this scenario, select count directly is used when it is faster to update statistics using sql from analysis table |
248+
| skip_analyze | Boolean | No | false | Skip the analysis of table count in full stage.In this scenario, you schedule analysis table sql to update related table statistics periodically or your table data does not change frequently |
247249
| format | Enum | No | DEFAULT | Optional output format for Oracle CDC, valid enumerations are `DEFAULT``COMPATIBLE_DEBEZIUM_JSON`. |
248250
| debezium | Config | No | - | Pass-through [Debezium's properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/oracle.adoc#connector-properties) to Debezium Embedded Engine which is used to capture data changes from Oracle server. |
249251
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
@@ -270,6 +272,44 @@ source {
270272
}
271273
```
272274

275+
> Use the select count(*) instead of analysis table for count table rows in full stage
276+
>
277+
> ```conf
278+
> source {
279+
> # This is a example source plugin **only for test and demonstrate the feature source plugin**
280+
> Oracle-CDC {
281+
> result_table_name = "customers"
282+
> use_select_count = true
283+
> username = "system"
284+
> password = "oracle"
285+
> database-names = ["XE"]
286+
> schema-names = ["DEBEZIUM"]
287+
> table-names = ["XE.DEBEZIUM.FULL_TYPES"]
288+
> base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
289+
> source.reader.close.timeout = 120000
290+
> }
291+
> }
292+
> ```
293+
>
294+
> Use the select NUM_ROWS from all_tables for the table rows but skip the analyze table.
295+
>
296+
> ```conf
297+
> source {
298+
> # This is a example source plugin **only for test and demonstrate the feature source plugin**
299+
> Oracle-CDC {
300+
> result_table_name = "customers"
301+
> skip_analyze = true
302+
> username = "system"
303+
> password = "oracle"
304+
> database-names = ["XE"]
305+
> schema-names = ["DEBEZIUM"]
306+
> table-names = ["XE.DEBEZIUM.FULL_TYPES"]
307+
> base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
308+
> source.reader.close.timeout = 120000
309+
> }
310+
> }
311+
> ```
312+
273313
### Support custom primary key for table
274314
275315
```

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.debezium.config.Configuration;
2525
import io.debezium.connector.oracle.OracleConnectorConfig;
2626
import io.debezium.relational.RelationalTableFilters;
27+
import lombok.Getter;
2728

2829
import java.util.List;
2930
import java.util.Properties;
@@ -32,11 +33,17 @@
3233
* Describes the connection information of the Oracle database and the configuration information for
3334
* performing snapshotting and streaming reading, such as splitSize.
3435
*/
36+
@Getter
3537
public class OracleSourceConfig extends JdbcSourceConfig {
3638

3739
private static final long serialVersionUID = 1L;
3840

41+
private final Boolean useSelectCount;
42+
private final Boolean skipAnalyze;
43+
3944
public OracleSourceConfig(
45+
Boolean useSelectCount,
46+
Boolean skipAnalyze,
4047
StartupConfig startupConfig,
4148
StopConfig stopConfig,
4249
List<String> databaseList,
@@ -82,6 +89,8 @@ public OracleSourceConfig(
8289
connectMaxRetries,
8390
connectionPoolSize,
8491
exactlyOnce);
92+
this.useSelectCount = useSelectCount;
93+
this.skipAnalyze = skipAnalyze;
8594
}
8695

8796
@Override

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory {
3838
private static final String DRIVER_CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
3939

4040
private List<String> schemaList;
41+
42+
private Boolean useSelectCount;
43+
44+
private Boolean skipAnalyze;
4145
/**
4246
* An optional list of regular expressions that match schema names to be monitored; any schema
4347
* name not included in the whitelist will be excluded from monitoring. By default all
@@ -48,6 +52,16 @@ public JdbcSourceConfigFactory schemaList(List<String> schemaList) {
4852
return this;
4953
}
5054

55+
public JdbcSourceConfigFactory useSelectCount(Boolean useSelectCount) {
56+
this.useSelectCount = useSelectCount;
57+
return this;
58+
}
59+
60+
public JdbcSourceConfigFactory skipAnalyze(Boolean skipAnalyze) {
61+
this.skipAnalyze = skipAnalyze;
62+
return this;
63+
}
64+
5165
/** Creates a new {@link OracleSourceConfig} for the given subtask {@code subtaskId}. */
5266
public OracleSourceConfig create(int subtask) {
5367

@@ -123,6 +137,8 @@ public OracleSourceConfig create(int subtask) {
123137
}
124138

125139
return new OracleSourceConfig(
140+
useSelectCount,
141+
skipAnalyze,
126142
startupConfig,
127143
stopConfig,
128144
databaseList,

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(Readonly
8888
configFactory.startupOptions(startupConfig);
8989
configFactory.stopOptions(stopConfig);
9090
configFactory.schemaList(config.get(OracleSourceOptions.SCHEMA_NAMES));
91+
configFactory.useSelectCount(config.get(OracleSourceOptions.USE_SELECT_COUNT));
92+
configFactory.skipAnalyze(config.get(OracleSourceOptions.SKIP_ANALYZE));
9193
configFactory.originUrl(config.get(JdbcCatalogOptions.BASE_URL));
9294
return configFactory;
9395
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public OptionRule optionRule() {
6161
JdbcCatalogOptions.BASE_URL,
6262
JdbcSourceOptions.DATABASE_NAMES,
6363
OracleSourceOptions.SCHEMA_NAMES,
64+
OracleSourceOptions.USE_SELECT_COUNT,
65+
OracleSourceOptions.SKIP_ANALYZE,
6466
JdbcSourceOptions.SERVER_TIME_ZONE,
6567
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
6668
JdbcSourceOptions.CONNECT_MAX_RETRIES,

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleSourceOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,16 @@ public class OracleSourceOptions {
5353
.listType()
5454
.noDefaultValue()
5555
.withDescription("Schema name of the database to monitor.");
56+
57+
public static final Option<Boolean> USE_SELECT_COUNT =
58+
Options.key("use_select_count")
59+
.booleanType()
60+
.defaultValue(false)
61+
.withDescription("Use select count for table count in full stage");
62+
63+
public static final Option<Boolean> SKIP_ANALYZE =
64+
Options.key("skip_analyze")
65+
.booleanType()
66+
.defaultValue(false)
67+
.withDescription("Skip the analysis of table count in full stage");
5668
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
2424
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter;
2525
import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
26+
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
2627
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleTypeUtils;
2728
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils;
2829

@@ -41,8 +42,11 @@
4142
@Slf4j
4243
public class OracleChunkSplitter extends AbstractJdbcSourceChunkSplitter {
4344

45+
private final OracleSourceConfig oracleSourceConfig;
46+
4447
public OracleChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
4548
super(sourceConfig, dialect);
49+
this.oracleSourceConfig = (OracleSourceConfig) sourceConfig;
4650
}
4751

4852
@Override
@@ -80,7 +84,7 @@ public Object queryNextChunkMax(
8084

8185
@Override
8286
public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
83-
return OracleUtils.queryApproximateRowCnt(jdbc, tableId);
87+
return OracleUtils.queryApproximateRowCnt(oracleSourceConfig, jdbc, tableId);
8488
}
8589

8690
@Override

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2222
import org.apache.seatunnel.common.utils.SeaTunnelException;
2323
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
24+
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
2425
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset;
2526

2627
import org.apache.kafka.connect.source.SourceRecord;
@@ -81,27 +82,41 @@ public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String
8182
});
8283
}
8384

84-
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
85+
public static long queryApproximateRowCnt(
86+
OracleSourceConfig oracleSourceConfig, JdbcConnection jdbc, TableId tableId)
8587
throws SQLException {
86-
final String analyzeTable =
87-
String.format(
88-
"analyze table %s compute statistics for table",
89-
quoteSchemaAndTable(tableId));
90-
final String rowCountQuery =
91-
String.format(
92-
"select NUM_ROWS from all_tables where TABLE_NAME = '%s'", tableId.table());
93-
return jdbc.execute(analyzeTable)
94-
.queryAndMap(
95-
rowCountQuery,
96-
rs -> {
97-
if (!rs.next()) {
98-
throw new SQLException(
99-
String.format(
100-
"No result returned after running query [%s]",
101-
rowCountQuery));
102-
}
103-
return rs.getLong(1);
104-
});
88+
Boolean useSelectCount = oracleSourceConfig.getUseSelectCount();
89+
String rowCountQuery;
90+
if (useSelectCount) {
91+
rowCountQuery = String.format("select count(*) from %s", quoteSchemaAndTable(tableId));
92+
} else {
93+
rowCountQuery =
94+
String.format(
95+
"select NUM_ROWS from all_tables where TABLE_NAME = '%s'",
96+
tableId.table());
97+
Boolean skipAnalyze = oracleSourceConfig.getSkipAnalyze();
98+
if (!skipAnalyze) {
99+
final String analyzeTable =
100+
String.format(
101+
"analyze table %s compute statistics for table",
102+
quoteSchemaAndTable(tableId));
103+
// not skip analyze
104+
log.info("analyze table sql: {}", analyzeTable);
105+
jdbc.execute(analyzeTable);
106+
}
107+
}
108+
log.info("row count query: {}", rowCountQuery);
109+
return jdbc.queryAndMap(
110+
rowCountQuery,
111+
rs -> {
112+
if (!rs.next()) {
113+
throw new SQLException(
114+
String.format(
115+
"No result returned after running query [%s]",
116+
rowCountQuery));
117+
}
118+
return rs.getLong(1);
119+
});
105120
}
106121

107122
public static Object queryMin(

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,16 @@ public interface JdbcSourceOptions {
9393
+ "The value represents the denominator of the sampling rate fraction. "
9494
+ "For example, a value of 1000 means a sampling rate of 1/1000. "
9595
+ "This parameter is used when the sample sharding strategy is triggered.");
96+
97+
Option<Boolean> USE_SELECT_COUNT =
98+
Options.key("use_select_count")
99+
.booleanType()
100+
.defaultValue(false)
101+
.withDescription("Use select count for table count");
102+
103+
Option<Boolean> SKIP_ANALYZE =
104+
Options.key("skip_analyze")
105+
.booleanType()
106+
.defaultValue(false)
107+
.withDescription("Skip the analysis of table count");
96108
}

0 commit comments

Comments
 (0)