Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public LogicalType visit(VariantType variantType) {
@Override
public LogicalType visit(BlobType blobType) {
// TODO introduce blob type in Flink SQL?
return new org.apache.flink.table.types.logical.VarBinaryType(BlobType.DEFAULT_SIZE);
return new org.apache.flink.table.types.logical.VarBinaryType(
org.apache.flink.table.types.logical.VarBinaryType.MAX_LENGTH);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.UriReaderFactory;

import org.apache.flink.types.Row;
Expand All @@ -39,6 +41,7 @@
import java.util.Random;
import java.util.stream.Stream;

import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.assertj.core.api.Assertions.assertThat;

/** Test write and read table with blob type. */
Expand Down Expand Up @@ -251,6 +254,50 @@ public void testExternalStorageBlobMultipleWrites() throws Exception {
}
}

@Test
public void testBlobTypeSchemaEquals() throws Exception {
// Step 1: Create a Paimon table with blob field via Flink SQL
tEnv.executeSql(
"CREATE TABLE blob_schema_test ("
+ "id INT, "
+ "name STRING, "
+ "picture BYTES"
+ ") WITH ("
+ "'row-tracking.enabled'='true',"
+ "'data-evolution.enabled'='true',"
+ "'blob-field'='picture'"
+ ")");

// Step 2: Get the Paimon FileStoreTable and its RowType
FileStoreTable paimonTable = paimonTable("blob_schema_test");
RowType paimonRowType = paimonTable.rowType();

// Step 3: Create a Flink temporary table with the same schema (BYTES column)
tEnv.executeSql(
"CREATE TEMPORARY TABLE flink_temp_table ("
+ "id INT, "
+ "name STRING, "
+ "picture BYTES"
+ ") WITH ("
+ "'row-tracking.enabled'='true',"
+ "'data-evolution.enabled'='true',"
+ "'connector'='blackhole'"
+ ")");
org.apache.flink.table.types.logical.RowType flinkRowType =
(org.apache.flink.table.types.logical.RowType)
tEnv.from("flink_temp_table")
.getResolvedSchema()
.toPhysicalRowDataType()
.getLogicalType();

// Step 4: Convert Paimon RowType to Flink RowType via LogicalTypeConversion
org.apache.flink.table.types.logical.RowType convertedRowType =
toLogicalType(paimonRowType);

// Step 5: Assert that schemaEquals considers them equal
assertThat(AbstractFlinkTableFactory.schemaEquals(convertedRowType, flinkRowType)).isTrue();
}

private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();

public static String bytesToHex(byte[] bytes) {
Expand Down