diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java index 8b0901ca8854..57c56f6f507f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java @@ -25,13 +25,16 @@ import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.dataevolution.DataEvolutionPartialWriteOperator; import org.apache.paimon.flink.dataevolution.FirstRowIdAssigner; +import org.apache.paimon.flink.dataevolution.MergeIntoUpdateChecker; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.CommittableTypeInfo; import org.apache.paimon.flink.sink.CommitterOperatorFactory; import org.apache.paimon.flink.sink.NoopCommittableStateManager; import org.apache.paimon.flink.sink.StoreCommitter; import org.apache.paimon.flink.sorter.SortOperator; +import org.apache.paimon.flink.utils.FlinkCalciteClasses; import org.apache.paimon.flink.utils.InternalTypeInfo; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; @@ -41,7 +44,6 @@ import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.StringUtils; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.dag.Transformation; @@ -49,8 +51,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; -import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.StreamFlatMap; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; @@ -69,10 +70,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; @@ -95,7 +95,6 @@ public class DataEvolutionMergeIntoAction extends TableActionBase { private static final Logger LOG = LoggerFactory.getLogger(DataEvolutionMergeIntoAction.class); - public static final String IDENTIFIER_QUOTE = "`"; private final CoreOptions coreOptions; @@ -120,6 +119,7 @@ public class DataEvolutionMergeIntoAction extends TableActionBase { // merge condition private String mergeCondition; + private MergeConditionParser mergeConditionParser; // set statement private String matchedUpdateSet; @@ -137,6 +137,17 @@ public DataEvolutionMergeIntoAction( table.getClass().getName())); } + Long latestSnapshotId = ((FileStoreTable) table).snapshotManager().latestSnapshotId(); + if (latestSnapshotId == null) { + throw new UnsupportedOperationException( + "merge-into action doesn't support updating an empty table."); + } + table = + table.copy( + Collections.singletonMap( + CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), + latestSnapshotId.toString())); + this.coreOptions = ((FileStoreTable) table).coreOptions(); if (!coreOptions.dataEvolutionEnabled()) { @@ -168,6 +179,12 @@ public DataEvolutionMergeIntoAction withTargetAlias(String targetAlias) { public DataEvolutionMergeIntoAction withMergeCondition(String mergeCondition) { this.mergeCondition = mergeCondition; + try { + this.mergeConditionParser = new MergeConditionParser(mergeCondition); + } catch (Exception e) { + LOG.error("Failed to parse merge condition: {}", mergeCondition, e); + throw new RuntimeException("Failed to parse merge condition " + mergeCondition, e); + } return this; } @@ -196,7 +213,12 @@ public TableResult runInternal() { DataStream written = writePartialColumns(shuffled, sourceWithType.f1, sinkParallelism); // 4. commit - DataStream committed = commit(written); + Set updatedColumns = + sourceWithType.f1.getFields().stream() + .map(DataField::name) + .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) + .collect(Collectors.toSet()); + DataStream committed = commit(written, updatedColumns); // execute internal Transformation transformations = @@ -219,8 +241,7 @@ public Tuple2, RowType> buildSource() { List project; if (matchedUpdateSet.equals("*")) { // if sourceName is qualified like 'default.S', we should build a project like S.* - String[] splits = sourceTable.split("\\."); - project = Collections.singletonList(splits[splits.length - 1] + ".*"); + project = Collections.singletonList(sourceTableName() + ".*"); } else { // validate upsert changes Map changes = parseCommaSeparatedKeyValues(matchedUpdateSet); @@ -245,16 +266,38 @@ public Tuple2, RowType> buildSource() { .collect(Collectors.toList()); } - // use join to find matched rows and assign row id for each source row. - // _ROW_ID is the first field of joined table. - String query = - String.format( - "SELECT %s, %s FROM %s INNER JOIN %s AS RT ON %s", - "`RT`.`_ROW_ID` as `_ROW_ID`", - String.join(",", project), - escapedSourceName(), - escapedRowTrackingTargetName(), - rewriteMergeCondition(mergeCondition)); + String query; + Optional sourceRowIdField; + try { + sourceRowIdField = mergeConditionParser.extractRowIdFieldFromSource(targetTableName()); + } catch (Exception e) { + LOG.error("Error happened when extract row id field from source table.", e); + throw new RuntimeException( + "Error happened when extract row id field from source table.", e); + } + + // if source table already contains _ROW_ID field, we could avoid join + if (sourceRowIdField.isPresent()) { + query = + String.format( + // cast _ROW_ID to BIGINT + "SELECT CAST(`%s`.`%s` AS BIGINT) AS `_ROW_ID`, %s FROM %s", + sourceTableName(), + sourceRowIdField.get(), + String.join(",", project), + escapedSourceName()); + } else { + // use join to find matched rows and assign row id for each source row. + // _ROW_ID is the first field of joined table. + query = + String.format( + "SELECT %s, %s FROM %s INNER JOIN %s AS RT ON %s", + "`RT`.`_ROW_ID` as `_ROW_ID`", + String.join(",", project), + escapedSourceName(), + escapedRowTrackingTargetName(), + rewriteMergeCondition(mergeCondition)); + } LOG.info("Source query: {}", query); @@ -286,11 +329,15 @@ public DataStream> shuffleByFirstRowId( Preconditions.checkState( !firstRowIds.isEmpty(), "Should not MERGE INTO an empty target table."); + // if firstRowIds is not empty, there must be a valid nextRowId + long maxRowId = table.latestSnapshot().get().nextRowId() - 1; + OneInputTransformation> assignedFirstRowId = new OneInputTransformation<>( sourceTransformation, "ASSIGN FIRST_ROW_ID", - new StreamMap<>(new FirstRowIdAssigner(firstRowIds, sourceType)), + new StreamFlatMap<>( + new FirstRowIdAssigner(firstRowIds, maxRowId, sourceType)), new TupleTypeInfo<>( BasicTypeInfo.LONG_TYPE_INFO, sourceTransformation.getOutputType()), sourceTransformation.getParallelism(), @@ -334,9 +381,20 @@ public DataStream writePartialColumns( .setParallelism(sinkParallelism); } - public DataStream commit(DataStream written) { + public DataStream commit( + DataStream written, Set updatedColumns) { FileStoreTable storeTable = (FileStoreTable) table; - OneInputStreamOperatorFactory committerOperator = + + // Check if some global-indexed columns are updated + DataStream checked = + written.transform( + "Updated Column Check", + new CommittableTypeInfo(), + new MergeIntoUpdateChecker(storeTable, updatedColumns)) + .setParallelism(1) + .setMaxParallelism(1); + + CommitterOperatorFactory committerOperator = new CommitterOperatorFactory<>( false, true, @@ -348,7 +406,7 @@ public DataStream commit(DataStream written) { context), new NoopCommittableStateManager()); - return written.transform("COMMIT OPERATOR", new CommittableTypeInfo(), committerOperator) + return checked.transform("COMMIT OPERATOR", new CommittableTypeInfo(), committerOperator) .setParallelism(1) .setMaxParallelism(1); } @@ -382,28 +440,13 @@ private DataStream toDataStream(Table source) { */ @VisibleForTesting public String rewriteMergeCondition(String mergeCondition) { - // skip single and double-quoted chunks - String skipQuoted = "'(?:''|[^'])*'" + "|\"(?:\"\"|[^\"])*\""; - String targetTableRegex = - "(?i)(?:\\b" - + Pattern.quote(targetTableName()) - + "\\b|`" - + Pattern.quote(targetTableName()) - + "`)\\s*\\."; - - Pattern pattern = Pattern.compile(skipQuoted + "|(" + targetTableRegex + ")"); - Matcher matcher = pattern.matcher(mergeCondition); - - StringBuffer sb = new StringBuffer(); - while (matcher.find()) { - if (matcher.group(1) != null) { - matcher.appendReplacement(sb, Matcher.quoteReplacement("`RT`.")); - } else { - matcher.appendReplacement(sb, Matcher.quoteReplacement(matcher.group(0))); - } + try { + Object rewrittenNode = mergeConditionParser.rewriteSqlNode(targetTableName(), "RT"); + return rewrittenNode.toString(); + } catch (Exception e) { + LOG.error("Failed to rewrite merge condition: {}", mergeCondition, e); + throw new RuntimeException("Failed to rewrite merge condition " + mergeCondition, e); } - matcher.appendTail(sb); - return sb.toString(); } /** @@ -432,7 +475,8 @@ private void checkSchema(Table source) { foundRowIdColumn = true; Preconditions.checkState( flinkColumn.getDataType().getLogicalType().getTypeRoot() - == LogicalTypeRoot.BIGINT); + == LogicalTypeRoot.BIGINT, + "_ROW_ID field should be BIGINT type."); } else { DataField targetField = targetFields.get(flinkColumn.getName()); if (targetField == null) { @@ -497,6 +541,11 @@ private String targetTableName() { return targetAlias == null ? identifier.getObjectName() : targetAlias; } + private String sourceTableName() { + String[] splits = sourceTable.split("\\."); + return splits[splits.length - 1]; + } + private String escapedSourceName() { return Arrays.stream(sourceTable.split("\\.")) .map(s -> String.format("`%s`", s)) @@ -514,28 +563,108 @@ private String escapedRowTrackingTargetName() { catalogName, identifier.getDatabaseName(), identifier.getObjectName()); } - private List normalizeFieldName(List fieldNames) { - return fieldNames.stream().map(this::normalizeFieldName).collect(Collectors.toList()); - } + /** The parser to parse merge condition through calcite sql parser. */ + static class MergeConditionParser { + + private final FlinkCalciteClasses calciteClasses; + private final Object sqlNode; + + MergeConditionParser(String mergeCondition) throws Exception { + this.calciteClasses = new FlinkCalciteClasses(); + this.sqlNode = initializeSqlNode(mergeCondition); + } - private String normalizeFieldName(String fieldName) { - if (StringUtils.isNullOrWhitespaceOnly(fieldName) || fieldName.endsWith(IDENTIFIER_QUOTE)) { - return fieldName; + private Object initializeSqlNode(String mergeCondition) throws Exception { + Object config = + calciteClasses + .configDelegate() + .withLex( + calciteClasses.sqlParserDelegate().config(), + calciteClasses.lexDelegate().java()); + Object sqlParser = calciteClasses.sqlParserDelegate().create(mergeCondition, config); + return calciteClasses.sqlParserDelegate().parseExpression(sqlParser); } - String[] splitFieldNames = fieldName.split("\\."); - if (!targetFieldNames.contains(splitFieldNames[splitFieldNames.length - 1])) { - return fieldName; + /** + * Rewrite the SQL node, replacing all references from the 'from' table to the 'to' table. + */ + public Object rewriteSqlNode(String from, String to) throws Exception { + return rewriteNode(sqlNode, from, to); } - return String.join( - ".", - Arrays.stream(splitFieldNames) - .map( - part -> - part.endsWith(IDENTIFIER_QUOTE) - ? part - : IDENTIFIER_QUOTE + part + IDENTIFIER_QUOTE) - .toArray(String[]::new)); + private Object rewriteNode(Object node, String from, String to) throws Exception { + // It's a SqlBasicCall, recursively rewrite children operands + if (calciteClasses.sqlBasicCallDelegate().instanceOfSqlBasicCall(node)) { + List operandList = calciteClasses.sqlBasicCallDelegate().getOperandList(node); + List newNodes = new java.util.ArrayList<>(); + for (Object operand : operandList) { + newNodes.add(rewriteNode(operand, from, to)); + } + + Object operator = calciteClasses.sqlBasicCallDelegate().getOperator(node); + Object parserPos = calciteClasses.sqlBasicCallDelegate().getParserPosition(node); + Object functionQuantifier = + calciteClasses.sqlBasicCallDelegate().getFunctionQuantifier(node); + return calciteClasses + .sqlBasicCallDelegate() + .create(operator, newNodes, parserPos, functionQuantifier); + } else if (calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(node)) { + // It's a sql identifier, try to replace the table name + List names = calciteClasses.sqlIndentifierDelegate().getNames(node); + Preconditions.checkState( + names.size() >= 2, "Please specify the table name for the column: " + node); + int nameLen = names.size(); + if (names.get(nameLen - 2).equals(from)) { + return calciteClasses.sqlIndentifierDelegate().setName(node, nameLen - 2, to); + } + return node; + } else { + return node; + } + } + + /** + * Find the row id field in source table. This method looks for an equality condition like + * `target_table._ROW_ID = source_table.some_field` or `source_table.some_field = + * target_table._ROW_ID`, and returns the field name that is paired with _ROW_ID. + */ + public Optional extractRowIdFieldFromSource(String targetTable) throws Exception { + Object operator = calciteClasses.sqlBasicCallDelegate().getOperator(sqlNode); + Object kind = calciteClasses.sqlOperatorDelegate().getKind(operator); + + if (kind == calciteClasses.sqlKindDelegate().equals()) { + List operandList = calciteClasses.sqlBasicCallDelegate().getOperandList(sqlNode); + + Object left = operandList.get(0); + Object right = operandList.get(1); + + if (calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(left) + && calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(right)) { + + List leftNames = calciteClasses.sqlIndentifierDelegate().getNames(left); + List rightNames = + calciteClasses.sqlIndentifierDelegate().getNames(right); + Preconditions.checkState( + leftNames.size() >= 2, + "Please specify the table name for the column: " + left); + Preconditions.checkState( + rightNames.size() >= 2, + "Please specify the table name for the column: " + right); + + if (leftNames.get(leftNames.size() - 1).equals(SpecialFields.ROW_ID.name()) + && leftNames.get(leftNames.size() - 2).equals(targetTable)) { + return Optional.of(rightNames.get(rightNames.size() - 1)); + } else if (rightNames + .get(rightNames.size() - 1) + .equals(SpecialFields.ROW_ID.name()) + && rightNames.get(rightNames.size() - 2).equals(targetTable)) { + return Optional.of(leftNames.get(leftNames.size() - 1)); + } + return Optional.empty(); + } + } + + return Optional.empty(); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java index 4f9ec8c64397..f09bdefe2818 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java @@ -35,7 +35,6 @@ import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.InnerTableRead; -import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.CommitIncrement; @@ -72,7 +71,6 @@ public class DataEvolutionPartialWriteOperator // dataType private final RowType dataType; - private final InternalRow.FieldGetter[] fieldGetters; private final int rowIdIndex; // data type excludes of _ROW_ID field. @@ -101,11 +99,6 @@ public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType dataType) this.dataType = SpecialFields.rowTypeWithRowId(table.rowType()).project(dataType.getFieldNames()); this.rowIdIndex = this.dataType.getFieldIndex(SpecialFields.ROW_ID.name()); - this.fieldGetters = new InternalRow.FieldGetter[dataType.getFieldCount()]; - List fields = this.dataType.getFields(); - for (int i = 0; i < fields.size(); i++) { - this.fieldGetters[i] = InternalRow.createFieldGetter(fields.get(i).type(), i); - } } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java index ad87c5b016d9..bf1c41a871e2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java @@ -24,30 +24,38 @@ import org.apache.paimon.utils.Preconditions; import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; import java.util.List; -/** Assign first row id for each row through binary search. */ -public class FirstRowIdAssigner extends RichMapFunction> { +/** + * Assign first row id for each row through binary search. Rows with invalid row ids are filtered + * out. + */ +public class FirstRowIdAssigner extends RichFlatMapFunction> { private final FirstRowIdLookup firstRowIdLookup; + private final long maxRowId; private final int rowIdFieldIndex; - public FirstRowIdAssigner(List firstRowIds, RowType rowType) { + public FirstRowIdAssigner(List firstRowIds, long maxRowId, RowType rowType) { this.firstRowIdLookup = new FirstRowIdLookup(firstRowIds); + this.maxRowId = maxRowId; this.rowIdFieldIndex = rowType.getFieldNames().indexOf(SpecialFields.ROW_ID.name()); Preconditions.checkState(this.rowIdFieldIndex >= 0, "Do not found _ROW_ID column."); } @Override - public Tuple2 map(RowData value) throws Exception { + public void flatMap(RowData value, Collector> out) throws Exception { long rowId = value.getLong(rowIdFieldIndex); - return new Tuple2<>(firstRowIdLookup.lookup(rowId), value); + if (rowId >= 0 && rowId <= maxRowId) { + out.collect(new Tuple2<>(firstRowIdLookup.lookup(rowId), value)); + } } /** The Key Selector to get firstRowId from tuple2. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java new file mode 100644 index 000000000000..94392a84ecde --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.dataevolution; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.index.GlobalIndexMeta; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * The checker for merge into update result. It will check each committable to see if some + * global-indexed columns are updated. It will take some actions according to {@link + * CoreOptions#GLOBAL_INDEX_COLUMN_UPDATE_ACTION}. + */ +public class MergeIntoUpdateChecker extends BoundedOneInputOperator { + + private static final Logger LOG = LoggerFactory.getLogger(MergeIntoUpdateChecker.class); + + private final FileStoreTable table; + private final Set updatedColumns; + + private transient Set affectedPartitions; + + public MergeIntoUpdateChecker(FileStoreTable table, Set updatedColumns) { + this.table = table; + this.updatedColumns = updatedColumns; + } + + @Override + public void open() throws Exception { + super.open(); + affectedPartitions = new HashSet<>(); + + Preconditions.checkState( + getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks() == 1, + "Parallelism of MergeIntoUpdateChecker must be 1."); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + affectedPartitions.add(element.getValue().commitMessage().partition()); + output.collect(element); + } + + @Override + public void endInput() throws Exception { + checkUpdatedColumns(); + } + + private void checkUpdatedColumns() { + Optional latestSnapshot = table.latestSnapshot(); + RowType rowType = table.rowType(); + Preconditions.checkState(latestSnapshot.isPresent()); + + List affectedEntries = + table.store() + .newIndexFileHandler() + .scan( + latestSnapshot.get(), + entry -> { + GlobalIndexMeta globalIndexMeta = + entry.indexFile().globalIndexMeta(); + if (globalIndexMeta != null) { + String fieldName = + rowType.getField(globalIndexMeta.indexFieldId()) + .name(); + return updatedColumns.contains(fieldName) + && affectedPartitions.contains(entry.partition()); + } + return false; + }); + + if (!affectedEntries.isEmpty()) { + CoreOptions.GlobalIndexColumnUpdateAction updateAction = + table.coreOptions().globalIndexColumnUpdateAction(); + switch (updateAction) { + case THROW_ERROR: + Set conflictedColumns = + affectedEntries.stream() + .map(file -> file.indexFile().globalIndexMeta().indexFieldId()) + .map(id -> rowType.getField(id).name()) + .collect(Collectors.toSet()); + + throw new RuntimeException( + String.format( + "MergeInto: update columns contain globally indexed columns, not supported now.\n" + + "Updated columns: %s\nConflicted columns: %s\n", + updatedColumns, conflictedColumns)); + case DROP_PARTITION_INDEX: + Map> entriesByParts = + affectedEntries.stream() + .collect( + Collectors.groupingBy( + IndexManifestEntry::partition, + Collectors.mapping( + IndexManifestEntry::indexFile, + Collectors.toList()))); + + for (Map.Entry> entry : + entriesByParts.entrySet()) { + LOG.debug( + "Dropping index files {} due to indexed fields update.", + entry.getValue()); + + CommitMessage commitMessage = + new CommitMessageImpl( + entry.getKey(), + 0, + null, + DataIncrement.deleteIndexIncrement(entry.getValue()), + CompactIncrement.emptyIncrement()); + + Committable committable = new Committable(Long.MAX_VALUE, commitMessage); + + output.collect(new StreamRecord<>(committable)); + } + break; + default: + throw new UnsupportedOperationException("Unsupported option: " + updateAction); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java index 5410dc3533a4..0ae93fdde6e2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java @@ -74,7 +74,7 @@ public class FlinkCalciteClasses { } } - public FlinkCalciteClasses() throws ClassNotFoundException { + public FlinkCalciteClasses() throws ClassNotFoundException, NoSuchFieldException { sqlNodeListDelegate = new SqlNodeListDelegate(); sqlLiteralDelegate = new SqlLiteralDelegate(); sqlBasicCallDelegate = new SqlBasicCallDelegate(); @@ -232,8 +232,11 @@ public static class SqlBasicCallDelegate { static final String CLASS_NAME = "org.apache.calcite.sql.SqlBasicCall"; private final Class clazz; + private final Class sqlParserPosClass; + public SqlBasicCallDelegate() throws ClassNotFoundException { this.clazz = loadCalciteClass(CLASS_NAME); + this.sqlParserPosClass = loadCalciteClass("org.apache.calcite.sql.parser.SqlParserPos"); } public Object getOperator(Object basicCall) throws Exception { @@ -244,6 +247,32 @@ public List getOperandList(Object basicCall) throws Exception { return (List) invokeMethod(clazz, basicCall, "getOperandList", new Class[0], new Object[0]); } + + public boolean instanceOfSqlBasicCall(Object sqlNode) throws Exception { + return clazz.isAssignableFrom(sqlNode.getClass()); + } + + public Object getParserPosition(Object basicCall) throws Exception { + return invokeMethod(clazz, basicCall, "getParserPosition", new Class[0], new Object[0]); + } + + public Object getFunctionQuantifier(Object basicCall) throws Exception { + return invokeMethod( + clazz, basicCall, "getFunctionQuantifier", new Class[0], new Object[0]); + } + + public Object create( + Object operator, List operands, Object parserPos, Object functionQuantifier) + throws Exception { + java.lang.reflect.Constructor constructor = + clazz.getConstructor( + loadCalciteClass(SqlOperatorDelegate.SQL_OPERATOR), + List.class, + sqlParserPosClass, + loadCalciteClass("org.apache.calcite.sql.SqlLiteral")); + constructor.setAccessible(true); + return constructor.newInstance(operator, operands, parserPos, functionQuantifier); + } } /** Accessing org.apache.calcite.sql.SqlOperator by Reflection. */ @@ -288,14 +317,29 @@ public boolean instanceOfSqlPrefixOperator(Object operator) throws Exception { public static class SqlIdentifierDelegate { private static final String SQL_IDENTIFIER = "org.apache.calcite.sql.SqlIdentifier"; private final Class identifierClazz; + private final Field namesField; - public SqlIdentifierDelegate() throws ClassNotFoundException { + public SqlIdentifierDelegate() throws ClassNotFoundException, NoSuchFieldException { this.identifierClazz = loadCalciteClass(SQL_IDENTIFIER); + this.namesField = identifierClazz.getField("names"); } public boolean instanceOfSqlIdentifier(Object sqlNode) throws Exception { return identifierClazz.isAssignableFrom(sqlNode.getClass()); } + + public List getNames(Object sqlIdentifier) throws IllegalAccessException { + return (List) namesField.get(sqlIdentifier); + } + + public Object setName(Object sqlIdentifier, int i, String name) throws Exception { + return invokeMethod( + identifierClazz, + sqlIdentifier, + "setName", + new Class[] {int.class, String.class}, + new Object[] {i, name}); + } } /** Accessing org.apache.calcite.sql.SqlNodeList by Reflection. */ diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java index 18fa729787b7..e5fac565c683 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java @@ -20,6 +20,8 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.table.FileStoreTable; import org.apache.flink.types.Row; import org.junit.jupiter.api.Assertions; @@ -36,7 +38,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Optional; import java.util.stream.Stream; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; @@ -48,7 +50,11 @@ import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.sEnv; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; /** ITCase for {@link DataEvolutionMergeIntoAction}. */ public class DataEvolutionMergeIntoActionITCase extends ActionITCaseBase { @@ -146,7 +152,7 @@ public void testUpdateMultipleColumns(boolean inDefault, String invoker) throws DataEvolutionMergeIntoActionBuilder builder = builder(warehouse, targetDb, "T") .withMergeCondition("T.id=S.id") - .withMatchedUpdateSet("T.name=S.name,T.value=S.`value`") + .withMatchedUpdateSet("T.value=S.`value`,T.name=S.name") .withSourceTable("S") .withSinkParallelism(2); @@ -418,30 +424,191 @@ public void testSqls(boolean inDefault, String invoker) throws Exception { expected); } + @ParameterizedTest(name = "use default db = {0}, invoker - {1}") + @MethodSource("testArguments") + public void testRowIdColumnContainedInSource(boolean inDefault, String invoker) + throws Exception { + String targetDb = inDefault ? database : "test_db"; + if (!inDefault) { + // create target table in a new database + sEnv.executeSql("DROP TABLE T"); + sEnv.executeSql("CREATE DATABASE test_db"); + sEnv.executeSql("USE test_db"); + bEnv.executeSql("USE test_db"); + prepareTargetTable(); + } + + List expected = + Arrays.asList( + changelogRow("+I", 2, "new_name1", 100.1), + changelogRow("+I", 3, "name3", 0.3), + changelogRow("+I", 4, "name4", 0.4), + changelogRow("+I", 8, "new_name7", null), + changelogRow("+I", 9, "name9", 0.9), + changelogRow("+I", 12, "new_name11", 101.1), + changelogRow("+I", 16, null, 101.1), + changelogRow("+I", 19, "new_name18", 101.8)); + + if (invoker.equals("action")) { + DataEvolutionMergeIntoActionBuilder builder = + builder(warehouse, targetDb, "T") + .withMergeCondition("T._ROW_ID=S.id") + .withMatchedUpdateSet("T.name=S.name,T.value=S.`value`") + .withSourceTable("S") + .withSinkParallelism(2); + + builder.build().run(); + } else { + String procedureStatement = + String.format( + "CALL sys.data_evolution_merge_into('%s.T', '', '', 'S', 'T._ROW_ID=S.id', 'name=S.name,value=S.`value`', 2)", + targetDb); + + executeSQL(procedureStatement, false, true); + } + + testBatchRead( + "SELECT id, name, `value` FROM T$row_tracking where _ROW_ID in (1, 2, 3, 7, 8, 11, 15, 18)", + expected); + } + @Test - public void testRewriteMergeCondition() throws Exception { - Map config = new HashMap<>(); - config.put("warehouse", warehouse); - DataEvolutionMergeIntoAction action = - new DataEvolutionMergeIntoAction(database, "T", config); + public void testUpdateAction() throws Exception { + + // create index on 01-22 partition + executeSQL( + "CALL sys.create_global_index(`table` => 'default.T', index_column => 'id', index_type => 'btree')", + false, + true); + + assertTrue(indexFileExists("T")); + + // 1. update indexed columns should throw an error by default + assertThatThrownBy( + () -> + executeSQL( + String.format( + "CALL sys.data_evolution_merge_into('%s.T', '', '', 'S', 'T._ROW_ID=S.id', 'name=S.name,id=1', 2)", + database), + false, + true)) + .rootCause() + .hasMessageContaining( + "MergeInto: update columns contain globally indexed columns, not supported now."); + + insertInto( + "T", + "(31, 'name31', 3.1, '01-23')", + "(32, 'name32', 3.2, '01-23')", + "(33, 'name33', 3.3, '01-23')", + "(34, 'name34', 3.4, '01-23')", + "(35, 'name35', 3.5, '01-23')", + "(36, 'name36', 3.6, '01-23')", + "(37, 'name37', 3.7, '01-23')", + "(38, 'name38', 3.8, '01-23')", + "(39, 'name39', 3.9, '01-23')", + "(40, 'name30', 3.0, '01-23')"); + + insertInto("S", "(35, 'new_name25', 125.1)"); + + // 2. updating unindexed partitions is not affected + assertDoesNotThrow( + () -> + executeSQL( + String.format( + "CALL sys.data_evolution_merge_into('%s.T', 'TempT', " + + "'CREATE TEMPORARY VIEW SS AS SELECT id, name, `value` FROM S WHERE id > 20'," + + " 'SS', 'TempT.id=SS.id', 'id=SS.id,value=SS.`value`', 2)", + database), + false, + true)); + + // 3. alter table's UpdateAction option to DROP_INDEX + executeSQL( + "ALTER TABLE T SET ('global-index.column-update-action' = 'DROP_PARTITION_INDEX')", + false, + true); + + assertDoesNotThrow( + () -> + executeSQL( + String.format( + "CALL sys.data_evolution_merge_into('%s.T', '', '', 'S', 'T._ROW_ID=S.id', 'name=S.name,id=1', 2)", + database), + false, + true)); + + assertFalse(indexFileExists("T")); + } + + private boolean indexFileExists(String tableName) throws Exception { + FileStoreTable table = getFileStoreTable(tableName); - String mergeCondition = "T.id=S.id"; - assertEquals("`RT`.id=S.id", action.rewriteMergeCondition(mergeCondition)); + List entries = table.store().newIndexFileHandler().scan("btree"); - mergeCondition = "`T`.id=S.id"; - assertEquals("`RT`.id=S.id", action.rewriteMergeCondition(mergeCondition)); + return !entries.isEmpty(); + } - mergeCondition = "t.id = s.id AND T.pt = s.pt"; + @Test + public void mergeConditionParserTest() throws Exception { + // 1. test rewrite table names + // basic rewrite + String mergeCondition = "T.id = S.id"; + DataEvolutionMergeIntoAction.MergeConditionParser parser = createParser(mergeCondition); + assertEquals("`RT`.`id` = `S`.`id`", parser.rewriteSqlNode("T", "RT").toString()); + + // should recognize quotes + mergeCondition = "T.id = s.id AND T.pt = s.pt"; + parser = createParser(mergeCondition); assertEquals( - "`RT`.id = s.id AND `RT`.pt = s.pt", action.rewriteMergeCondition(mergeCondition)); + "`RT`.`id` = `s`.`id` AND `RT`.`pt` = `s`.`pt`", + parser.rewriteSqlNode("T", "RT").toString()); - mergeCondition = "TT.id = 1 AND T.id = 2"; - assertEquals("TT.id = 1 AND `RT`.id = 2", action.rewriteMergeCondition(mergeCondition)); + // should not rewrite column names + mergeCondition = "`T`.id = `T1`.`T` and T.`T.T` = S.a"; + parser = createParser(mergeCondition); + assertEquals( + "`RT`.`id` = `T1`.`T` AND `RT`.`T.T` = `S`.`a`", + parser.rewriteSqlNode("T", "RT").toString()); - mergeCondition = "TT.id = 'T.id' AND T.id = \"T.id\""; + // should not rewrite literals + mergeCondition = "T.id = S.id AND S.str_col = 'T.id' AND T.`value` = 1"; + parser = createParser(mergeCondition); assertEquals( - "TT.id = 'T.id' AND `RT`.id = \"T.id\"", - action.rewriteMergeCondition(mergeCondition)); + "`RT`.`id` = `S`.`id` AND `S`.`str_col` = 'T.id' AND `RT`.`value` = 1", + parser.rewriteSqlNode("T", "RT").toString()); + + // 2. test extract row id condition + Optional rowIdColumn; + + mergeCondition = "T._ROW_ID = S.id"; + parser = createParser(mergeCondition); + rowIdColumn = parser.extractRowIdFieldFromSource("T"); + assertTrue(rowIdColumn.isPresent()); + assertEquals("id", rowIdColumn.get()); + + mergeCondition = "S.id = T._ROW_ID"; + parser = createParser(mergeCondition); + rowIdColumn = parser.extractRowIdFieldFromSource("T"); + assertTrue(rowIdColumn.isPresent()); + assertEquals("id", rowIdColumn.get()); + + // target table not matches + mergeCondition = "S.id = T._ROW_ID"; + parser = createParser(mergeCondition); + rowIdColumn = parser.extractRowIdFieldFromSource("S"); + assertFalse(rowIdColumn.isPresent()); + + // for simplicity, compounded condition is not considered now. + mergeCondition = "S.id = T._ROW_ID AND T.id = 1"; + parser = createParser(mergeCondition); + rowIdColumn = parser.extractRowIdFieldFromSource("T"); + assertFalse(rowIdColumn.isPresent()); + } + + private DataEvolutionMergeIntoAction.MergeConditionParser createParser(String mergeCondition) + throws Exception { + return new DataEvolutionMergeIntoAction.MergeConditionParser(mergeCondition); } @Test