Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-4339][Sort] Rollback debezium-core to 1.5.4 #4340

Merged
merged 2 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion inlong-sort/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<module>sort-dist</module>
</modules>
<properties>
<debezium.version>1.6.4.Final</debezium.version>
<debezium.version>1.5.4.Final</debezium.version>
<kafka.clients.version>2.7.0</kafka.clients.version>
</properties>
<dependencyManagement>
Expand Down
4 changes: 0 additions & 4 deletions inlong-sort/sort-connectors/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@
<artifactId>clickhouse-jdbc</artifactId>
</dependency>
<!--for postgresql-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.inlong.sort.cdc.debezium.internal;

import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void initializeStorage() {
}

@Override
public boolean storeOnlyCapturedTables() {
public boolean storeOnlyMonitoredTables() {
return storeOnlyMonitoredTablesDdl;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void initializeStorage() {
}

@Override
public boolean storeOnlyCapturedTables() {
public boolean storeOnlyMonitoredTables() {
return storeOnlyMonitoredTablesDdl;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void dispatchSchemaChangeEvent(
T dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter)
throws InterruptedException {
if (dataCollectionId != null && !filter.isIncluded(dataCollectionId)) {
if (historizedSchema == null || historizedSchema.storeOnlyCapturedTables()) {
if (historizedSchema == null || historizedSchema.storeOnlyMonitoredTables()) {
LOG.trace("Filtering schema change event for {}", dataCollectionId);
return;
}
Expand All @@ -158,7 +158,7 @@ public void dispatchSchemaChangeEvent(
}
}
if (!anyNonfilteredEvent) {
if (historizedSchema == null || historizedSchema.storeOnlyCapturedTables()) {
if (historizedSchema == null || historizedSchema.storeOnlyMonitoredTables()) {
LOG.trace("Filtering schema change event for {}", dataCollectionIds);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
executor.submit(
() -> {
try {
binlogSplitReadTask.execute(new BinlogSplitChangeEventSourceContextImpl(),
statefulTaskContext.getOffsetContext());
binlogSplitReadTask.execute(new BinlogSplitChangeEventSourceContextImpl());
} catch (Exception e) {
currentTaskRunning = false;
LOG.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
final SnapshotSplitChangeEventSourceContextImpl sourceContext =
new SnapshotSplitChangeEventSourceContextImpl();
SnapshotResult snapshotResult =
splitSnapshotReadTask.execute(sourceContext,
statefulTaskContext.getOffsetContext());
splitSnapshotReadTask.execute(sourceContext);
final MySqlBinlogSplit backfillBinlogSplit =
createBackfillBinlogSplit(sourceContext);
// optimization that skip the binlog read when the low watermark equals high
Expand All @@ -130,14 +129,8 @@ public void submitSplit(MySqlSplit mySqlSplit) {
if (snapshotResult.isCompletedOrSkipped()) {
final MySqlBinlogSplitReadTask backfillBinlogReadTask =
createBackfillBinlogReadTask(backfillBinlogSplit);
final MySqlOffsetContext.Loader loader =
new MySqlOffsetContext.Loader(
statefulTaskContext.getConnectorConfig());
final MySqlOffsetContext mySqlOffsetContext =
loader.load(
backfillBinlogSplit.getStartingOffset().getOffset());
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContextImpl(), mySqlOffsetContext);
new SnapshotBinlogSplitChangeEventSourceContextImpl());
} else {
readException =
new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.debezium.util.Clock;
import org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.EventDispatcherImpl;
import org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher;
import org.apache.inlong.sort.cdc.mysql.debezium.reader.SnapshotSplitReader;
import org.apache.inlong.sort.cdc.mysql.debezium.reader.SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
import org.slf4j.Logger;
Expand Down Expand Up @@ -70,6 +70,7 @@ public MySqlBinlogSplitReadTask(
MySqlBinlogSplit binlogSplit) {
super(
connectorConfig,
offsetContext,
connection,
dispatcher,
errorHandler,
Expand All @@ -86,15 +87,14 @@ public MySqlBinlogSplitReadTask(
}

@Override
public void execute(ChangeEventSourceContext context, MySqlOffsetContext offsetContext)
throws InterruptedException {
public void execute(ChangeEventSourceContext context) throws InterruptedException {
this.context = context;
super.execute(context, offsetContext);
super.execute(context);
}

@Override
protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
super.handleEvent(offsetContext, event);
protected void handleEvent(Event event) {
super.handleEvent(event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset = getBinlogPosition(offsetContext.getOffset());
Expand All @@ -112,7 +112,7 @@ protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
new DebeziumException("Error processing binlog signal event", e));
}
// tell reader the binlog task finished
((SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished();
((SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
Expand Down Expand Up @@ -60,16 +61,12 @@

import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.currentBinlogOffset;

/**
* Task to read snapshot split of table.
*/
public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource<MySqlOffsetContext> {
/** Task to read snapshot split of table. */
public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource {

private static final Logger LOG = LoggerFactory.getLogger(MySqlSnapshotSplitReadTask.class);

/**
* Interval for showing a log statement with the progress while scanning a single table.
*/
/** Interval for showing a log statement with the progress while scanning a single table. */
private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000);

private final MySqlConnectorConfig connectorConfig;
Expand All @@ -92,7 +89,7 @@ public MySqlSnapshotSplitReadTask(
TopicSelector<TableId> topicSelector,
Clock clock,
MySqlSnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotProgressListener);
super(connectorConfig, previousOffset, snapshotProgressListener);
this.offsetContext = previousOffset;
this.connectorConfig = connectorConfig;
this.databaseSchema = databaseSchema;
Expand All @@ -105,8 +102,7 @@ public MySqlSnapshotSplitReadTask(
}

@Override
public SnapshotResult<MySqlOffsetContext> execute(ChangeEventSourceContext context,
MySqlOffsetContext previousOffset) throws InterruptedException {
public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);
final SnapshotContext ctx;
try {
Expand All @@ -116,7 +112,7 @@ public SnapshotResult<MySqlOffsetContext> execute(ChangeEventSourceContext conte
throw new RuntimeException(e);
}
try {
return doExecute(context, previousOffset, ctx, snapshottingTask);
return doExecute(context, ctx, snapshottingTask);
} catch (InterruptedException e) {
LOG.warn("Snapshot was interrupted before completion");
throw e;
Expand All @@ -125,21 +121,18 @@ public SnapshotResult<MySqlOffsetContext> execute(ChangeEventSourceContext conte
}
}

protected SnapshotResult<MySqlOffsetContext> doExecute(
@Override
protected SnapshotResult doExecute(
ChangeEventSourceContext context,
MySqlOffsetContext previousOffset,
SnapshotContext<MySqlOffsetContext> snapshotContext,
SnapshotContext snapshotContext,
SnapshottingTask snapshottingTask)
throws Exception {
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlOffsetContext>
ctx =
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<
MySqlOffsetContext>)
snapshotContext;
ctx.offset = previousOffset;
SignalEventDispatcher signalEventDispatcher =
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
ctx.offset = offsetContext;
final SignalEventDispatcher signalEventDispatcher =
new SignalEventDispatcher(
previousOffset.getPartition(),
offsetContext.getPartition(),
topicSelector.topicNameFor(snapshotSplit.getTableId()),
dispatcher.getQueue());

Expand Down Expand Up @@ -170,7 +163,7 @@ protected SnapshotResult<MySqlOffsetContext> doExecute(
}

@Override
protected SnapshottingTask getSnapshottingTask(MySqlOffsetContext mySqlOffsetContext) {
protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
return new SnapshottingTask(false, true);
}

Expand All @@ -180,6 +173,14 @@ protected SnapshotContext prepare(ChangeEventSourceContext changeEventSourceCont
return new MySqlSnapshotContext();
}

private static class MySqlSnapshotContext
extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext {

public MySqlSnapshotContext() throws SQLException {
super("");
gong marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void createDataEvents(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
TableId tableId)
Expand All @@ -192,9 +193,7 @@ private void createDataEvents(
snapshotReceiver.completeSnapshot();
}

/**
* Dispatches the data change events for the records of a single table.
*/
/** Dispatches the data change events for the records of a single table. */
private void createDataEventsForTable(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
EventDispatcher.SnapshotReceiver snapshotReceiver,
Expand All @@ -217,16 +216,16 @@ private void createDataEventsForTable(
selectSql);

try (PreparedStatement selectStatement =
StatementUtils.readTableSplitDataStatement(
jdbcConnection,
selectSql,
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null,
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getFieldCount(),
connectorConfig.getQueryFetchSize());
ResultSet rs = selectStatement.executeQuery()) {
StatementUtils.readTableSplitDataStatement(
jdbcConnection,
selectSql,
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null,
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getFieldCount(),
connectorConfig.getQueryFetchSize());
ResultSet rs = selectStatement.executeQuery()) {

ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
long rows = 0;
Expand Down Expand Up @@ -353,20 +352,12 @@ private Object readTimestampField(ResultSet rs, int fieldNo, Column column, Tabl

try {
return MySqlValueConverters.containsZeroValuesInDatePart(
(new String(b.getBytes(1, (int) (b.length())), "UTF-8")), column, table)
(new String(b.getBytes(1, (int) (b.length())), "UTF-8")), column, table)
? null
: rs.getTimestamp(fieldNo, Calendar.getInstance());
} catch (UnsupportedEncodingException e) {
LOG.error("Could not read MySQL TIME value as UTF-8");
throw new RuntimeException(e);
}
}

private static class MySqlSnapshotContext
extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext {

public MySqlSnapshotContext() throws SQLException {
super("");
}
}
}
Loading