Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/content/spark/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ All Spark's data types are available in package `org.apache.spark.sql.types`.
<td><code>VarBinaryType</code>, <code>BinaryType</code></td>
<td>true</td>
</tr>
<tr>
<td><code>VariantType(Spark4.0+)</code></td>
<td><code>VariantType</code></td>
<td>true</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ public BaseVariantReader reader() {
}

public static RowType variantShreddingSchema(DataType dataType) {
return variantShreddingSchema(dataType, true, false);
return VariantMetadataUtils.addVariantMetadata(
variantShreddingSchema(dataType, true, false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
*/
public interface Variant {

byte VARIANT_SPEC_VERSION = (byte) 1;

String METADATA = "metadata";

String VALUE = "value";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ public static boolean isVariantRowType(DataType dataType) {
return true;
}

/** Add metadata to the top-level fields to mark it s a shredding schema for writers. */
public static RowType addVariantMetadata(RowType rowType) {
List<DataField> fields = new ArrayList<>();
for (DataField f : rowType.getFields()) {
fields.add((f.newDescription(METADATA_KEY)));
}
return rowType.copy(fields);
}

/** Extract the path from variant metadata description. */
public static String path(String description) {
return splitDescription(description)[0];
Expand Down
1 change: 0 additions & 1 deletion paimon-format/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ under the License.
<include>org.apache.parquet:parquet-format-structures</include>
<include>org.apache.parquet:parquet-jackson</include>
<include>commons-pool:commons-pool</include>
<include>commons-pool:commons-pool</include>
<include>org.locationtech.jts:jts-core</include>

<!-- compress -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet;

import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.data.variant.VariantMetadataUtils;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -213,20 +214,30 @@ public static Type convertToParquetType(String name, DataType type, int fieldId,
.withId(fieldId);
case ROW:
RowType rowType = (RowType) type;
return new GroupType(repetition, name, convertToParquetTypes(rowType))
Types.GroupBuilder<GroupType> groupTypeBuilder = Types.buildGroup(repetition);
if (VariantMetadataUtils.isVariantRowType(rowType)) {
groupTypeBuilder.as(
LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION));
}
return groupTypeBuilder
.addFields(convertToParquetTypes(rowType))
.named(name)
.withId(fieldId);
case VARIANT:
return Types.buildGroup(repetition)
.as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION))
.addField(
Types.primitive(
PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
.named(Variant.VALUE))
.named(Variant.VALUE)
.withId(0))
.addField(
Types.primitive(
PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
.named(Variant.METADATA))
.named(Variant.METADATA)
.withId(1))
.named(name)
.withId(fieldId);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public static RowType variantFileType(Type fileType) {
return (RowType) ParquetSchemaConverter.convertToPaimonField(fileType).type();
} else {
List<DataField> dataFields = new ArrayList<>();
dataFields.add(new DataField(0, VALUE, DataTypes.BYTES()));
dataFields.add(new DataField(1, METADATA, DataTypes.BYTES()));
dataFields.add(new DataField(0, VALUE, DataTypes.BYTES().notNull()));
dataFields.add(new DataField(1, METADATA, DataTypes.BYTES().notNull()));
return new RowType(dataFields);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@
import org.apache.paimon.types.RowType;

import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -545,9 +549,43 @@ protected void verifyShreddingSchema(RowType... expectShreddedTypes) throws IOEx
fileIO, file, fileIO.getFileSize(file), new Options())) {
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
for (int i = 0; i < expectShreddedTypes.length; i++) {
assertThat(VariantUtils.variantFileType(schema.getType(i)))
assertThat(
VariantMetadataUtils.addVariantMetadata(
VariantUtils.variantFileType(schema.getType(i))))
.isEqualTo(variantShreddingSchema(expectShreddedTypes[i]));
}
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testVariantTypeAnnotation(boolean inferShredding) throws Exception {
Options options = new Options();
options.set(
CoreOptions.VARIANT_INFER_SHREDDING_SCHEMA.key(), String.valueOf(inferShredding));
ParquetFileFormat format = createFormat(options);
RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v", DataTypes.VARIANT()));

FormatWriterFactory factory = format.createWriterFactory(writeType);
writeRows(
factory,
GenericRow.of(GenericVariant.fromJson("{\"name\":\"Alice\"}")),
GenericRow.of(GenericVariant.fromJson("{\"name\":\"Bob\"}")));

// Verify that the Parquet schema contains LogicalTypeAnnotation.variantType
try (ParquetFileReader reader =
ParquetUtil.getParquetReader(
fileIO, file, fileIO.getFileSize(file), new Options())) {
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
Type variantField = schema.getType(0);

// The variant field should be a group type
assertThat(variantField.isPrimitive()).isFalse();
LogicalTypeAnnotation logicalType = variantField.getLogicalTypeAnnotation();

// The variant type should have variant annotation
assertThat(logicalType)
.isInstanceOf(LogicalTypeAnnotation.VariantLogicalTypeAnnotation.class);
}
}
}