Skip to content

Commit

Permalink
PARQUET-2037: Write INT96 with parquet-avro (#901)
Browse files Browse the repository at this point in the history
  • Loading branch information
gszadovszky committed May 12, 2021
1 parent a3a1ad4 commit c72862b
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 22 deletions.
2 changes: 2 additions & 0 deletions parquet-avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Apache Avro integration
| `parquet.avro.read.schema` | `String` | The Avro schema to be used for reading. It shall be compatible with the file schema. The file schema will be used directly if not set. |
| `parquet.avro.projection` | `String` | The Avro schema to be used for projection. |
| `parquet.avro.compatible` | `boolean` | Flag for compatibility mode. `true` for materializing Avro `IndexedRecord` objects, `false` for materializing the related objects for either generic, specific, or reflect records.<br/>The default value is `true`. |
| `parquet.avro.readInt96AsFixed` | `boolean` | Flag for handling the `INT96` Parquet types. `true` for converting it to the `fixed` Avro type, `false` for not handling `INT96` types (throwing exception).<br/>The default value is `false`.<br/>**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |

### Configuration for writing

Expand All @@ -42,3 +43,4 @@ Apache Avro integration
| `parquet.avro.write-old-list-structure` | `boolean` | Flag whether to write list structures in the old way (2 levels) or the new one (3 levels). When writing at 2 levels no null values are available at the element level.<br/>The default value is `true` |
| `parquet.avro.add-list-element-records` | `boolean` | Flag whether to assume that any repeated element in the schema is a list element.<br/>The default value is `true`. |
| `parquet.avro.write-parquet-uuid` | `boolean` | Flag whether to write the [Parquet UUID logical type](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#uuid) in case of an [Avro UUID type](https://avro.apache.org/docs/current/spec.html#UUID) is present.<br/>The default value is `false`. |
| `parquet.avro.writeFixedAsInt96` | `String` | Comma separated list of paths pointing to Avro schema elements which are to be converted to `INT96` Parquet types.<br/>The path is a `'.'` separated list of field names and does not contain the name of the schema nor the namespace. The type of the referenced schema elements must be `fixed` with the size of 12 bytes.<br/>**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,20 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID;
Expand Down Expand Up @@ -77,6 +81,7 @@ public class AvroSchemaConverter {
private final boolean writeOldListStructure;
private final boolean writeParquetUUID;
private final boolean readInt96AsFixed;
private final Set<String> pathsToInt96;

public AvroSchemaConverter() {
this(ADD_LIST_ELEMENT_RECORDS_DEFAULT);
Expand All @@ -93,6 +98,7 @@ public AvroSchemaConverter() {
this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT;
this.readInt96AsFixed = READ_INT96_AS_FIXED_DEFAULT;
this.pathsToInt96 = Collections.emptySet();
}

public AvroSchemaConverter(Configuration conf) {
Expand All @@ -102,6 +108,7 @@ public AvroSchemaConverter(Configuration conf) {
WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT);
this.readInt96AsFixed = conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT);
this.pathsToInt96 = new HashSet<>(Arrays.asList(conf.getStrings(WRITE_FIXED_AS_INT96, new String[0])));
}

/**
Expand Down Expand Up @@ -134,26 +141,26 @@ public MessageType convert(Schema avroSchema) {
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields()));
return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields(), ""));
}

private List<Type> convertFields(List<Schema.Field> fields) {
private List<Type> convertFields(List<Schema.Field> fields, String schemaPath) {
List<Type> types = new ArrayList<Type>();
for (Schema.Field field : fields) {
if (field.schema().getType().equals(Schema.Type.NULL)) {
continue; // Avro nulls are not encoded, unless they are null unions
}
types.add(convertField(field));
types.add(convertField(field, appendPath(schemaPath, field.name())));
}
return types;
}

private Type convertField(String fieldName, Schema schema) {
return convertField(fieldName, schema, Type.Repetition.REQUIRED);
private Type convertField(String fieldName, Schema schema, String schemaPath) {
return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath);
}

@SuppressWarnings("deprecation")
private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
private Type convertField(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
Types.PrimitiveBuilder<PrimitiveType> builder;
Schema.Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();
Expand All @@ -177,26 +184,33 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
builder = Types.primitive(BINARY, repetition).as(stringType());
}
} else if (type.equals(Schema.Type.RECORD)) {
return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
return new GroupType(repetition, fieldName, convertFields(schema.getFields(), schemaPath));
} else if (type.equals(Schema.Type.ENUM)) {
builder = Types.primitive(BINARY, repetition).as(enumType());
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
convertField("array", schema.getElementType(), REPEATED));
convertField("array", schema.getElementType(), REPEATED, schemaPath));
} else {
return ConversionPatterns.listOfElements(repetition, fieldName,
convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType()));
convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(), schemaPath));
}
} else if (type.equals(Schema.Type.MAP)) {
Type valType = convertField("value", schema.getValueType());
Type valType = convertField("value", schema.getValueType(), schemaPath);
// avro map key type is always string
return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
} else if (type.equals(Schema.Type.FIXED)) {
builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(schema.getFixedSize());
if (pathsToInt96.contains(schemaPath)) {
if (schema.getFixedSize() != 12) {
throw new IllegalArgumentException(
"The size of the fixed type field " + schemaPath + " must be 12 bytes for INT96 conversion");
}
builder = Types.primitive(PrimitiveTypeName.INT96, repetition);
} else {
builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(schema.getFixedSize());
}
} else if (type.equals(Schema.Type.UNION)) {
return convertUnion(fieldName, schema, repetition);
return convertUnion(fieldName, schema, repetition, schemaPath);
} else {
throw new UnsupportedOperationException("Cannot convert Avro type " + type);
}
Expand All @@ -218,7 +232,7 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
return builder.named(fieldName);
}

private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
List<Schema> nonNullSchemas = new ArrayList<Schema>(schema.getTypes().size());
// Found any schemas in the union? Required for the edge case, where the union contains only a single type.
boolean foundNullSchema = false;
Expand All @@ -239,25 +253,26 @@ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repet
throw new UnsupportedOperationException("Cannot convert Avro union of only nulls");

case 1:
return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition) :
convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition, schemaPath) :
convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);

default: // complex union type
return convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);
}
}

private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List<Schema> nonNullSchemas) {
private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List<Schema> nonNullSchemas,
String schemaPath) {
List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
int index = 0;
for (Schema childSchema : nonNullSchemas) {
unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL, schemaPath));
}
return new GroupType(repetition, fieldName, unionTypes);
}

private Type convertField(Schema.Field field) {
return convertField(field.name(), field.schema());
private Type convertField(Schema.Field field, String schemaPath) {
return convertField(field.name(), field.schema(), schemaPath);
}

public Schema convert(MessageType parquetSchema) {
Expand Down Expand Up @@ -314,7 +329,7 @@ public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
return Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12);
}
throw new IllegalArgumentException(
"INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
"INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
}
@Override
public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
Expand Down Expand Up @@ -524,4 +539,11 @@ private static Schema optional(Schema original) {
Schema.create(Schema.Type.NULL),
original));
}

private static String appendPath(String path, String fieldName) {
if (path == null || path.isEmpty()) {
return fieldName;
}
return path + '.' + fieldName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public static void setAvroDataSupplier(
public static final String WRITE_PARQUET_UUID = "parquet.avro.write-parquet-uuid";
static final boolean WRITE_PARQUET_UUID_DEFAULT = false;

// Support writing Parquet INT96 from a 12-byte Avro fixed.
public static final String WRITE_FIXED_AS_INT96 = "parquet.avro.writeFixedAsInt96";

private static final String MAP_REPEATED_NAME = "key_value";
private static final String MAP_KEY_NAME = "key";
private static final String MAP_VALUE_NAME = "value";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.apache.parquet.avro.AvroTestUtil.optionalField;
import static org.apache.parquet.avro.AvroTestUtil.primitive;
import static org.apache.parquet.avro.AvroTestUtil.record;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.schema.OriginalType.DATE;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -824,6 +825,37 @@ public void testUUIDTypeWithParquetUUID() throws Exception {
"}\n");
}

@Test
public void testAvroFixed12AsParquetInt96Type() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("fixedToInt96.avsc").openStream());

Configuration conf = new Configuration();
conf.setStrings(WRITE_FIXED_AS_INT96, "int96", "mynestedrecord.int96inrecord", "mynestedrecord.myarrayofoptional",
"mynestedrecord.mymap");
testAvroToParquetConversion(conf, schema, "message org.apache.parquet.avro.fixedToInt96 {\n"
+ " required int96 int96;\n"
+ " required fixed_len_byte_array(12) notanint96;\n"
+ " required group mynestedrecord {\n"
+ " required int96 int96inrecord;\n"
+ " required group myarrayofoptional (LIST) {\n"
+ " repeated int96 array;\n"
+ " }\n"
+ " required group mymap (MAP) {\n"
+ " repeated group key_value (MAP_KEY_VALUE) {\n"
+ " required binary key (STRING);\n"
+ " required int96 value;\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " required fixed_len_byte_array(1) onebytefixed;\n"
+ "}");

conf.setStrings(WRITE_FIXED_AS_INT96, "onebytefixed");
assertThrows("Exception should be thrown for fixed types to be converted to INT96 where the size is not 12 bytes",
IllegalArgumentException.class, () -> new AvroSchemaConverter(conf).convert(schema));
}

public static Schema optional(Schema original) {
return Schema.createUnion(Lists.newArrayList(
Schema.create(Schema.Type.NULL),
Expand Down
80 changes: 80 additions & 0 deletions parquet-avro/src/test/resources/fixedToInt96.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{
"name": "fixedToInt96",
"namespace": "org.apache.parquet.avro",
"type": "record",
"fields": [
{
"name": "int96",
"type": {
"type": "fixed",
"name": "ignored1",
"namespace": "",
"size": 12
}
},
{
"name": "notanint96",
"type": {
"type": "fixed",
"name": "ignored2",
"namespace": "",
"size": 12
}
},
{
"name": "mynestedrecord",
"type": {
"type": "record",
"name": "ignored3",
"namespace": "",
"fields": [
{
"name": "int96inrecord",
"type": {
"type": "fixed",
"name": "ignored4",
"namespace": "",
"size": 12
}
},
{
"name": "myarrayofoptional",
"type": {
"type": "array",
"items": [
"null",
{
"type": "fixed",
"name": "ignored5",
"namespace": "",
"size": 12
}
]
}
},
{
"name": "mymap",
"type": {
"type": "map",
"values": {
"type": "fixed",
"name": "ignored6",
"namespace": "",
"size": 12
}
}
}
]
}
},
{
"name": "onebytefixed",
"type": {
"type": "fixed",
"name": "ignored7",
"namespace": "",
"size": 1
}
}
]
}

0 comments on commit c72862b

Please sign in to comment.