diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java index 3529349588ee..30c93c30b290 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java @@ -29,6 +29,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.InternalRow.FieldGetter; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataOutputView; import org.apache.paimon.memory.MemorySegment; @@ -213,6 +214,19 @@ private static FieldWriter createFieldWriter(DataType fieldType) { (writer, pos, value) -> writer.writeTimestamp((Timestamp) value, timestampPrecision); break; + case VARIANT: + fieldWriter = + (writer, pos, value) -> { + Variant variant = (Variant) value; + byte[] bytes; + try { + bytes = VariantSerializer.INSTANCE.serializeToBytes(variant); + } catch (IOException e) { + throw new RuntimeException(e); + } + writer.writeBinary(bytes); + }; + break; case ARRAY: Serializer arraySerializer = InternalSerializers.create(fieldType); fieldWriter = @@ -301,6 +315,17 @@ private static FieldReader createFieldReader(DataType fieldType) { final int timestampPrecision = getPrecision(fieldType); fieldReader = (reader, pos) -> reader.readTimestamp(timestampPrecision); break; + case VARIANT: + fieldReader = + (reader, pos) -> { + byte[] bytes = reader.readBinary(); + try { + return VariantSerializer.INSTANCE.deserializeFromBytes(bytes); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + break; case ARRAY: fieldReader = (reader, pos) -> reader.readArray(); break; diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java index ebcb02d70453..5e35569a3376 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java @@ -19,8 +19,12 @@ package org.apache.paimon.data.serializer; import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataInputViewStreamWrapper; import org.apache.paimon.io.DataOutputView; +import org.apache.paimon.io.DataOutputViewStreamWrapper; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; @@ -66,6 +70,31 @@ public interface Serializer extends Serializable { */ T deserialize(DataInputView source) throws IOException; + /** + * Serializes the given record to byte array. + * + * @param record The record to serialize. + * @return The serialized element. + */ + default byte[] serializeToBytes(T record) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + serialize(record, view); + return out.toByteArray(); + } + + /** + * De-serializes a record from byte array. + * + * @param bytes The byte array to de-serialize. + * @return The deserialized element. + */ + default T deserializeFromBytes(byte[] bytes) throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream(bytes); + DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in); + return deserialize(view); + } + /** * Serializes the given record to string. * diff --git a/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java b/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java index 919ff4f9dc45..6e03a5699916 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -178,6 +179,25 @@ private static RowCompactedSerializer getRowSerializer() { } } + static final class VariantTypesTest extends RowCompactedSerializerTest { + public VariantTypesTest() { + super(getRowSerializer(), getData()); + } + + private static InternalRow[] getData() { + return new GenericRow[] { + GenericRow.of(null, null), + GenericRow.of( + GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"), + GenericVariant.fromJson("{\"age\":29,\"city\":\"Shanghai\"}")) + }; + } + + private static RowCompactedSerializer getRowSerializer() { + return new RowCompactedSerializer(RowType.of(DataTypes.VARIANT(), DataTypes.VARIANT())); + } + } + static final class NestedInternalRowTest extends RowCompactedSerializerTest { private static final RowType NESTED_DATA_TYPE = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupVariantTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupVariantTableTest.java new file mode 100644 index 000000000000..7dfe6d74a252 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupVariantTableTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.variant.GenericVariant; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VariantType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link LookupTable} with variant type. */ +public class LookupVariantTableTest extends TableTestBase { + + @TempDir java.nio.file.Path tempDir; + private IOManager ioManager; + + @BeforeEach + public void before() throws IOException { + this.ioManager = new IOManagerImpl(tempDir.toString()); + } + + @Test + public void testRemoteFile() throws Exception { + Options options = new Options(); + options.set(CoreOptions.BUCKET, 1); + options.set(CoreOptions.DELETION_VECTORS_ENABLED, true); + options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true); + options.set(CoreOptions.MERGE_ENGINE, PARTIAL_UPDATE); + Identifier identifier = new Identifier("default", "t"); + Schema schema = + new Schema( + RowType.of(new IntType(), new IntType(), new VariantType()).getFields(), + Collections.emptyList(), + Collections.singletonList("f0"), + options.toMap(), + null); + catalog.createTable(identifier, schema, false); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + + // first write + try (BatchTableWrite write = writeBuilder.newWrite().withIOManager(ioManager); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write( + GenericRow.of( + 1, null, GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"))); + commit.commit(write.prepareCommit()); + } + + // second write to trigger ser and deser + try (BatchTableWrite write = writeBuilder.newWrite().withIOManager(ioManager); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(1, 1, null)); + commit.commit(write.prepareCommit()); + } + + // read to assert + ReadBuilder readBuilder = table.newReadBuilder(); + List result = new ArrayList<>(); + readBuilder + .newRead() + .createReader(readBuilder.newScan().plan()) + .forEachRemaining( + row -> { + result.add( + row.getInt(0) + + "-" + + row.getInt(1) + + "-" + + row.getVariant(2).toJson()); + }); + assertThat(result).containsOnly("1-1-{\"age\":27,\"city\":\"Beijing\"}"); + } +}