Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.TableFilter;
Expand All @@ -38,10 +37,8 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
Expand Down Expand Up @@ -219,21 +216,14 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
tablePrefix,
tableSuffix,
tableMapping);
Set<String> createdTables;
try {
createdTables = new HashSet<>(catalog.listTables(database));
} catch (Catalog.DatabaseNotExistException e) {
throw new RuntimeException(e);
}
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder,
tblIncludingPattern,
tblExcludingPattern,
dbIncludingPattern,
dbExcludingPattern,
tableNameConverter,
createdTables);
tableNameConverter);
}

protected abstract boolean requirePrimaryKeys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
Expand Down Expand Up @@ -103,12 +104,16 @@ public void processElement(T raw, Context context, Collector<Void> collector) th
schema -> {
Identifier identifier = new Identifier(database, tableName);
try {
catalog.createTable(identifier, schema, true);
} catch (Exception e) {
LOG.error(
"Cannot create newly added Paimon table {}",
identifier.getFullName(),
e);
Table ignore = catalog.getTable(identifier);
} catch (Catalog.TableNotExistException e) {
try {
catalog.createTable(identifier, schema, true);
} catch (Exception ex) {
LOG.error(
"Cannot create newly added Paimon table {}",
identifier.getFullName(),
ex);
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul
@Nullable private final Pattern dbIncludingPattern;
@Nullable private final Pattern dbExcludingPattern;
private final TableNameConverter tableNameConverter;
private final Set<String> createdTables;

private final Map<String, RichEventParser> parsers = new HashMap<>();
private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();
Expand All @@ -66,7 +64,7 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul
private RichEventParser currentParser;

public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
this(null, null, null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
this(null, null, null, null, null, new TableNameConverter(caseSensitive));
}

public RichCdcMultiplexRecordEventParser(
Expand All @@ -75,15 +73,13 @@ public RichCdcMultiplexRecordEventParser(
@Nullable Pattern tblExcludingPattern,
@Nullable Pattern dbIncludingPattern,
@Nullable Pattern dbExcludingPattern,
TableNameConverter tableNameConverter,
Set<String> createdTables) {
TableNameConverter tableNameConverter) {
this.schemaBuilder = schemaBuilder;
this.tblIncludingPattern = tblIncludingPattern;
this.tblExcludingPattern = tblExcludingPattern;
this.dbIncludingPattern = dbIncludingPattern;
this.dbExcludingPattern = dbExcludingPattern;
this.tableNameConverter = tableNameConverter;
this.createdTables = createdTables;
}

@Override
Expand Down Expand Up @@ -201,8 +197,6 @@ private boolean shouldSynchronizeCurrentTable() {
}

private boolean shouldCreateCurrentTable() {
return shouldSynchronizeCurrentTable
&& !record.cdcSchema().fields().isEmpty()
&& createdTables.add(parseTableName());
return shouldSynchronizeCurrentTable && !record.cdcSchema().fields().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public void testCatalogAndTableConfig() {
}

@Test
@Timeout(60)
@Timeout(2400)
public void testCaseInsensitive() throws Exception {
final String topic = "case-insensitive";
createTestTopic(topic, 1, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testIncludingAndExcludingTables() throws Exception {
}

@Test
@Timeout(60)
@Timeout(2400)
public void testCaseInsensitive() throws Exception {
testCaseInsensitive(OGG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ public void testNewlyAddedTables() throws Exception {
}

@Test
@Timeout(600)
@Timeout(1200)
public void testNewlyAddedTableSingleTable() throws Exception {
testNewlyAddedTable(1, false, false, "paimon_sync_database_newly_added_tables_1");
}
Expand Down
Loading