Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
Expand Down Expand Up @@ -295,6 +296,28 @@ public Optional<List<IndexFile>> indexFiles() {
return hasIndexFile ? Optional.of(indexFiles) : Optional.empty();
}

public Optional<DataSplit> filterDataFile(Predicate<DataFileMeta> filter) {
List<DataFileMeta> filtered = new ArrayList<>();
List<DeletionFile> filteredDeletion = dataDeletionFiles == null ? null : new ArrayList<>();
for (int i = 0; i < dataFiles.size(); i++) {
DataFileMeta file = dataFiles.get(i);
if (filter.test(file)) {
filtered.add(file);
if (filteredDeletion != null) {
filteredDeletion.add(dataDeletionFiles.get(i));
}
}
}
if (filtered.isEmpty()) {
return Optional.empty();
}
DataSplit split = new DataSplit();
split.assign(this);
split.dataFiles = filtered;
split.dataDeletionFiles = filteredDeletion;
return Optional.of(split);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
Expand Down Expand Up @@ -97,45 +96,6 @@ case class MergeIntoPaimonDataEvolutionTable(
columns.toSet
}

private val firstRowIds: immutable.IndexedSeq[Long] = table
.store()
.newScan()
.withManifestEntryFilter(
entry =>
entry.file().firstRowId() != null && (!isBlobFile(
entry
.file()
.fileName())))
.plan()
.files()
.asScala
.map(file => file.file().firstRowId().asInstanceOf[Long])
.distinct
.sorted
.toIndexedSeq

private val firstRowIdToBlobFirstRowIds = {
val map = new mutable.HashMap[Long, List[Long]]()
val files = table
.store()
.newScan()
.withManifestEntryFilter(entry => isBlobFile(entry.file().fileName()))
.plan()
.files()
.asScala
.sortBy(f => f.file().firstRowId())

for (file <- files) {
val firstRowId = file.file().firstRowId().asInstanceOf[Long]
val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId)
map.update(
firstIdInNormalFile,
map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+ firstRowId
)
}
map
}

