Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Oracle] Fix Oracle CDC cannot capture newly added tables during task running #2078

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.ververica.cdc.connectors.base.relational;

import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isMysqlConnector;

import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
Expand Down Expand Up @@ -188,14 +190,16 @@ private Struct schemaChangeRecordKey(SchemaChangeEvent event) {
}

private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOException {
Struct sourceInfo = event.getSource();
Map<String, Object> source = new HashMap<>();
String fileName = sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY);
Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY);
Long serverId = sourceInfo.getInt64(SERVER_ID_KEY);
source.put(SERVER_ID_KEY, serverId);
source.put(BINLOG_FILENAME_OFFSET_KEY, fileName);
source.put(BINLOG_POSITION_OFFSET_KEY, pos);
if (isMysqlConnector(event.getSource())) {
Struct sourceInfo = event.getSource();
String fileName = sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY);
Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY);
Long serverId = sourceInfo.getInt64(SERVER_ID_KEY);
source.put(SERVER_ID_KEY, serverId);
source.put(BINLOG_FILENAME_OFFSET_KEY, fileName);
source.put(BINLOG_POSITION_OFFSET_KEY, pos);
}
HistoryRecord historyRecord =
new HistoryRecord(
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ public class SourceRecordUtils {

private SourceRecordUtils() {}

public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.mysql.SchemaChangeKey";
public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME =
"io.debezium.connector.common.Heartbeat";
private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();
public static final String MYSQL_SCHEMA_CHANGE_EVENT_KEY_NAME = "io.debezium.connector.mysql.SchemaChangeKey";
public static final String ORACLE_SCHEMA_CHANGE_EVENT_KEY_NAME = "io.debezium.connector.oracle.SchemaChangeKey";
public static final String CONNECTOR = "connector";
public static final String MYSQL_CONNECTOR = "mysql";

/** Converts a {@link ResultSet} row to an array of Objects. */
public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
Expand Down Expand Up @@ -95,7 +97,8 @@ public static Long getFetchTimestamp(SourceRecord record) {

public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
Schema keySchema = sourceRecord.keySchema();
return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
return keySchema != null && (MYSQL_SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())
|| ORACLE_SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()));
}

public static boolean isDataChangeRecord(SourceRecord record) {
Expand Down Expand Up @@ -199,4 +202,14 @@ public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) throws I
String historyRecordStr = value.getString(HISTORY_RECORD_FIELD);
return new HistoryRecord(DOCUMENT_READER.read(historyRecordStr));
}

/**
* Whether the source belong Mysql Connector
* @param source
* @return true if the source belong Mysql Connector
*/
public static boolean isMysqlConnector(Struct source) {
String connector = source.getString(CONNECTOR);
return MYSQL_CONNECTOR.equalsIgnoreCase(connector);
}
}