Skip to content

Commit

Permalink
DBZ-5119 Move Heartbeat creation to ConnectorConfig & descendants
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros authored and jpechane committed May 25, 2022
1 parent 7ff7826 commit 7d8c948
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 94 deletions.
Expand Up @@ -21,7 +21,6 @@
import io.debezium.connector.mysql.MySqlConnection.MySqlConnectionConfiguration;
import io.debezium.connector.mysql.MySqlConnectorConfig.BigIntUnsignedHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
Expand Down Expand Up @@ -146,8 +145,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
DataChangeEvent::new,
null,
metadataProvider,
new HeartbeatFactory<>(
connectorConfig,
connectorConfig.createHeartbeat(
topicSelector,
schemaNameAdjuster,
() -> new MySqlConnection(new MySqlConnectionConfiguration(heartbeatConfig), connectorConfig.useCursorFetch()
Expand Down
Expand Up @@ -19,7 +19,6 @@
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
Expand Down Expand Up @@ -97,8 +96,7 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
new HeartbeatFactory<>(
connectorConfig,
connectorConfig.createHeartbeat(
topicSelector,
schemaNameAdjuster,
() -> getHeartbeatConnection(connectorConfig, jdbcConfig),
Expand Down
Expand Up @@ -30,7 +30,6 @@
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
Expand Down Expand Up @@ -177,8 +176,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
DataChangeEvent::new,
PostgresChangeRecordEmitter::updateSchema,
metadataProvider,
new HeartbeatFactory<>(
connectorConfig,
connectorConfig.createHeartbeat(
topicSelector,
schemaNameAdjuster,
() -> new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL),
Expand Down
Expand Up @@ -12,7 +12,7 @@

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.connection.LogicalDecodingMessage;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
Expand Down Expand Up @@ -41,24 +41,24 @@ public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicSel
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
new HeartbeatFactory<>(connectorConfig, topicSelector, schemaNameAdjuster), schemaNameAdjuster, null);
connectorConfig.createHeartbeat(topicSelector, schemaNameAdjuster, null, null), schemaNameAdjuster, null);
}

public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider,
HeartbeatFactory<T> heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster) {
Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
heartbeatFactory, schemaNameAdjuster, null);
heartbeat, schemaNameAdjuster, null);
}

