Skip to content

Commit

Permalink
[INLONG-7882][Sort] Oracle CDC reduces the number of session connecti…
Browse files Browse the repository at this point in the history
…ons (#8406)
  • Loading branch information
e-mhui committed Jul 5, 2023
1 parent 691c3dd commit e460264
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,7 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
@Override
public OracleSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final OracleConnection jdbcConnection =
createOracleConnection(taskSourceConfig.getDbzConfiguration());
return new OracleSourceFetchTaskContext(taskSourceConfig, this, jdbcConnection);
return new OracleSourceFetchTaskContext(taskSourceConfig, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import java.time.Instant;
import java.util.Map;

import static org.apache.inlong.sort.cdc.oracle.source.utils.OracleConnectionUtils.createOracleConnection;

/** The context for fetch task that fetching data of snapshot split from Oracle data source.
* Copy from com.ververica:flink-connector-oracle-cdc:2.3.0
*/
Expand All @@ -83,11 +85,9 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private OracleErrorHandler errorHandler;

public OracleSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
OracleConnection connection) {
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
this.connection = connection;
this.connection = createOracleConnection(sourceConfig.getDbzConfiguration());
this.metadataProvider = new OracleEventMetadataProvider();
}

Expand All @@ -101,7 +101,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
.getDbzConfiguration()
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
sourceSplitBase.getTableSchemas().values());
this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig);
this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig, connection);
// todo logMiner or xStream
this.offsetContext =
loadStartingOffsetState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
*/
public class OracleConnectionUtils {

private static final Logger LOG = LoggerFactory.getLogger(
com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(OracleConnectionUtils.class);

/** Returned by column metadata in Oracle if no scale is set. */
private static final int ORACLE_UNSET_SCALE = -127;
Expand All @@ -56,7 +55,7 @@ public static OracleConnection createOracleConnection(Configuration dbzConfigura
Configuration configuration = dbzConfiguration.subset(DATABASE_CONFIG_PREFIX, true);
return new OracleConnection(
configuration.isEmpty() ? dbzConfiguration : configuration,
com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.class::getClassLoader);
OracleConnectionUtils.class::getClassLoader);
}

/** Fetch current redoLog offsets in Oracle Server. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import org.apache.inlong.sort.cdc.oracle.source.meta.offset.RedoLogOffset;

import com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
import com.ververica.cdc.connectors.oracle.source.utils.OracleTypeUtils;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
Expand Down Expand Up @@ -267,12 +265,9 @@ public static RowType getSplitType(Table table) {

/** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
public static OracleDatabaseSchema createOracleDatabaseSchema(
OracleConnectorConfig dbzOracleConfig) {
OracleConnectorConfig dbzOracleConfig, OracleConnection oracleConnection) {
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
OracleConnection oracleConnection =
OracleConnectionUtils.createOracleConnection(dbzOracleConfig.getJdbcConfig());
// OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig);
OracleValueConverters oracleValueConverters =
new OracleValueConverters(dbzOracleConfig, oracleConnection);
StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
Expand All @@ -285,27 +280,6 @@ public static OracleDatabaseSchema createOracleDatabaseSchema(
tableNameCaseSensitivity);
}

/** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
public static OracleDatabaseSchema createOracleDatabaseSchema(
OracleConnectorConfig dbzOracleConfig, boolean tableIdCaseInsensitive) {
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
OracleConnection oracleConnection =
OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig);
OracleValueConverters oracleValueConverters =
new OracleValueConverters(dbzOracleConfig, oracleConnection);
StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
tableIdCaseInsensitive
? StreamingAdapter.TableNameCaseSensitivity.SENSITIVE
: StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE;
return new OracleDatabaseSchema(
dbzOracleConfig,
oracleValueConverters,
schemaNameAdjuster,
topicSelector,
tableNameCaseSensitivity);
}

public static RedoLogOffset getRedoLogPosition(SourceRecord dataRecord) {
return getRedoLogPosition(dataRecord.sourceOffset());
}
Expand Down

0 comments on commit e460264

Please sign in to comment.