Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/docs/append-table/blob.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields,
This option must not overlap with <code>blob-descriptor-field</code>.
</td>
</tr>
<tr>
<td><h5>blob-view.resolve.enabled</h5></td>
<td>No</td>
<td style={{wordWrap: "break-word"}}>true</td>
<td>Boolean</td>
<td>
Controls whether <code>blob-view-field</code> values are resolved to the upstream BLOB
content at read time. Set this dynamic option to <code>false</code> when forwarding blob view
references from one view table to another view table and you want the target table to keep
referencing the original upstream BLOB.
</td>
</tr>
<tr>
<td><h5>blob-external-storage-field</h5></td>
<td>No</td>
Expand Down Expand Up @@ -373,6 +385,36 @@ FROM `image_table$row_tracking`;

Reads from `image_view_table.image_ref` return the referenced BLOB bytes in the same way as normal blob fields. The referenced upstream table and row must remain available for the view to be resolved.

#### Forward Blob View References

By default, reading a blob view field resolves the `BlobViewStruct` and returns the upstream BLOB
content. If you want to import data from one blob view table into another blob view table without
copying the BLOB bytes, read the source table with `blob-view.resolve.enabled=false` and write the
result into a target field configured by `blob-view-field`.

With this option disabled, Paimon preserves the serialized `BlobViewStruct` during reads. When the
preserved value is written to another blob view field, the target table stores the same upstream
reference instead of creating a chained view reference.

For example, if table `T1` contains blob view references to BLOBs in table `T0`, importing `T1` into
`T2` with `blob-view.resolve.enabled=false` makes `T2` keep referencing `T0` directly.

```sql
CREATE TABLE t2 (
id INT,
image_ref BYTES
) WITH (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-view-field' = 'image_ref'
);

-- Flink SQL example: the source table is read with blob view resolution disabled.
INSERT INTO t2
SELECT id, image_ref
FROM t1 /*+ OPTIONS('blob-view.resolve.enabled'='false') */;
```

### MERGE INTO Support

For Data Evolution writes in Flink and Spark:
Expand Down
6 changes: 6 additions & 0 deletions docs/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<td>String</td>
<td>Comma-separated field names to treat as BLOB fields and store as serialized BlobViewStruct bytes inline in data files and resolve from upstream tables at read time.</td>
</tr>
<tr>
<td><h5>blob-view.resolve.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to resolve blob-view-field values from upstream tables at read time. Set to false to preserve BlobViewStruct references when forwarding blob view values to another blob-view table.</td>
</tr>
<tr>
<td><h5>blob-write-null-on-missing-file</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
14 changes: 14 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2338,6 +2338,15 @@ public InlineElement getDescription() {
+ "as serialized BlobViewStruct bytes inline in data files and "
+ "resolve from upstream tables at read time.");

public static final ConfigOption<Boolean> BLOB_VIEW_RESOLVE_ENABLED =
key("blob-view.resolve.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to resolve blob-view-field values from upstream tables at "
+ "read time. Set to false to preserve BlobViewStruct references "
+ "when forwarding blob view values to another blob-view table.");

public static final ConfigOption<Boolean> BLOB_AS_DESCRIPTOR =
key("blob-as-descriptor")
.booleanType()
Expand Down Expand Up @@ -3011,6 +3020,11 @@ public Set<String> blobViewField() {
return parseCommaSeparatedSet(BLOB_VIEW_FIELD);
}

/** Whether to resolve blob view references at read time. */
public boolean blobViewResolveEnabled() {
return options.get(BLOB_VIEW_RESOLVE_ENABLED);
}

/** Resolve blob fields that are stored inline in normal data files. */
public Set<String> blobInlineField() {
Set<String> fields = new HashSet<>(blobDescriptorField());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
}

private int[] blobViewFields(RowType rowType) {
Set<String> blobViewFieldNames = CoreOptions.fromMap(schema().options()).blobViewField();
CoreOptions options = CoreOptions.fromMap(schema().options());
if (!options.blobViewResolveEnabled()) {
return new int[0];
}

Set<String> blobViewFieldNames = options.blobViewField();
if (blobViewFieldNames.isEmpty()) {
return new int[0];
}
Expand Down
128 changes: 128 additions & 0 deletions paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,134 @@ public void testBlobViewE2E() throws Exception {
});
}

