[FLINK-37429][transform] Map each column name to a new name in generated expression#3939
[FLINK-37429][transform] Map each column name to a new name in generated expression#3939leonardBang merged 5 commits intoapache:masterfrom
Conversation
yuxiqian
left a comment
There was a problem hiding this comment.
Thanks for Shawn's quick fix! Just left some minor comments.
| public TransformFilter( | ||
| String expression, | ||
| String scriptExpression, | ||
| List<String> columnNames, | ||
| Map<String, String> columnNameMap) { |
There was a problem hiding this comment.
IIUC TransformFilter is static and will not be refreshed after schema changes, but columnNameMap might be changed by altering columns. Will inserting columns in the middle affect the numbering system?
ProjectionColumn looks fine (since projection stuff should be recreated every time when schema changes), but still need to be verified by an IT case.
There was a problem hiding this comment.
Every TransformExpressionKey has its own columnNameMap. If the expression script is not changed, the columnNameMap will not change. I have added a test for the schema change scenario.
| public static Map<String, String> generateColumnNameMap(List<String> originalColumnNames) { | ||
| int i = 0; | ||
| Map<String, String> columnNameMap = new HashMap<>(); | ||
| for (String columnName : originalColumnNames) { | ||
| if (!columnNameMap.containsKey(columnName)) { | ||
| columnNameMap.put(columnName, MAPPED_COLUMN_NAME_PREFIX + i); | ||
| i++; | ||
| } | ||
| } | ||
| return columnNameMap; | ||
| } |
There was a problem hiding this comment.
Minor concern: though it's easy and simple to use field\d as column name, it would produce cryptic error message if Janino throws an exception about generated expressions.
Some options might be:
- We can leave "legal" names as is without mangling them, and only map names that are not valid Java identifiers.
- Try to extract valid characters from original column name, and append them as a hint. For example,
invalid-namecould be mapped tocolumn1_invalidname(recognizable, at least). - Always provide the column name map information when exceptions are thrown.
There was a problem hiding this comment.
I prefer option 3.
For option 1, seems it is not very easy to judge a column name is legal or not considering every java keyword is illegal.
For option 2, a corner case is there are two columns named a-b and a.b and we still cannot recognize them after mapping.
Have added column name map information to the exception log.
...rc/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
Show resolved
Hide resolved
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
Outdated
Show resolved
Hide resolved
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
Show resolved
Hide resolved
yuxiqian
left a comment
There was a problem hiding this comment.
Looks good! Covering this case in FlinkPipelineTransformITCase would be nice, but it's not a must for now.
...src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
Outdated
Show resolved
Hide resolved
leonardBang
left a comment
There was a problem hiding this comment.
Thanks @Shawn-Hx for the contribution, LGTM
...src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java
Outdated
Show resolved
Hide resolved
…line-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java Co-authored-by: Leonard Xu <leonard@apache.org>
…ted expression This closes apache#3939 Co-authored-by: Leonard Xu <leonard@apache.org>
This close FLINK-37429, FLINK-37326