public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<PostgresPartition, T> inconsistentSchemaHandler,
EventMetadataProvider metadataProvider, HeartbeatFactory<T> heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster,
EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster,
JdbcConnection jdbcConnection) {
super(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler, metadataProvider,
heartbeatFactory, schemaNameAdjuster);
heartbeat, schemaNameAdjuster);
this.queue = queue;
this.logicalDecodingMessageMonitor = new LogicalDecodingMessageMonitor(connectorConfig, this::enqueueLogicalDecodingMessage);
this.messageFilter = connectorConfig.getMessageFilter();
Expand Down
Expand Up @@ -34,9 +34,13 @@
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.Operation;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
import io.debezium.heartbeat.HeartbeatErrorHandler;
import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.util.SchemaNameAdjuster;
Expand Down Expand Up @@ -952,4 +956,12 @@ public Optional<String> customRetriableException() {
public String getTaskId() {
return taskId;
}

public Heartbeat createHeartbeat(TopicSelector<? extends DataCollectionId> topicSelector, SchemaNameAdjuster schemaNameAdjuster,
HeartbeatConnectionProvider connectionProvider, HeartbeatErrorHandler errorHandler) {
if (getHeartbeatInterval().isZero()) {
return Heartbeat.DEFAULT_NOOP_HEARTBEAT;
}
return new HeartbeatImpl(getHeartbeatInterval(), topicSelector.getHeartbeatTopic(), getLogicalName(), schemaNameAdjuster);
}
}
Expand Up @@ -39,8 +39,8 @@ public class DatabaseHeartbeatImpl extends HeartbeatImpl {
private final JdbcConnection jdbcConnection;
private final HeartbeatErrorHandler errorHandler;

DatabaseHeartbeatImpl(Duration heartbeatInterval, String topicName, String key, JdbcConnection jdbcConnection, String heartBeatActionQuery,
HeartbeatErrorHandler errorHandler, SchemaNameAdjuster schemaNameAdjuster) {
public DatabaseHeartbeatImpl(Duration heartbeatInterval, String topicName, String key, JdbcConnection jdbcConnection, String heartBeatActionQuery,
HeartbeatErrorHandler errorHandler, SchemaNameAdjuster schemaNameAdjuster) {
super(heartbeatInterval, topicName, key, schemaNameAdjuster);

this.heartBeatActionQuery = heartBeatActionQuery;
Expand Down
Expand Up @@ -8,8 +8,7 @@
import io.debezium.jdbc.JdbcConnection;

/**
* Defines a contract for providing a connection to the {@link DatabaseHeartbeatImpl}
* if the {@link HeartbeatFactory} elects to use that implementation.
* Defines a contract for providing a connection to the {@link DatabaseHeartbeatImpl}.
*
* @author Chris Cranford
*/
Expand Down

This file was deleted.

Expand Up @@ -27,7 +27,7 @@
* Default implementation of Heartbeat
*
*/
class HeartbeatImpl implements Heartbeat {
public class HeartbeatImpl implements Heartbeat {

private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);

Expand All @@ -53,7 +53,7 @@ class HeartbeatImpl implements Heartbeat {

private volatile Timer heartbeatTimeout;

HeartbeatImpl(Duration heartbeatInterval, String topicName, String key, SchemaNameAdjuster schemaNameAdjuster) {
public HeartbeatImpl(Duration heartbeatInterval, String topicName, String key, SchemaNameAdjuster schemaNameAdjuster) {
this.topicName = topicName;
this.key = key;
this.heartbeatInterval = heartbeatInterval;
Expand Down
Expand Up @@ -26,7 +26,6 @@
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.Operation;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.pipeline.signal.Signal;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
Expand Down Expand Up @@ -92,21 +91,21 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
new HeartbeatFactory<>(connectorConfig, topicSelector, schemaNameAdjuster), schemaNameAdjuster);
connectorConfig.createHeartbeat(topicSelector, schemaNameAdjuster, null, null), schemaNameAdjuster);
}

public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider,
HeartbeatFactory<T> heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster) {
Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
heartbeatFactory, schemaNameAdjuster);
heartbeat, schemaNameAdjuster);
}

public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<P, T> inconsistentSchemaHandler,
EventMetadataProvider metadataProvider, HeartbeatFactory<T> heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster) {
EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
this.tableChangesSerializer = new ConnectTableChangeSerializer(schemaNameAdjuster);
this.connectorConfig = connectorConfig;
this.topicSelector = topicSelector;
Expand All @@ -124,7 +123,7 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, schemaNameAdjuster,
this::enqueueTransactionMessage);
this.signal = new Signal<>(connectorConfig, this);
this.heartbeat = heartbeatFactory.createHeartbeat();
this.heartbeat = heartbeat;

schemaChangeKeySchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector." + connectorConfig.getConnectorName() + ".SchemaChangeKey"))
Expand Down
Expand Up @@ -25,6 +25,9 @@
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
import io.debezium.heartbeat.HeartbeatErrorHandler;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
Expand All @@ -35,6 +38,10 @@
import io.debezium.relational.Tables.ColumnNameFilterFactory;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;

/**
* Configuration options shared across the relational CDC connectors.
Expand Down Expand Up @@ -808,6 +815,22 @@ public Map<TableId, String> getSnapshotSelectOverridesByTable() {
return Collections.unmodifiableMap(snapshotSelectOverridesByTable);
}

@Override
public Heartbeat createHeartbeat(TopicSelector<? extends DataCollectionId> topicSelector, SchemaNameAdjuster schemaNameAdjuster,
HeartbeatConnectionProvider connectionProvider, HeartbeatErrorHandler errorHandler) {
if (!Strings.isNullOrBlank(getHeartbeatActionQuery()) && !getHeartbeatInterval().isZero()) {
return new DatabaseHeartbeatImpl(
getHeartbeatInterval(),
topicSelector.getHeartbeatTopic(),
getLogicalName(),
connectionProvider.get(),
getHeartbeatActionQuery(),
errorHandler,
schemaNameAdjuster);
}
return super.createHeartbeat(topicSelector, schemaNameAdjuster, connectionProvider, errorHandler);
}

private static int validateSchemaBlacklist(Configuration config, Field field, Field.ValidationOutput problems) {
String whitelist = config.getFallbackStringPropertyWithWarning(SCHEMA_INCLUDE_LIST, SCHEMA_WHITELIST);
String blacklist = config.getFallbackStringPropertyWithWarning(SCHEMA_EXCLUDE_LIST, SCHEMA_BLACKLIST);
Expand Down

0 comments on commit 7d8c948

Please sign in to comment.