/**
* Self-Merge pattern:
* {{{
Expand Down Expand Up @@ -171,27 +131,63 @@ case class MergeIntoPaimonDataEvolutionTable(
"NOT MATCHED BY SOURCE are not supported."
)

lazy val targetRelation: DataSourceV2Relation = PaimonRelation.getPaimonRelation(targetTable)
private lazy val targetRelation: DataSourceV2Relation =
PaimonRelation.getPaimonRelation(targetTable)

lazy val tableSchema: StructType = v2Table.schema

override def run(sparkSession: SparkSession): Seq[Row] = {
// Avoid that more than one source rows match the same target row.
val commitMessages = invokeMergeInto(sparkSession)
writer.commit(commitMessages.toSeq)
writer.commit(commitMessages)
Seq.empty[Row]
}

private def invokeMergeInto(sparkSession: SparkSession): Seq[CommitMessage] = {
val tableSplits: Seq[DataSplit] = table
.newSnapshotReader()
.read()
.splits()
.asScala
.map(_.asInstanceOf[DataSplit])
.toSeq

val firstRowIds: immutable.IndexedSeq[Long] = tableSplits
.flatMap(_.dataFiles().asScala)
.filter(file => file.firstRowId() != null && !isBlobFile(file.fileName()))
.map(file => file.firstRowId().asInstanceOf[Long])
.distinct
.sorted
.toIndexedSeq

val firstRowIdToBlobFirstRowIds: Map[Long, List[Long]] = {
val map = new mutable.HashMap[Long, List[Long]]()
val files = tableSplits
.flatMap(_.dataFiles().asScala)
.filter(file => isBlobFile(file.fileName()))
.sortBy(f => f.firstRowId())

for (file <- files) {
val firstRowId = file.firstRowId().asInstanceOf[Long]
val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId)
map.update(
firstIdInNormalFile,
map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+ firstRowId
)
}
map.toMap
}

// step 1: find the related data split, make it target file plan
val dataSplits: Seq[DataSplit] = targetRelatedSplits(sparkSession)
val dataSplits: Seq[DataSplit] =
targetRelatedSplits(sparkSession, tableSplits, firstRowIds, firstRowIdToBlobFirstRowIds)
val touchedFileTargetRelation =
createNewScanPlan(dataSplits.toSeq, targetRelation)
createNewScanPlan(dataSplits, targetRelation)

// step 2: invoke update action
val updateCommit =
if (matchedActions.nonEmpty) {
val updateResult = updateActionInvoke(sparkSession, touchedFileTargetRelation)
val updateResult = updateActionInvoke(sparkSession, touchedFileTargetRelation, firstRowIds)
checkUpdateResult(updateResult)
} else Nil

Expand All @@ -204,51 +200,54 @@ case class MergeIntoPaimonDataEvolutionTable(
updateCommit ++ insertCommit
}

private def targetRelatedSplits(sparkSession: SparkSession): Seq[DataSplit] = {
private def targetRelatedSplits(
sparkSession: SparkSession,
tableSplits: Seq[DataSplit],
firstRowIds: immutable.IndexedSeq[Long],
firstRowIdToBlobFirstRowIds: Map[Long, List[Long]]): Seq[DataSplit] = {
// Self-Merge shortcut:
// In Self-Merge mode, every row in the table may be updated, so we scan all splits.
if (isSelfMergeOnRowId) {
return table
.newSnapshotReader()
.read()
.splits()
.asScala
.map(_.asInstanceOf[DataSplit])
.toSeq
return tableSplits
}

val sourceDss = createDataset(sparkSession, sourceTable)

val firstRowIdsTouched = extractSourceRowIdMapping match {
case Some(sourceRowIdAttr) =>
// Shortcut: Directly get _FIRST_ROW_IDs from the source table.
findRelatedFirstRowIds(sourceDss, sparkSession, sourceRowIdAttr.name).toSet
findRelatedFirstRowIds(
sourceDss,
sparkSession,
firstRowIds,
firstRowIdToBlobFirstRowIds,
sourceRowIdAttr.name).toSet

case None =>
// Perform the full join to find related _FIRST_ROW_IDs.
val targetDss = createDataset(sparkSession, targetRelation)
findRelatedFirstRowIds(
targetDss.alias("_left").join(sourceDss, toColumn(matchedCondition), "inner"),
sparkSession,
"_left." + ROW_ID_NAME).toSet
firstRowIds,
firstRowIdToBlobFirstRowIds,
"_left." + ROW_ID_NAME
).toSet
}

table
.newSnapshotReader()
.withManifestEntryFilter(
entry =>
entry.file().firstRowId() != null && firstRowIdsTouched.contains(
entry.file().firstRowId()))
.read()
.splits()
.asScala
.map(_.asInstanceOf[DataSplit])
.toSeq
tableSplits
.map(
split =>
split.filterDataFile(
file => file.firstRowId() != null && firstRowIdsTouched.contains(file.firstRowId())))
.filter(optional => optional.isPresent)
.map(_.get())
}

private def updateActionInvoke(
sparkSession: SparkSession,
touchedFileTargetRelation: DataSourceV2Relation): Seq[CommitMessage] = {
touchedFileTargetRelation: DataSourceV2Relation,
firstRowIds: immutable.IndexedSeq[Long]): Seq[CommitMessage] = {
val mergeFields = extractFields(matchedCondition)
val allFields = mutable.SortedSet.empty[AttributeReference](
(o1, o2) => {
Expand Down Expand Up @@ -343,7 +342,7 @@ case class MergeIntoPaimonDataEvolutionTable(
child = readPlan
)

val withFirstRowId = addFirstRowId(sparkSession, mergeRows)
val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds)
assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2)
withFirstRowId
} else {
Expand Down Expand Up @@ -381,7 +380,7 @@ case class MergeIntoPaimonDataEvolutionTable(
output = output,
child = joinPlan
)
val withFirstRowId = addFirstRowId(sparkSession, mergeRows)
val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds)
assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2)
withFirstRowId
.repartition(col(FIRST_ROW_ID_NAME))
Expand Down Expand Up @@ -496,7 +495,6 @@ case class MergeIntoPaimonDataEvolutionTable(
.newIndexFileHandler()
.scan(latestSnapshot.get(), filter)
.asScala
.toSeq

if (affectedIndexEntries.isEmpty) {
updateCommit
Expand Down Expand Up @@ -533,19 +531,19 @@ case class MergeIntoPaimonDataEvolutionTable(
private def findRelatedFirstRowIds(
dataset: Dataset[Row],
sparkSession: SparkSession,
firstRowIds: immutable.IndexedSeq[Long],
firstRowIdToBlobFirstRowIds: Map[Long, List[Long]],
identifier: String): Array[Long] = {
import sparkSession.implicits._
val firstRowIdsFinal = firstRowIds
val firstRowIdToBlobFirstRowIdsFinal = firstRowIdToBlobFirstRowIds
val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIdsFinal, rowId))
val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIds, rowId))
dataset
.select(firstRowIdUdf(col(identifier)))
.distinct()
.as[Long]
.flatMap(
f => {
if (firstRowIdToBlobFirstRowIdsFinal.contains(f)) {
firstRowIdToBlobFirstRowIdsFinal(f)
if (firstRowIdToBlobFirstRowIds.contains(f)) {
firstRowIdToBlobFirstRowIds(f)
} else {
Seq(f)
}
Expand All @@ -572,38 +570,38 @@ case class MergeIntoPaimonDataEvolutionTable(
private def attribute(name: String, plan: LogicalPlan) =
plan.output.find(attr => resolver(name, attr.name)).get

private def addFirstRowId(sparkSession: SparkSession, plan: LogicalPlan): Dataset[Row] = {
private def addFirstRowId(
sparkSession: SparkSession,
plan: LogicalPlan,
firstRowIds: immutable.IndexedSeq[Long]): Dataset[Row] = {
assert(plan.output.exists(_.name.equals(ROW_ID_NAME)))
val firstRowIdsFinal = firstRowIds
val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIdsFinal, rowId))
val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIds, rowId))
val firstRowIdColumn = firstRowIdUdf(col(ROW_ID_NAME))
createDataset(sparkSession, plan).withColumn(FIRST_ROW_ID_NAME, firstRowIdColumn)
}
}

object MergeIntoPaimonDataEvolutionTable {

final private val ROW_FROM_SOURCE = "__row_from_source"
final private val ROW_FROM_TARGET = "__row_from_target"
final private val ROW_ID_NAME = "_ROW_ID"
final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID";
final private val redundantColumns =
Seq(PaimonMetadataColumn.ROW_ID.toAttribute)

def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: Long): Long = {
private def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: Long): Long = {
if (indexed.isEmpty) {
throw new IllegalArgumentException("The input sorted sequence is empty.")
}

indexed.search(value) match {
case Found(foundIndex) => indexed(foundIndex)
case InsertionPoint(insertionIndex) => {
case InsertionPoint(insertionIndex) =>
if (insertionIndex == 0) {
throw new IllegalArgumentException(
s"Value $value is less than the first element in the sorted sequence.")
} else {
indexed(insertionIndex - 1)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.CoreOptions
import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.format.FileFormat
import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.Row
Expand Down