Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7717 Move LogPositionValidator outside JdbcConnection #5436

Merged
merged 1 commit into from Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -136,7 +136,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co

MySqlOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();

validateAndLoadSchemaHistory(connectorConfig, connection, previousOffsets, schema, snapshotter);
validateAndLoadSchemaHistory(connectorConfig, connection::validateLogPosition, previousOffsets, schema, snapshotter);

LOGGER.info("Reconnecting after finishing schema recovery");

Expand Down
Expand Up @@ -61,7 +61,6 @@ public AbstractConnectorConnection(ConnectionConfiguration configuration, MySqlF
super(configuration.config(), configuration.factory(), QUOTED_CHARACTER, QUOTED_CHARACTER);
this.connectionConfig = configuration;
this.fieldReader = fieldReader;
this.logPositionValidator = this::validateLogPosition;
}

@Override
Expand Down
Expand Up @@ -97,7 +97,6 @@ public OracleConnection(JdbcConfiguration config, ConnectionFactory connectionFa
public OracleConnection(JdbcConfiguration config, ConnectionFactory connectionFactory, boolean showVersion) {
super(config, connectionFactory, QUOTED_CHARACTER, QUOTED_CHARACTER);
LOGGER.trace("JDBC connection string: " + connectionString(config));
this.logPositionValidator = this::validateLogPosition;
this.databaseVersion = resolveOracleDatabaseVersion();
if (showVersion) {
LOGGER.info("Database Version: {}", databaseVersion.getBanner());
Expand All @@ -107,7 +106,6 @@ public OracleConnection(JdbcConfiguration config, ConnectionFactory connectionFa
public OracleConnection(JdbcConfiguration config, boolean showVersion) {
super(config, resolveConnectionFactory(config), QUOTED_CHARACTER, QUOTED_CHARACTER);
LOGGER.trace("JDBC connection string: " + connectionString(config));
this.logPositionValidator = this::validateLogPosition;
this.databaseVersion = resolveOracleDatabaseVersion();
if (showVersion) {
LOGGER.info("Database Version: {}", databaseVersion.getBanner());
Expand Down
Expand Up @@ -98,7 +98,7 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(

OracleOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();

validateAndLoadSchemaHistory(connectorConfig, jdbcConnection, previousOffsets, schema, snapshotterService.getSnapshotter());
validateAndLoadSchemaHistory(connectorConfig, jdbcConnection::validateLogPosition, previousOffsets, schema, snapshotterService.getSnapshotter());

taskContext = new OracleTaskContext(connectorConfig, schema);

Expand Down
Expand Up @@ -135,7 +135,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
beanRegistryJdbcConnection.username(), e);
}

validateAndLoadSchemaHistory(connectorConfig, jdbcConnection, previousOffsets, schema, snapshotter);
validateAndLoadSchemaHistory(connectorConfig, jdbcConnection::validateLogPosition, previousOffsets, schema, snapshotter);

LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
Expand Down
Expand Up @@ -105,8 +105,6 @@ public class PostgresConnection extends JdbcConnection {
public PostgresConnection(JdbcConfiguration config, PostgresValueConverterBuilder valueConverterBuilder, String connectionUsage) {
super(addDefaultSettings(config, connectionUsage), FACTORY, PostgresConnection::validateServerVersion, "\"", "\"");

this.logPositionValidator = this::validateLogPosition;

if (Objects.isNull(valueConverterBuilder)) {
this.typeRegistry = null;
this.defaultValueConverter = null;
Expand All @@ -131,7 +129,6 @@ public PostgresConnection(PostgresConnectorConfig config, TypeRegistry typeRegis
PostgresConnection::validateServerVersion,
"\"", "\"");

this.logPositionValidator = this::validateLogPosition;
if (Objects.isNull(typeRegistry)) {
this.typeRegistry = null;
this.defaultValueConverter = null;
Expand Down
Expand Up @@ -140,7 +140,6 @@ public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConver
boolean useSingleDatabase) {
super(config.getJdbcConfig(), createConnectionFactory(config.getJdbcConfig(), useSingleDatabase), OPENING_QUOTING_CHARACTER, CLOSING_QUOTING_CHARACTER);

this.logPositionValidator = this::validateLogPosition;
defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters);
this.queryFetchSize = config.getQueryFetchSize();

Expand Down
Expand Up @@ -105,7 +105,7 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>

final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);

validateAndLoadSchemaHistory(connectorConfig, metadataConnection, offsets, schema,
validateAndLoadSchemaHistory(connectorConfig, metadataConnection::validateLogPosition, offsets, schema,
snapshotterService.getSnapshotter());

taskContext = new SqlServerTaskContext(connectorConfig, schema);
Expand Down
Expand Up @@ -35,7 +35,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.function.LogPositionValidator;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.notification.channels.NotificationChannel;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
Expand Down Expand Up @@ -68,7 +68,8 @@ public abstract class BaseSourceTask<P extends Partition, O extends OffsetContex
private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1));
private Configuration config;

protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, JdbcConnection jdbcConnection, Offsets<P, O> previousOffsets, DatabaseSchema schema,
protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, LogPositionValidator logPositionValidator, Offsets<P, O> previousOffsets,
DatabaseSchema schema,
Snapshotter snapshotter) {

for (Map.Entry<P, O> previousOffset : previousOffsets) {
Expand Down Expand Up @@ -99,7 +100,7 @@ protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, JdbcCo
}
else {

boolean logPositionAvailable = jdbcConnection.isLogPositionAvailable(offset, config);
boolean logPositionAvailable = isLogPositionAvailable(logPositionValidator, offset, config);

if (schema.isHistorized() && !((HistorizedDatabaseSchema) schema).historyExists()) {

Expand Down Expand Up @@ -146,6 +147,15 @@ protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, JdbcCo
}
}

public boolean isLogPositionAvailable(LogPositionValidator logPositionValidator, OffsetContext offsetContext, CommonConnectorConfig config) {

if (logPositionValidator == null) {
LOGGER.warn("Current JDBC connection implementation is not providing a log position validator implementation. The check will always be 'true'");
return true;
}
return logPositionValidator.validate(offsetContext, config);
}

public enum State {
RESTARTING,
RUNNING,
Expand Down
@@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.function;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.spi.OffsetContext;

@FunctionalInterface
public interface LogPositionValidator {

/**
* Validate the stored offset with the position available in the db log.
* @param offsetContext The current stored offset.
* @param config Connector configuration.
*/
boolean validate(OffsetContext offsetContext, CommonConnectorConfig config);
}
23 changes: 0 additions & 23 deletions debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java
Expand Up @@ -58,7 +58,6 @@
import io.debezium.config.Field;
import io.debezium.pipeline.source.snapshot.incremental.ChunkQueryBuilder;
import io.debezium.pipeline.source.snapshot.incremental.DefaultChunkQueryBuilder;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Attribute;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
Expand Down Expand Up @@ -161,17 +160,6 @@ public interface ResultSetExtractor<T> {
T apply(ResultSet rs) throws SQLException;
}

@FunctionalInterface
public interface LogPositionValidator {

/**
* Validate the stored offset with the position available in the db log.
* @param offsetContext The current stored offset.
* @param config Connector configuration.
*/
boolean validate(OffsetContext offsetContext, CommonConnectorConfig config);
}

/**
* Create a {@link ConnectionFactory} that replaces variables in the supplied URL pattern. Variables include:
* <ul>
Expand Down Expand Up @@ -340,8 +328,6 @@ private static String findAndReplace(String url, String name, Properties props,
private final String closingQuoteCharacter;
private volatile Connection conn;

protected LogPositionValidator logPositionValidator;

/**
* Create a new instance with the given configuration and connection factory.
*
Expand Down Expand Up @@ -1653,13 +1639,4 @@ protected Map<String, Object> reselectColumns(String query, TableId tableId, Lis
});
return results;
}

public boolean isLogPositionAvailable(OffsetContext offsetContext, CommonConnectorConfig config) {

if (logPositionValidator == null) {
LOGGER.warn("Current JDBC connection implementation is not providing a log position validator implementation. The check will always be 'true'");
return true;
}
return logPositionValidator.validate(offsetContext, config);
}
}