diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index cbd2e3086f7d..7775422fc888 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -52,6 +52,7 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX; @@ -295,6 +296,28 @@ public Optional> indexFiles() { return hasIndexFile ? Optional.of(indexFiles) : Optional.empty(); } + public Optional filterDataFile(Predicate filter) { + List filtered = new ArrayList<>(); + List filteredDeletion = dataDeletionFiles == null ? null : new ArrayList<>(); + for (int i = 0; i < dataFiles.size(); i++) { + DataFileMeta file = dataFiles.get(i); + if (filter.test(file)) { + filtered.add(file); + if (filteredDeletion != null) { + filteredDeletion.add(dataDeletionFiles.get(i)); + } + } + } + if (filtered.isEmpty()) { + return Optional.empty(); + } + DataSplit split = new DataSplit(); + split.assign(this); + split.dataFiles = filtered; + split.dataDeletionFiles = filteredDeletion; + return Optional.of(split); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 545bf36fc1be..3b1a1ec49a4e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -28,7 +28,6 @@ import org.apache.paimon.spark.catalyst.analysis.PaimonRelation import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand -import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl} @@ -97,45 +96,6 @@ case class MergeIntoPaimonDataEvolutionTable( columns.toSet } - private val firstRowIds: immutable.IndexedSeq[Long] = table - .store() - .newScan() - .withManifestEntryFilter( - entry => - entry.file().firstRowId() != null && (!isBlobFile( - entry - .file() - .fileName()))) - .plan() - .files() - .asScala - .map(file => file.file().firstRowId().asInstanceOf[Long]) - .distinct - .sorted - .toIndexedSeq - - private val firstRowIdToBlobFirstRowIds = { - val map = new mutable.HashMap[Long, List[Long]]() - val files = table - .store() - .newScan() - .withManifestEntryFilter(entry => isBlobFile(entry.file().fileName())) - .plan() - .files() - .asScala - .sortBy(f => f.file().firstRowId()) - - for (file <- files) { - val firstRowId = file.file().firstRowId().asInstanceOf[Long] - val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId) - map.update( - firstIdInNormalFile, - map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+ firstRowId - ) - } - map - } - /** * Self-Merge pattern: * {{{ @@ -171,27 +131,63 @@ case class MergeIntoPaimonDataEvolutionTable( "NOT MATCHED BY SOURCE are not supported." ) - lazy val targetRelation: DataSourceV2Relation = PaimonRelation.getPaimonRelation(targetTable) + private lazy val targetRelation: DataSourceV2Relation = + PaimonRelation.getPaimonRelation(targetTable) lazy val tableSchema: StructType = v2Table.schema override def run(sparkSession: SparkSession): Seq[Row] = { // Avoid that more than one source rows match the same target row. val commitMessages = invokeMergeInto(sparkSession) - writer.commit(commitMessages.toSeq) + writer.commit(commitMessages) Seq.empty[Row] } private def invokeMergeInto(sparkSession: SparkSession): Seq[CommitMessage] = { + val tableSplits: Seq[DataSplit] = table + .newSnapshotReader() + .read() + .splits() + .asScala + .map(_.asInstanceOf[DataSplit]) + .toSeq + + val firstRowIds: immutable.IndexedSeq[Long] = tableSplits + .flatMap(_.dataFiles().asScala) + .filter(file => file.firstRowId() != null && !isBlobFile(file.fileName())) + .map(file => file.firstRowId().asInstanceOf[Long]) + .distinct + .sorted + .toIndexedSeq + + val firstRowIdToBlobFirstRowIds: Map[Long, List[Long]] = { + val map = new mutable.HashMap[Long, List[Long]]() + val files = tableSplits + .flatMap(_.dataFiles().asScala) + .filter(file => isBlobFile(file.fileName())) + .sortBy(f => f.firstRowId()) + + for (file <- files) { + val firstRowId = file.firstRowId().asInstanceOf[Long] + val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId) + map.update( + firstIdInNormalFile, + map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+ firstRowId + ) + } + map.toMap + } + // step 1: find the related data split, make it target file plan - val dataSplits: Seq[DataSplit] = targetRelatedSplits(sparkSession) + val dataSplits: Seq[DataSplit] = + targetRelatedSplits(sparkSession, tableSplits, firstRowIds, firstRowIdToBlobFirstRowIds) val touchedFileTargetRelation = - createNewScanPlan(dataSplits.toSeq, targetRelation) + createNewScanPlan(dataSplits, targetRelation) // step 2: invoke update action val updateCommit = if (matchedActions.nonEmpty) { - val updateResult = updateActionInvoke(sparkSession, touchedFileTargetRelation) + val updateResult = updateActionInvoke(sparkSession, touchedFileTargetRelation, firstRowIds) checkUpdateResult(updateResult) } else Nil @@ -204,17 +200,15 @@ case class MergeIntoPaimonDataEvolutionTable( updateCommit ++ insertCommit } - private def targetRelatedSplits(sparkSession: SparkSession): Seq[DataSplit] = { + private def targetRelatedSplits( + sparkSession: SparkSession, + tableSplits: Seq[DataSplit], + firstRowIds: immutable.IndexedSeq[Long], + firstRowIdToBlobFirstRowIds: Map[Long, List[Long]]): Seq[DataSplit] = { // Self-Merge shortcut: // In Self-Merge mode, every row in the table may be updated, so we scan all splits. if (isSelfMergeOnRowId) { - return table - .newSnapshotReader() - .read() - .splits() - .asScala - .map(_.asInstanceOf[DataSplit]) - .toSeq + return tableSplits } val sourceDss = createDataset(sparkSession, sourceTable) @@ -222,7 +216,12 @@ case class MergeIntoPaimonDataEvolutionTable( val firstRowIdsTouched = extractSourceRowIdMapping match { case Some(sourceRowIdAttr) => // Shortcut: Directly get _FIRST_ROW_IDs from the source table. - findRelatedFirstRowIds(sourceDss, sparkSession, sourceRowIdAttr.name).toSet + findRelatedFirstRowIds( + sourceDss, + sparkSession, + firstRowIds, + firstRowIdToBlobFirstRowIds, + sourceRowIdAttr.name).toSet case None => // Perform the full join to find related _FIRST_ROW_IDs. @@ -230,25 +229,25 @@ case class MergeIntoPaimonDataEvolutionTable( findRelatedFirstRowIds( targetDss.alias("_left").join(sourceDss, toColumn(matchedCondition), "inner"), sparkSession, - "_left." + ROW_ID_NAME).toSet + firstRowIds, + firstRowIdToBlobFirstRowIds, + "_left." + ROW_ID_NAME + ).toSet } - table - .newSnapshotReader() - .withManifestEntryFilter( - entry => - entry.file().firstRowId() != null && firstRowIdsTouched.contains( - entry.file().firstRowId())) - .read() - .splits() - .asScala - .map(_.asInstanceOf[DataSplit]) - .toSeq + tableSplits + .map( + split => + split.filterDataFile( + file => file.firstRowId() != null && firstRowIdsTouched.contains(file.firstRowId()))) + .filter(optional => optional.isPresent) + .map(_.get()) } private def updateActionInvoke( sparkSession: SparkSession, - touchedFileTargetRelation: DataSourceV2Relation): Seq[CommitMessage] = { + touchedFileTargetRelation: DataSourceV2Relation, + firstRowIds: immutable.IndexedSeq[Long]): Seq[CommitMessage] = { val mergeFields = extractFields(matchedCondition) val allFields = mutable.SortedSet.empty[AttributeReference]( (o1, o2) => { @@ -343,7 +342,7 @@ case class MergeIntoPaimonDataEvolutionTable( child = readPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows) + val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) withFirstRowId } else { @@ -381,7 +380,7 @@ case class MergeIntoPaimonDataEvolutionTable( output = output, child = joinPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows) + val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) withFirstRowId .repartition(col(FIRST_ROW_ID_NAME)) @@ -496,7 +495,6 @@ case class MergeIntoPaimonDataEvolutionTable( .newIndexFileHandler() .scan(latestSnapshot.get(), filter) .asScala - .toSeq if (affectedIndexEntries.isEmpty) { updateCommit @@ -533,19 +531,19 @@ case class MergeIntoPaimonDataEvolutionTable( private def findRelatedFirstRowIds( dataset: Dataset[Row], sparkSession: SparkSession, + firstRowIds: immutable.IndexedSeq[Long], + firstRowIdToBlobFirstRowIds: Map[Long, List[Long]], identifier: String): Array[Long] = { import sparkSession.implicits._ - val firstRowIdsFinal = firstRowIds - val firstRowIdToBlobFirstRowIdsFinal = firstRowIdToBlobFirstRowIds - val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIdsFinal, rowId)) + val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIds, rowId)) dataset .select(firstRowIdUdf(col(identifier))) .distinct() .as[Long] .flatMap( f => { - if (firstRowIdToBlobFirstRowIdsFinal.contains(f)) { - firstRowIdToBlobFirstRowIdsFinal(f) + if (firstRowIdToBlobFirstRowIds.contains(f)) { + firstRowIdToBlobFirstRowIds(f) } else { Seq(f) } @@ -572,38 +570,38 @@ case class MergeIntoPaimonDataEvolutionTable( private def attribute(name: String, plan: LogicalPlan) = plan.output.find(attr => resolver(name, attr.name)).get - private def addFirstRowId(sparkSession: SparkSession, plan: LogicalPlan): Dataset[Row] = { + private def addFirstRowId( + sparkSession: SparkSession, + plan: LogicalPlan, + firstRowIds: immutable.IndexedSeq[Long]): Dataset[Row] = { assert(plan.output.exists(_.name.equals(ROW_ID_NAME))) - val firstRowIdsFinal = firstRowIds - val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIdsFinal, rowId)) + val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIds, rowId)) val firstRowIdColumn = firstRowIdUdf(col(ROW_ID_NAME)) createDataset(sparkSession, plan).withColumn(FIRST_ROW_ID_NAME, firstRowIdColumn) } } object MergeIntoPaimonDataEvolutionTable { + final private val ROW_FROM_SOURCE = "__row_from_source" final private val ROW_FROM_TARGET = "__row_from_target" final private val ROW_ID_NAME = "_ROW_ID" final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID"; - final private val redundantColumns = - Seq(PaimonMetadataColumn.ROW_ID.toAttribute) - def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: Long): Long = { + private def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: Long): Long = { if (indexed.isEmpty) { throw new IllegalArgumentException("The input sorted sequence is empty.") } indexed.search(value) match { case Found(foundIndex) => indexed(foundIndex) - case InsertionPoint(insertionIndex) => { + case InsertionPoint(insertionIndex) => if (insertionIndex == 0) { throw new IllegalArgumentException( s"Value $value is less than the first element in the sorted sequence.") } else { indexed(insertionIndex - 1) } - } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 36feb56f824a..e227604759b2 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -18,9 +18,7 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.CoreOptions import org.apache.paimon.Snapshot.CommitKind -import org.apache.paimon.format.FileFormat import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.Row