Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.memory.MemorySegment;
Expand Down Expand Up @@ -213,6 +214,19 @@ private static FieldWriter createFieldWriter(DataType fieldType) {
(writer, pos, value) ->
writer.writeTimestamp((Timestamp) value, timestampPrecision);
break;
case VARIANT:
fieldWriter =
(writer, pos, value) -> {
Variant variant = (Variant) value;
byte[] bytes;
try {
bytes = VariantSerializer.INSTANCE.serializeToBytes(variant);
} catch (IOException e) {
throw new RuntimeException(e);
}
writer.writeBinary(bytes);
};
break;
case ARRAY:
Serializer<InternalArray> arraySerializer = InternalSerializers.create(fieldType);
fieldWriter =
Expand Down Expand Up @@ -301,6 +315,17 @@ private static FieldReader createFieldReader(DataType fieldType) {
final int timestampPrecision = getPrecision(fieldType);
fieldReader = (reader, pos) -> reader.readTimestamp(timestampPrecision);
break;
case VARIANT:
fieldReader =
(reader, pos) -> {
byte[] bytes = reader.readBinary();
try {
return VariantSerializer.INSTANCE.deserializeFromBytes(bytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
break;
case ARRAY:
fieldReader = (reader, pos) -> reader.readArray();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
package org.apache.paimon.data.serializer;

import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;

Expand Down Expand Up @@ -66,6 +70,31 @@ public interface Serializer<T> extends Serializable {
*/
T deserialize(DataInputView source) throws IOException;

/**
* Serializes the given record to byte array.
*
* @param record The record to serialize.
* @return The serialized element.
*/
default byte[] serializeToBytes(T record) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
serialize(record, view);
return out.toByteArray();
}

/**
* De-serializes a record from byte array.
*
* @param bytes The byte array to de-serialize.
* @return The deserialized element.
*/
default T deserializeFromBytes(byte[] bytes) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
return deserialize(view);
}

/**
* Serializes the given record to string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -178,6 +179,25 @@ private static RowCompactedSerializer getRowSerializer() {
}
}

static final class VariantTypesTest extends RowCompactedSerializerTest {
public VariantTypesTest() {
super(getRowSerializer(), getData());
}

private static InternalRow[] getData() {
return new GenericRow[] {
GenericRow.of(null, null),
GenericRow.of(
GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"),
GenericVariant.fromJson("{\"age\":29,\"city\":\"Shanghai\"}"))
};
}

private static RowCompactedSerializer getRowSerializer() {
return new RowCompactedSerializer(RowType.of(DataTypes.VARIANT(), DataTypes.VARIANT()));
}
}

static final class NestedInternalRowTest extends RowCompactedSerializerTest {

private static final RowType NESTED_DATA_TYPE =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VariantType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link LookupTable} with variant type. */
public class LookupVariantTableTest extends TableTestBase {

@TempDir java.nio.file.Path tempDir;
private IOManager ioManager;

@BeforeEach
public void before() throws IOException {
this.ioManager = new IOManagerImpl(tempDir.toString());
}

@Test
public void testRemoteFile() throws Exception {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true);
options.set(CoreOptions.MERGE_ENGINE, PARTIAL_UPDATE);
Identifier identifier = new Identifier("default", "t");
Schema schema =
new Schema(
RowType.of(new IntType(), new IntType(), new VariantType()).getFields(),
Collections.emptyList(),
Collections.singletonList("f0"),
options.toMap(),
null);
catalog.createTable(identifier, schema, false);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();

// first write
try (BatchTableWrite write = writeBuilder.newWrite().withIOManager(ioManager);
BatchTableCommit commit = writeBuilder.newCommit()) {
write.write(
GenericRow.of(
1, null, GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}")));
commit.commit(write.prepareCommit());
}

// second write to trigger ser and deser
try (BatchTableWrite write = writeBuilder.newWrite().withIOManager(ioManager);
BatchTableCommit commit = writeBuilder.newCommit()) {
write.write(GenericRow.of(1, 1, null));
commit.commit(write.prepareCommit());
}

// read to assert
ReadBuilder readBuilder = table.newReadBuilder();
List<String> result = new ArrayList<>();
readBuilder
.newRead()
.createReader(readBuilder.newScan().plan())
.forEachRemaining(
row -> {
result.add(
row.getInt(0)
+ "-"
+ row.getInt(1)
+ "-"
+ row.getVariant(2).toJson());
});
assertThat(result).containsOnly("1-1-{\"age\":27,\"city\":\"Beijing\"}");
}
}