From 8cdd3fc8d6529d2ad6d399685d0d86659e496d67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 27 May 2026 19:29:29 +0800 Subject: [PATCH 1/6] [core] support adding blob column through comments --- .../java/org/apache/paimon/CoreOptions.java | 1 + .../apache/paimon/schema/BlobSchemaUtils.java | 146 ++++++++++++++++++ .../apache/paimon/schema/SchemaManager.java | 32 +++- .../paimon/schema/BlobSchemaUtilsTest.java | 114 ++++++++++++++ .../paimon/table/SchemaEvolutionTest.java | 134 ++++++++++++++++ .../paimon/flink/SchemaChangeITCase.java | 21 +++ .../spark/SparkSchemaEvolutionITCase.java | 34 ++++ 7 files changed, 479 insertions(+), 3 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 6b0ef75ff823..dc5755e1ba07 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2308,6 +2308,7 @@ public InlineElement getDescription() { .noDefaultValue() .withDescription("Format table commit hive sync uri."); + @Immutable public static final ConfigOption BLOB_FIELD = key("blob-field") .stringType() diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java b/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java new file mode 100644 index 000000000000..5cc8fc91ffed --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.schema; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.FallbackKey; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** Utilities for BLOB-related schema evolution (ALTER TABLE ADD COLUMN comment directives). */ +public final class BlobSchemaUtils { + + public static final String BLOB_FIELD_DIRECTIVE = "__BLOB_FIELD"; + public static final String BLOB_DESCRIPTOR_FIELD_DIRECTIVE = "__BLOB_DESCRIPTOR_FIELD"; + + private BlobSchemaUtils() {} + + /** + * Parses the comment of an {@code ALTER TABLE ADD COLUMN} statement. Returns {@code null} when + * the comment is a regular user comment; returns a {@link ParsedDirective} when the comment + * begins with a supported BLOB directive. Throws {@link IllegalArgumentException} when the + * comment begins with {@code __BLOB} but is not one of the supported directives. + */ + @Nullable + public static ParsedDirective parseAddColumnComment(@Nullable String comment) { + if (comment == null || !comment.startsWith("__BLOB")) { + return null; + } + comment = StringUtils.trim(comment); + String optionKey = matchDirective(comment, BLOB_DESCRIPTOR_FIELD_DIRECTIVE); + String marker = BLOB_DESCRIPTOR_FIELD_DIRECTIVE; + if (optionKey == null) { + optionKey = matchDirective(comment, BLOB_FIELD_DIRECTIVE); + marker = BLOB_FIELD_DIRECTIVE; + } + Preconditions.checkArgument( + optionKey != null, + "Unsupported BLOB directive in column comment: '%s'. Supported directives are " + + "'%s' and '%s'.", + comment, + BLOB_FIELD_DIRECTIVE, + BLOB_DESCRIPTOR_FIELD_DIRECTIVE); + String realComment = + comment.length() == marker.length() + ? null + : comment.substring(marker.length() + 1).trim(); + if (realComment != null && realComment.isEmpty()) { + realComment = null; + } + return new ParsedDirective(optionKey, realComment); + } + + @Nullable + private static String matchDirective(String comment, String marker) { + if (!comment.startsWith(marker)) { + return null; + } + if (comment.length() == marker.length()) { + return optionKeyFor(marker); + } + return comment.charAt(marker.length()) == ';' ? optionKeyFor(marker) : null; + } + + private static String optionKeyFor(String marker) { + if (BLOB_FIELD_DIRECTIVE.equals(marker)) { + return CoreOptions.BLOB_FIELD.key(); + } else if (BLOB_DESCRIPTOR_FIELD_DIRECTIVE.equals(marker)) { + return CoreOptions.BLOB_DESCRIPTOR_FIELD.key(); + } else { + throw new IllegalArgumentException("Unsupported BLOB directive: " + marker); + } + } + + /** + * Modify blob options, ensure the `blob-field`, `blob-descriptor-field` is consistent with + * actual schema. If the canonical key is empty but a fallback key holds the value (e.g. legacy + * {@code blob.stored-descriptor-fields}), the fallback value is migrated to the canonical key + * before appending so old entries are not shadowed. + */ + public static void modifyBlobOptions( + String blobKey, String fieldName, Map options) { + ConfigOption option; + if (CoreOptions.BLOB_FIELD.key().equals(blobKey)) { + option = CoreOptions.BLOB_FIELD; + } else if (CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(blobKey)) { + option = CoreOptions.BLOB_DESCRIPTOR_FIELD; + } else { + throw new IllegalArgumentException("Unsupported BLOB directive: " + blobKey); + } + + String existing = options.get(blobKey); + if (existing == null || existing.isEmpty()) { + // migrate legacy fallback keys to current canonical key + for (FallbackKey fk : option.fallbackKeys()) { + String fallbackValue = options.remove(fk.getKey()); + if (fallbackValue != null && !fallbackValue.isEmpty()) { + existing = fallbackValue; + break; + } + } + } + String newValue = existing == null ? fieldName : existing + "," + fieldName; + options.put(blobKey, newValue); + } + + /** Parsed BLOB directive: the option key to update and the user-facing comment. */ + public static final class ParsedDirective { + private final String optionKey; + @Nullable private final String realComment; + + private ParsedDirective(String optionKey, @Nullable String realComment) { + this.optionKey = optionKey; + this.realComment = realComment; + } + + public String optionKey() { + return optionKey; + } + + @Nullable + public String realComment() { + return realComment; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 670960889d5c..08b36bfaf28f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -39,6 +39,7 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.SchemaModification; import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BlobType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeCasts; @@ -342,8 +343,34 @@ public static TableSchema generateTableSchema( "Column %s cannot specify NOT NULL in the %s table.", String.join(".", addColumn.fieldNames()), lazyIdentifier.get().getFullName()); + + BlobSchemaUtils.ParsedDirective blobDirective = + BlobSchemaUtils.parseAddColumnComment(addColumn.description()); + DataType requestedDataType = addColumn.dataType(); + String effectiveComment = addColumn.description(); + // try convert to blob type + if (blobDirective != null) { + Preconditions.checkArgument( + addColumn.fieldNames().length == 1, + "BLOB directive cannot be used on a nested column %s.", + String.join(".", addColumn.fieldNames())); + DataTypeRoot root = requestedDataType.getTypeRoot(); + Preconditions.checkArgument( + root == DataTypeRoot.VARBINARY || root == DataTypeRoot.BINARY, + "Column %s declared with a BLOB directive must be of BYTES or " + + "BINARY type, but was %s.", + addColumn.fieldNames()[0], + requestedDataType); + requestedDataType = new BlobType(requestedDataType.isNullable()); + effectiveComment = blobDirective.realComment(); + + BlobSchemaUtils.modifyBlobOptions( + blobDirective.optionKey(), addColumn.fieldNames()[0], newOptions); + } + int id = highestFieldId.incrementAndGet(); - DataType dataType = ReassignFieldId.reassign(addColumn.dataType(), highestFieldId); + DataType dataType = ReassignFieldId.reassign(requestedDataType, highestFieldId); + String storedComment = effectiveComment; new NestedColumnModifier(addColumn.fieldNames(), lazyIdentifier) { @Override protected void updateLastColumn( @@ -352,8 +379,7 @@ protected void updateLastColumn( Catalog.ColumnNotExistException { assertColumnNotExists(newFields, fieldName, lazyIdentifier); - DataField dataField = - new DataField(id, fieldName, dataType, addColumn.description()); + DataField dataField = new DataField(id, fieldName, dataType, storedComment); // key: name ; value : index Map map = new HashMap<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java new file mode 100644 index 000000000000..13c434626acb --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.schema; + +import org.apache.paimon.CoreOptions; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link BlobSchemaUtils}. */ +public class BlobSchemaUtilsTest { + + @Test + public void testParseAddColumnComment() { + // null and non-directive comments are passthrough. + assertThat(BlobSchemaUtils.parseAddColumnComment(null)).isNull(); + assertThat(BlobSchemaUtils.parseAddColumnComment("")).isNull(); + assertThat(BlobSchemaUtils.parseAddColumnComment("normal user comment")).isNull(); + // case-sensitive: lowercase is not a directive. + assertThat(BlobSchemaUtils.parseAddColumnComment("__blob_field; x")).isNull(); + + // bare BLOB_FIELD directive + BlobSchemaUtils.ParsedDirective bareBlob = + BlobSchemaUtils.parseAddColumnComment(BlobSchemaUtils.BLOB_FIELD_DIRECTIVE); + assertThat(bareBlob.optionKey()).isEqualTo(CoreOptions.BLOB_FIELD.key()); + assertThat(bareBlob.realComment()).isNull(); + + // BLOB_FIELD with trailing semicolon only — still no real comment + BlobSchemaUtils.ParsedDirective trailingSemi = + BlobSchemaUtils.parseAddColumnComment(BlobSchemaUtils.BLOB_FIELD_DIRECTIVE + ";"); + assertThat(trailingSemi.realComment()).isNull(); + + // BLOB_FIELD with real comment (note inner whitespace trimmed). + BlobSchemaUtils.ParsedDirective withComment = + BlobSchemaUtils.parseAddColumnComment( + BlobSchemaUtils.BLOB_FIELD_DIRECTIVE + "; profile picture "); + assertThat(withComment.optionKey()).isEqualTo(CoreOptions.BLOB_FIELD.key()); + assertThat(withComment.realComment()).isEqualTo("profile picture"); + + // BLOB_DESCRIPTOR_FIELD directive + BlobSchemaUtils.ParsedDirective descriptor = + BlobSchemaUtils.parseAddColumnComment( + BlobSchemaUtils.BLOB_DESCRIPTOR_FIELD_DIRECTIVE + "; desc text"); + assertThat(descriptor.optionKey()).isEqualTo(CoreOptions.BLOB_DESCRIPTOR_FIELD.key()); + assertThat(descriptor.realComment()).isEqualTo("desc text"); + } + + @Test + public void testParseRejectsUnknownDirective() { + assertThatThrownBy(() -> BlobSchemaUtils.parseAddColumnComment("__BLOB_VIEW_FIELD; x")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported BLOB directive"); + assertThatThrownBy(() -> BlobSchemaUtils.parseAddColumnComment("__BLOB_UNKNOWN")) + .isInstanceOf(IllegalArgumentException.class); + // a __BLOB_FIELD prefix without a `;` boundary is not a valid directive. + assertThatThrownBy(() -> BlobSchemaUtils.parseAddColumnComment("__BLOB_FIELDX")) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testModifyBlobOptions() { + Map opts = new HashMap<>(); + BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_FIELD.key(), "a", opts); + assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a"); + + BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_FIELD.key(), "b", opts); + assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a,b"); + + BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c", opts); + assertThat(opts).containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c"); + + assertThatThrownBy( + () -> + BlobSchemaUtils.modifyBlobOptions( + "not-a-blob-option", "x", new HashMap<>())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported BLOB directive"); + } + + @Test + public void testModifyBlobOptionsMigratesLegacyFallbackKey() { + // legacy option holds the descriptor field; canonical key absent. + Map opts = new HashMap<>(); + opts.put("blob.stored-descriptor-fields", "legacy_col"); + + BlobSchemaUtils.modifyBlobOptions(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "new_col", opts); + + // fallback value is migrated to canonical key, fallback key removed to avoid stale data. + assertThat(opts) + .containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "legacy_col,new_col"); + assertThat(opts).doesNotContainKey("blob.stored-descriptor-fields"); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 58d82b0bc97d..81606279c570 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -41,8 +41,10 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -188,6 +190,138 @@ public void testAddDuplicateField() throws Exception { columnName, identifier.getFullName()); } + @Test + public void testAddBlobColumnViaCommentDirective() throws Exception { + // create table with one pre-existing BLOB column registered in blob-field, so we can + // also verify that ADD COLUMN appends to (rather than overwrites) the existing value. + Map options = blobEnabledOptions(); + options.put(CoreOptions.BLOB_FIELD.key(), "existing_col"); + schemaManager.createTable( + new Schema( + RowType.of( + new DataField[] { + new DataField(0, "k", DataTypes.INT()), + new DataField( + 1, "existing_col", DataTypes.BLOB().copy(true)) + }) + .getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + "")); + + // bare directive — no user comment, appended to existing blob-field value. + // directive + user comment — BLOB_DESCRIPTOR_FIELD newly registered. + schemaManager.commitChanges( + ImmutableList.of( + SchemaChange.addColumn("picture", DataTypes.BYTES(), "__BLOB_FIELD", null), + SchemaChange.addColumn( + "desc_col", + DataTypes.BYTES(), + "__BLOB_DESCRIPTOR_FIELD; descriptor comment", + null))); + + TableSchema latest = schemaManager.latest().get(); + + DataField picture = + latest.fields().stream().filter(f -> f.name().equals("picture")).findFirst().get(); + assertThat(picture.type().getTypeRoot()).isEqualTo(DataTypeRoot.BLOB); + assertThat(picture.description()).isNull(); + + DataField desc = + latest.fields().stream().filter(f -> f.name().equals("desc_col")).findFirst().get(); + assertThat(desc.type().getTypeRoot()).isEqualTo(DataTypeRoot.BLOB); + assertThat(desc.description()).isEqualTo("descriptor comment"); + + assertThat(latest.options().get(CoreOptions.BLOB_FIELD.key())) + .isEqualTo("existing_col,picture"); + assertThat(latest.options().get(CoreOptions.BLOB_DESCRIPTOR_FIELD.key())) + .isEqualTo("desc_col"); + } + + @Test + public void testAddBlobColumnErrors() throws Exception { + schemaManager.createTable( + new Schema( + RowType.of( + new DataField[] { + new DataField(0, "k", DataTypes.INT()), + new DataField( + 1, + "nested", + DataTypes.ROW( + new DataField(2, "a", DataTypes.INT()))) + }) + .getFields(), + Collections.emptyList(), + Collections.emptyList(), + new HashMap<>(), + "")); + + // non-BYTES/BINARY type rejected. + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList( + SchemaChange.addColumn( + "bad", + DataTypes.INT(), + "__BLOB_FIELD", + null)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be of BYTES or BINARY type"); + + // nested column rejected. + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList( + SchemaChange.addColumn( + new String[] {"nested", "blob"}, + DataTypes.BYTES(), + "__BLOB_FIELD", + null)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("nested column"); + + // unknown __BLOB directive rejected (e.g. __BLOB_VIEW_FIELD is not supported). + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList( + SchemaChange.addColumn( + "x", + DataTypes.BYTES(), + "__BLOB_VIEW_FIELD", + null)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported BLOB directive"); + + // SET OPTION on blob-field is rejected (the option is @Immutable). + TableSchema oldSchema = schemaManager.latest().get(); + LazyField hasSnapshots = new LazyField<>(() -> true); + LazyField lazyId = new LazyField<>(() -> identifier); + assertThatThrownBy( + () -> + SchemaManager.generateTableSchema( + oldSchema, + Collections.singletonList( + SchemaChange.setOption( + CoreOptions.BLOB_FIELD.key(), "k")), + hasSnapshots, + lazyId)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining(CoreOptions.BLOB_FIELD.key()); + } + + private static Map blobEnabledOptions() { + Map options = new HashMap<>(); + options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(CoreOptions.BUCKET.key(), "-1"); + return options; + } + @Test public void testUpdateFieldType() throws Exception { Schema schema = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index d51c1d006ac2..8e6ae0b123ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1839,4 +1839,25 @@ public void testDropPrimaryKeyOnNonEmptyTable() { UnsupportedOperationException.class, "Cannot drop primary keys on a non-empty table.")); } + + private static final String BLOB_TABLE_OPTIONS = + "'row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'bucket'='-1'"; + + @Test + public void testAddBlobColumnViaCommentDirective() { + sql("CREATE TABLE T (id INT, data STRING) WITH (" + BLOB_TABLE_OPTIONS + ")"); + + // bare directive — no user comment + sql("ALTER TABLE T ADD desc_col BYTES COMMENT '__BLOB_DESCRIPTOR_FIELD'"); + // directive + user comment + sql("ALTER TABLE T ADD picture BYTES COMMENT '__BLOB_FIELD; profile picture'"); + + String createSql = sql("SHOW CREATE TABLE T").get(0).toString(); + assertThat(createSql).doesNotContain("__BLOB"); + assertThat(createSql).contains("`desc_col`"); + assertThat(createSql).contains("`picture`"); + assertThat(createSql).contains("COMMENT 'profile picture'"); + assertThat(createSql).contains("'blob-field' = 'picture'"); + assertThat(createSql).contains("'blob-descriptor-field' = 'desc_col'"); + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index fc8b4adb6ebd..7afe3c76cf78 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -1080,4 +1080,38 @@ public void testUpdateNestedColumnTypeInMap(String formatType) { .containsExactlyInAnyOrder( "[1,APPLE,1000000000000]", "[2,cat,200]", "[3,FLOWER,3000000000000]"); } + + private static final String BLOB_TABLE_PROPS = + "'row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'bucket'='-1'"; + + @Test + public void testAddBlobColumnViaCommentDirective() { + String table = "paimon.default.blob_add_col"; + spark.sql( + "CREATE TABLE " + + table + + " (id INT, data STRING) TBLPROPERTIES (" + + BLOB_TABLE_PROPS + + ")"); + + // bare directive — no user comment + spark.sql( + "ALTER TABLE " + + table + + " ADD COLUMN desc_col BINARY COMMENT '__BLOB_DESCRIPTOR_FIELD'"); + // directive + user comment + spark.sql( + "ALTER TABLE " + + table + + " ADD COLUMN picture BINARY COMMENT '__BLOB_FIELD; profile picture'"); + + String createSql = + spark.sql("SHOW CREATE TABLE " + table).collectAsList().get(0).toString(); + assertThat(createSql).doesNotContain("__BLOB"); + assertThat(createSql).contains("desc_col"); + assertThat(createSql).contains("picture"); + assertThat(createSql).contains("profile picture"); + assertThat(createSql).contains("'blob-field' = 'picture'"); + assertThat(createSql).contains("'blob-descriptor-field' = 'desc_col'"); + } } From 9e82f3c200739c20adccee5942d42e9d6ae09217 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 27 May 2026 19:50:05 +0800 Subject: [PATCH 2/6] add code for update/drop table --- .../apache/paimon/schema/BlobSchemaUtils.java | 45 ++++++++++++ .../apache/paimon/schema/SchemaManager.java | 27 +++++++ .../paimon/schema/BlobSchemaUtilsTest.java | 20 +++++ .../paimon/table/SchemaEvolutionTest.java | 73 +++++++++++++++++++ 4 files changed, 165 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java b/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java index 5cc8fc91ffed..18e388eccafa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/BlobSchemaUtils.java @@ -124,6 +124,51 @@ public static void modifyBlobOptions( options.put(blobKey, newValue); } + /** + * Removes {@code fieldName} from every BLOB-related comma-separated option (and the legacy + * fallback key for {@code blob-descriptor-field}). When the resulting csv becomes empty the + * option key is dropped entirely. Used when a BLOB column is being dropped. + */ + public static void removeFromBlobOptions(String fieldName, Map options) { + ConfigOption[] keys = + new ConfigOption[] { + CoreOptions.BLOB_FIELD, + CoreOptions.BLOB_DESCRIPTOR_FIELD, + CoreOptions.BLOB_VIEW_FIELD, + CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD + }; + for (ConfigOption option : keys) { + removeFromCsvOption(option.key(), fieldName, options); + for (FallbackKey fk : option.fallbackKeys()) { + removeFromCsvOption(fk.getKey(), fieldName, options); + } + } + } + + private static void removeFromCsvOption( + String key, String fieldName, Map options) { + String existing = options.get(key); + if (existing == null || existing.isEmpty()) { + return; + } + StringBuilder sb = new StringBuilder(); + for (String v : existing.split(",")) { + String trimmed = v.trim(); + if (trimmed.isEmpty() || trimmed.equals(fieldName)) { + continue; + } + if (sb.length() > 0) { + sb.append(','); + } + sb.append(trimmed); + } + if (sb.length() == 0) { + options.remove(key); + } else { + options.put(key, sb.toString()); + } + } + /** Parsed BLOB directive: the option key to update and the user-facing comment. */ public static final class ParsedDirective { private final String optionKey; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 08b36bfaf28f..da2c909a566c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -461,6 +461,9 @@ protected void updateLastColumn( } else if (change instanceof DropColumn) { DropColumn drop = (DropColumn) change; dropColumnValidation(oldTableSchema, drop); + if (drop.fieldNames().length == 1) { + BlobSchemaUtils.removeFromBlobOptions(drop.fieldNames()[0], newOptions); + } new NestedColumnModifier(drop.fieldNames(), lazyIdentifier) { @Override protected void updateLastColumn( @@ -477,6 +480,8 @@ protected void updateLastColumn( UpdateColumnType update = (UpdateColumnType) change; assertNotUpdatingPartitionKeys(oldTableSchema, update.fieldNames(), "update"); assertNotUpdatingPrimaryKeys(oldTableSchema, update.fieldNames(), "update"); + assertNotChangingBlobColumnType( + newFields, update.fieldNames(), update.newDataType()); updateNestedColumn( newFields, update.fieldNames(), @@ -949,6 +954,28 @@ private static void assertNotRenamingBlobColumn(List fields, String[] } } + private static void assertNotChangingBlobColumnType( + List fields, String[] fieldNames, DataType newType) { + if (fieldNames.length > 1) { + return; + } + String fieldName = fieldNames[0]; + for (DataField field : fields) { + if (!field.name().equals(fieldName)) { + continue; + } + boolean wasBlob = field.type().is(DataTypeRoot.BLOB); + boolean willBeBlob = newType.is(DataTypeRoot.BLOB); + if (wasBlob || willBeBlob) { + throw new UnsupportedOperationException( + String.format( + "Cannot change column type involving BLOB: [%s] %s -> %s", + fieldName, field.type(), newType)); + } + return; + } + } + private abstract static class NestedColumnModifier { private final String[] updateFieldNames; diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java index 13c434626acb..1813fe2ecf3d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/BlobSchemaUtilsTest.java @@ -111,4 +111,24 @@ public void testModifyBlobOptionsMigratesLegacyFallbackKey() { .containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "legacy_col,new_col"); assertThat(opts).doesNotContainKey("blob.stored-descriptor-fields"); } + + @Test + public void testRemoveFromBlobOptions() { + // pre-populated with all 4 canonical keys + the legacy fallback for descriptor. + Map opts = new HashMap<>(); + opts.put(CoreOptions.BLOB_FIELD.key(), "a,b"); + opts.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "b,c"); + opts.put(CoreOptions.BLOB_VIEW_FIELD.key(), "b"); + opts.put(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "b"); + opts.put("blob.stored-descriptor-fields", "b,legacy"); + + BlobSchemaUtils.removeFromBlobOptions("b", opts); + + // b removed from every csv; keys whose csv becomes empty are dropped. + assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a"); + assertThat(opts).containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c"); + assertThat(opts).doesNotContainKey(CoreOptions.BLOB_VIEW_FIELD.key()); + assertThat(opts).doesNotContainKey(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()); + assertThat(opts).containsEntry("blob.stored-descriptor-fields", "legacy"); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 81606279c570..ebb9a889002e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -314,6 +314,79 @@ public void testAddBlobColumnErrors() throws Exception { .hasMessageContaining(CoreOptions.BLOB_FIELD.key()); } + @Test + public void testDropBlobColumnCleansOptions() throws Exception { + // table with one descriptor BLOB col registered in both blob-descriptor-field and + // blob-external-storage-field (subset rule), and one normal blob col in blob-field. + Map options = blobEnabledOptions(); + options.put(CoreOptions.BLOB_FIELD.key(), "pic"); + options.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "ext"); + options.put(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "ext"); + options.put(CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(), "/tmp/blob-ext"); + schemaManager.createTable( + new Schema( + RowType.of( + new DataField[] { + new DataField(0, "k", DataTypes.INT()), + new DataField(1, "pic", DataTypes.BLOB().copy(true)), + new DataField(2, "ext", DataTypes.BLOB().copy(true)) + }) + .getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + "")); + + // drop the descriptor BLOB column — it must vanish from both descriptor-field and + // external-storage-field; the other BLOB column is untouched. + schemaManager.commitChanges(Collections.singletonList(SchemaChange.dropColumn("ext"))); + + TableSchema latest = schemaManager.latest().get(); + assertThat(latest.options().get(CoreOptions.BLOB_FIELD.key())).isEqualTo("pic"); + assertThat(latest.options()).doesNotContainKey(CoreOptions.BLOB_DESCRIPTOR_FIELD.key()); + assertThat(latest.options()) + .doesNotContainKey(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()); + } + + @Test + public void testUpdateColumnTypeOnBlobIsRejected() throws Exception { + Map options = blobEnabledOptions(); + options.put(CoreOptions.BLOB_FIELD.key(), "pic"); + schemaManager.createTable( + new Schema( + RowType.of( + new DataField[] { + new DataField(0, "k", DataTypes.INT()), + new DataField(1, "pic", DataTypes.BLOB().copy(true)), + new DataField(2, "raw", DataTypes.BYTES()) + }) + .getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + "")); + + // BLOB -> BYTES rejected. + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList( + SchemaChange.updateColumnType( + "pic", DataTypes.BYTES())))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("BLOB"); + + // BYTES -> BLOB rejected (must be added via ADD COLUMN directive instead). + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList( + SchemaChange.updateColumnType( + "raw", DataTypes.BLOB())))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("BLOB"); + } + private static Map blobEnabledOptions() { Map options = new HashMap<>(); options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); From 36a40735140e3a3eb39f63f5d71994008b81bca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 27 May 2026 20:00:24 +0800 Subject: [PATCH 3/6] forbid directly adding blob type column --- .../java/org/apache/paimon/schema/SchemaManager.java | 10 ++++++++++ .../org/apache/paimon/table/SchemaEvolutionTest.java | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index da2c909a566c..66fe6374d1c3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -366,6 +366,16 @@ public static TableSchema generateTableSchema( BlobSchemaUtils.modifyBlobOptions( blobDirective.optionKey(), addColumn.fieldNames()[0], newOptions); + } else if (requestedDataType.is(DataTypeRoot.BLOB)) { + // We do not permit directly adding blob type column, + // since we don't know the storage mode i.e. native blob or descriptor blob. + throw new UnsupportedOperationException( + String.format( + "Adding BLOB column %s requires a comment directive ('%s' " + + "or '%s') so the storage mode is explicit.", + String.join(".", addColumn.fieldNames()), + BlobSchemaUtils.BLOB_FIELD_DIRECTIVE, + BlobSchemaUtils.BLOB_DESCRIPTOR_FIELD_DIRECTIVE)); } int id = highestFieldId.incrementAndGet(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index ebb9a889002e..3cfee7364195 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -297,6 +297,17 @@ public void testAddBlobColumnErrors() throws Exception { .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Unsupported BLOB directive"); + // raw BlobType without any directive rejected — SDK callers must go through the + // directive path so the storage mode (blob-field vs blob-descriptor-field) is explicit. + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList( + SchemaChange.addColumn( + "raw_blob", DataTypes.BLOB(), null, null)))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("requires a comment directive"); + // SET OPTION on blob-field is rejected (the option is @Immutable). TableSchema oldSchema = schemaManager.latest().get(); LazyField hasSnapshots = new LazyField<>(() -> true); From 59b4eb94a93e4f8e7e102284a2b374c51c034d25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 27 May 2026 20:08:31 +0800 Subject: [PATCH 4/6] minor modification --- .../java/org/apache/paimon/schema/SchemaManager.java | 10 ++++++---- .../org/apache/paimon/table/SchemaEvolutionTest.java | 7 ++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 66fe6374d1c3..09ddeb14b288 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -356,9 +356,11 @@ public static TableSchema generateTableSchema( String.join(".", addColumn.fieldNames())); DataTypeRoot root = requestedDataType.getTypeRoot(); Preconditions.checkArgument( - root == DataTypeRoot.VARBINARY || root == DataTypeRoot.BINARY, - "Column %s declared with a BLOB directive must be of BYTES or " - + "BINARY type, but was %s.", + root == DataTypeRoot.VARBINARY + || root == DataTypeRoot.BINARY + || root == DataTypeRoot.BLOB, + "Column %s declared with a BLOB directive must be of BYTES, " + + "BINARY or BLOB type, but was %s.", addColumn.fieldNames()[0], requestedDataType); requestedDataType = new BlobType(requestedDataType.isNullable()); @@ -367,7 +369,7 @@ public static TableSchema generateTableSchema( BlobSchemaUtils.modifyBlobOptions( blobDirective.optionKey(), addColumn.fieldNames()[0], newOptions); } else if (requestedDataType.is(DataTypeRoot.BLOB)) { - // We do not permit directly adding blob type column, + // We do not permit adding blob type column without comment hint, // since we don't know the storage mode i.e. native blob or descriptor blob. throw new UnsupportedOperationException( String.format( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 3cfee7364195..a27c1a125138 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -211,13 +211,14 @@ public void testAddBlobColumnViaCommentDirective() throws Exception { "")); // bare directive — no user comment, appended to existing blob-field value. - // directive + user comment — BLOB_DESCRIPTOR_FIELD newly registered. + // directive + user comment, SDK caller passes a BlobType directly (allowed when + // accompanied by a directive so the storage mode is explicit). schemaManager.commitChanges( ImmutableList.of( SchemaChange.addColumn("picture", DataTypes.BYTES(), "__BLOB_FIELD", null), SchemaChange.addColumn( "desc_col", - DataTypes.BYTES(), + DataTypes.BLOB(), "__BLOB_DESCRIPTOR_FIELD; descriptor comment", null))); @@ -269,7 +270,7 @@ public void testAddBlobColumnErrors() throws Exception { "__BLOB_FIELD", null)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("must be of BYTES or BINARY type"); + .hasMessageContaining("must be of BYTES, BINARY or BLOB type"); // nested column rejected. assertThatThrownBy( From c346e539c443cf51c08259739a7257e41d49820b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 27 May 2026 20:19:57 +0800 Subject: [PATCH 5/6] add docs --- docs/docs/append-table/blob.mdx | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx index dfe78709b2c9..a3d4eedca5e5 100644 --- a/docs/docs/append-table/blob.mdx +++ b/docs/docs/append-table/blob.mdx @@ -268,6 +268,56 @@ CREATE TABLE image_table ( +### Adding a Blob Column to an Existing Table + +A BLOB column can be added to an existing blob-enabled table with a single `ALTER TABLE ADD COLUMN` statement. Because most SQL engines do not have a `BLOB` syntax, the new column is declared as `BYTES` or `BINARY` and the BLOB storage mode is selected via a **comment directive**: + +- `__BLOB_FIELD` — store the column as a default blob (raw bytes written to `.blob` files, equivalent to `blob-field`). +- `__BLOB_DESCRIPTOR_FIELD` — store the column as a descriptor-only blob inline in data files (equivalent to `blob-descriptor-field`). + +Anything after the optional `;` separator is preserved as the column's real comment. + + + + + +```sql +-- Add a blob-field column (no extra user comment) +ALTER TABLE image_table ADD picture BYTES COMMENT '__BLOB_FIELD'; + +-- Add a descriptor-field column with a real user comment +ALTER TABLE image_table + ADD video BYTES COMMENT '__BLOB_DESCRIPTOR_FIELD; promotional video'; +``` + + + + + +```sql +-- Add a blob-field column (no extra user comment) +ALTER TABLE image_table ADD COLUMN picture BINARY COMMENT '__BLOB_FIELD'; + +-- Add a descriptor-field column with a real user comment +ALTER TABLE image_table + ADD COLUMN video BINARY COMMENT '__BLOB_DESCRIPTOR_FIELD; promotional video'; +``` + + + + + +Paimon converts the declared `BYTES`/`BINARY` type to `BLOB`, appends the new column to the corresponding option (`blob-field` or `blob-descriptor-field`), and stores the trimmed real comment on the column. The whole operation is atomic — no need to `SET` an option first and then `ADD COLUMN`. + +#### Limitations + +1. **Storage mode must be explicit.** Only `__BLOB_FIELD` and `__BLOB_DESCRIPTOR_FIELD` are accepted. `blob-view-field` and `blob-external-storage-field` cannot be added this way; they must be configured at table creation time. +2. **An unknown `__BLOB`-prefixed directive is rejected** so typos do not silently fall through as a regular comment. +3. **Column type must be `BYTES` / `BINARY` (or `BLOB` when calling the Java API directly).** Other types with a BLOB directive are rejected. +4. **Raw BLOB without directive is rejected.** When calling the Java SDK and passing `DataTypes.BLOB()` to `SchemaChange.addColumn`, the directive is still required so the storage mode (default vs descriptor) is unambiguous. +5. **Existing columns cannot be converted to/from BLOB.** `ALTER TABLE ... CHANGE`/`ALTER COLUMN TYPE` between BLOB and any other type is rejected — both directions break already-written data. +6. **Dropping a BLOB column cleans the options automatically.** When you `ALTER TABLE ... DROP COLUMN`, Paimon removes the dropped name from `blob-field` / `blob-descriptor-field` / `blob-view-field` / `blob-external-storage-field` so the remaining options stay consistent with the schema. + ### Inserting Blob Data From 1f16c56a800c16069fbabeadb12916e015d8f49b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 27 May 2026 20:45:42 +0800 Subject: [PATCH 6/6] fix tests --- .../java/org/apache/paimon/append/MultipleBlobTableTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java index 1cbaf903f0e4..f230216a58a6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java @@ -422,7 +422,8 @@ public void testAddBlobColumnThenProjectBothBlobs() throws Exception { // Add new blob column f3 catalog.alterTable( identifier(), - Collections.singletonList(SchemaChange.addColumn("f3", DataTypes.BLOB())), + Collections.singletonList( + SchemaChange.addColumn("f3", DataTypes.BLOB(), "__BLOB_FIELD", null)), false); // Write more data with both f2 and f3