diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx index 7f0afc964e7d..dfe78709b2c9 100644 --- a/docs/docs/append-table/blob.mdx +++ b/docs/docs/append-table/blob.mdx @@ -147,6 +147,18 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields, This option must not overlap with blob-descriptor-field. + +
blob-view.resolve.enabled
+ No + true + Boolean + + Controls whether blob-view-field values are resolved to the upstream BLOB + content at read time. Set this dynamic option to false when forwarding blob view + references from one view table to another view table and you want the target table to keep + referencing the original upstream BLOB. + +
blob-external-storage-field
No @@ -373,6 +385,36 @@ FROM `image_table$row_tracking`; Reads from `image_view_table.image_ref` return the referenced BLOB bytes in the same way as normal blob fields. The referenced upstream table and row must remain available for the view to be resolved. +#### Forward Blob View References + +By default, reading a blob view field resolves the `BlobViewStruct` and returns the upstream BLOB +content. If you want to import data from one blob view table into another blob view table without +copying the BLOB bytes, read the source table with `blob-view.resolve.enabled=false` and write the +result into a target field configured by `blob-view-field`. + +With this option disabled, Paimon preserves the serialized `BlobViewStruct` during reads. When the +preserved value is written to another blob view field, the target table stores the same upstream +reference instead of creating a chained view reference. + +For example, if table `T1` contains blob view references to BLOBs in table `T0`, importing `T1` into +`T2` with `blob-view.resolve.enabled=false` makes `T2` keep referencing `T0` directly. + +```sql +CREATE TABLE t2 ( + id INT, + image_ref BYTES +) WITH ( + 'row-tracking.enabled' = 'true', + 'data-evolution.enabled' = 'true', + 'blob-view-field' = 'image_ref' +); + +-- Flink SQL example: the source table is read with blob view resolution disabled. +INSERT INTO t2 +SELECT id, image_ref +FROM t1 /*+ OPTIONS('blob-view.resolve.enabled'='false') */; +``` + ### MERGE INTO Support For Data Evolution writes in Flink and Spark: diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index 481369245597..6c3d2fabded8 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -74,6 +74,12 @@ String Comma-separated field names to treat as BLOB fields and store as serialized BlobViewStruct bytes inline in data files and resolve from upstream tables at read time. + +
blob-view.resolve.enabled
+ true + Boolean + Whether to resolve blob-view-field values from upstream tables at read time. Set to false to preserve BlobViewStruct references when forwarding blob view values to another blob-view table. +
blob-write-null-on-missing-file
false diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 8ef13bef2045..6cf63236afac 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2338,6 +2338,15 @@ public InlineElement getDescription() { + "as serialized BlobViewStruct bytes inline in data files and " + "resolve from upstream tables at read time."); + public static final ConfigOption BLOB_VIEW_RESOLVE_ENABLED = + key("blob-view.resolve.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to resolve blob-view-field values from upstream tables at " + + "read time. Set to false to preserve BlobViewStruct references " + + "when forwarding blob view values to another blob-view table."); + public static final ConfigOption BLOB_AS_DESCRIPTOR = key("blob-as-descriptor") .booleanType() @@ -3011,6 +3020,11 @@ public Set blobViewField() { return parseCommaSeparatedSet(BLOB_VIEW_FIELD); } + /** Whether to resolve blob view references at read time. */ + public boolean blobViewResolveEnabled() { + return options.get(BLOB_VIEW_RESOLVE_ENABLED); + } + /** Resolve blob fields that are stored inline in normal data files. */ public Set blobInlineField() { Set fields = new HashSet<>(blobDescriptorField()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java index 35aabd38184e..bf41ff70b62f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java @@ -80,7 +80,12 @@ public RecordReader createReader(Split split) throws IOException { } private int[] blobViewFields(RowType rowType) { - Set blobViewFieldNames = CoreOptions.fromMap(schema().options()).blobViewField(); + CoreOptions options = CoreOptions.fromMap(schema().options()); + if (!options.blobViewResolveEnabled()) { + return new int[0]; + } + + Set blobViewFieldNames = options.blobViewField(); if (blobViewFieldNames.isEmpty()) { return new int[0]; } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index d8fd0cce9b52..145cf0b04d33 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -887,6 +887,134 @@ public void testBlobViewE2E() throws Exception { }); } + @Test + public void testForwardBlobViewReference() throws Exception { + String upstreamTableName = "UpstreamBlobForward"; + Schema.Builder upstreamSchema = Schema.newBuilder(); + upstreamSchema.column("id", DataTypes.INT()); + upstreamSchema.column("image", DataTypes.BLOB()); + upstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + upstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + upstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + catalog.createTable(identifier(upstreamTableName), upstreamSchema.build(), true); + + FileStoreTable upstreamTable = getTable(identifier(upstreamTableName)); + byte[] imageBytes1 = randomBytes(); + byte[] imageBytes2 = randomBytes(); + writeRows( + upstreamTable, + Arrays.asList( + GenericRow.of(1, new BlobData(imageBytes1)), + GenericRow.of(2, new BlobData(imageBytes2)))); + + int imageFieldId = upstreamTable.rowType().getField("image").id(); + RowTrackingTable upstreamRowTracking = new RowTrackingTable(upstreamTable); + ReadBuilder rowIdReader = + upstreamRowTracking.newReadBuilder().withProjection(new int[] {0, 2}); + Map idToRowId = new HashMap<>(); + rowIdReader + .newRead() + .createReader(rowIdReader.newScan().plan()) + .forEachRemaining(row -> idToRowId.put(row.getInt(0), row.getLong(1))); + assertThat(idToRowId.size()).isEqualTo(2); + + String firstViewTableName = "FirstBlobView"; + Schema.Builder firstViewSchema = Schema.newBuilder(); + firstViewSchema.column("id", DataTypes.INT()); + firstViewSchema.column("image", DataTypes.BLOB()); + firstViewSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + firstViewSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + firstViewSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + firstViewSchema.option(CoreOptions.BLOB_FIELD.key(), "image"); + firstViewSchema.option(CoreOptions.BLOB_VIEW_FIELD.key(), "image"); + catalog.createTable(identifier(firstViewTableName), firstViewSchema.build(), true); + + String upstreamFullName = database + "." + upstreamTableName; + BlobViewStruct viewStruct1 = + new BlobViewStruct( + Identifier.fromString(upstreamFullName), imageFieldId, idToRowId.get(1)); + BlobViewStruct viewStruct2 = + new BlobViewStruct( + Identifier.fromString(upstreamFullName), imageFieldId, idToRowId.get(2)); + FileStoreTable firstViewTable = getTable(identifier(firstViewTableName)); + writeRows( + firstViewTable, + Arrays.asList( + GenericRow.of(1, Blob.fromView(viewStruct1)), + GenericRow.of(2, Blob.fromView(viewStruct2)))); + + String secondViewTableName = "SecondBlobView"; + Schema.Builder secondViewSchema = Schema.newBuilder(); + secondViewSchema.column("id", DataTypes.INT()); + secondViewSchema.column("image", DataTypes.BLOB()); + secondViewSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + secondViewSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + secondViewSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + secondViewSchema.option(CoreOptions.BLOB_FIELD.key(), "image"); + secondViewSchema.option(CoreOptions.BLOB_VIEW_FIELD.key(), "image"); + catalog.createTable(identifier(secondViewTableName), secondViewSchema.build(), true); + + Map preserveBlobViewOptions = new HashMap<>(); + preserveBlobViewOptions.put(CoreOptions.BLOB_VIEW_RESOLVE_ENABLED.key(), "false"); + FileStoreTable firstViewWithoutResolve = firstViewTable.copy(preserveBlobViewOptions); + ReadBuilder preserveReadBuilder = firstViewWithoutResolve.newReadBuilder(); + RecordReader preserveReader = + preserveReadBuilder.newRead().createReader(preserveReadBuilder.newScan().plan()); + List rowsToForward = new ArrayList<>(); + InternalRowSerializer firstViewSerializer = + new InternalRowSerializer(firstViewWithoutResolve.rowType()); + preserveReader.forEachRemaining(row -> rowsToForward.add(firstViewSerializer.copy(row))); + preserveReader.close(); + + rowsToForward.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0))); + Blob preservedBlob1 = rowsToForward.get(0).getBlob(1); + Blob preservedBlob2 = rowsToForward.get(1).getBlob(1); + assertThat(preservedBlob1).isInstanceOf(BlobView.class); + assertThat(preservedBlob2).isInstanceOf(BlobView.class); + assertThat(((BlobView) preservedBlob1).isResolved()).isFalse(); + assertThat(((BlobView) preservedBlob2).isResolved()).isFalse(); + assertThat(((BlobView) preservedBlob1).viewStruct()).isEqualTo(viewStruct1); + assertThat(((BlobView) preservedBlob2).viewStruct()).isEqualTo(viewStruct2); + + FileStoreTable secondViewTable = getTable(identifier(secondViewTableName)); + writeRows(secondViewTable, rowsToForward); + + FileStoreTable secondViewWithoutResolve = secondViewTable.copy(preserveBlobViewOptions); + ReadBuilder verifyReferenceBuilder = secondViewWithoutResolve.newReadBuilder(); + RecordReader verifyReferenceReader = + verifyReferenceBuilder + .newRead() + .createReader(verifyReferenceBuilder.newScan().plan()); + List secondViewRawRows = new ArrayList<>(); + InternalRowSerializer secondViewSerializer = + new InternalRowSerializer(secondViewWithoutResolve.rowType()); + verifyReferenceReader.forEachRemaining( + row -> secondViewRawRows.add(secondViewSerializer.copy(row))); + verifyReferenceReader.close(); + + secondViewRawRows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0))); + assertThat(((BlobView) secondViewRawRows.get(0).getBlob(1)).viewStruct()) + .isEqualTo(viewStruct1); + assertThat(((BlobView) secondViewRawRows.get(1).getBlob(1)).viewStruct()) + .isEqualTo(viewStruct2); + + Map idToBlob = new HashMap<>(); + idToBlob.put(1, imageBytes1); + idToBlob.put(2, imageBytes2); + ReadBuilder secondViewReadBuilder = secondViewTable.newReadBuilder(); + secondViewReadBuilder + .newRead() + .createReader(secondViewReadBuilder.newScan().plan()) + .forEachRemaining( + row -> { + int id = row.getInt(0); + Blob blob = row.getBlob(1); + assertThat(blob).isInstanceOf(BlobView.class); + assertThat(((BlobView) blob).isResolved()).isTrue(); + assertThat(blob.toData()).isEqualTo(idToBlob.get(id)); + }); + } + @Test public void testBlobProjectionExcludesBlobColumn() throws Exception { createTableDefault(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java index dc38588a7ce4..bbb84fd77cfd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink; import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobView; import org.apache.paimon.data.InternalRow; import java.util.Set; @@ -40,6 +41,9 @@ public FlinkRowDataWithBlob( public byte[] getBinary(int pos) { if (blobFields.contains(pos)) { Blob blob = row.getBlob(pos); + if (blob instanceof BlobView && !((BlobView) blob).isResolved()) { + return Blob.serializeBlob(blob); + } return blobAsDescriptor ? blob.toDescriptor().serialize() : blob.toData(); } else { return row.getBinary(pos); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java index b2238ce04cc8..e89a24132f4b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.BlobRef; +import org.apache.paimon.data.BlobViewStruct; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; @@ -324,6 +325,68 @@ public void testWriteBlobViewWithBuiltInFunction() throws Exception { assertThat((byte[]) result.get(1).getField(2)).isEqualTo(new byte[] {89, 69}); } + @Test + public void testForwardBlobViewReferenceWithDynamicOption() throws Exception { + tEnv.executeSql( + "CREATE TABLE upstream_blob_view_forward (id INT, name STRING, picture BYTES)" + + " WITH ('row-tracking.enabled'='true'," + + " 'data-evolution.enabled'='true'," + + " 'blob-field'='picture')"); + batchSql("INSERT INTO upstream_blob_view_forward VALUES (1, 'row1', X'48656C6C6F')"); + batchSql("INSERT INTO upstream_blob_view_forward VALUES (2, 'row2', X'5945')"); + + String upstreamFullTableName = tEnv.getCurrentDatabase() + ".upstream_blob_view_forward"; + tEnv.executeSql( + "CREATE TABLE first_downstream_blob_view (id INT, label STRING, image_ref BYTES)" + + " WITH ('row-tracking.enabled'='true'," + + " 'data-evolution.enabled'='true'," + + " 'blob-view-field'='image_ref')"); + batchSql( + String.format( + "INSERT INTO first_downstream_blob_view" + + " SELECT id, name, sys.blob_view('%s', 'picture', _ROW_ID)" + + " FROM `upstream_blob_view_forward$row_tracking`", + upstreamFullTableName)); + + tEnv.executeSql( + "CREATE TABLE second_downstream_blob_view (id INT, label STRING, image_ref BYTES)" + + " WITH ('row-tracking.enabled'='true'," + + " 'data-evolution.enabled'='true'," + + " 'blob-view-field'='image_ref')"); + batchSql( + "INSERT INTO second_downstream_blob_view" + + " SELECT id, label, image_ref" + + " FROM first_downstream_blob_view" + + " /*+ OPTIONS('blob-view.resolve.enabled'='false') */"); + + assertThat(batchSql("SELECT * FROM second_downstream_blob_view ORDER BY id")) + .containsExactly( + Row.of(1, "row1", new byte[] {72, 101, 108, 108, 111}), + Row.of(2, "row2", new byte[] {89, 69})); + + List originalReferences = + batchSql( + "SELECT image_ref" + + " FROM first_downstream_blob_view" + + " /*+ OPTIONS('blob-view.resolve.enabled'='false') */" + + " ORDER BY id"); + List forwardedReferences = + batchSql( + "SELECT image_ref" + + " FROM second_downstream_blob_view" + + " /*+ OPTIONS('blob-view.resolve.enabled'='false') */" + + " ORDER BY id"); + + assertThat(forwardedReferences).hasSize(originalReferences.size()); + for (int i = 0; i < forwardedReferences.size(); i++) { + byte[] originalReference = (byte[]) originalReferences.get(i).getField(0); + byte[] forwardedReference = (byte[]) forwardedReferences.get(i).getField(0); + assertThat(forwardedReference).isEqualTo(originalReference); + assertThat(BlobViewStruct.deserialize(forwardedReference).identifier().getFullName()) + .isEqualTo(upstreamFullTableName); + } + } + @Test public void testBlobViewRejectsUnqualifiedTableName() { assertThatThrownBy( diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 46158ed4dc8d..9c912e027343 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.CoreOptions import org.apache.paimon.catalog.CatalogContext -import org.apache.paimon.data.{Blob, BlobDescriptor} +import org.apache.paimon.data.{Blob, BlobDescriptor, BlobViewStruct} import org.apache.paimon.fs.{IsolatedDirectoryFileIO, Path} import org.apache.paimon.fs.local.LocalFileIO import org.apache.paimon.options.Options @@ -237,6 +237,81 @@ class BlobTestBase extends PaimonSparkTestBase { } } + test("Blob: forward blob view reference with dynamic option") { + withTable( + "upstream_blob_view_forward", + "first_downstream_blob_view", + "second_downstream_blob_view") { + sql( + "CREATE TABLE upstream_blob_view_forward (id INT, name STRING, picture BINARY) " + + "TBLPROPERTIES (" + + "'row-tracking.enabled'='true', " + + "'data-evolution.enabled'='true', " + + "'blob-field'='picture')") + sql( + "INSERT INTO upstream_blob_view_forward VALUES " + + "(1, 'row1', X'48656C6C6F'), " + + "(2, 'row2', X'5945')") + + val upstreamFullName = s"$dbName0.upstream_blob_view_forward" + sql( + "CREATE TABLE first_downstream_blob_view (id INT, label STRING, image_ref BINARY) " + + "TBLPROPERTIES (" + + "'row-tracking.enabled'='true', " + + "'data-evolution.enabled'='true', " + + "'blob-field'='image_ref', " + + "'blob-view-field'='image_ref')") + sql( + s"INSERT INTO first_downstream_blob_view " + + s"SELECT id, name, sys.blob_view('$upstreamFullName', 'picture', _ROW_ID) " + + s"FROM `upstream_blob_view_forward$$row_tracking`") + + sql( + "CREATE TABLE second_downstream_blob_view (id INT, label STRING, image_ref BINARY) " + + "TBLPROPERTIES (" + + "'row-tracking.enabled'='true', " + + "'data-evolution.enabled'='true', " + + "'blob-field'='image_ref', " + + "'blob-view-field'='image_ref')") + + val preserveFirstReferenceOption = + s"spark.paimon.paimon.$dbName0.first_downstream_blob_view." + + CoreOptions.BLOB_VIEW_RESOLVE_ENABLED.key() + val preserveSecondReferenceOption = + s"spark.paimon.paimon.$dbName0.second_downstream_blob_view." + + CoreOptions.BLOB_VIEW_RESOLVE_ENABLED.key() + withSparkSQLConf(preserveFirstReferenceOption -> "false") { + sql( + "INSERT INTO second_downstream_blob_view " + + "SELECT id, label, image_ref FROM first_downstream_blob_view") + + checkAnswer( + sql("SELECT * FROM second_downstream_blob_view ORDER BY id"), + Seq( + Row(1, "row1", Array[Byte](72, 101, 108, 108, 111)), + Row(2, "row2", Array[Byte](89, 69))) + ) + + withSparkSQLConf(preserveSecondReferenceOption -> "false") { + val originalReferences = + sql("SELECT image_ref FROM first_downstream_blob_view ORDER BY id").collect() + val forwardedReferences = + sql("SELECT image_ref FROM second_downstream_blob_view ORDER BY id").collect() + + assert(forwardedReferences.length == originalReferences.length) + forwardedReferences.zip(originalReferences).foreach { + case (forwarded, original) => + val originalReference = original.getAs[Array[Byte]](0) + val forwardedReference = forwarded.getAs[Array[Byte]](0) + assert(util.Arrays.equals(originalReference, forwardedReference)) + assert(BlobViewStruct.deserialize(forwardedReference).identifier().getFullName == + upstreamFullName) + } + } + } + } + } + test("Blob: test write blob descriptor from external storage") { val catalogName = "isolated_paimon" val databaseName = "external_blob_db" diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala index 6c1dbd9d2120..118d71b10730 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.data +import org.apache.paimon.data.{Blob, BlobView} import org.apache.paimon.types.RowType import org.apache.paimon.utils.InternalRowUtils.copyInternalRow @@ -28,10 +29,14 @@ class Spark3InternalRowWithBlob(rowType: RowType, blobFields: Set[Int], blobAsDe override def getBinary(ordinal: Int): Array[Byte] = { if (blobFields.contains(ordinal)) { - if (blobAsDescriptor) { - row.getBlob(ordinal).toDescriptor.serialize() - } else { - row.getBlob(ordinal).toData + val blob = row.getBlob(ordinal) + blob match { + case blobView: BlobView if !blobView.isResolved => + Blob.serializeBlob(blobView) + case _ if blobAsDescriptor => + blob.toDescriptor.serialize() + case _ => + blob.toData } } else { super.getBinary(ordinal) diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala index 2a120e5b4c2a..31d29baf5dbb 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.data +import org.apache.paimon.data.{Blob, BlobView} import org.apache.paimon.spark.AbstractSparkInternalRow import org.apache.paimon.types.RowType import org.apache.paimon.utils.InternalRowUtils.copyInternalRow @@ -30,10 +31,14 @@ class Spark4InternalRowWithBlob(rowType: RowType, blobFields: Set[Int], blobAsDe override def getBinary(ordinal: Int): Array[Byte] = { if (blobFields.contains(ordinal)) { - if (blobAsDescriptor) { - row.getBlob(ordinal).toDescriptor.serialize() - } else { - row.getBlob(ordinal).toData + val blob = row.getBlob(ordinal) + blob match { + case blobView: BlobView if !blobView.isResolved => + Blob.serializeBlob(blobView) + case _ if blobAsDescriptor => + blob.toDescriptor.serialize() + case _ => + blob.toData } } else { super.getBinary(ordinal)