@Test
public void testForwardBlobViewReference() throws Exception {
String upstreamTableName = "UpstreamBlobForward";
Schema.Builder upstreamSchema = Schema.newBuilder();
upstreamSchema.column("id", DataTypes.INT());
upstreamSchema.column("image", DataTypes.BLOB());
upstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
upstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
upstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
catalog.createTable(identifier(upstreamTableName), upstreamSchema.build(), true);

FileStoreTable upstreamTable = getTable(identifier(upstreamTableName));
byte[] imageBytes1 = randomBytes();
byte[] imageBytes2 = randomBytes();
writeRows(
upstreamTable,
Arrays.asList(
GenericRow.of(1, new BlobData(imageBytes1)),
GenericRow.of(2, new BlobData(imageBytes2))));

int imageFieldId = upstreamTable.rowType().getField("image").id();
RowTrackingTable upstreamRowTracking = new RowTrackingTable(upstreamTable);
ReadBuilder rowIdReader =
upstreamRowTracking.newReadBuilder().withProjection(new int[] {0, 2});
Map<Integer, Long> idToRowId = new HashMap<>();
rowIdReader
.newRead()
.createReader(rowIdReader.newScan().plan())
.forEachRemaining(row -> idToRowId.put(row.getInt(0), row.getLong(1)));
assertThat(idToRowId.size()).isEqualTo(2);

String firstViewTableName = "FirstBlobView";
Schema.Builder firstViewSchema = Schema.newBuilder();
firstViewSchema.column("id", DataTypes.INT());
firstViewSchema.column("image", DataTypes.BLOB());
firstViewSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
firstViewSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
firstViewSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
firstViewSchema.option(CoreOptions.BLOB_FIELD.key(), "image");
firstViewSchema.option(CoreOptions.BLOB_VIEW_FIELD.key(), "image");
catalog.createTable(identifier(firstViewTableName), firstViewSchema.build(), true);

String upstreamFullName = database + "." + upstreamTableName;
BlobViewStruct viewStruct1 =
new BlobViewStruct(
Identifier.fromString(upstreamFullName), imageFieldId, idToRowId.get(1));
BlobViewStruct viewStruct2 =
new BlobViewStruct(
Identifier.fromString(upstreamFullName), imageFieldId, idToRowId.get(2));
FileStoreTable firstViewTable = getTable(identifier(firstViewTableName));
writeRows(
firstViewTable,
Arrays.asList(
GenericRow.of(1, Blob.fromView(viewStruct1)),
GenericRow.of(2, Blob.fromView(viewStruct2))));

String secondViewTableName = "SecondBlobView";
Schema.Builder secondViewSchema = Schema.newBuilder();
secondViewSchema.column("id", DataTypes.INT());
secondViewSchema.column("image", DataTypes.BLOB());
secondViewSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
secondViewSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
secondViewSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
secondViewSchema.option(CoreOptions.BLOB_FIELD.key(), "image");
secondViewSchema.option(CoreOptions.BLOB_VIEW_FIELD.key(), "image");
catalog.createTable(identifier(secondViewTableName), secondViewSchema.build(), true);

Map<String, String> preserveBlobViewOptions = new HashMap<>();
preserveBlobViewOptions.put(CoreOptions.BLOB_VIEW_RESOLVE_ENABLED.key(), "false");
FileStoreTable firstViewWithoutResolve = firstViewTable.copy(preserveBlobViewOptions);
ReadBuilder preserveReadBuilder = firstViewWithoutResolve.newReadBuilder();
RecordReader<InternalRow> preserveReader =
preserveReadBuilder.newRead().createReader(preserveReadBuilder.newScan().plan());
List<InternalRow> rowsToForward = new ArrayList<>();
InternalRowSerializer firstViewSerializer =
new InternalRowSerializer(firstViewWithoutResolve.rowType());
preserveReader.forEachRemaining(row -> rowsToForward.add(firstViewSerializer.copy(row)));
preserveReader.close();

rowsToForward.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
Blob preservedBlob1 = rowsToForward.get(0).getBlob(1);
Blob preservedBlob2 = rowsToForward.get(1).getBlob(1);
assertThat(preservedBlob1).isInstanceOf(BlobView.class);
assertThat(preservedBlob2).isInstanceOf(BlobView.class);
assertThat(((BlobView) preservedBlob1).isResolved()).isFalse();
assertThat(((BlobView) preservedBlob2).isResolved()).isFalse();
assertThat(((BlobView) preservedBlob1).viewStruct()).isEqualTo(viewStruct1);
assertThat(((BlobView) preservedBlob2).viewStruct()).isEqualTo(viewStruct2);

FileStoreTable secondViewTable = getTable(identifier(secondViewTableName));
writeRows(secondViewTable, rowsToForward);

FileStoreTable secondViewWithoutResolve = secondViewTable.copy(preserveBlobViewOptions);
ReadBuilder verifyReferenceBuilder = secondViewWithoutResolve.newReadBuilder();
RecordReader<InternalRow> verifyReferenceReader =
verifyReferenceBuilder
.newRead()
.createReader(verifyReferenceBuilder.newScan().plan());
List<InternalRow> secondViewRawRows = new ArrayList<>();
InternalRowSerializer secondViewSerializer =
new InternalRowSerializer(secondViewWithoutResolve.rowType());
verifyReferenceReader.forEachRemaining(
row -> secondViewRawRows.add(secondViewSerializer.copy(row)));
verifyReferenceReader.close();

secondViewRawRows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
assertThat(((BlobView) secondViewRawRows.get(0).getBlob(1)).viewStruct())
.isEqualTo(viewStruct1);
assertThat(((BlobView) secondViewRawRows.get(1).getBlob(1)).viewStruct())
.isEqualTo(viewStruct2);

Map<Integer, byte[]> idToBlob = new HashMap<>();
idToBlob.put(1, imageBytes1);
idToBlob.put(2, imageBytes2);
ReadBuilder secondViewReadBuilder = secondViewTable.newReadBuilder();
secondViewReadBuilder
.newRead()
.createReader(secondViewReadBuilder.newScan().plan())
.forEachRemaining(
row -> {
int id = row.getInt(0);
Blob blob = row.getBlob(1);
assertThat(blob).isInstanceOf(BlobView.class);
assertThat(((BlobView) blob).isResolved()).isTrue();
assertThat(blob.toData()).isEqualTo(idToBlob.get(id));
});
}

