Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-2932][Pipeline] Flink CDC pipeline supports transform #2937

Merged
merged 5 commits into from
Apr 2, 2024

Conversation

aiwenmo
Copy link
Contributor

@aiwenmo aiwenmo commented Dec 27, 2023

This is a draft pull request, and the functionality has not been completed yet.
Please discuss the details, implementation, and optimization under this PR.
I will complete the remaining code. thx

@aiwenmo
Copy link
Contributor Author

aiwenmo commented Dec 28, 2023

What is the more appropriate way to obtain the field name and data type of dataChangeEvent. after()?

@aiwenmo
Copy link
Contributor Author

aiwenmo commented Dec 28, 2023

Change the parameter definition of transform to the following:

transform:
  - source-table: mydb.app_order_.*
    projection: id, order_id, TO_UPPER(product_name)
    filter: id > 10 AND order_id > 100
    description: project fields from source table
  - source-table: mydb.web_order_.*
    projection: CONCAT(id, order_id) as uniq_id, *
    filter: uniq_id > 10
    description: add new uniq_id for each row

@github-actions github-actions bot added the cli label Dec 28, 2023
TableId tableId = dataChangeEvent.tableId();
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
if(after == null){
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we filter out all the deletion event, then the data of target table and source table will be inconsistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx. I will fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed it.

Copy link
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this great contribution. Left some comments, we need to clearly declare support functions and data types.
What‘s more, we also need to consider the situation of restarting from a failure.

@@ -16,9 +16,78 @@

package com.ververica.cdc.composer.definition;

import java.util.Objects;
import java.util.Optional;

/**
* Definition of transformation.
*
* <p>Transformation will be implemented later, therefore we left the class blank.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment can be removed.

new ArrayList<>(
Arrays.asList(
new Column[createTableEvent.getSchema().getColumns().size()]));
Collections.copy(sourceColumn, createTableEvent.getSchema().getColumns());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply use createTableEvent.getSchema().getColumns() to build sourceColumn?

Projector projector = route.f1;
BinaryRecordData data =
projector.generateRecordData(after, sourceColumnMap.get(tableId));
return DataChangeEvent.setAfter(dataChangeEvent, data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should modify before and after record here, because sink like Kafka, pulsar need to use both to build debezium json format.

// skip delete event
if (after == null) {
return event;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete event should be processed to be consistent with source table.

private final List<Tuple3<String, String, String>> filterRules = new ArrayList<>();

public Builder addFilter(String tableInclusions, String projection, String filter) {
filterRules.add(Tuple3.of(tableInclusions, projection, filter));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like projection is useless in FilterFunction, can we remove it?

return true;
}

private void transformCreateTableEvent(CreateTableEvent createTableEvent) {
Copy link
Contributor

@lvyanquan lvyanquan Jan 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateTableEvent is not transformed here, We should use a more appropriate method name.

new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
ProjectionFunction transform =
ProjectionFunction.newBuilder()
.addProjection(CUSTOMERS_TABLEID.identifier(), "*, col1 + col2 col12")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A doc or more tests is necessary to show what kind of Projection we support after this pr. For example, can we support CAST Function now?

for (int i = 0; i < columns.size(); i++) {
originalValueMap.put(
columns.get(i).getName(),
fromDataType(after.getString(i), columns.get(i).getType()));
Copy link
Contributor

@lvyanquan lvyanquan Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are more data types in BinaryRecordData, so we may need to keep a list of FieldGetter to get the correct objects, like createFieldGetter.

return DecimalData.fromBigDecimal(
bigDecimalValue, bigDecimalValue.precision(), bigDecimalValue.scale());
default:
return BinaryStringData.fromString(value.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to support more types, such as some time types and float types.

@aiwenmo
Copy link
Contributor Author

aiwenmo commented Jan 2, 2024

@lvyanquan Thanks for your review. I will fix these issues tonight.

@aiwenmo
Copy link
Contributor Author

aiwenmo commented Jan 3, 2024

There are only two bugs that have not been fixed yet. I will fix them again today.
1.A doc or more tests is necessary to show what kind of Projection we support after this pr. For example, can we support CAST Function now?
2.We need to support more types, such as some time types and float types.

Copy link
Contributor

@PatrickRen PatrickRen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aiwenmo Thanks for the great work! I looked through the PR and left some comments. I'm not quite familiar with Calcite so please correct me if I'm wrong

* create a list of {@link RecordData.FieldGetter} from given {@link Column} to get Object from
* RecordData.
*/
public static List<RecordData.FieldGetter> createFieldGetters(List<Column> columns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the logic here is the same as createFieldGetters(Schema). What about letting createFieldGetters(Schema) reuse this one:

public static List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
    return createFieldGetters(schema.getColumns());
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx. I will optimize it.

private final DataType dataType;
private final SqlBasicCall transform;
private final JexlExpression expression;
private static final Engine jexlEngine = new Engine();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a rookie of Calcite and JEXL so please correct me if I'm wrong. It looks like projection and filter expressions are interpreted as SQL syntax first and parsed by Calcite in Projector, then evaluated by JEXL syntax during runtime. Are these two syntaxes compatible to each other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original intention of using Calcite is to provide parsing capability for Flink function calls. JEXL is a temporary solution that can handle most scenarios, and I will add some unit tests to validate it. Of course, if you have any other good suggestions, we can discuss them together.

return true;
}

private void catchSchema(CreateTableEvent createTableEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Comment on lines 45 to 48
DataStream<Event> projectionOutput =
input.map(projectionFunctionBuilder.build(), new EventTypeInfo())
.name("Transform:Projection");
return projectionOutput.filter(filterFunctionBuilder.build()).name("Transform:Filter");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the order of filter and projection can be adjusted dynamically. If there's no filter on computed columns, filter can be moved before projection. It's totally acceptable to me to hard-code the order for now and we can optimize it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can design new parameters to determine the execution order of projection and filter。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supported.

* Updates the before of a {@link DataChangeEvent} instance that describes the event with meta
* info.
*/
public static DataChangeEvent setBefore(DataChangeEvent dataChangeEvent, RecordData before) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about resetBefore/After?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good idea.

Comment on lines 551 to 589
// rename column
/*Map<String, String> nameMapping = new HashMap<>();
nameMapping.put("col2", "newCol2");
nameMapping.put("col3", "newCol3");
RenameColumnEvent renameColumnEvent = new RenameColumnEvent(TABLE_1, nameMapping);
split1.add(renameColumnEvent);*/

// drop column
/*DropColumnEvent dropColumnEvent =
new DropColumnEvent(
TABLE_1,
Collections.singletonList(
Column.physicalColumn("newCol2", DataTypes.STRING())));
split1.add(dropColumnEvent);*/

// delete
/*split1.add(
DataChangeEvent.deleteEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
})));*/

// update
/*split1.add(
DataChangeEvent.updateEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("")
}),
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("x")
})));*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to add unit tests for various scenarios, so this code has not been deleted. Next, I will add unit tests over the weekend.

@aiwenmo aiwenmo marked this pull request as ready for review January 6, 2024 13:24
Copy link
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.
I have two points to discuss:
The first one is the compatibility between JEXL and calcite, One reference is that Flink is using Janino, would it be better.
The second one is how to determine the type of compute column, if we want to support operations of two types, like int value + long value.

if (transform.isValidFilter()) {
containFilter = true;
filterFunctionBuilder.addFilter(
transform.getSourceTable(), transform.getFilter().get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems somewhat inconsistent, do we still need to set Filter to optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Filter is optional.

return schemaChangeEvent;
}

public BinaryRecordData generateRecordData(BinaryRecordData after, List<Column> columns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BinaryRecordData recordData will be better.

return true;
}
if (!(event instanceof DataChangeEvent)) {
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataChangeEvent of delete should also apply filter.

for (int i = 0; i < columns.size(); i++) {
jexlContext.set(
columns.get(i).getName(),
fromDataType(after.getString(i), columns.get(i).getType()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep a sourceColumnMap like what ProjectionFunction do to get the correct values.

}
}
columnTransformList.add(
ColumnTransform.of(columnName, DataTypes.STRING(), transform));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set all column to string type will cause inaccuracies. We need to add type inference, or simply extract one data type from relevant columns.

return recordDataGenerator;
}

public boolean isVaild() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isValid

try {
return (Boolean) expressionEvaluator.evaluate(params.toArray());
} catch (InvocationTargetException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use LOG.

import java.util.stream.Collectors;

/** A data process function that applies user-defined transform logics. */
public class TransformDataFunction extends AbstractStreamOperator<Event>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransformDataOperator?

.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.physicalColumn("col3", DataTypes.STRING())
.physicalColumn("col12", DataTypes.STRING())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual column orders should be like the following?

.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.physicalColumn("col12", DataTypes.STRING())
.physicalColumn("col3", DataTypes.STRING())

}
}

