Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.dataevolution;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.stream.Collectors;

import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.types.VectorType.isVectorStoreFile;

/**
* The Flink Batch Operator to process sorted new rows for data-evolution partial write. It assumes
Expand Down Expand Up @@ -90,7 +92,8 @@ public class DataEvolutionPartialWriteOperator
private transient Writer writer;

public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType dataType) {
this.table = table;
this.table =
table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G"));
List<String> fieldNames =
dataType.getFieldNames().stream()
.filter(name -> !SpecialFields.ROW_ID.name().equals(name))
Expand All @@ -116,7 +119,8 @@ public void open() throws Exception {
.withManifestEntryFilter(
entry ->
entry.file().firstRowId() != null
&& !isBlobFile(entry.file().fileName()))
&& !isBlobFile(entry.file().fileName())
&& !isVectorStoreFile(entry.file().fileName()))
.plan()
.files();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,61 @@ public void testUpdateRawBlobColumnThrowsError() throws Exception {
"Expected error about raw-data BLOB column but got: " + t.getMessage());
}

@Test
public void testUpdateNonBlobColumnOnRawBlobTableWithSplitFiles() throws Exception {
sEnv.executeSql(
buildDdl(
"RAW_BLOB_SPLIT_T",
Arrays.asList("id INT", "name STRING", "picture BYTES"),
Collections.emptyList(),
Collections.emptyList(),
new HashMap<String, String>() {
{
put(ROW_TRACKING_ENABLED.key(), "true");
put(DATA_EVOLUTION_ENABLED.key(), "true");
put("blob-field", "picture");
put(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "1 b");
}
}));
insertInto(
"RAW_BLOB_SPLIT_T",
"(1, 'name1', X'48656C6C6F')",
"(2, 'name2', X'5945')",
"(3, 'name3', X'414243')");
testBatchRead(
"SELECT COUNT(*) FROM `RAW_BLOB_SPLIT_T$files` "
+ "WHERE file_path NOT LIKE '%.blob'",
Collections.singletonList(changelogRow("+I", 1L)));
testBatchRead(
"SELECT COUNT(*) > 1 FROM `RAW_BLOB_SPLIT_T$files` "
+ "WHERE file_path LIKE '%.blob'",
Collections.singletonList(changelogRow("+I", true)));

sEnv.executeSql(
buildDdl(
"RAW_BLOB_SPLIT_S",
Arrays.asList("id INT", "name STRING"),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap()));
insertInto("RAW_BLOB_SPLIT_S", "(1, 'updated_name1')");

builder(warehouse, database, "RAW_BLOB_SPLIT_T")
.withMergeCondition("RAW_BLOB_SPLIT_T.id=RAW_BLOB_SPLIT_S.id")
.withMatchedUpdateSet("RAW_BLOB_SPLIT_T.name=RAW_BLOB_SPLIT_S.name")
.withSourceTable("RAW_BLOB_SPLIT_S")
.withSinkParallelism(1)
.build()
.run();

List<Row> expected =
Arrays.asList(
changelogRow("+I", 1, "updated_name1"),
changelogRow("+I", 2, "name2"),
changelogRow("+I", 3, "name3"));
testBatchRead("SELECT id, name FROM RAW_BLOB_SPLIT_T ORDER BY id", expected);
}

@Test
public void testUpdateNonBlobColumnOnDescriptorBlobTableSucceeds() throws Exception {
// Create a table with descriptor BLOB column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile
import org.apache.paimon.spark.write.{DataEvolutionTableDataWrite, WriteHelper, WriteTaskResult}
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink._
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.DataType
import org.apache.paimon.types.DataTypeRoot.BLOB
import org.apache.paimon.types.VectorType.isVectorStoreFile

import org.apache.spark.sql._

Expand All @@ -44,6 +46,7 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se
split
.dataFiles()
.asScala
.filter(file => !isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName()))
.foreach(
file =>
firstRowIdToPartitionMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.VectorType.isVectorStoreFile

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils._
Expand Down Expand Up @@ -150,7 +151,12 @@ case class MergeIntoPaimonDataEvolutionTable(

val firstRowIds: immutable.IndexedSeq[Long] = tableSplits
.flatMap(_.dataFiles().asScala)
.filter(file => file.firstRowId() != null && !isBlobFile(file.fileName()))
.filter {
file =>
file.firstRowId() != null &&
!isBlobFile(file.fileName()) &&
!isVectorStoreFile(file.fileName())
}
.map(file => file.firstRowId().asInstanceOf[Long])
.distinct
.sorted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,61 @@ class BlobTestBase extends PaimonSparkTestBase {
}
}

test("Blob: merge-into updates non-blob column on raw blob table with split blob files") {
withTable("s", "t") {
sql(
"CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " +
"('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " +
"'blob-field'='picture', 'blob.target-file-size'='1 b')")
sql(
"INSERT INTO t VALUES " +
"(1, 'name1', X'48656C6C6F'), " +
"(2, 'name2', X'5945'), " +
"(3, 'name3', X'414243')")

sql("CREATE TABLE s (id INT, name STRING)")
sql("INSERT INTO s VALUES (1, 'updated_name1')")

sql("""
|MERGE INTO t
|USING s
|ON t.id = s.id
|WHEN MATCHED THEN UPDATE SET t.name = s.name
|""".stripMargin)

checkAnswer(
sql("SELECT id, name FROM t ORDER BY id"),
Seq(Row(1, "updated_name1"), Row(2, "name2"), Row(3, "name3"))
)
}
}

test("Blob: self merge reads raw blob column to update non-blob column") {
withTable("t") {
sql(
"CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " +
"('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " +
"'blob-field'='picture', 'blob.target-file-size'='1 b')")
sql(
"INSERT INTO t VALUES " +
"(1, 'name1', X'48656C6C6F'), " +
"(2, 'name2', X'5945'), " +
"(3, 'name3', X'414243')")

sql("""
|MERGE INTO t
|USING t AS source
|ON t._ROW_ID = source._ROW_ID
|WHEN MATCHED THEN UPDATE SET t.name = CAST(length(source.picture) AS STRING)
|""".stripMargin)

checkAnswer(
sql("SELECT id, name FROM t ORDER BY id"),
Seq(Row(1, "5"), Row(2, "2"), Row(3, "3"))
)
}
}

test("Blob: merge-into updates non-blob column on descriptor blob table") {
withTable("s", "t") {
sql(
Expand Down