Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions docs/docs/append-table/blob.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,56 @@ CREATE TABLE image_table (

</Tabs>

### Adding a Blob Column to an Existing Table

A BLOB column can be added to an existing blob-enabled table with a single `ALTER TABLE ADD COLUMN` statement. Because most SQL engines do not have a `BLOB` syntax, the new column is declared as `BYTES` or `BINARY` and the BLOB storage mode is selected via a **comment directive**:

- `__BLOB_FIELD` — store the column as a default blob (raw bytes written to `.blob` files, equivalent to `blob-field`).
- `__BLOB_DESCRIPTOR_FIELD` — store the column as a descriptor-only blob inline in data files (equivalent to `blob-descriptor-field`).

Anything after the optional `;` separator is preserved as the column's real comment.

<Tabs groupId="blob-add-column">

<TabItem value="flink-sql" label="Flink SQL">

```sql
-- Add a blob-field column (no extra user comment)
ALTER TABLE image_table ADD picture BYTES COMMENT '__BLOB_FIELD';

-- Add a descriptor-field column with a real user comment
ALTER TABLE image_table
ADD video BYTES COMMENT '__BLOB_DESCRIPTOR_FIELD; promotional video';
```

</TabItem>

<TabItem value="spark-sql" label="Spark SQL">

```sql
-- Add a blob-field column (no extra user comment)
ALTER TABLE image_table ADD COLUMN picture BINARY COMMENT '__BLOB_FIELD';

-- Add a descriptor-field column with a real user comment
ALTER TABLE image_table
ADD COLUMN video BINARY COMMENT '__BLOB_DESCRIPTOR_FIELD; promotional video';
```

</TabItem>

</Tabs>

Paimon converts the declared `BYTES`/`BINARY` type to `BLOB`, appends the new column to the corresponding option (`blob-field` or `blob-descriptor-field`), and stores the trimmed real comment on the column. The whole operation is atomic — no need to `SET` an option first and then `ADD COLUMN`.

#### Limitations

1. **Storage mode must be explicit.** Only `__BLOB_FIELD` and `__BLOB_DESCRIPTOR_FIELD` are accepted. `blob-view-field` and `blob-external-storage-field` cannot be added this way; they must be configured at table creation time.
2. **An unknown `__BLOB`-prefixed directive is rejected** so typos do not silently fall through as a regular comment.
3. **Column type must be `BYTES` / `BINARY` (or `BLOB` when calling the Java API directly).** Other types with a BLOB directive are rejected.
4. **Raw BLOB without directive is rejected.** When calling the Java SDK and passing `DataTypes.BLOB()` to `SchemaChange.addColumn`, the directive is still required so the storage mode (default vs descriptor) is unambiguous.
5. **Existing columns cannot be converted to/from BLOB.** `ALTER TABLE ... CHANGE`/`ALTER COLUMN TYPE` between BLOB and any other type is rejected — both directions break already-written data.
6. **Dropping a BLOB column cleans the options automatically.** When you `ALTER TABLE ... DROP COLUMN`, Paimon removes the dropped name from `blob-field` / `blob-descriptor-field` / `blob-view-field` / `blob-external-storage-field` so the remaining options stay consistent with the schema.

### Inserting Blob Data

<Tabs groupId="blob-insert">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2308,6 +2308,7 @@ public InlineElement getDescription() {
.noDefaultValue()
.withDescription("Format table commit hive sync uri.");

@Immutable
public static final ConfigOption<String> BLOB_FIELD =
key("blob-field")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.schema;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.FallbackKey;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;

import java.util.Map;

/** Utilities for BLOB-related schema evolution (ALTER TABLE ADD COLUMN comment directives). */
public final class BlobSchemaUtils {

public static final String BLOB_FIELD_DIRECTIVE = "__BLOB_FIELD";
public static final String BLOB_DESCRIPTOR_FIELD_DIRECTIVE = "__BLOB_DESCRIPTOR_FIELD";

private BlobSchemaUtils() {}

/**
* Parses the comment of an {@code ALTER TABLE ADD COLUMN} statement. Returns {@code null} when
* the comment is a regular user comment; returns a {@link ParsedDirective} when the comment
* begins with a supported BLOB directive. Throws {@link IllegalArgumentException} when the
* comment begins with {@code __BLOB} but is not one of the supported directives.
*/
@Nullable
public static ParsedDirective parseAddColumnComment(@Nullable String comment) {
if (comment == null || !comment.startsWith("__BLOB")) {
return null;
}
comment = StringUtils.trim(comment);
String optionKey = matchDirective(comment, BLOB_DESCRIPTOR_FIELD_DIRECTIVE);
String marker = BLOB_DESCRIPTOR_FIELD_DIRECTIVE;
if (optionKey == null) {
optionKey = matchDirective(comment, BLOB_FIELD_DIRECTIVE);
marker = BLOB_FIELD_DIRECTIVE;
}
Preconditions.checkArgument(
optionKey != null,
"Unsupported BLOB directive in column comment: '%s'. Supported directives are "
+ "'%s' and '%s'.",
comment,
BLOB_FIELD_DIRECTIVE,
BLOB_DESCRIPTOR_FIELD_DIRECTIVE);
String realComment =
comment.length() == marker.length()
? null
: comment.substring(marker.length() + 1).trim();
if (realComment != null && realComment.isEmpty()) {
realComment = null;
}
return new ParsedDirective(optionKey, realComment);
}

@Nullable
private static String matchDirective(String comment, String marker) {
if (!comment.startsWith(marker)) {
return null;
}
if (comment.length() == marker.length()) {
return optionKeyFor(marker);
}
return comment.charAt(marker.length()) == ';' ? optionKeyFor(marker) : null;
}

private static String optionKeyFor(String marker) {
if (BLOB_FIELD_DIRECTIVE.equals(marker)) {
return CoreOptions.BLOB_FIELD.key();
} else if (BLOB_DESCRIPTOR_FIELD_DIRECTIVE.equals(marker)) {
return CoreOptions.BLOB_DESCRIPTOR_FIELD.key();
} else {
throw new IllegalArgumentException("Unsupported BLOB directive: " + marker);
}
}

/**
* Modify blob options, ensure the `blob-field`, `blob-descriptor-field` is consistent with
* actual schema. If the canonical key is empty but a fallback key holds the value (e.g. legacy
* {@code blob.stored-descriptor-fields}), the fallback value is migrated to the canonical key
* before appending so old entries are not shadowed.
*/
public static void modifyBlobOptions(
String blobKey, String fieldName, Map<String, String> options) {
ConfigOption<String> option;
if (CoreOptions.BLOB_FIELD.key().equals(blobKey)) {
option = CoreOptions.BLOB_FIELD;
} else if (CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(blobKey)) {
option = CoreOptions.BLOB_DESCRIPTOR_FIELD;
} else {
throw new IllegalArgumentException("Unsupported BLOB directive: " + blobKey);
}

String existing = options.get(blobKey);
if (existing == null || existing.isEmpty()) {
// migrate legacy fallback keys to current canonical key
for (FallbackKey fk : option.fallbackKeys()) {
String fallbackValue = options.remove(fk.getKey());
if (fallbackValue != null && !fallbackValue.isEmpty()) {
existing = fallbackValue;
break;
}
}
}
String newValue = existing == null ? fieldName : existing + "," + fieldName;
options.put(blobKey, newValue);
}

/**
* Removes {@code fieldName} from every BLOB-related comma-separated option (and the legacy
* fallback key for {@code blob-descriptor-field}). When the resulting csv becomes empty the
* option key is dropped entirely. Used when a BLOB column is being dropped.
*/
public static void removeFromBlobOptions(String fieldName, Map<String, String> options) {
ConfigOption<String>[] keys =
new ConfigOption[] {
CoreOptions.BLOB_FIELD,
CoreOptions.BLOB_DESCRIPTOR_FIELD,
CoreOptions.BLOB_VIEW_FIELD,
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD
};
for (ConfigOption<String> option : keys) {
removeFromCsvOption(option.key(), fieldName, options);
for (FallbackKey fk : option.fallbackKeys()) {
removeFromCsvOption(fk.getKey(), fieldName, options);
}
}
}

private static void removeFromCsvOption(
String key, String fieldName, Map<String, String> options) {
String existing = options.get(key);
if (existing == null || existing.isEmpty()) {
return;
}
StringBuilder sb = new StringBuilder();
for (String v : existing.split(",")) {
String trimmed = v.trim();
if (trimmed.isEmpty() || trimmed.equals(fieldName)) {
continue;
}
if (sb.length() > 0) {
sb.append(',');
}
sb.append(trimmed);
}
if (sb.length() == 0) {
options.remove(key);
} else {
options.put(key, sb.toString());
}
}

/** Parsed BLOB directive: the option key to update and the user-facing comment. */
public static final class ParsedDirective {
private final String optionKey;
@Nullable private final String realComment;

private ParsedDirective(String optionKey, @Nullable String realComment) {
this.optionKey = optionKey;
this.realComment = realComment;
}

public String optionKey() {
return optionKey;
}

@Nullable
public String realComment() {
return realComment;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.SchemaModification;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;
Expand Down Expand Up @@ -342,8 +343,46 @@ public static TableSchema generateTableSchema(
"Column %s cannot specify NOT NULL in the %s table.",
String.join(".", addColumn.fieldNames()),
lazyIdentifier.get().getFullName());

BlobSchemaUtils.ParsedDirective blobDirective =
BlobSchemaUtils.parseAddColumnComment(addColumn.description());
DataType requestedDataType = addColumn.dataType();
String effectiveComment = addColumn.description();
// try convert to blob type
if (blobDirective != null) {
Preconditions.checkArgument(
addColumn.fieldNames().length == 1,
"BLOB directive cannot be used on a nested column %s.",
String.join(".", addColumn.fieldNames()));
DataTypeRoot root = requestedDataType.getTypeRoot();
Preconditions.checkArgument(
root == DataTypeRoot.VARBINARY
|| root == DataTypeRoot.BINARY
|| root == DataTypeRoot.BLOB,
"Column %s declared with a BLOB directive must be of BYTES, "
+ "BINARY or BLOB type, but was %s.",
addColumn.fieldNames()[0],
requestedDataType);
requestedDataType = new BlobType(requestedDataType.isNullable());
effectiveComment = blobDirective.realComment();

BlobSchemaUtils.modifyBlobOptions(
blobDirective.optionKey(), addColumn.fieldNames()[0], newOptions);
} else if (requestedDataType.is(DataTypeRoot.BLOB)) {
// We do not permit adding blob type column without comment hint,
// since we don't know the storage mode i.e. native blob or descriptor blob.
throw new UnsupportedOperationException(
String.format(
"Adding BLOB column %s requires a comment directive ('%s' "
+ "or '%s') so the storage mode is explicit.",
String.join(".", addColumn.fieldNames()),
BlobSchemaUtils.BLOB_FIELD_DIRECTIVE,
BlobSchemaUtils.BLOB_DESCRIPTOR_FIELD_DIRECTIVE));
}

int id = highestFieldId.incrementAndGet();
DataType dataType = ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
DataType dataType = ReassignFieldId.reassign(requestedDataType, highestFieldId);
String storedComment = effectiveComment;
new NestedColumnModifier(addColumn.fieldNames(), lazyIdentifier) {
@Override
protected void updateLastColumn(
Expand All @@ -352,8 +391,7 @@ protected void updateLastColumn(
Catalog.ColumnNotExistException {
assertColumnNotExists(newFields, fieldName, lazyIdentifier);

DataField dataField =
new DataField(id, fieldName, dataType, addColumn.description());
DataField dataField = new DataField(id, fieldName, dataType, storedComment);

// key: name ; value : index
Map<String, Integer> map = new HashMap<>();
Expand Down Expand Up @@ -435,6 +473,9 @@ protected void updateLastColumn(
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
dropColumnValidation(oldTableSchema, drop);
if (drop.fieldNames().length == 1) {
BlobSchemaUtils.removeFromBlobOptions(drop.fieldNames()[0], newOptions);
}
new NestedColumnModifier(drop.fieldNames(), lazyIdentifier) {
@Override
protected void updateLastColumn(
Expand All @@ -451,6 +492,8 @@ protected void updateLastColumn(
UpdateColumnType update = (UpdateColumnType) change;
assertNotUpdatingPartitionKeys(oldTableSchema, update.fieldNames(), "update");
assertNotUpdatingPrimaryKeys(oldTableSchema, update.fieldNames(), "update");
assertNotChangingBlobColumnType(
newFields, update.fieldNames(), update.newDataType());
updateNestedColumn(
newFields,
update.fieldNames(),
Expand Down Expand Up @@ -923,6 +966,28 @@ private static void assertNotRenamingBlobColumn(List<DataField> fields, String[]
}
}

private static void assertNotChangingBlobColumnType(
List<DataField> fields, String[] fieldNames, DataType newType) {
if (fieldNames.length > 1) {
return;
}
String fieldName = fieldNames[0];
for (DataField field : fields) {
if (!field.name().equals(fieldName)) {
continue;
}
boolean wasBlob = field.type().is(DataTypeRoot.BLOB);
boolean willBeBlob = newType.is(DataTypeRoot.BLOB);
if (wasBlob || willBeBlob) {
throw new UnsupportedOperationException(
String.format(
"Cannot change column type involving BLOB: [%s] %s -> %s",
fieldName, field.type(), newType));
}
return;
}
}

private abstract static class NestedColumnModifier {

private final String[] updateFieldNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ public void testAddBlobColumnThenProjectBothBlobs() throws Exception {
// Add new blob column f3
catalog.alterTable(
identifier(),
Collections.singletonList(SchemaChange.addColumn("f3", DataTypes.BLOB())),
Collections.singletonList(
SchemaChange.addColumn("f3", DataTypes.BLOB(), "__BLOB_FIELD", null)),
false);

// Write more data with both f2 and f3
Expand Down
Loading
Loading