private TransformSchemaFunction(List<Tuple2<String, String>> transformRules) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a discussion, should we chain this operator with Source?

SqlNode validateSqlNode = validator.validate(sqlNode);
HepProgramBuilder builder = new HepProgramBuilder();
HepPlanner planner = new HepPlanner(builder.build());
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(factory));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like some classes are unrelated with columns and sqlNode, can we reuse them?

} else if (event instanceof DataChangeEvent) {
Optional<DataChangeEvent> dataChangeEventOptional =
applyDataChangeEvent(((DataChangeEvent) event));
if (dataChangeEventOptional.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which condition will dataChangeEvent be empty, as we don't do filter here?

if (columnNames.contains(columns.get(i).getName())) {
params.add(
DataTypeConverter.convertToOriginal(
after.getString(i), columns.get(i).getType()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableInfo.getFieldGetters()[i].getFieldOrNull(after)

@lvyanquan
Copy link
Contributor

Hi @aiwenmo , I met Invalid signature file digest for Manifest main when testing this transform in E2e environment.
Can you help to fix it when building flink-cdc-dist jar?

transformFunctionBuilder.addTransform(
transform.getSourceTable(),
transform.getProjection().get(),
transform.getFilter().get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though transform.isValidProjection() is true, transform.getFilter().get() will still be possible to get NoPointException.

params.add(tableInfo.getTableName());
}

ExpressionEvaluator expressionEvaluator =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should cache expressionEvaluator like what RowFilter do.

private final String expression;
private final String scriptExpression;
private final List<String> columnNames;
private ExpressionEvaluator expressionEvaluator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got A different class with the same name was previously loaded by 'app'. (org.codehaus.janino.ExpressionEvaluator is in unnamed module of loader 'app') error when submitting job to session.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aiwenmo and @lvyanquan for the great work, I only left two comments

TransformExpressionCompiler.compileExpression(transformExpressionKey);
try {
return (Boolean)
expressionEvaluator.evaluate(generateParams(after, tableInfo, transformFilter));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could one TransformFilterProcessor instance holds a expressionEvaluator which built in construction? and thus we can offer a method like private boolean process( BinaryRecordData after). In this way, we can avoid compile expression per record.

}

public static BinaryRecordData processData(
BinaryRecordData after, TableInfo tableInfo, TransformProjection transformProjection) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The static method let the class looks like a utils, could we instantiate a TransformProjectionProcessor transformProjectionProcessor = new TransformProjectionProcessor(xx, xx) and then call transformProjectionProcessor.process() ?

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aiwenmo for the update, I think this PR is ready to merge, I only left two minor comments


private static class BoundedMap<K, V> extends LinkedHashMap<K, V> {

private static final long serialVersionUID = -211630219014422361L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1L

@@ -0,0 +1,120 @@
/*
* Copyright 2023 Ververica Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rebase to fix the copyright.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aiwenmo for the great work, LGTM although I think we can improve the test coverage as well as the document, but I think we can improve them later.

@aiwenmo
Copy link
Contributor Author

aiwenmo commented Apr 2, 2024

Thanks @aiwenmo for the great work, LGTM although I think we can improve the test coverage as well as the document, but I think we can improve them later.

Thanks @leonardBang , I will improve them in the new pull request.

@leonardBang leonardBang merged commit 1d05abf into apache:master Apr 2, 2024
13 checks passed
wuzhenhua01 pushed a commit to wuzhenhua01/flink-cdc-connectors that referenced this pull request Aug 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants