From 6017b165289d8e6f40db396cd07c62285be7fca9 Mon Sep 17 00:00:00 2001 From: Qingsheng Ren Date: Tue, 16 Apr 2024 22:48:41 +0800 Subject: [PATCH] [FLINK-34653][cdc][route] Support table merging with route rules This closes #3129. --- .../cdc/common/utils/ChangeEventUtils.java | 97 +---- .../assertions/ChangeEventAssert.java | 3 +- .../composer/flink/FlinkPipelineComposer.java | 9 +- .../flink/translator/RouteTranslator.java | 43 --- .../translator/SchemaOperatorTranslator.java | 25 +- .../flink/FlinkPipelineComposerITCase.java | 271 +++++++++++++ .../operators/schema/SchemaOperator.java | 250 +++++++++++- .../schema/SchemaOperatorFactory.java | 13 +- .../schema/coordinator/SchemaDerivation.java | 315 +++++++++++++++ .../schema/coordinator/SchemaRegistry.java | 25 +- .../coordinator/SchemaRegistryProvider.java | 28 +- .../SchemaRegistryRequestHandler.java | 44 ++- .../schema/event/SchemaChangeResponse.java | 16 +- .../operators/route/RouteFunctionTest.java | 195 ---------- .../operators/schema/SchemaOperatorTest.java | 2 +- .../coordinator/SchemaDerivationTest.java | 365 ++++++++++++++++++ .../operators/EventOperatorTestHarness.java | 4 +- 17 files changed, 1332 insertions(+), 373 deletions(-) rename flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/route/RouteFunction.java => flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java (51%) delete mode 100644 flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/RouteTranslator.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java delete mode 100644 flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/route/RouteFunctionTest.java create mode 100644 flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/route/RouteFunction.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java similarity index 51% rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/route/RouteFunction.java rename to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java index cc40796c30..1825dd2623 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/route/RouteFunction.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java @@ -15,107 +15,20 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.operators.route; +package org.apache.flink.cdc.common.utils; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; -import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; -import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.schema.Selectors; -import org.apache.flink.configuration.Configuration; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.flink.cdc.common.utils.Preconditions.checkState; - -/** A map function that applies user-defined routing logics. */ -public class RouteFunction extends RichMapFunction { - private final List> routingRules; - private transient List> routes; - - public static Builder newBuilder() { - return new Builder(); - } - - /** Builder of {@link RouteFunction}. */ - public static class Builder { - private final List> routingRules = new ArrayList<>(); - - public Builder addRoute(String tableInclusions, TableId replaceBy) { - routingRules.add(Tuple2.of(tableInclusions, replaceBy)); - return this; - } - - public RouteFunction build() { - return new RouteFunction(routingRules); - } - } - - private RouteFunction(List> routingRules) { - this.routingRules = routingRules; - } - - @Override - public void open(Configuration parameters) throws Exception { - routes = - routingRules.stream() - .map( - tuple2 -> { - String tableInclusions = tuple2.f0; - TableId replaceBy = tuple2.f1; - Selectors selectors = - new Selectors.SelectorsBuilder() - .includeTables(tableInclusions) - .build(); - return new Tuple2<>(selectors, replaceBy); - }) - .collect(Collectors.toList()); - } - - @Override - public Event map(Event event) throws Exception { - checkState( - event instanceof ChangeEvent, - String.format( - "The input event of the route is not a ChangeEvent but with type \"%s\"", - event.getClass().getCanonicalName())); - ChangeEvent changeEvent = (ChangeEvent) event; - TableId tableId = changeEvent.tableId(); - - for (Tuple2 route : routes) { - Selectors selectors = route.f0; - TableId replaceBy = route.f1; - if (selectors.isMatch(tableId)) { - return recreateChangeEvent(changeEvent, replaceBy); - } - } - return event; - } - - private ChangeEvent recreateChangeEvent(ChangeEvent event, TableId tableId) { - if (event instanceof DataChangeEvent) { - return recreateDataChangeEvent(((DataChangeEvent) event), tableId); - } - if (event instanceof SchemaChangeEvent) { - return recreateSchemaChangeEvent(((SchemaChangeEvent) event), tableId); - } - throw new UnsupportedOperationException( - String.format( - "Unsupported change event with type \"%s\"", - event.getClass().getCanonicalName())); - } - - private DataChangeEvent recreateDataChangeEvent( +/** Utilities for handling {@link org.apache.flink.cdc.common.event.ChangeEvent}s. */ +public class ChangeEventUtils { + public static DataChangeEvent recreateDataChangeEvent( DataChangeEvent dataChangeEvent, TableId tableId) { switch (dataChangeEvent.op()) { case INSERT: @@ -141,7 +54,7 @@ private DataChangeEvent recreateDataChangeEvent( } } - private SchemaChangeEvent recreateSchemaChangeEvent( + public static SchemaChangeEvent recreateSchemaChangeEvent( SchemaChangeEvent schemaChangeEvent, TableId tableId) { if (schemaChangeEvent instanceof CreateTableEvent) { CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent; diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/testutils/assertions/ChangeEventAssert.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/testutils/assertions/ChangeEventAssert.java index 53f4ec57b1..02ef468233 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/testutils/assertions/ChangeEventAssert.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/testutils/assertions/ChangeEventAssert.java @@ -40,7 +40,8 @@ public SELF hasTableId(TableId tableId) { failWithActualExpectedAndMessage( actual.tableId(), tableId, - "Table ID of the DataChangeEvent is not as expected"); + "Table ID of the %s is not as expected", + actual.getClass().getSimpleName()); } return myself; } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index ceb285f853..30ce1ff5e6 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -32,7 +32,6 @@ import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator; import org.apache.flink.cdc.composer.flink.translator.PartitioningTranslator; -import org.apache.flink.cdc.composer.flink.translator.RouteTranslator; import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; import org.apache.flink.cdc.composer.flink.translator.TransformTranslator; import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; @@ -104,6 +103,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Build TransformSchemaOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); stream = transformTranslator.translateSchema(stream, pipelineDef.getTransforms()); + + // Schema operator SchemaOperatorTranslator schemaOperatorTranslator = new SchemaOperatorTranslator( pipelineDef @@ -121,16 +122,12 @@ public PipelineExecution compose(PipelineDef pipelineDef) { schemaOperatorIDGenerator.generate(), pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE)); - // Build Router used to route Event - RouteTranslator routeTranslator = new RouteTranslator(); - stream = routeTranslator.translate(stream, pipelineDef.getRoute()); - // Build DataSink in advance as schema operator requires MetadataApplier DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig()); stream = schemaOperatorTranslator.translate( - stream, parallelism, dataSink.getMetadataApplier()); + stream, parallelism, dataSink.getMetadataApplier(), pipelineDef.getRoute()); // Build Partitioner used to shuffle Event PartitioningTranslator partitioningTranslator = new PartitioningTranslator(); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/RouteTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/RouteTranslator.java deleted file mode 100644 index 0ad0c0dd3a..0000000000 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/RouteTranslator.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.composer.flink.translator; - -import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.composer.definition.RouteDef; -import org.apache.flink.cdc.runtime.operators.route.RouteFunction; -import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.util.List; - -/** Translator used to build {@link RouteFunction}. */ -public class RouteTranslator { - - public DataStream translate(DataStream input, List routes) { - if (routes.isEmpty()) { - return input; - } - RouteFunction.Builder routeFunctionBuilder = RouteFunction.newBuilder(); - for (RouteDef route : routes) { - routeFunctionBuilder.addRoute( - route.getSourceTable(), TableId.parse(route.getSinkTable())); - } - return input.map(routeFunctionBuilder.build(), new EventTypeInfo()).name("Route"); - } -} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java index bb434581bd..9513c03496 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java @@ -17,17 +17,23 @@ package org.apache.flink.cdc.composer.flink.translator; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.composer.definition.RouteDef; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperatorFactory; import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import java.util.ArrayList; +import java.util.List; + /** Translator used to build {@link SchemaOperator} for schema event process. */ @Internal public class SchemaOperatorTranslator { @@ -41,10 +47,13 @@ public SchemaOperatorTranslator( } public DataStream translate( - DataStream input, int parallelism, MetadataApplier metadataApplier) { + DataStream input, + int parallelism, + MetadataApplier metadataApplier, + List routes) { switch (schemaChangeBehavior) { case EVOLVE: - return addSchemaOperator(input, parallelism, metadataApplier); + return addSchemaOperator(input, parallelism, metadataApplier, routes); case IGNORE: return dropSchemaChangeEvent(input, parallelism); case EXCEPTION: @@ -61,12 +70,20 @@ public String getSchemaOperatorUid() { } private DataStream addSchemaOperator( - DataStream input, int parallelism, MetadataApplier metadataApplier) { + DataStream input, + int parallelism, + MetadataApplier metadataApplier, + List routes) { + List> routingRules = new ArrayList<>(); + for (RouteDef route : routes) { + routingRules.add( + Tuple2.of(route.getSourceTable(), TableId.parse(route.getSinkTable()))); + } SingleOutputStreamOperator stream = input.transform( "SchemaOperator", new EventTypeInfo(), - new SchemaOperatorFactory(metadataApplier)); + new SchemaOperatorFactory(metadataApplier, routingRules)); stream.uid(schemaOperatorUid).setParallelism(parallelism); return stream; } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index ed7eaee3c9..b59f4ead64 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -18,9 +18,21 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.RouteDef; import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.definition.SourceDef; import org.apache.flink.cdc.composer.definition.TransformDef; @@ -30,14 +42,17 @@ import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper; import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import java.io.ByteArrayOutputStream; import java.io.PrintStream; @@ -386,4 +401,260 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=()}"); } + + @Test + void testOneToOneRouting() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup route + TableId routedTable1 = TableId.tableId("default_namespace", "default_schema", "routed1"); + TableId routedTable2 = TableId.tableId("default_namespace", "default_schema", "routed2"); + List routeDef = + Arrays.asList( + new RouteDef(TABLE_1.toString(), routedTable1.toString(), null), + new RouteDef(TABLE_2.toString(), routedTable2.toString(), null)); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, sinkDef, routeDef, Collections.emptyList(), pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List routed1Results = ValuesDatabase.getResults(routedTable1); + assertThat(routed1Results) + .contains( + "default_namespace.default_schema.routed1:col1=2;newCol3=x", + "default_namespace.default_schema.routed1:col1=3;newCol3="); + List routed2Results = ValuesDatabase.getResults(routedTable2); + assertThat(routed2Results) + .contains( + "default_namespace.default_schema.routed2:col1=1;col2=1", + "default_namespace.default_schema.routed2:col1=2;col2=2", + "default_namespace.default_schema.routed2:col1=3;col2=3"); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.routed1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.routed2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.routed1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed2, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed2, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed2, before=[], after=[3, 3], op=INSERT, meta=()}", + "RenameColumnEvent{tableId=default_namespace.default_schema.routed1, nameMapping={col2=newCol2, col3=newCol3}}", + "DropColumnEvent{tableId=default_namespace.default_schema.routed1, droppedColumnNames=[newCol2]}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[1, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); + } + + @Test + void testMergingWithRoute() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); + TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2"); + Schema table1Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + Schema table2Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .physicalColumn("age", DataTypes.TINYINT()) + .physicalColumn("description", DataTypes.STRING()) + .primaryKey("id") + .build(); + + // Create test dataset: + // Create table 1 [id, name, age] + // Table 1: +I[1, Alice, 18] + // Table 1: +I[2, Bob, 20] + // Table 1: -U[2, Bob, 20] +U[2, Bob, 30] + // Create table 2 [id, name, age] + // Table 2: +I[3, Charlie, 15, student] + // Table 2: +I[4, Donald, 25, student] + // Table 2: -D[4, Donald, 25, student] + // Rename column for table 1: name -> last_name + // Add column for table 2: gender + // Table 1: +I[5, Eliza, 24] + // Table 2: +I[6, Frank, 30, student, male] + List events = new ArrayList<>(); + BinaryRecordDataGenerator table1dataGenerator = + new BinaryRecordDataGenerator( + table1Schema.getColumnDataTypes().toArray(new DataType[0])); + BinaryRecordDataGenerator table2dataGenerator = + new BinaryRecordDataGenerator( + table2Schema.getColumnDataTypes().toArray(new DataType[0])); + events.add(new CreateTableEvent(myTable1, table1Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {1, BinaryStringData.fromString("Alice"), 18}))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}), + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 30}))); + events.add(new CreateTableEvent(myTable2, table2Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 3L, + BinaryStringData.fromString("Charlie"), + (byte) 15, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.deleteEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + events.add(new RenameColumnEvent(myTable1, ImmutableMap.of("name", "last_name"))); + events.add( + new AddColumnEvent( + myTable2, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("gender", DataTypes.STRING()))))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {5, BinaryStringData.fromString("Eliza"), 24}))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + new BinaryRecordDataGenerator( + new DataType[] { + DataTypes.BIGINT(), + DataTypes.VARCHAR(255), + DataTypes.TINYINT(), + DataTypes.STRING(), + DataTypes.STRING() + }) + .generate( + new Object[] { + 6L, + BinaryStringData.fromString("Frank"), + (byte) 30, + BinaryStringData.fromString("student"), + BinaryStringData.fromString("male") + }))); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup route + TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged"); + List routeDef = + Collections.singletonList( + new RouteDef( + "default_namespace.default_schema.mytable[0-9]", + mergedTable.toString(), + null)); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, sinkDef, routeDef, Collections.emptyList(), pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable); + assertThat(mergedTableSchema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.BIGINT()) + .physicalColumn("description", DataTypes.STRING()) + .physicalColumn("last_name", DataTypes.STRING()) + .physicalColumn("gender", DataTypes.STRING()) + .primaryKey("id") + .build()); + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, student], after=[], op=DELETE, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`last_name` STRING, position=LAST, existedColumnName=null}]}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, null, 24, null, Eliza, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}"); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index ee10552789..3648a1e7af 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -17,17 +17,30 @@ package org.apache.flink.cdc.runtime.operators.schema; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.StringData; +import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeFamily; +import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.cdc.common.utils.ChangeEventUtils; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse; +import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -40,10 +53,21 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.SerializedValue; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; /** * The operator will evolve schemas in {@link SchemaRegistry} for incoming {@link @@ -56,10 +80,17 @@ public class SchemaOperator extends AbstractStreamOperator private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class); + private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1); + private final List> routingRules; + + private transient List> routes; private transient TaskOperatorEventGateway toCoordinator; + private transient SchemaEvolutionClient schemaEvolutionClient; + private transient LoadingCache cachedSchemas; - public SchemaOperator() { + public SchemaOperator(List> routingRules) { + this.routingRules = routingRules; this.chainingStrategy = ChainingStrategy.ALWAYS; } @@ -70,6 +101,30 @@ public void setup( Output> output) { super.setup(containingTask, config, output); this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway(); + routes = + routingRules.stream() + .map( + tuple2 -> { + String tableInclusions = tuple2.f0; + TableId replaceBy = tuple2.f1; + Selectors selectors = + new Selectors.SelectorsBuilder() + .includeTables(tableInclusions) + .build(); + return new Tuple2<>(selectors, replaceBy); + }) + .collect(Collectors.toList()); + schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, getOperatorID()); + cachedSchemas = + CacheBuilder.newBuilder() + .expireAfterAccess(CACHE_EXPIRE_DURATION) + .build( + new CacheLoader() { + @Override + public Schema load(TableId tableId) { + return getLatestSchema(tableId); + } + }); } /** @@ -78,29 +133,130 @@ public void setup( @Override public void processElement(StreamRecord streamRecord) { Event event = streamRecord.getValue(); + // Schema changes if (event instanceof SchemaChangeEvent) { TableId tableId = ((SchemaChangeEvent) event).tableId(); LOG.info( "Table {} received SchemaChangeEvent and start to be blocked.", tableId.toString()); handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event); + // Update caches + cachedSchemas.put(tableId, getLatestSchema(tableId)); + getRoutedTable(tableId) + .ifPresent(routed -> cachedSchemas.put(routed, getLatestSchema(routed))); return; } - output.collect(streamRecord); + + // Data changes + DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + TableId tableId = dataChangeEvent.tableId(); + Optional optionalRoutedTable = getRoutedTable(tableId); + if (optionalRoutedTable.isPresent()) { + output.collect( + new StreamRecord<>( + maybeFillInNullForEmptyColumns( + dataChangeEvent, optionalRoutedTable.get()))); + } else { + output.collect(streamRecord); + } } // ---------------------------------------------------------------------------------- + private DataChangeEvent maybeFillInNullForEmptyColumns( + DataChangeEvent originalEvent, TableId routedTableId) { + try { + Schema originalSchema = cachedSchemas.get(originalEvent.tableId()); + Schema routedTableSchema = cachedSchemas.get(routedTableId); + if (originalSchema.equals(routedTableSchema)) { + return ChangeEventUtils.recreateDataChangeEvent(originalEvent, routedTableId); + } + switch (originalEvent.op()) { + case INSERT: + return DataChangeEvent.insertEvent( + routedTableId, + regenerateRecordData( + originalEvent.after(), originalSchema, routedTableSchema), + originalEvent.meta()); + case UPDATE: + return DataChangeEvent.updateEvent( + routedTableId, + regenerateRecordData( + originalEvent.before(), originalSchema, routedTableSchema), + regenerateRecordData( + originalEvent.after(), originalSchema, routedTableSchema), + originalEvent.meta()); + case DELETE: + return DataChangeEvent.deleteEvent( + routedTableId, + regenerateRecordData( + originalEvent.before(), originalSchema, routedTableSchema), + originalEvent.meta()); + case REPLACE: + return DataChangeEvent.replaceEvent( + routedTableId, + regenerateRecordData( + originalEvent.after(), originalSchema, routedTableSchema), + originalEvent.meta()); + default: + throw new IllegalArgumentException( + String.format( + "Unrecognized operation type \"%s\"", originalEvent.op())); + } + } catch (Exception e) { + throw new IllegalStateException("Unable to fill null for empty columns", e); + } + } + + private RecordData regenerateRecordData( + RecordData recordData, Schema originalSchema, Schema routedTableSchema) { + // Regenerate record data + List fieldGetters = new ArrayList<>(); + for (Column column : routedTableSchema.getColumns()) { + String columnName = column.getName(); + int columnIndex = originalSchema.getColumnNames().indexOf(columnName); + if (columnIndex == -1) { + fieldGetters.add(new NullFieldGetter()); + } else { + RecordData.FieldGetter fieldGetter = + RecordData.createFieldGetter( + originalSchema.getColumn(columnName).get().getType(), columnIndex); + // Check type compatibility + if (originalSchema.getColumn(columnName).get().getType().equals(column.getType())) { + fieldGetters.add(fieldGetter); + } else { + fieldGetters.add(new TypeCoercionFieldGetter(column.getType(), fieldGetter)); + } + } + } + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator( + routedTableSchema.getColumnDataTypes().toArray(new DataType[0])); + return recordDataGenerator.generate( + fieldGetters.stream() + .map(fieldGetter -> fieldGetter.getFieldOrNull(recordData)) + .toArray()); + } + + private Optional getRoutedTable(TableId originalTableId) { + for (Tuple2 route : routes) { + if (route.f0.isMatch(originalTableId)) { + return Optional.of(route.f1); + } + } + return Optional.empty(); + } + private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) { // The request will need to send a FlushEvent or block until flushing finished SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); - if (response.isShouldSendFlushEvent()) { + if (!response.getSchemaChangeEvents().isEmpty()) { LOG.info( "Sending the FlushEvent for table {} in subtask {}.", tableId, getRuntimeContext().getIndexOfThisSubtask()); output.collect(new StreamRecord<>(new FlushEvent(tableId))); - output.collect(new StreamRecord<>(schemaChangeEvent)); + response.getSchemaChangeEvents().forEach(e -> output.collect(new StreamRecord<>(e))); // The request will block until flushing finished in each sink writer requestReleaseUpstream(); } @@ -127,4 +283,90 @@ RESPONSE sendRequestToCoordinator(REQUEST request) { "Failed to send request to coordinator: " + request.toString(), e); } } + + private Schema getLatestSchema(TableId tableId) { + try { + Optional optionalSchema = schemaEvolutionClient.getLatestSchema(tableId); + if (!optionalSchema.isPresent()) { + throw new IllegalStateException( + String.format("Schema doesn't exist for table \"%s\"", tableId)); + } + return optionalSchema.get(); + } catch (Exception e) { + throw new IllegalStateException( + String.format("Unable to get latest schema for table \"%s\"", tableId)); + } + } + + private static class NullFieldGetter implements RecordData.FieldGetter { + @Nullable + @Override + public Object getFieldOrNull(RecordData recordData) { + return null; + } + } + + private static class TypeCoercionFieldGetter implements RecordData.FieldGetter { + private final DataType destinationType; + private final RecordData.FieldGetter originalFieldGetter; + + public TypeCoercionFieldGetter( + DataType destinationType, RecordData.FieldGetter originalFieldGetter) { + this.destinationType = destinationType; + this.originalFieldGetter = originalFieldGetter; + } + + @Nullable + @Override + public Object getFieldOrNull(RecordData recordData) { + Object originalField = originalFieldGetter.getFieldOrNull(recordData); + if (originalField == null) { + return null; + } + if (destinationType.is(DataTypeRoot.BIGINT)) { + if (originalField instanceof Byte) { + // TINYINT + return ((Byte) originalField).longValue(); + } else if (originalField instanceof Short) { + // SMALLINT + return ((Short) originalField).longValue(); + } else if (originalField instanceof Integer) { + // INT + return ((Integer) originalField).longValue(); + } else { + throw new IllegalArgumentException( + String.format( + "Cannot fit type \"%s\" into a BIGINT column. " + + "Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column", + originalField.getClass())); + } + } else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { + if (originalField instanceof Float) { + // FLOAT + return ((Float) originalField).doubleValue(); + } else { + throw new IllegalArgumentException( + String.format( + "Cannot fit type \"%s\" into a DOUBLE column. " + + "Currently only FLOAT can be accepted by a DOUBLE column", + originalField.getClass())); + } + } else if (destinationType.is(DataTypeRoot.VARCHAR)) { + if (originalField instanceof StringData) { + return originalField; + } else { + throw new IllegalArgumentException( + String.format( + "Cannot fit type \"%s\" into a STRING column. " + + "Currently only CHAR / VARCHAR can be accepted by a STRING column", + originalField.getClass())); + } + } else { + throw new IllegalArgumentException( + String.format( + "Column type \"%s\" doesn't support type coercion", + destinationType)); + } + } + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java index 7e691019a3..7ee348e798 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java @@ -17,8 +17,10 @@ package org.apache.flink.cdc.runtime.operators.schema; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -27,6 +29,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import java.util.List; + /** Factory to create {@link SchemaOperator}. */ @Internal public class SchemaOperatorFactory extends SimpleOperatorFactory @@ -35,15 +39,18 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory private static final long serialVersionUID = 1L; private final MetadataApplier metadataApplier; + private final List> routingRules; - public SchemaOperatorFactory(MetadataApplier metadataApplier) { - super(new SchemaOperator()); + public SchemaOperatorFactory( + MetadataApplier metadataApplier, List> routingRules) { + super(new SchemaOperator(routingRules)); this.metadataApplier = metadataApplier; + this.routingRules = routingRules; } @Override public OperatorCoordinator.Provider getCoordinatorProvider( String operatorName, OperatorID operatorID) { - return new SchemaRegistryProvider(operatorID, operatorName, metadataApplier); + return new SchemaRegistryProvider(operatorID, operatorName, metadataApplier, routingRules); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java new file mode 100644 index 0000000000..baf1ad66c7 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.coordinator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeFamily; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.utils.ChangeEventUtils; +import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** Derive schema changes based on the routing rules. */ +public class SchemaDerivation { + private final SchemaManager schemaManager; + private final List> routes; + private final Map> derivationMapping; + + public SchemaDerivation( + SchemaManager schemaManager, + List> routes, + Map> derivationMapping) { + this.schemaManager = schemaManager; + this.routes = routes; + this.derivationMapping = derivationMapping; + } + + public List applySchemaChange(SchemaChangeEvent schemaChangeEvent) { + for (Tuple2 route : routes) { + TableId originalTable = schemaChangeEvent.tableId(); + + // Check routing table + if (!route.f0.isMatch(originalTable)) { + continue; + } + + // Matched a routing rule + TableId derivedTable = route.f1; + Set originalTables = + derivationMapping.computeIfAbsent(derivedTable, t -> new HashSet<>()); + originalTables.add(originalTable); + + if (originalTables.size() == 1) { + // 1-to-1 mapping. Replace the table ID directly + SchemaChangeEvent derivedSchemaChangeEvent = + ChangeEventUtils.recreateSchemaChangeEvent(schemaChangeEvent, derivedTable); + schemaManager.applySchemaChange(derivedSchemaChangeEvent); + return Collections.singletonList(derivedSchemaChangeEvent); + } + + // Many-to-1 mapping (merging tables) + Schema derivedTableSchema = schemaManager.getLatestSchema(derivedTable).get(); + if (schemaChangeEvent instanceof CreateTableEvent) { + return handleCreateTableEvent( + (CreateTableEvent) schemaChangeEvent, derivedTableSchema, derivedTable); + } else if (schemaChangeEvent instanceof AddColumnEvent) { + return handleAddColumnEvent( + (AddColumnEvent) schemaChangeEvent, derivedTableSchema, derivedTable); + } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) { + return handleAlterColumnTypeEvent( + (AlterColumnTypeEvent) schemaChangeEvent, derivedTableSchema, derivedTable); + } else if (schemaChangeEvent instanceof DropColumnEvent) { + return Collections.emptyList(); + } else if (schemaChangeEvent instanceof RenameColumnEvent) { + return handleRenameColumnEvent( + (RenameColumnEvent) schemaChangeEvent, derivedTableSchema, derivedTable); + } else { + throw new IllegalStateException( + String.format( + "Unrecognized SchemaChangeEvent type: %s", schemaChangeEvent)); + } + } + + // No routes are matched + return Collections.singletonList(schemaChangeEvent); + } + + public Map> getDerivationMapping() { + return derivationMapping; + } + + public static void serializeDerivationMapping( + SchemaDerivation schemaDerivation, DataOutputStream out) throws IOException { + TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; + // Serialize derivation mapping in SchemaDerivation + Map> derivationMapping = schemaDerivation.getDerivationMapping(); + out.write(derivationMapping.size()); + for (Map.Entry> entry : derivationMapping.entrySet()) { + // Routed table ID + TableId routedTableId = entry.getKey(); + tableIdSerializer.serialize(routedTableId, new DataOutputViewStreamWrapper(out)); + // Original table IDs + Set originalTableIds = entry.getValue(); + out.writeInt(originalTableIds.size()); + for (TableId originalTableId : originalTableIds) { + tableIdSerializer.serialize(originalTableId, new DataOutputViewStreamWrapper(out)); + } + } + } + + public static Map> deserializerDerivationMapping(DataInputStream in) + throws IOException { + TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; + int derivationMappingSize = in.readInt(); + Map> derivationMapping = new HashMap<>(derivationMappingSize); + for (int i = 0; i < derivationMappingSize; i++) { + // Routed table ID + TableId routedTableId = + tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); + // Original table IDs + int numOriginalTables = in.readInt(); + Set originalTableIds = new HashSet<>(numOriginalTables); + for (int j = 0; j < numOriginalTables; j++) { + TableId originalTableId = + tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); + originalTableIds.add(originalTableId); + } + derivationMapping.put(routedTableId, originalTableIds); + } + return derivationMapping; + } + + private List handleRenameColumnEvent( + RenameColumnEvent renameColumnEvent, Schema derivedTableSchema, TableId derivedTable) { + List newColumns = new ArrayList<>(); + renameColumnEvent + .getNameMapping() + .forEach( + (before, after) -> { + if (derivedTableSchema.getColumn(after).isPresent()) { + return; + } + Column existedColumn = derivedTableSchema.getColumn(before).get(); + newColumns.add( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + after, + existedColumn.getType(), + existedColumn.getComment()))); + }); + List schemaChangeEvents = new ArrayList<>(); + if (!newColumns.isEmpty()) { + AddColumnEvent derivedSchemaChangeEvent = new AddColumnEvent(derivedTable, newColumns); + schemaChangeEvents.add(derivedSchemaChangeEvent); + } + schemaChangeEvents.forEach(schemaManager::applySchemaChange); + return schemaChangeEvents; + } + + private List handleAlterColumnTypeEvent( + AlterColumnTypeEvent alterColumnTypeEvent, + Schema derivedTableSchema, + TableId derivedTable) { + Map typeDifference = new HashMap<>(); + alterColumnTypeEvent + .getTypeMapping() + .forEach( + (columnName, dataType) -> { + Column existedColumnInDerivedTable = + derivedTableSchema.getColumn(columnName).get(); + if (!existedColumnInDerivedTable.getType().equals(dataType)) { + // Check type compatibility + DataType widerType = + getWiderType( + existedColumnInDerivedTable.getType(), dataType); + if (!widerType.equals(existedColumnInDerivedTable.getType())) { + typeDifference.put( + existedColumnInDerivedTable.getName(), widerType); + } + } + }); + List schemaChangeEvents = new ArrayList<>(); + if (!typeDifference.isEmpty()) { + AlterColumnTypeEvent derivedSchemaChangeEvent = + new AlterColumnTypeEvent(derivedTable, typeDifference); + schemaChangeEvents.add(derivedSchemaChangeEvent); + } + schemaChangeEvents.forEach(schemaManager::applySchemaChange); + return schemaChangeEvents; + } + + private List handleAddColumnEvent( + AddColumnEvent addColumnEvent, Schema derivedTableSchema, TableId derivedTable) { + List newColumns = new ArrayList<>(); + Map newTypeMapping = new HashMap<>(); + // Check if new column already existed in the derived table + for (AddColumnEvent.ColumnWithPosition addedColumn : addColumnEvent.getAddedColumns()) { + Optional optionalColumnInDerivedTable = + derivedTableSchema.getColumn(addedColumn.getAddColumn().getName()); + if (!optionalColumnInDerivedTable.isPresent()) { + // Non-existed column. Use AddColumn + newColumns.add(new AddColumnEvent.ColumnWithPosition(addedColumn.getAddColumn())); + } else { + // Existed column. Check type compatibility + Column existedColumnInDerivedTable = optionalColumnInDerivedTable.get(); + if (!existedColumnInDerivedTable + .getType() + .equals(addedColumn.getAddColumn().getType())) { + DataType widerType = + getWiderType( + existedColumnInDerivedTable.getType(), + addedColumn.getAddColumn().getType()); + if (!widerType.equals(existedColumnInDerivedTable.getType())) { + newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType); + } + } + } + } + + List schemaChangeEvents = new ArrayList<>(); + if (!newColumns.isEmpty()) { + schemaChangeEvents.add(new AddColumnEvent(derivedTable, newColumns)); + } + if (!newTypeMapping.isEmpty()) { + schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping)); + } + schemaChangeEvents.forEach(schemaManager::applySchemaChange); + return schemaChangeEvents; + } + + private List handleCreateTableEvent( + CreateTableEvent createTableEvent, Schema derivedTableSchema, TableId derivedTable) { + List newColumns = new ArrayList<>(); + Map newTypeMapping = new HashMap<>(); + // Check if there is any columns that doesn't exist in the derived table + // and perform add-column for non-existed columns. + for (Column column : createTableEvent.getSchema().getColumns()) { + Optional optionalColumnInDerivedTable = + derivedTableSchema.getColumn(column.getName()); + if (!optionalColumnInDerivedTable.isPresent()) { + // Non-existed column. Use AddColumn + newColumns.add(new AddColumnEvent.ColumnWithPosition(column)); + } else { + // Existed column. Check type compatibility + Column existedColumnInDerivedTable = optionalColumnInDerivedTable.get(); + if (!existedColumnInDerivedTable.getType().equals(column.getType())) { + DataType widerType = + getWiderType(existedColumnInDerivedTable.getType(), column.getType()); + if (!widerType.equals(existedColumnInDerivedTable.getType())) { + newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType); + } + } + } + } + + List schemaChangeEvents = new ArrayList<>(); + if (!newColumns.isEmpty()) { + schemaChangeEvents.add(new AddColumnEvent(derivedTable, newColumns)); + } + if (!newTypeMapping.isEmpty()) { + schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping)); + } + schemaChangeEvents.forEach(schemaManager::applySchemaChange); + return schemaChangeEvents; + } + + private DataType getWiderType(DataType thisType, DataType thatType) { + if (thisType.equals(thatType)) { + return thisType; + } + if (thisType.is(DataTypeFamily.INTEGER_NUMERIC) + && thatType.is(DataTypeFamily.INTEGER_NUMERIC)) { + return DataTypes.BIGINT(); + } + if (thisType.is(DataTypeFamily.CHARACTER_STRING) + && thatType.is(DataTypeFamily.CHARACTER_STRING)) { + return DataTypes.STRING(); + } + if (thisType.is(DataTypeFamily.APPROXIMATE_NUMERIC) + && thatType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { + return DataTypes.DOUBLE(); + } + throw new IllegalStateException( + String.format("Incompatible types: \"%s\" and \"%s\"", thisType, thatType)); + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index 57a82b603a..82ae21b867 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -17,7 +17,9 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent; @@ -43,7 +45,9 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap; @@ -82,22 +86,30 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH /** Metadata applier for applying schema changes to external system. */ private final MetadataApplier metadataApplier; + private final List> routes; + /** The request handler that handle all requests and events. */ private SchemaRegistryRequestHandler requestHandler; /** Schema manager for tracking schemas of all tables. */ private SchemaManager schemaManager = new SchemaManager(); + private SchemaDerivation schemaDerivation; + public SchemaRegistry( String operatorName, OperatorCoordinator.Context context, - MetadataApplier metadataApplier) { + MetadataApplier metadataApplier, + List> routes) { this.context = context; this.operatorName = operatorName; this.failedReasons = new HashMap<>(); this.metadataApplier = metadataApplier; + this.routes = routes; schemaManager = new SchemaManager(); - requestHandler = new SchemaRegistryRequestHandler(metadataApplier, schemaManager); + schemaDerivation = new SchemaDerivation(schemaManager, routes, new HashMap<>()); + requestHandler = + new SchemaRegistryRequestHandler(metadataApplier, schemaManager, schemaDerivation); } @Override @@ -141,6 +153,8 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(schemaManager); out.writeInt(serializedSchemaManager.length); out.write(serializedSchemaManager); + // Serialize SchemaDerivation mapping + SchemaDerivation.serializeDerivationMapping(schemaDerivation, out); resultFuture.complete(baos.toByteArray()); } } @@ -181,7 +195,12 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData schemaManager = SchemaManager.SERIALIZER.deserialize( schemaManagerSerializerVersion, serializedSchemaManager); - requestHandler = new SchemaRegistryRequestHandler(metadataApplier, schemaManager); + Map> derivationMapping = + SchemaDerivation.deserializerDerivationMapping(in); + schemaDerivation = new SchemaDerivation(schemaManager, routes, derivationMapping); + requestHandler = + new SchemaRegistryRequestHandler( + metadataApplier, schemaManager, schemaDerivation); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java index 1536f8a826..1f6e7aaf57 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java @@ -17,11 +17,17 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import java.util.List; +import java.util.stream.Collectors; + /** Provider of {@link SchemaRegistry}. */ @Internal public class SchemaRegistryProvider implements OperatorCoordinator.Provider { @@ -30,12 +36,17 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider { private final OperatorID operatorID; private final String operatorName; private final MetadataApplier metadataApplier; + private final List> routingRules; public SchemaRegistryProvider( - OperatorID operatorID, String operatorName, MetadataApplier metadataApplier) { + OperatorID operatorID, + String operatorName, + MetadataApplier metadataApplier, + List> routingRules) { this.operatorID = operatorID; this.operatorName = operatorName; this.metadataApplier = metadataApplier; + this.routingRules = routingRules; } @Override @@ -45,6 +56,19 @@ public OperatorID getOperatorId() { @Override public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception { - return new SchemaRegistry(operatorName, context, metadataApplier); + List> routes = + routingRules.stream() + .map( + tuple2 -> { + String tableInclusions = tuple2.f0; + TableId replaceBy = tuple2.f1; + Selectors selectors = + new Selectors.SelectorsBuilder() + .includeTables(tableInclusions) + .build(); + return new Tuple2<>(selectors, replaceBy); + }) + .collect(Collectors.toList()); + return new SchemaRegistry(operatorName, context, metadataApplier, routes); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 8b11e9509b..f9dfdb781c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -33,6 +33,7 @@ import javax.annotation.concurrent.NotThreadSafe; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -55,6 +56,8 @@ public class SchemaRegistryRequestHandler { /** Schema manager holding schema for all tables. */ private final SchemaManager schemaManager; + private final SchemaDerivation schemaDerivation; + /** * Not applied SchemaChangeRequest before receiving all flush success events for its table from * sink writers. @@ -64,12 +67,15 @@ public class SchemaRegistryRequestHandler { private final Set flushedSinkWriters; public SchemaRegistryRequestHandler( - MetadataApplier metadataApplier, SchemaManager schemaManager) { + MetadataApplier metadataApplier, + SchemaManager schemaManager, + SchemaDerivation schemaDerivation) { this.metadataApplier = metadataApplier; this.activeSinkWriters = new HashSet<>(); this.flushedSinkWriters = new HashSet<>(); this.pendingSchemaChanges = new LinkedList<>(); this.schemaManager = schemaManager; + this.schemaDerivation = schemaDerivation; } /** @@ -96,13 +102,22 @@ public CompletableFuture handleSchemaChangeRequest( request.getTableId().toString()); if (request.getSchemaChangeEvent() instanceof CreateTableEvent && schemaManager.schemaExists(request.getTableId())) { - return CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(false))); + return CompletableFuture.completedFuture( + wrap(new SchemaChangeResponse(Collections.emptyList()))); } - CompletableFuture response = - CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true))); schemaManager.applySchemaChange(request.getSchemaChangeEvent()); - pendingSchemaChanges.add(new PendingSchemaChange(request, response)); - pendingSchemaChanges.get(0).startToWaitForReleaseRequest(); + List derivedSchemaChangeEvents = + schemaDerivation.applySchemaChange(request.getSchemaChangeEvent()); + CompletableFuture response = + CompletableFuture.completedFuture( + wrap(new SchemaChangeResponse(derivedSchemaChangeEvents))); + if (!derivedSchemaChangeEvents.isEmpty()) { + PendingSchemaChange pendingSchemaChange = + new PendingSchemaChange(request, response); + pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents; + pendingSchemaChanges.add(pendingSchemaChange); + pendingSchemaChanges.get(0).startToWaitForReleaseRequest(); + } return response; } else { LOG.info("There are already processing requests. Wait for processing."); @@ -147,7 +162,8 @@ public void flushSuccess(TableId tableId, int sinkSubtask) { "All sink subtask have flushed for table {}. Start to apply schema change.", tableId.toString()); PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0); - applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent()); + waitFlushSuccess.derivedSchemaChangeEvents.forEach( + schemaChangeEvent -> applySchemaChange(tableId, schemaChangeEvent)); waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse())); if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) { @@ -166,21 +182,27 @@ private void startNextSchemaChangeRequest() { && schemaManager.schemaExists(request.getTableId())) { pendingSchemaChange .getResponseFuture() - .complete(wrap(new SchemaChangeResponse(false))); + .complete(wrap(new SchemaChangeResponse(Collections.emptyList()))); pendingSchemaChanges.remove(0); } else { schemaManager.applySchemaChange(request.getSchemaChangeEvent()); + List derivedSchemaChangeEvents = + schemaDerivation.applySchemaChange(request.getSchemaChangeEvent()); pendingSchemaChange .getResponseFuture() - .complete(wrap(new SchemaChangeResponse(true))); - pendingSchemaChange.startToWaitForReleaseRequest(); - break; + .complete(wrap(new SchemaChangeResponse(derivedSchemaChangeEvents))); + if (!derivedSchemaChangeEvents.isEmpty()) { + pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents; + pendingSchemaChange.startToWaitForReleaseRequest(); + break; + } } } } private static class PendingSchemaChange { private final SchemaChangeRequest changeRequest; + private List derivedSchemaChangeEvents; private CompletableFuture responseFuture; private RequestStatus status; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java index 1e609fc701..142de431ed 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java @@ -17,10 +17,12 @@ package org.apache.flink.cdc.runtime.operators.schema.event; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import java.util.List; import java.util.Objects; /** @@ -34,14 +36,14 @@ public class SchemaChangeResponse implements CoordinationResponse { * Whether the SchemaOperator need to buffer data and the SchemaOperatorCoordinator need to wait * for flushing. */ - private final boolean shouldSendFlushEvent; + private final List schemaChangeEvents; - public SchemaChangeResponse(boolean shouldSendFlushEvent) { - this.shouldSendFlushEvent = shouldSendFlushEvent; + public SchemaChangeResponse(List schemaChangeEvents) { + this.schemaChangeEvents = schemaChangeEvents; } - public boolean isShouldSendFlushEvent() { - return shouldSendFlushEvent; + public List getSchemaChangeEvents() { + return schemaChangeEvents; } @Override @@ -53,11 +55,11 @@ public boolean equals(Object o) { return false; } SchemaChangeResponse response = (SchemaChangeResponse) o; - return shouldSendFlushEvent == response.shouldSendFlushEvent; + return schemaChangeEvents.equals(response.schemaChangeEvents); } @Override public int hashCode() { - return Objects.hash(shouldSendFlushEvent); + return Objects.hash(schemaChangeEvents); } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/route/RouteFunctionTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/route/RouteFunctionTest.java deleted file mode 100644 index 1f3652a506..0000000000 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/route/RouteFunctionTest.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.route; - -import org.apache.flink.cdc.common.data.binary.BinaryStringData; -import org.apache.flink.cdc.common.event.AddColumnEvent; -import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; -import org.apache.flink.cdc.common.event.CreateTableEvent; -import org.apache.flink.cdc.common.event.DataChangeEvent; -import org.apache.flink.cdc.common.event.DropColumnEvent; -import org.apache.flink.cdc.common.event.OperationType; -import org.apache.flink.cdc.common.event.RenameColumnEvent; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.schema.Column; -import org.apache.flink.cdc.common.schema.PhysicalColumn; -import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.types.DataType; -import org.apache.flink.cdc.common.types.DataTypes; -import org.apache.flink.cdc.common.types.RowType; -import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; -import org.apache.flink.configuration.Configuration; - -import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; - -import org.junit.jupiter.api.Test; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat; - -class RouteFunctionTest { - private static final TableId CUSTOMERS = - TableId.tableId("my_company", "my_branch", "customers"); - private static final TableId NEW_CUSTOMERS = - TableId.tableId("my_new_company", "my_new_branch", "customers"); - private static final Schema CUSTOMERS_SCHEMA = - Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) - .physicalColumn("name", DataTypes.STRING()) - .physicalColumn("phone", DataTypes.BIGINT()) - .primaryKey("id") - .build(); - - @Test - void testDataChangeEventRouting() throws Exception { - RouteFunction router = - RouteFunction.newBuilder() - .addRoute("my_company.\\.+.customers", NEW_CUSTOMERS) - .build(); - router.open(new Configuration()); - BinaryRecordDataGenerator recordDataGenerator = - new BinaryRecordDataGenerator(((RowType) CUSTOMERS_SCHEMA.toRowDataType())); - - // Insert - DataChangeEvent insertEvent = - DataChangeEvent.insertEvent( - CUSTOMERS, - recordDataGenerator.generate( - new Object[] {1, new BinaryStringData("Alice"), 12345678L})); - assertThat(router.map(insertEvent)) - .asDataChangeEvent() - .hasTableId(NEW_CUSTOMERS) - .hasOperationType(OperationType.INSERT) - .withAfterRecordData() - .hasArity(3) - .withSchema(CUSTOMERS_SCHEMA) - .hasFields(1, new BinaryStringData("Alice"), 12345678L); - - // Update - DataChangeEvent updateEvent = - DataChangeEvent.updateEvent( - CUSTOMERS, - recordDataGenerator.generate( - new Object[] {1, new BinaryStringData("Alice"), 12345678L}), - recordDataGenerator.generate( - new Object[] {1, new BinaryStringData("Alice"), 87654321L})); - DataChangeEvent mappedUpdateEvent = (DataChangeEvent) router.map(updateEvent); - assertThat(mappedUpdateEvent) - .hasTableId(NEW_CUSTOMERS) - .hasOperationType(OperationType.UPDATE); - assertThat(mappedUpdateEvent.before()) - .withSchema(CUSTOMERS_SCHEMA) - .hasFields(1, new BinaryStringData("Alice"), 12345678L); - assertThat(mappedUpdateEvent.after()) - .withSchema(CUSTOMERS_SCHEMA) - .hasFields(1, new BinaryStringData("Alice"), 87654321L); - - // Replace - DataChangeEvent replaceEvent = - DataChangeEvent.replaceEvent( - CUSTOMERS, - recordDataGenerator.generate( - new Object[] {1, new BinaryStringData("Bob"), 87654321L})); - assertThat(router.map(replaceEvent)) - .asDataChangeEvent() - .hasTableId(NEW_CUSTOMERS) - .hasOperationType(OperationType.REPLACE) - .withAfterRecordData() - .hasArity(3) - .withSchema(CUSTOMERS_SCHEMA) - .hasFields(1, new BinaryStringData("Bob"), 87654321L); - - // Delete - DataChangeEvent deleteEvent = - DataChangeEvent.deleteEvent( - CUSTOMERS, - recordDataGenerator.generate( - new Object[] {1, new BinaryStringData("Bob"), 87654321L})); - assertThat(router.map(deleteEvent)) - .asDataChangeEvent() - .hasTableId(NEW_CUSTOMERS) - .hasOperationType(OperationType.DELETE) - .withBeforeRecordData() - .hasArity(3) - .withSchema(CUSTOMERS_SCHEMA) - .hasFields(1, new BinaryStringData("Bob"), 87654321L); - } - - @Test - void testSchemaChangeEventRouting() throws Exception { - RouteFunction router = - RouteFunction.newBuilder() - .addRoute("\\.+_company.\\.+_branch.customers", NEW_CUSTOMERS) - .build(); - router.open(new Configuration()); - - // CreateTableEvent - CreateTableEvent createTableEvent = new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA); - assertThat(router.map(createTableEvent)) - .asSchemaChangeEvent() - .hasTableId(NEW_CUSTOMERS) - .asCreateTableEvent() - .hasSchema(CUSTOMERS_SCHEMA); - - // AddColumnEvent - AddColumnEvent.ColumnWithPosition newColumn = - new AddColumnEvent.ColumnWithPosition( - Column.physicalColumn("address", DataTypes.STRING()), - AddColumnEvent.ColumnPosition.LAST, - null); - AddColumnEvent addColumnEvent = - new AddColumnEvent(CUSTOMERS, Collections.singletonList(newColumn)); - assertThat(router.map(addColumnEvent)) - .asSchemaChangeEvent() - .asAddColumnEvent() - .hasTableId(NEW_CUSTOMERS) - .containsAddedColumns(newColumn); - - // DropColumnEvent - PhysicalColumn droppedColumn = Column.physicalColumn("address", DataTypes.STRING()); - List droppedColumns = Collections.singletonList(droppedColumn.getName()); - DropColumnEvent dropColumnEvent = new DropColumnEvent(CUSTOMERS, droppedColumns); - assertThat(router.map(dropColumnEvent)) - .asSchemaChangeEvent() - .asDropColumnEvent() - .containsDroppedColumns(droppedColumn.getName()) - .hasTableId(NEW_CUSTOMERS); - - // RenameColumnEvent - Map columnRenaming = ImmutableMap.of("phone", "mobile"); - RenameColumnEvent renameColumnEvent = new RenameColumnEvent(CUSTOMERS, columnRenaming); - assertThat(router.map(renameColumnEvent)) - .asSchemaChangeEvent() - .asRenameColumnEvent() - .containsNameMapping(columnRenaming) - .hasTableId(NEW_CUSTOMERS); - - // AlterColumnTypeEvent - Map typeMapping = ImmutableMap.of("mobile", DataTypes.STRING()); - AlterColumnTypeEvent alterColumnTypeEvent = - new AlterColumnTypeEvent(CUSTOMERS, typeMapping); - assertThat(router.map(alterColumnTypeEvent)) - .asSchemaChangeEvent() - .asAlterColumnTypeEvent() - .containsTypeMapping(typeMapping) - .hasTableId(NEW_CUSTOMERS); - } -} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java index ad8c8b39b3..24ee75f483 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java @@ -97,7 +97,7 @@ private OneInputStreamOperatorTestHarness createTestHarness( int maxParallelism, int parallelism, int subtaskIndex, OperatorID opID) throws Exception { return new OneInputStreamOperatorTestHarness<>( - new SchemaOperator(), + new SchemaOperator(new ArrayList<>()), maxParallelism, parallelism, subtaskIndex, diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java new file mode 100644 index 0000000000..670286cb68 --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.coordinator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit test for {@link SchemaDerivation}. */ +class SchemaDerivationTest { + + private static final TableId TABLE_1 = TableId.tableId("mydb", "myschema", "mytable1"); + private static final TableId TABLE_2 = TableId.tableId("mydb", "myschema", "mytable2"); + private static final TableId MERGED_TABLE = TableId.tableId("mydb", "myschema", "mytables"); + + private static final Schema SCHEMA = + Schema.newBuilder() + .column(Column.physicalColumn("id", DataTypes.BIGINT())) + .column(Column.physicalColumn("name", DataTypes.STRING())) + .column(Column.physicalColumn("age", DataTypes.INT())) + .build(); + + private static final Schema COMPATIBLE_SCHEMA = + Schema.newBuilder() + .column(Column.physicalColumn("id", DataTypes.BIGINT())) + .column(Column.physicalColumn("name", DataTypes.STRING())) + .column(Column.physicalColumn("age", DataTypes.BIGINT())) + .column(Column.physicalColumn("gender", DataTypes.STRING())) + .build(); + + private static final Schema INCOMPATIBLE_SCHEMA = + Schema.newBuilder() + .column(Column.physicalColumn("id", DataTypes.BIGINT())) + .column(Column.physicalColumn("name", DataTypes.STRING())) + .column(Column.physicalColumn("age", DataTypes.STRING())) + .column(Column.physicalColumn("gender", DataTypes.STRING())) + .build(); + + private static final List> ROUTES = + Collections.singletonList( + Tuple2.of( + new Selectors.SelectorsBuilder() + .includeTables("mydb.myschema.mytable[0-9]") + .build(), + MERGED_TABLE)); + + @Test + void testOneToOneMapping() { + SchemaDerivation schemaDerivation = + new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>()); + + // Create table + List derivedChangesAfterCreateTable = + schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA)); + assertThat(derivedChangesAfterCreateTable).hasSize(1); + assertThat(derivedChangesAfterCreateTable.get(0)) + .asCreateTableEvent() + .hasTableId(MERGED_TABLE) + .hasSchema(SCHEMA); + + // Add column + AddColumnEvent.ColumnWithPosition newCol1 = + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("new_col1", DataTypes.STRING(), null)); + AddColumnEvent.ColumnWithPosition newCol2 = + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("new_col2", DataTypes.STRING(), null)); + List newColumns = Arrays.asList(newCol1, newCol2); + List derivedChangesAfterAddColumn = + schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_1, newColumns)); + assertThat(derivedChangesAfterAddColumn).hasSize(1); + assertThat(derivedChangesAfterAddColumn.get(0)) + .asAddColumnEvent() + .hasTableId(MERGED_TABLE) + .containsAddedColumns(newCol1, newCol2); + + // Alter column type + ImmutableMap typeMapping = ImmutableMap.of("age", DataTypes.BIGINT()); + List derivedChangesAfterAlterTableType = + schemaDerivation.applySchemaChange(new AlterColumnTypeEvent(TABLE_1, typeMapping)); + assertThat(derivedChangesAfterAlterTableType).hasSize(1); + assertThat(derivedChangesAfterAlterTableType.get(0)) + .asAlterColumnTypeEvent() + .hasTableId(MERGED_TABLE) + .containsTypeMapping(typeMapping); + + // Drop column + List droppedColumns = Arrays.asList("new_col1", "new_col2"); + List derivedChangesAfterDropColumn = + schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_1, droppedColumns)); + assertThat(derivedChangesAfterDropColumn).hasSize(1); + assertThat(derivedChangesAfterDropColumn.get(0)) + .asDropColumnEvent() + .hasTableId(MERGED_TABLE) + .containsDroppedColumns("new_col1", "new_col2"); + + // Rename column + Map renamedColumns = ImmutableMap.of("name", "last_name"); + List derivedChangesAfterRenameColumn = + schemaDerivation.applySchemaChange(new RenameColumnEvent(TABLE_1, renamedColumns)); + assertThat(derivedChangesAfterRenameColumn).hasSize(1); + assertThat(derivedChangesAfterRenameColumn.get(0)) + .asRenameColumnEvent() + .hasTableId(MERGED_TABLE) + .containsNameMapping(renamedColumns); + } + + @Test + void testMergingTablesWithExactSameSchema() { + SchemaDerivation schemaDerivation = + new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>()); + + // Create table 1 + List derivedChangesAfterCreateTable = + schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA)); + assertThat(derivedChangesAfterCreateTable).hasSize(1); + assertThat(derivedChangesAfterCreateTable.get(0)) + .asCreateTableEvent() + .hasTableId(MERGED_TABLE) + .hasSchema(SCHEMA); + // Create table 2 + assertThat(schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_2, SCHEMA))) + .isEmpty(); + + // Add column for table 1 + AddColumnEvent.ColumnWithPosition newCol1 = + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("new_col1", DataTypes.STRING(), null)); + AddColumnEvent.ColumnWithPosition newCol2 = + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("new_col2", DataTypes.STRING(), null)); + List newColumns = Arrays.asList(newCol1, newCol2); + List derivedChangesAfterAddColumn = + schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_1, newColumns)); + assertThat(derivedChangesAfterAddColumn).hasSize(1); + assertThat(derivedChangesAfterAddColumn.get(0)) + .asAddColumnEvent() + .hasTableId(MERGED_TABLE) + .containsAddedColumns(newCol1, newCol2); + // Add column for table 2 + assertThat(schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_2, newColumns))) + .isEmpty(); + + // Alter column type for table 1 + ImmutableMap typeMapping = ImmutableMap.of("age", DataTypes.BIGINT()); + List derivedChangesAfterAlterColumnType = + schemaDerivation.applySchemaChange(new AlterColumnTypeEvent(TABLE_1, typeMapping)); + assertThat(derivedChangesAfterAlterColumnType).hasSize(1); + assertThat(derivedChangesAfterAlterColumnType.get(0)) + .asAlterColumnTypeEvent() + .hasTableId(MERGED_TABLE) + .containsTypeMapping(typeMapping); + // Alter column type for table 2 + assertThat( + schemaDerivation.applySchemaChange( + new AlterColumnTypeEvent(TABLE_2, typeMapping))) + .isEmpty(); + + // Drop column for table 1 + List droppedColumns = Arrays.asList("new_col1", "new_col2"); + assertThat(schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_1, droppedColumns))) + .isEmpty(); + // Drop column for table 2 + assertThat(schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_2, droppedColumns))) + .isEmpty(); + + // Rename column for table 1 + Map renamedColumns = ImmutableMap.of("name", "last_name"); + List derivedChangesAfterRenameColumn = + schemaDerivation.applySchemaChange(new RenameColumnEvent(TABLE_1, renamedColumns)); + assertThat(derivedChangesAfterRenameColumn).hasSize(1); + assertThat(derivedChangesAfterRenameColumn.get(0)) + .asAddColumnEvent() + .hasTableId(MERGED_TABLE) + .containsAddedColumns( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("last_name", DataTypes.STRING(), null))); + // Rename column for table 2 + assertThat( + schemaDerivation.applySchemaChange( + new RenameColumnEvent(TABLE_2, renamedColumns))) + .isEmpty(); + } + + @Test + void testMergingTableWithDifferentSchemas() { + SchemaManager schemaManager = new SchemaManager(); + SchemaDerivation schemaDerivation = + new SchemaDerivation(schemaManager, ROUTES, new HashMap<>()); + // Create table 1 + List derivedChangesAfterCreateTable = + schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA)); + assertThat(derivedChangesAfterCreateTable).hasSize(1); + assertThat(derivedChangesAfterCreateTable.get(0)) + .asCreateTableEvent() + .hasTableId(MERGED_TABLE) + .hasSchema(SCHEMA); + // Create table 2 + List derivedChangesAfterCreateTable2 = + schemaDerivation.applySchemaChange( + new CreateTableEvent(TABLE_2, COMPATIBLE_SCHEMA)); + assertThat(derivedChangesAfterCreateTable2).hasSize(2); + assertThat(derivedChangesAfterCreateTable2) + .containsExactlyInAnyOrder( + new AddColumnEvent( + MERGED_TABLE, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "gender", DataTypes.STRING(), null)))), + new AlterColumnTypeEvent( + MERGED_TABLE, ImmutableMap.of("age", DataTypes.BIGINT()))); + + // Add column for table 1 + AddColumnEvent.ColumnWithPosition newCol1 = + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("new_col1", DataTypes.VARCHAR(255), null)); + AddColumnEvent.ColumnWithPosition newCol2 = + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("new_col2", DataTypes.VARCHAR(255), null)); + List newColumns = Arrays.asList(newCol1, newCol2); + List derivedChangesAfterAddColumn = + schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_1, newColumns)); + assertThat(derivedChangesAfterAddColumn).hasSize(1); + assertThat(derivedChangesAfterAddColumn.get(0)) + .asAddColumnEvent() + .hasTableId(MERGED_TABLE) + .containsAddedColumns(newCol1, newCol2); + // Add column for table 2 + List derivedChangesAfterAddColumnForTable2 = + schemaDerivation.applySchemaChange( + new AddColumnEvent( + TABLE_2, + Arrays.asList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "new_col1", DataTypes.STRING(), null)), + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "new_col2", DataTypes.STRING(), null))))); + assertThat(derivedChangesAfterAddColumnForTable2).hasSize(1); + assertThat(derivedChangesAfterAddColumnForTable2.get(0)) + .asAlterColumnTypeEvent() + .containsTypeMapping( + ImmutableMap.of( + "new_col1", DataTypes.STRING(), "new_col2", DataTypes.STRING())); + + // Alter column type for table 1 + ImmutableMap typeMapping = ImmutableMap.of("age", DataTypes.BIGINT()); + List derivedChangesAfterAlterColumnType = + schemaDerivation.applySchemaChange(new AlterColumnTypeEvent(TABLE_1, typeMapping)); + assertThat(derivedChangesAfterAlterColumnType).isEmpty(); + // Alter column type for table 2 + List derivedChangesAfterAlterColumnTypeForTable2 = + schemaDerivation.applySchemaChange( + new AlterColumnTypeEvent( + TABLE_2, ImmutableMap.of("age", DataTypes.TINYINT()))); + assertThat(derivedChangesAfterAlterColumnTypeForTable2).isEmpty(); + + // Drop column for table 1 + List droppedColumns = Arrays.asList("new_col1", "new_col2"); + assertThat(schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_1, droppedColumns))) + .isEmpty(); + // Drop column for table 2 + assertThat(schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_2, droppedColumns))) + .isEmpty(); + + // Rename column for table 1 + Map renamedColumns = ImmutableMap.of("name", "last_name"); + List derivedChangesAfterRenameColumn = + schemaDerivation.applySchemaChange(new RenameColumnEvent(TABLE_1, renamedColumns)); + assertThat(derivedChangesAfterRenameColumn).hasSize(1); + assertThat(derivedChangesAfterRenameColumn.get(0)) + .asAddColumnEvent() + .hasTableId(MERGED_TABLE) + .containsAddedColumns( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("last_name", DataTypes.STRING(), null))); + // Rename column for table 2 + List derivedChangesAfterRenameColumnForTable2 = + schemaDerivation.applySchemaChange( + new RenameColumnEvent(TABLE_2, ImmutableMap.of("name", "first_name"))); + assertThat(derivedChangesAfterRenameColumnForTable2).hasSize(1); + assertThat(derivedChangesAfterRenameColumnForTable2.get(0)) + .asAddColumnEvent() + .hasTableId(MERGED_TABLE) + .containsAddedColumns( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("first_name", DataTypes.STRING(), null))); + + assertThat(schemaManager.getLatestSchema(MERGED_TABLE)) + .contains( + Schema.newBuilder() + .column(Column.physicalColumn("id", DataTypes.BIGINT())) + .column(Column.physicalColumn("name", DataTypes.STRING())) + .column(Column.physicalColumn("age", DataTypes.BIGINT())) + .column(Column.physicalColumn("gender", DataTypes.STRING())) + .column(Column.physicalColumn("new_col1", DataTypes.STRING())) + .column(Column.physicalColumn("new_col2", DataTypes.STRING())) + .column(Column.physicalColumn("last_name", DataTypes.STRING())) + .column(Column.physicalColumn("first_name", DataTypes.STRING())) + .build()); + } + + @Test + void testIncompatibleTypes() { + SchemaDerivation schemaDerivation = + new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>()); + // Create table 1 + List derivedChangesAfterCreateTable = + schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA)); + assertThat(derivedChangesAfterCreateTable).hasSize(1); + assertThat(derivedChangesAfterCreateTable.get(0)) + .asCreateTableEvent() + .hasTableId(MERGED_TABLE) + .hasSchema(SCHEMA); + + // Create table 2 + assertThatThrownBy( + () -> + schemaDerivation.applySchemaChange( + new CreateTableEvent(TABLE_2, INCOMPATIBLE_SCHEMA))) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Incompatible types: \"INT\" and \"STRING\""); + } +} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java index 25c7ed9e5c..240eccadad 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java @@ -41,6 +41,7 @@ import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.util.OutputTag; +import java.util.Collections; import java.util.LinkedList; /** @@ -72,7 +73,8 @@ public EventOperatorTestHarness(OP operator, int numOutputs) { "SchemaOperator", new MockOperatorCoordinatorContext( SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()), - new CollectingMetadataApplier()); + new CollectingMetadataApplier(), + Collections.emptyList()); schemaRegistryGateway = new TestingSchemaRegistryGateway(schemaRegistry); }