@Test
public void testBlobProjectionExcludesBlobColumn() throws Exception {
createTableDefault();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink;

import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobView;
import org.apache.paimon.data.InternalRow;

import java.util.Set;
Expand All @@ -40,6 +41,9 @@ public FlinkRowDataWithBlob(
public byte[] getBinary(int pos) {
if (blobFields.contains(pos)) {
Blob blob = row.getBlob(pos);
if (blob instanceof BlobView && !((BlobView) blob).isResolved()) {
return Blob.serializeBlob(blob);
}
return blobAsDescriptor ? blob.toDescriptor().serialize() : blob.toData();
} else {
return row.getBinary(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.BlobRef;
import org.apache.paimon.data.BlobViewStruct;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -324,6 +325,68 @@ public void testWriteBlobViewWithBuiltInFunction() throws Exception {
assertThat((byte[]) result.get(1).getField(2)).isEqualTo(new byte[] {89, 69});
}

@Test
public void testForwardBlobViewReferenceWithDynamicOption() throws Exception {
tEnv.executeSql(
"CREATE TABLE upstream_blob_view_forward (id INT, name STRING, picture BYTES)"
+ " WITH ('row-tracking.enabled'='true',"
+ " 'data-evolution.enabled'='true',"
+ " 'blob-field'='picture')");
batchSql("INSERT INTO upstream_blob_view_forward VALUES (1, 'row1', X'48656C6C6F')");
batchSql("INSERT INTO upstream_blob_view_forward VALUES (2, 'row2', X'5945')");

String upstreamFullTableName = tEnv.getCurrentDatabase() + ".upstream_blob_view_forward";
tEnv.executeSql(
"CREATE TABLE first_downstream_blob_view (id INT, label STRING, image_ref BYTES)"
+ " WITH ('row-tracking.enabled'='true',"
+ " 'data-evolution.enabled'='true',"
+ " 'blob-view-field'='image_ref')");
batchSql(
String.format(
"INSERT INTO first_downstream_blob_view"
+ " SELECT id, name, sys.blob_view('%s', 'picture', _ROW_ID)"
+ " FROM `upstream_blob_view_forward$row_tracking`",
upstreamFullTableName));

tEnv.executeSql(
"CREATE TABLE second_downstream_blob_view (id INT, label STRING, image_ref BYTES)"
+ " WITH ('row-tracking.enabled'='true',"
+ " 'data-evolution.enabled'='true',"
+ " 'blob-view-field'='image_ref')");
batchSql(
"INSERT INTO second_downstream_blob_view"
+ " SELECT id, label, image_ref"
+ " FROM first_downstream_blob_view"
+ " /*+ OPTIONS('blob-view.resolve.enabled'='false') */");

assertThat(batchSql("SELECT * FROM second_downstream_blob_view ORDER BY id"))
.containsExactly(
Row.of(1, "row1", new byte[] {72, 101, 108, 108, 111}),
Row.of(2, "row2", new byte[] {89, 69}));

List<Row> originalReferences =
batchSql(
"SELECT image_ref"
+ " FROM first_downstream_blob_view"
+ " /*+ OPTIONS('blob-view.resolve.enabled'='false') */"
+ " ORDER BY id");
List<Row> forwardedReferences =
batchSql(
"SELECT image_ref"
+ " FROM second_downstream_blob_view"
+ " /*+ OPTIONS('blob-view.resolve.enabled'='false') */"
+ " ORDER BY id");

assertThat(forwardedReferences).hasSize(originalReferences.size());
for (int i = 0; i < forwardedReferences.size(); i++) {
byte[] originalReference = (byte[]) originalReferences.get(i).getField(0);
byte[] forwardedReference = (byte[]) forwardedReferences.get(i).getField(0);
assertThat(forwardedReference).isEqualTo(originalReference);
assertThat(BlobViewStruct.deserialize(forwardedReference).identifier().getFullName())
.isEqualTo(upstreamFullTableName);
}
}

@Test
public void testBlobViewRejectsUnqualifiedTableName() {
assertThatThrownBy(
Expand Down
Loading
Loading