Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client.model;

import org.apache.hudi.adapter.DataTypeAdapter;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.util.ValidationUtils;

Expand Down Expand Up @@ -169,6 +170,6 @@ protected String getMetaColumnVal(int ordinal) {
protected abstract int rebaseOrdinal(int ordinal);

public Variant getVariant(int i) {
throw new UnsupportedOperationException("Variant is not supported yet.");
return DataTypeAdapter.getVariant(row, rebaseOrdinal(i));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.client.model;

import org.apache.hudi.adapter.DataTypeAdapter;

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
Expand Down Expand Up @@ -149,7 +151,7 @@ private <T> T getValue(int pos, Function<Integer, T> getter) {
return getter.apply(pos);
}

public Variant getVariant(int i) {
throw new UnsupportedOperationException("Variant is not supported yet.");
public Variant getVariant(int pos) {
return DataTypeAdapter.getVariant(row, pos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.io.storage.row.parquet;

import org.apache.hudi.adapter.DataTypeAdapter;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -155,6 +157,15 @@ public static RowType.RowField convertToRowField(Type parquetType) {
new MapType(
convertToRowField(keyValueType.getLeft()).getType().copy(true),
convertToRowField(keyValueType.getRight()).getType()));
} else if (hasVariantAnnotation(logicalType)) {
if (isShreddedVariant(groupType)) {
throw new UnsupportedOperationException(
"Shredded Variant is not supported in Flink. "
+ "The Parquet group '" + groupType.getName() + "' contains a '"
+ HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD
+ "' field indicating a shredded layout.");
}
dataType = DataTypeAdapter.createVariantType();
} else {
dataType =
DataTypes.of(new RowType(
Expand Down Expand Up @@ -190,6 +201,39 @@ public static MessageType convertToParquetMessageType(String name, RowType rowTy
return new MessageType(name, types);
}

/**
* Checks whether the group carries the Parquet {@code VARIANT} logical type annotation.
* Uses class-name matching so this compiles against parquet-java versions that predate the
* {@code VariantLogicalTypeAnnotation} class (< 1.15.2).
*/
private static boolean hasVariantAnnotation(LogicalTypeAnnotation logicalType) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the writer does not attach the variant logical annotation (todo in line 227), a Flink-written variant column will read back as a plain row and HoodieSchemaConverter.convertToSchema() will lose the VARIANT type?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private def isVariantGroup(group: GroupType): Boolean = {
group.containsField("value") &&
group.containsField("metadata") &&
group.getType("value").isPrimitive &&
group.getType("metadata").isPrimitive &&
group.getType("value").asPrimitiveType().getPrimitiveTypeName == PrimitiveType.PrimitiveTypeName.BINARY &&
group.getType("metadata").asPrimitiveType().getPrimitiveTypeName == PrimitiveType.PrimitiveTypeName.BINARY
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah based on my understanding, in order to make sure a Flink-written variant field (or any variant field written without the parquet variant annotation — which is all current Hudi writes since the code has a TODO to add it like you pointed out) is re-read as variant in Flink, we would need a subsequent PR like #18539 that add HoodieSchema for detection or we need to actually attach the annotation at write time.
Maybe we can add a TODO comment here, just so that readers know this state of Flink variant integration (in case for some reason we don't fully "fix" this by 1.3)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, add a TODO to add variant on writer side first.

// needs to ensure the writer attach the variant annotation in 1.3.
return logicalType != null
&& logicalType.getClass().getSimpleName().equals("VariantLogicalTypeAnnotation");
}

/**
* Checks whether a variant group contains a {@code typed_value} field, indicating a shredded
* layout. Called only after {@link #hasVariantAnnotation} returns true.
*/
private static boolean isShreddedVariant(GroupType groupType) {
return groupType.containsField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
}

/**
* Converts a Variant column to the canonical unshredded Parquet layout:
* a group with required binary {@code metadata} and required binary {@code value}.
*/
private static Type convertVariantToParquetType(String name, Type.Repetition repetition) {
// TODO: add .as(LogicalTypeAnnotation.variantType()) once parquet-java is bumped to 1.16.0
return Types.buildGroup(repetition)
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
.named(HoodieSchema.Variant.VARIANT_METADATA_FIELD))
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
.named(HoodieSchema.Variant.VARIANT_VALUE_FIELD))
.named(name);
}

private static Type convertToParquetType(
String name, LogicalType type, Type.Repetition repetition) {
switch (type.getTypeRoot()) {
Expand Down Expand Up @@ -304,6 +348,9 @@ private static Type convertToParquetType(
.addField(convertToParquetType(field.getName(), field.getType(), field.getType().isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED)));
return builder.named(name);
default:
if (DataTypeAdapter.isVariantType(type)) {
return convertVariantToParquetType(name, repetition);
}
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.util;

import org.apache.hudi.adapter.DataTypeAdapter;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.avro.generic.GenericFixed;
Expand Down Expand Up @@ -155,6 +157,9 @@ public static AvroToRowDataConverter createConverter(LogicalType type, boolean u
case MULTISET:
return createMapConverter(type, utcTimezone);
default:
if (DataTypeAdapter.isVariantType(type)) {
return createVariantConverter();
}
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
Expand Down Expand Up @@ -212,6 +217,18 @@ private static AvroToRowDataConverter createMapConverter(LogicalType type, boole
};
}

/**
* Creates a converter for Flink 2.1+ VARIANT LogicalType. The converter receives an Avro
* GenericRecord carrying metadata/value binary fields and produces a Flink
* {@code BinaryVariant}.
*/
private static AvroToRowDataConverter createVariantConverter() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: record.get(0) and record.get(1) are opaque — could you cast to GenericRecord and use HoodieSchema.Variant.VARIANT_METADATA_FIELD / VARIANT_VALUE_FIELD for named access? The same constants are already used in ParquetSchemaConverter in this PR.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return avroObject -> {
IndexedRecord record = (IndexedRecord) avroObject;
return DataTypeAdapter.createVariant(convertToBytes(record.get(1)), convertToBytes(record.get(0)));
};
}

private static AvroToRowDataConverter createTimestampConverter(int precision, boolean utcTimezone) {
final ChronoUnit chronoUnit;
if (precision <= 3) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.util;

import org.apache.hudi.adapter.DataTypeAdapter;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
Expand Down Expand Up @@ -222,6 +223,10 @@ public static HoodieSchema convertToSchema(LogicalType logicalType, String rowNa

case RAW:
default:
if (DataTypeAdapter.isVariantType(logicalType)) {
schema = HoodieSchema.createVariant();
break;
}
throw new UnsupportedOperationException(
"Unsupported type for HoodieSchema conversion: " + logicalType);
}
Expand Down Expand Up @@ -553,23 +558,24 @@ private static DataType convertUnion(HoodieSchema schema) {
}

/**
* Converts a Variant schema to Flink's ROW type.
* Variant is represented as ROW<`metadata` BYTES, `value` BYTES> in Flink.
* Converts a Variant HoodieSchema to the native Flink {@code VariantType} DataType.
* Requires Flink 2.1+ at runtime; throws {@link UnsupportedOperationException} on older versions.
*
* @param schema HoodieSchema to convert (must be a VARIANT type)
* @return DataType representing the Variant as a ROW with binary fields
* @return native VariantType DataType
* @throws UnsupportedOperationException if Flink runtime is pre-2.1 or variant is shredded
*/
private static DataType convertVariant(HoodieSchema schema) {
if (schema.getType() != HoodieSchemaType.VARIANT) {
throw new IllegalStateException("Expected HoodieSchema.Variant but got: " + schema.getClass());
}

// Variant is stored as a struct with two binary fields: metadata and value.
// Field order follows the Parquet spec and Iceberg convention (metadata first, value second).
return DataTypes.ROW(
DataTypes.FIELD("metadata", DataTypes.BYTES().notNull()),
DataTypes.FIELD("value", DataTypes.BYTES().notNull())
).notNull();
if (((HoodieSchema.Variant) schema).isShredded()) {
throw new UnsupportedOperationException(
"Shredded Variant is not yet supported in Flink. Use unshredded Variant instead.");
}

return DataTypeAdapter.createVariantType().notNull();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.util;

import org.apache.hudi.adapter.DataTypeAdapter;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
Expand Down Expand Up @@ -239,6 +240,10 @@ public Object convert(HoodieSchema schema, Object object) {
break;
case RAW:
default:
if (DataTypeAdapter.isVariantType(type)) {
converter = createVariantConverter();
break;
}
throw new UnsupportedOperationException("Unsupported type: " + type);
}

Expand Down Expand Up @@ -357,5 +362,28 @@ public Object convert(HoodieSchema schema, Object object) {
}
};
}

/**
* Creates a converter for Flink 2.1+ VARIANT LogicalType. The converter receives a Flink
* {@code Variant} object at runtime and extracts the raw metadata/value byte arrays,
* then packs them into an Avro GenericRecord with the Variant schema.
*
* <p>No shredded-variant check is needed here: {@code HoodieSchemaConverter.convertVariant()}
* already rejects shredded variants before a Flink type or converter is ever constructed,
* and Flink 2.1 itself only supports unshredded variants (FLIP-521).
*/
private static RowDataToAvroConverter createVariantConverter() {
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;

@Override
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the string literals "metadata" and "value" here could use HoodieSchema.Variant.VARIANT_METADATA_FIELD / VARIANT_VALUE_FIELD to stay consistent with how ParquetSchemaConverter references the same fields in this PR.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

public Object convert(HoodieSchema schema, Object object) {
final GenericRecord record = new GenericData.Record(schema.toAvroSchema());
record.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD, ByteBuffer.wrap(DataTypeAdapter.getVariantMetadata(object)));
record.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD, ByteBuffer.wrap(DataTypeAdapter.getVariantValue(object)));
return record;
}
};
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
Expand Down Expand Up @@ -638,6 +639,7 @@ public void testBlobInNestedStructures() {
}

@Test
@Disabled("disabled and reopen the tests for 1.3")
public void testVariantTypeConversion() {
// Test direct Variant conversion
HoodieSchema variantSchema = HoodieSchema.createVariant();
Expand All @@ -654,6 +656,7 @@ public void testVariantTypeConversion() {
}

@Test
@Disabled("disabled and reopen the tests for 1.3")
public void testVariantInRecordConversion() {
// Test Variant field within a record
HoodieSchema recordWithVariant = HoodieSchema.createRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -114,6 +115,7 @@ private void verifyFlinkCanReadSparkVariantTable(String tablePath, String tableT
}

@Test
@Disabled("disabled and reopen the tests for 1.3")
public void testFlinkReadSparkVariantCOWTable() throws Exception {
// Test that Flink can read a COW table with Variant data written by Spark 4.0
Path cowTargetDir = tempDir.resolve("cow");
Expand All @@ -123,6 +125,7 @@ public void testFlinkReadSparkVariantCOWTable() throws Exception {
}

@Test
@Disabled("disabled and reopen the tests for 1.3")
public void testFlinkReadSparkVariantMORTableWithAvro() throws Exception {
// Test that Flink can read a MOR table with AVRO record type and Variant data written by Spark 4.0
Path morAvroTargetDir = tempDir.resolve("mor_avro");
Expand All @@ -132,6 +135,7 @@ public void testFlinkReadSparkVariantMORTableWithAvro() throws Exception {
}

@Test
@Disabled("disabled and reopen the tests for 1.3")
public void testFlinkReadSparkVariantMORTableWithSpark() throws Exception {
// Test that Flink can read a MOR table with SPARK record type and Variant data written by Spark 4.0
Path morSparkTargetDir = tempDir.resolve("mor_spark");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.variant.Variant;

/**
* Adapter utils to provide {@code DataType} utilities.
*/
public class DataTypeAdapter {
public static Variant getVariant(RowData rowData, int pos) {
throw new UnsupportedOperationException("Variant is not supported yet.");
}

public static Object createVariant(byte[] value, byte[] metadata) {
throw new UnsupportedOperationException("Variant is not supported yet.");
}

public static boolean isVariantType(LogicalType logicalType) {
return false;
}

public static DataType createVariantType() {
throw new UnsupportedOperationException("Variant is not supported yet.");
}

public static byte[] getVariantMetadata(Object obj) {
throw new UnsupportedOperationException("Variant is not supported yet.");
}

public static byte[] getVariantValue(Object obj) {
throw new UnsupportedOperationException("Variant is not supported yet.");
}
}
Loading
Loading