Skip to content

Commit

Permalink
[FLINK-34653][cdc][route] Support table merging with route rules
Browse files Browse the repository at this point in the history
This closes  #3129.
  • Loading branch information
PatrickRen committed Apr 16, 2024
1 parent 9d150c0 commit 6017b16
Show file tree
Hide file tree
Showing 17 changed files with 1,332 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,107 +15,20 @@
* limitations under the License.
*/

package org.apache.flink.cdc.runtime.operators.route;
package org.apache.flink.cdc.common.utils;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.common.utils.Preconditions.checkState;

/** A map function that applies user-defined routing logics. */
public class RouteFunction extends RichMapFunction<Event, Event> {
private final List<Tuple2<String, TableId>> routingRules;
private transient List<Tuple2<Selectors, TableId>> routes;

public static Builder newBuilder() {
return new Builder();
}

/** Builder of {@link RouteFunction}. */
public static class Builder {
private final List<Tuple2<String, TableId>> routingRules = new ArrayList<>();

public Builder addRoute(String tableInclusions, TableId replaceBy) {
routingRules.add(Tuple2.of(tableInclusions, replaceBy));
return this;
}

public RouteFunction build() {
return new RouteFunction(routingRules);
}
}

private RouteFunction(List<Tuple2<String, TableId>> routingRules) {
this.routingRules = routingRules;
}

@Override
public void open(Configuration parameters) throws Exception {
routes =
routingRules.stream()
.map(
tuple2 -> {
String tableInclusions = tuple2.f0;
TableId replaceBy = tuple2.f1;
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables(tableInclusions)
.build();
return new Tuple2<>(selectors, replaceBy);
})
.collect(Collectors.toList());
}

@Override
public Event map(Event event) throws Exception {
checkState(
event instanceof ChangeEvent,
String.format(
"The input event of the route is not a ChangeEvent but with type \"%s\"",
event.getClass().getCanonicalName()));
ChangeEvent changeEvent = (ChangeEvent) event;
TableId tableId = changeEvent.tableId();

for (Tuple2<Selectors, TableId> route : routes) {
Selectors selectors = route.f0;
TableId replaceBy = route.f1;
if (selectors.isMatch(tableId)) {
return recreateChangeEvent(changeEvent, replaceBy);
}
}
return event;
}

private ChangeEvent recreateChangeEvent(ChangeEvent event, TableId tableId) {
if (event instanceof DataChangeEvent) {
return recreateDataChangeEvent(((DataChangeEvent) event), tableId);
}
if (event instanceof SchemaChangeEvent) {
return recreateSchemaChangeEvent(((SchemaChangeEvent) event), tableId);
}
throw new UnsupportedOperationException(
String.format(
"Unsupported change event with type \"%s\"",
event.getClass().getCanonicalName()));
}

private DataChangeEvent recreateDataChangeEvent(
/** Utilities for handling {@link org.apache.flink.cdc.common.event.ChangeEvent}s. */
public class ChangeEventUtils {
public static DataChangeEvent recreateDataChangeEvent(
DataChangeEvent dataChangeEvent, TableId tableId) {
switch (dataChangeEvent.op()) {
case INSERT:
Expand All @@ -141,7 +54,7 @@ private DataChangeEvent recreateDataChangeEvent(
}
}

private SchemaChangeEvent recreateSchemaChangeEvent(
public static SchemaChangeEvent recreateSchemaChangeEvent(
SchemaChangeEvent schemaChangeEvent, TableId tableId) {
if (schemaChangeEvent instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public SELF hasTableId(TableId tableId) {
failWithActualExpectedAndMessage(
actual.tableId(),
tableId,
"Table ID of the DataChangeEvent is not as expected");
"Table ID of the %s is not as expected",
actual.getClass().getSimpleName());
}
return myself;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator;
import org.apache.flink.cdc.composer.flink.translator.PartitioningTranslator;
import org.apache.flink.cdc.composer.flink.translator.RouteTranslator;
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
import org.apache.flink.cdc.composer.flink.translator.TransformTranslator;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
Expand Down Expand Up @@ -104,6 +103,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
// Build TransformSchemaOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
stream = transformTranslator.translateSchema(stream, pipelineDef.getTransforms());

// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
pipelineDef
Expand All @@ -121,16 +122,12 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
schemaOperatorIDGenerator.generate(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));

// Build Router used to route Event
RouteTranslator routeTranslator = new RouteTranslator();
stream = routeTranslator.translate(stream, pipelineDef.getRoute());

// Build DataSink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());

stream =
schemaOperatorTranslator.translate(
stream, parallelism, dataSink.getMetadataApplier());
stream, parallelism, dataSink.getMetadataApplier(), pipelineDef.getRoute());

// Build Partitioner used to shuffle Event
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@

package org.apache.flink.cdc.composer.flink.translator;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperatorFactory;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import java.util.ArrayList;
import java.util.List;

/** Translator used to build {@link SchemaOperator} for schema event process. */
@Internal
public class SchemaOperatorTranslator {
Expand All @@ -41,10 +47,13 @@ public SchemaOperatorTranslator(
}

public DataStream<Event> translate(
DataStream<Event> input, int parallelism, MetadataApplier metadataApplier) {
DataStream<Event> input,
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
switch (schemaChangeBehavior) {
case EVOLVE:
return addSchemaOperator(input, parallelism, metadataApplier);
return addSchemaOperator(input, parallelism, metadataApplier, routes);
case IGNORE:
return dropSchemaChangeEvent(input, parallelism);
case EXCEPTION:
Expand All @@ -61,12 +70,20 @@ public String getSchemaOperatorUid() {
}

private DataStream<Event> addSchemaOperator(
DataStream<Event> input, int parallelism, MetadataApplier metadataApplier) {
DataStream<Event> input,
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
List<Tuple2<String, TableId>> routingRules = new ArrayList<>();
for (RouteDef route : routes) {
routingRules.add(
Tuple2.of(route.getSourceTable(), TableId.parse(route.getSinkTable())));
}
SingleOutputStreamOperator<Event> stream =
input.transform(
"SchemaOperator",
new EventTypeInfo(),
new SchemaOperatorFactory(metadataApplier));
new SchemaOperatorFactory(metadataApplier, routingRules));
stream.uid(schemaOperatorUid).setParallelism(parallelism);
return stream;
}
Expand Down
Loading

0 comments on commit 6017b16

Please sign in to comment.