From f84938441be49c665595c936ac631c3e5f171bf9 Mon Sep 17 00:00:00 2001 From: Constantin Muraru Date: Thu, 26 Apr 2018 08:48:08 -0400 Subject: [PATCH] PARQUET-968 Add Hive/Presto support in ProtoParquet MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds Hive (https://github.com/apache/hive) and Presto (https://github.com/prestodb/presto) support for parquet messages written with ProtoParquetWriter. Hive and other tools, such as Presto (used by AWS Athena), rely on specific LIST/MAP wrappers (as defined in the parquet spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). These wrappers are currently missing from the ProtoParquet schema. AvroParquet works just fine, because it adds these wrappers when it deals with arrays and maps. This PR brings these wrappers in parquet-proto, providing the same functionality that already exists in parquet-avro. This is backward compatible. Messages written without the extra LIST/MAP wrappers are still being read successfully using the updated ProtoParquetReader. Regarding the change. Given the following protobuf schema: ``` message ListOfPrimitives { repeated int64 my_repeated_id = 1; } ``` Old parquet schema was: ``` message ListOfPrimitives { repeated int64 my_repeated_id = 1; } ``` New parquet schema is: ``` message ListOfPrimitives { required group my_repeated_id (LIST) = 1 { repeated group list { required int64 element; } } } ``` --- For list of messages, the changes look like this: Protobuf schema: ``` message ListOfMessages { string top_field = 1; repeated MyInnerMessage first_array = 2; } message MyInnerMessage { int32 inner_field = 1; } ``` Old parquet schema was: ``` message TestProto3.ListOfMessages { optional binary top_field (UTF8) = 1; repeated group first_array = 2 { optional int32 inner_field = 1; } } ``` The expected parquet schema, compatible with Hive (and similar to parquet-avro) is the following (notice the LIST wrapper): ``` message TestProto3.ListOfMessages { optional binary top_field (UTF8) = 1; required group first_array (LIST) = 2 { repeated group list { optional group element { optional int32 inner_field = 1; } } } } ``` --- Similar for maps. Protobuf schema: ``` message TopMessage { map myMap = 1; } message MyInnerMessage { int32 inner_field = 1; } ``` Old parquet schema: ``` message TestProto3.TopMessage { repeated group myMap = 1 { optional int64 key = 1; optional group value = 2 { optional int32 inner_field = 1; } } } ``` New parquet schema (notice the `MAP` wrapper): ``` message TestProto3.TopMessage { required group myMap (MAP) = 1 { repeated group key_value { required int64 key; optional group value { optional int32 inner_field = 1; } } } } ``` Jira: https://issues.apache.org/jira/browse/PARQUET-968 Author: Constantin Muraru Author: Benoît Hanotte Closes #411 from costimuraru/PARQUET-968 and squashes the following commits: 16eafcb6 [Benoît Hanotte] PARQUET-968 add proto flag to enable writing using specs-compliant schemas (#2) a8bd7041 [Constantin Muraru] Pick up commit from @andredasilvapinto 5cf92487 [Constantin Muraru] PARQUET-968 Add Hive support in ProtoParquet --- .../parquet/proto/ProtoMessageConverter.java | 126 +++- .../parquet/proto/ProtoSchemaConverter.java | 169 ++++-- .../parquet/proto/ProtoWriteSupport.java | 190 +++++- .../proto/ProtoInputOutputFormatTest.java | 120 ++++ .../proto/ProtoSchemaConverterTest.java | 219 ++++++- .../parquet/proto/ProtoWriteSupportTest.java | 565 +++++++++++++++++- .../parquet/proto/utils/WriteUsingMR.java | 10 +- .../src/test/resources/TestProto3.proto | 8 + .../src/test/resources/TestProtobuf.proto | 8 + 9 files changed, 1331 insertions(+), 84 deletions(-) diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index 890f16c5f6..979d78ea71 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -24,12 +24,14 @@ import com.twitter.elephantbird.util.Protobufs; import org.apache.parquet.column.Dictionary; import org.apache.parquet.io.InvalidRecordException; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.IncompatibleSchemaModificationException; +import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.Type; import java.util.HashMap; @@ -126,10 +128,15 @@ public void add(Object value) { }; } + if (OriginalType.LIST == parquetType.getOriginalType()) { + return new ListConverter(parentBuilder, fieldDescriptor, parquetType); + } + if (OriginalType.MAP == parquetType.getOriginalType()) { + return new MapConverter(parentBuilder, fieldDescriptor, parquetType); + } return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); } - private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { JavaType javaType = fieldDescriptor.getJavaType(); @@ -342,4 +349,121 @@ public void addBinary(Binary binary) { } } + + /** + * This class unwraps the additional LIST wrapper and makes it possible to read the underlying data and then convert + * it to protobuf. + *

+ * Consider the following protobuf schema: + * message SimpleList { + * repeated int64 first_array = 1; + * } + *

+ * A LIST wrapper is created in parquet for the above mentioned protobuf schema: + * message SimpleList { + * optional group first_array (LIST) = 1 { + * repeated group list { + * optional int32 element; + * } + * } + * } + *

+ * The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains + * a repeated group named 'list', itself containing only one field called 'element' of the type of the repeated + * object (can be a primitive as in this example or a group in case of a repeated message in protobuf). + */ + final class ListConverter extends GroupConverter { + private final Converter converter; + + public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + OriginalType originalType = parquetType.getOriginalType(); + if (originalType != OriginalType.LIST || parquetType.isPrimitive()) { + throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead."); + } + + GroupType rootWrapperType = parquetType.asGroupType(); + if (!rootWrapperType.containsField("list") || rootWrapperType.getType("list").isPrimitive()) { + throw new ParquetDecodingException("Expected repeated 'list' group inside LIST wrapperr but got: " + rootWrapperType); + } + + GroupType listType = rootWrapperType.getType("list").asGroupType(); + if (!listType.containsField("element")) { + throw new ParquetDecodingException("Expected 'element' inside repeated list group but got: " + listType); + } + + Type elementType = listType.getType("element"); + converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType); + } + + @Override + public Converter getConverter(int fieldIndex) { + if (fieldIndex > 0) { + throw new ParquetDecodingException("Unexpected multiple fields in the LIST wrapper"); + } + + return new GroupConverter() { + @Override + public Converter getConverter(int fieldIndex) { + return converter; + } + + @Override + public void start() { + + } + + @Override + public void end() { + + } + }; + } + + @Override + public void start() { + + } + + @Override + public void end() { + + } + } + + + final class MapConverter extends GroupConverter { + private final Converter converter; + + public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + OriginalType originalType = parquetType.getOriginalType(); + if (originalType != OriginalType.MAP) { + throw new ParquetDecodingException("Expected MAP wrapper. Found: " + originalType + " instead."); + } + + Type parquetSchema; + if (parquetType.asGroupType().containsField("key_value")){ + parquetSchema = parquetType.asGroupType().getType("key_value"); + } else { + throw new ParquetDecodingException("Expected map but got: " + parquetType); + } + + converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema); + } + + @Override + public Converter getConverter(int fieldIndex) { + if (fieldIndex > 0) { + throw new ParquetDecodingException("Unexpected multiple fields in the MAP wrapper"); + } + return converter; + } + + @Override + public void start() { + } + + @Override + public void end() { + } + } } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java index 64668c0255..0e1aa20100 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,36 +18,49 @@ */ package org.apache.parquet.proto; -import static org.apache.parquet.schema.OriginalType.ENUM; -import static org.apache.parquet.schema.OriginalType.UTF8; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; - -import java.util.List; - +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; +import com.google.protobuf.Message; +import com.twitter.elephantbird.util.Protobufs; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.apache.parquet.schema.Types.Builder; import org.apache.parquet.schema.Types.GroupBuilder; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; -import com.google.protobuf.Message; -import com.twitter.elephantbird.util.Protobufs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + +import static org.apache.parquet.schema.OriginalType.ENUM; +import static org.apache.parquet.schema.OriginalType.UTF8; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; + /** * Converts a Protocol Buffer Descriptor into a Parquet schema. */ public class ProtoSchemaConverter { private static final Logger LOG = LoggerFactory.getLogger(ProtoSchemaConverter.class); + private final boolean parquetSpecsCompliant; + + public ProtoSchemaConverter() { + this(false); + } + + /** + * Instanciate a schema converter to get the parquet schema corresponding to protobuf classes. + * @param parquetSpecsCompliant If set to false, the parquet schema generated will be using the old + * schema style (prior to PARQUET-968) to provide backward-compatibility + * but which does not use LIST and MAP wrappers around collections as required + * by the parquet specifications. If set to true, specs compliant schemas are used. + */ + public ProtoSchemaConverter(boolean parquetSpecsCompliant) { + this.parquetSpecsCompliant = parquetSpecsCompliant; + } public MessageType convert(Class protobufClass) { LOG.debug("Converting protocol buffer class \"" + protobufClass + "\" to parquet schema."); @@ -60,8 +73,8 @@ public MessageType convert(Class protobufClass) { } /* Iterates over list of fields. **/ - private GroupBuilder convertFields(GroupBuilder groupBuilder, List fieldDescriptors) { - for (Descriptors.FieldDescriptor fieldDescriptor : fieldDescriptors) { + private GroupBuilder convertFields(GroupBuilder groupBuilder, List fieldDescriptors) { + for (FieldDescriptor fieldDescriptor : fieldDescriptors) { groupBuilder = addField(fieldDescriptor, groupBuilder) .id(fieldDescriptor.getNumber()) @@ -70,7 +83,7 @@ private GroupBuilder convertFields(GroupBuilder groupBuilder, List Builder>, GroupBuilder> addField(Descriptors.FieldDescriptor descriptor, GroupBuilder builder) { - Type.Repetition repetition = getRepetition(descriptor); - JavaType javaType = descriptor.getJavaType(); + private Builder>, GroupBuilder> addField(FieldDescriptor descriptor, final GroupBuilder builder) { + if (descriptor.getJavaType() == JavaType.MESSAGE) { + return addMessageField(descriptor, builder); + } + + ParquetType parquetType = getParquetType(descriptor); + if (descriptor.isRepeated() && parquetSpecsCompliant) { + // the old schema style did not include the LIST wrapper around repeated fields + return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder); + } + + return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.originalType); + } + + private Builder>, GroupBuilder> addRepeatedPrimitive(FieldDescriptor descriptor, + PrimitiveTypeName primitiveType, + OriginalType originalType, + final GroupBuilder builder) { + return builder + .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST) + .group(Type.Repetition.REPEATED) + .primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType) + .named("element") + .named("list"); + } + + private GroupBuilder> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder builder) { + GroupBuilder>>> result = + builder + .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST) + .group(Type.Repetition.REPEATED) + .group(Type.Repetition.OPTIONAL); + + convertFields(result, descriptor.getMessageType().getFields()); + + return result.named("element").named("list"); + } + + private GroupBuilder> addMessageField(FieldDescriptor descriptor, final GroupBuilder builder) { + if (descriptor.isMapField() && parquetSpecsCompliant) { + // the old schema style did not include the MAP wrapper around map groups + return addMapField(descriptor, builder); + } + if (descriptor.isRepeated() && parquetSpecsCompliant) { + // the old schema style did not include the LIST wrapper around repeated messages + return addRepeatedMessage(descriptor, builder); + } + + // Plain message + GroupBuilder> group = builder.group(getRepetition(descriptor)); + convertFields(group, descriptor.getMessageType().getFields()); + return group; + } + + private GroupBuilder> addMapField(FieldDescriptor descriptor, final GroupBuilder builder) { + List fields = descriptor.getMessageType().getFields(); + if (fields.size() != 2) { + throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields); + } + + ParquetType mapKeyParquetType = getParquetType(fields.get(0)); + + GroupBuilder>> group = builder + .group(Type.Repetition.OPTIONAL).as(OriginalType.MAP) // only optional maps are allowed in Proto3 + .group(Type.Repetition.REPEATED) // key_value wrapper + .primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.originalType).named("key"); + + return addField(fields.get(1), group).named("value") + .named("key_value"); + } + + private ParquetType getParquetType(FieldDescriptor fieldDescriptor) { + + JavaType javaType = fieldDescriptor.getJavaType(); switch (javaType) { - case BOOLEAN: return builder.primitive(BOOLEAN, repetition); - case INT: return builder.primitive(INT32, repetition); - case LONG: return builder.primitive(INT64, repetition); - case FLOAT: return builder.primitive(FLOAT, repetition); - case DOUBLE: return builder.primitive(DOUBLE, repetition); - case BYTE_STRING: return builder.primitive(BINARY, repetition); - case STRING: return builder.primitive(BINARY, repetition).as(UTF8); - case MESSAGE: { - GroupBuilder> group = builder.group(repetition); - convertFields(group, descriptor.getMessageType().getFields()); - return group; - } - case ENUM: return builder.primitive(BINARY, repetition).as(ENUM); + case INT: return ParquetType.of(INT32); + case LONG: return ParquetType.of(INT64); + case DOUBLE: return ParquetType.of(DOUBLE); + case BOOLEAN: return ParquetType.of(BOOLEAN); + case FLOAT: return ParquetType.of(FLOAT); + case STRING: return ParquetType.of(BINARY, UTF8); + case ENUM: return ParquetType.of(BINARY, ENUM); + case BYTE_STRING: return ParquetType.of(BINARY); default: throw new UnsupportedOperationException("Cannot convert Protocol Buffer: unknown type " + javaType); } } + private static class ParquetType { + PrimitiveTypeName primitiveType; + OriginalType originalType; + + private ParquetType(PrimitiveTypeName primitiveType, OriginalType originalType) { + this.primitiveType = primitiveType; + this.originalType = originalType; + } + + public static ParquetType of(PrimitiveTypeName primitiveType, OriginalType originalType) { + return new ParquetType(primitiveType, originalType); + } + + public static ParquetType of(PrimitiveTypeName primitiveType) { + return of(primitiveType, null); + } + } + } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index 19b07066ac..59c236f312 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -18,12 +18,9 @@ */ package org.apache.parquet.proto; -import com.google.protobuf.ByteString; -import com.google.protobuf.DescriptorProtos; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.MessageOrBuilder; -import com.google.protobuf.TextFormat; +import com.google.protobuf.*; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.FieldDescriptor; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.BadConfigurationException; @@ -31,14 +28,13 @@ import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.IncompatibleSchemaModificationException; -import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.*; import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Array; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -50,7 +46,13 @@ public class ProtoWriteSupport extends WriteSupport< private static final Logger LOG = LoggerFactory.getLogger(ProtoWriteSupport.class); public static final String PB_CLASS_WRITE = "parquet.proto.writeClass"; + // PARQUET-968 introduces changes to allow writing specs compliant schemas with parquet-protobuf. + // In the past, collection were not written using the LIST and MAP wrappers and thus were not compliant + // with the parquet specs. This flag, is set to true, allows to write using spec compliant schemas + // but is set to false by default to keep backward compatibility + public static final String PB_SPECS_COMPLIANT_WRITE = "parquet.proto.writeSpecsCompliant"; + private boolean writeSpecsCompliant = false; private RecordConsumer recordConsumer; private Class protoMessage; private MessageWriter messageWriter; @@ -71,6 +73,16 @@ public static void setSchema(Configuration configuration, Class extraMetaData = new HashMap(); extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName()); extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage)); + extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant)); return new WriteContext(rootSchema, extraMetaData); } @@ -152,17 +166,21 @@ class MessageWriter extends FieldWriter { final FieldWriter[] fieldWriters; @SuppressWarnings("unchecked") - MessageWriter(Descriptors.Descriptor descriptor, GroupType schema) { - List fields = descriptor.getFields(); + MessageWriter(Descriptor descriptor, GroupType schema) { + List fields = descriptor.getFields(); fieldWriters = (FieldWriter[]) Array.newInstance(FieldWriter.class, fields.size()); - for (Descriptors.FieldDescriptor fieldDescriptor: fields) { + for (FieldDescriptor fieldDescriptor: fields) { String name = fieldDescriptor.getName(); Type type = schema.getType(name); FieldWriter writer = createWriter(fieldDescriptor, type); - if(fieldDescriptor.isRepeated()) { - writer = new ArrayWriter(writer); + if(writeSpecsCompliant && fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) { + writer = new ArrayWriter(writer); + } + else if (!writeSpecsCompliant && fieldDescriptor.isRepeated()) { + // the old schemas style used to write maps as repeated fields instead of wrapping them in a LIST + writer = new RepeatedWriter(writer); } writer.setFieldName(name); @@ -172,11 +190,11 @@ class MessageWriter extends FieldWriter { } } - private FieldWriter createWriter(Descriptors.FieldDescriptor fieldDescriptor, Type type) { + private FieldWriter createWriter(FieldDescriptor fieldDescriptor, Type type) { switch (fieldDescriptor.getJavaType()) { case STRING: return new StringWriter() ; - case MESSAGE: return new MessageWriter(fieldDescriptor.getMessageType(), type.asGroupType()); + case MESSAGE: return createMessageWriter(fieldDescriptor, type); case INT: return new IntWriter(); case LONG: return new LongWriter(); case FLOAT: return new FloatWriter(); @@ -189,6 +207,47 @@ private FieldWriter createWriter(Descriptors.FieldDescriptor fieldDescriptor, Ty return unknownType(fieldDescriptor);//should not be executed, always throws exception. } + private FieldWriter createMessageWriter(FieldDescriptor fieldDescriptor, Type type) { + if (fieldDescriptor.isMapField() && writeSpecsCompliant) { + return createMapWriter(fieldDescriptor, type); + } + + return new MessageWriter(fieldDescriptor.getMessageType(), getGroupType(type)); + } + + private GroupType getGroupType(Type type) { + if (type.getOriginalType() == OriginalType.LIST) { + return type.asGroupType().getType("list").asGroupType().getType("element").asGroupType(); + } + + if (type.getOriginalType() == OriginalType.MAP) { + return type.asGroupType().getType("key_value").asGroupType().getType("value").asGroupType(); + } + + return type.asGroupType(); + } + + private MapWriter createMapWriter(FieldDescriptor fieldDescriptor, Type type) { + List fields = fieldDescriptor.getMessageType().getFields(); + if (fields.size() != 2) { + throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields); + } + + // KeyFieldWriter + FieldDescriptor keyProtoField = fields.get(0); + FieldWriter keyWriter = createWriter(keyProtoField, type); + keyWriter.setFieldName(keyProtoField.getName()); + keyWriter.setIndex(0); + + // ValueFieldWriter + FieldDescriptor valueProtoField = fields.get(1); + FieldWriter valueWriter = createWriter(valueProtoField, type); + valueWriter.setFieldName(valueProtoField.getName()); + valueWriter.setIndex(1); + + return new MapWriter(keyWriter, valueWriter); + } + /** Writes top level message. It cannot call startGroup() */ void writeTopLevelMessage(Object value) { writeAllFields((MessageOrBuilder) value); @@ -206,18 +265,16 @@ final void writeRawValue(Object value) { @Override final void writeField(Object value) { recordConsumer.startField(fieldName, index); - recordConsumer.startGroup(); - writeAllFields((MessageOrBuilder) value); - recordConsumer.endGroup(); + writeRawValue(value); recordConsumer.endField(fieldName, index); } private void writeAllFields(MessageOrBuilder pb) { //returns changed fields with values. Map is ordered by id. - Map changedPbFields = pb.getAllFields(); + Map changedPbFields = pb.getAllFields(); - for (Map.Entry entry : changedPbFields.entrySet()) { - Descriptors.FieldDescriptor fieldDescriptor = entry.getKey(); + for (Map.Entry entry : changedPbFields.entrySet()) { + FieldDescriptor fieldDescriptor = entry.getKey(); if(fieldDescriptor.isExtension()) { // Field index of an extension field might overlap with a base field. @@ -243,6 +300,45 @@ final void writeRawValue(Object value) { throw new UnsupportedOperationException("Array has no raw value"); } + @Override + final void writeField(Object value) { + recordConsumer.startField(fieldName, index); + recordConsumer.startGroup(); + List list = (List) value; + + recordConsumer.startField("list", 0); // This is the wrapper group for the array field + for (Object listEntry: list) { + recordConsumer.startGroup(); + recordConsumer.startField("element", 0); // This is the mandatory inner field + + fieldWriter.writeRawValue(listEntry); + + recordConsumer.endField("element", 0); + recordConsumer.endGroup(); + } + recordConsumer.endField("list", 0); + + recordConsumer.endGroup(); + recordConsumer.endField(fieldName, index); + } + } + + /** + * The RepeatedWriter is used to write collections (lists and maps) using the old style (without LIST and MAP + * wrappers). + */ + class RepeatedWriter extends FieldWriter { + final FieldWriter fieldWriter; + + RepeatedWriter(FieldWriter fieldWriter) { + this.fieldWriter = fieldWriter; + } + + @Override + final void writeRawValue(Object value) { + throw new UnsupportedOperationException("Array has no raw value"); + } + @Override final void writeField(Object value) { recordConsumer.startField(fieldName, index); @@ -257,10 +353,10 @@ final void writeField(Object value) { } /** validates mapping between protobuffer fields and parquet fields.*/ - private void validatedMapping(Descriptors.Descriptor descriptor, GroupType parquetSchema) { - List allFields = descriptor.getFields(); + private void validatedMapping(Descriptor descriptor, GroupType parquetSchema) { + List allFields = descriptor.getFields(); - for (Descriptors.FieldDescriptor fieldDescriptor: allFields) { + for (FieldDescriptor fieldDescriptor: allFields) { String fieldName = fieldDescriptor.getName(); int fieldIndex = fieldDescriptor.getIndex(); int parquetIndex = parquetSchema.getFieldIndex(fieldName); @@ -295,6 +391,41 @@ final void writeRawValue(Object value) { } } + class MapWriter extends FieldWriter { + + private final FieldWriter keyWriter; + private final FieldWriter valueWriter; + + public MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) { + super(); + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; + } + + @Override + final void writeRawValue(Object value) { + recordConsumer.startGroup(); + + recordConsumer.startField("key_value", 0); // This is the wrapper group for the map field + for (Message msg : (Collection) value) { + recordConsumer.startGroup(); + + final Descriptor descriptorForType = msg.getDescriptorForType(); + final FieldDescriptor keyDesc = descriptorForType.findFieldByName("key"); + final FieldDescriptor valueDesc = descriptorForType.findFieldByName("value"); + + keyWriter.writeField(msg.getField(keyDesc)); + valueWriter.writeField(msg.getField(valueDesc)); + + recordConsumer.endGroup(); + } + + recordConsumer.endField("key_value", 0); + + recordConsumer.endGroup(); + } + } + class FloatWriter extends FieldWriter { @Override final void writeRawValue(Object value) { @@ -333,7 +464,7 @@ final void writeRawValue(Object value) { } } - private FieldWriter unknownType(Descriptors.FieldDescriptor fieldDescriptor) { + private FieldWriter unknownType(FieldDescriptor fieldDescriptor) { String exceptionMsg = "Unknown type with descriptor \"" + fieldDescriptor + "\" and type \"" + fieldDescriptor.getJavaType() + "\"."; throw new InvalidRecordException(exceptionMsg); @@ -341,9 +472,8 @@ private FieldWriter unknownType(Descriptors.FieldDescriptor fieldDescriptor) { /** Returns message descriptor as JSON String*/ private String serializeDescriptor(Class protoClass) { - Descriptors.Descriptor descriptor = Protobufs.getMessageDescriptor(protoClass); + Descriptor descriptor = Protobufs.getMessageDescriptor(protoClass); DescriptorProtos.DescriptorProto asProto = descriptor.toProto(); return TextFormat.printToString(asProto); } - } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index 6c01d7b8db..5544dc6887 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.parquet.proto; import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.proto.test.TestProto3; import org.apache.parquet.proto.test.TestProtobuf; @@ -193,6 +194,125 @@ public void testProto3CustomProtoClass() throws Exception { assertEquals("writtenString", stringValue); } + @Test + public void testRepeatedIntMessageClass() throws Exception { + TestProtobuf.RepeatedIntMessage msgEmpty = TestProtobuf.RepeatedIntMessage.newBuilder().build(); + TestProtobuf.RepeatedIntMessage msgNonEmpty = TestProtobuf.RepeatedIntMessage.newBuilder() + .addRepeatedInt(1).addRepeatedInt(2) + .build(); + + Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testRepeatedIntMessageClassSchemaCompliant() throws Exception { + TestProtobuf.RepeatedIntMessage msgEmpty = TestProtobuf.RepeatedIntMessage.newBuilder().build(); + TestProtobuf.RepeatedIntMessage msgNonEmpty = TestProtobuf.RepeatedIntMessage.newBuilder() + .addRepeatedInt(1).addRepeatedInt(2) + .build(); + + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + + Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testMapIntMessageClass() throws Exception { + TestProtobuf.MapIntMessage msgEmpty = TestProtobuf.MapIntMessage.newBuilder().build(); + TestProtobuf.MapIntMessage msgNonEmpty = TestProtobuf.MapIntMessage.newBuilder() + .putMapInt(1, 123).putMapInt(2, 234) + .build(); + + Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.MapIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testMapIntMessageClassSchemaCompliant() throws Exception { + TestProtobuf.MapIntMessage msgEmpty = TestProtobuf.MapIntMessage.newBuilder().build(); + TestProtobuf.MapIntMessage msgNonEmpty = TestProtobuf.MapIntMessage.newBuilder() + .putMapInt(1, 123).putMapInt(2, 234) + .build(); + + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + + Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.MapIntMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testRepeatedInnerMessageClass() throws Exception { + TestProtobuf.RepeatedInnerMessage msgEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder().build(); + TestProtobuf.RepeatedInnerMessage msgNonEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder() + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setOne("one").build()) + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setTwo("two").build()) + .build(); + + Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedInnerMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + + @Test + public void testRepeatedInnerMessageClassSchemaCompliant() throws Exception { + TestProtobuf.RepeatedInnerMessage msgEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder().build(); + TestProtobuf.RepeatedInnerMessage msgNonEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder() + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setOne("one").build()) + .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setTwo("two").build()) + .build(); + + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + + Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); + ReadUsingMR readUsingMR = new ReadUsingMR(); + String customClass = TestProtobuf.RepeatedInnerMessage.class.getName(); + ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); + List result = readUsingMR.read(outputPath); + + assertEquals(2, result.size()); + assertEquals(msgEmpty, result.get(0)); + assertEquals(msgNonEmpty, result.get(1)); + } + /** * Runs job that writes input to file and then job reading data back. */ diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java index 6f5ff53b69..4ca82ac740 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java @@ -32,14 +32,17 @@ public class ProtoSchemaConverterTest { /** * Converts given pbClass to parquet schema and compares it with expected parquet schema. */ - private void testConversion(Class pbClass, String parquetSchemaString) throws + private void testConversion(Class pbClass, String parquetSchemaString, boolean parquetSpecsCompliant) throws Exception { - ProtoSchemaConverter protoSchemaConverter = new ProtoSchemaConverter(); + ProtoSchemaConverter protoSchemaConverter = new ProtoSchemaConverter(parquetSpecsCompliant); MessageType schema = protoSchemaConverter.convert(pbClass); MessageType expectedMT = MessageTypeParser.parseMessageType(parquetSchemaString); assertEquals(expectedMT.toString(), schema.toString()); } + private void testConversion(Class pbClass, String parquetSchemaString) throws Exception { + testConversion(pbClass, parquetSchemaString, true); + } /** * Tests that all protocol buffer datatypes are converted to correct parquet datatypes. @@ -103,10 +106,12 @@ public void testProto3ConvertAllDatatypes() throws Exception { " optional binary optionalEnum (ENUM) = 18;" + " optional int32 someInt32 = 19;" + " optional binary someString (UTF8) = 20;" + - " repeated group optionalMap = 21 {\n" + - " optional int64 key = 1;\n" + - " optional group value = 2 {\n" + - " optional int32 someId = 3;\n" + + " optional group optionalMap (MAP) = 21 {\n" + + " repeated group key_value {\n" + + " required int64 key;\n" + + " optional group value {\n" + + " optional int32 someId = 3;\n" + + " }\n" + " }\n" + " }\n" + "}"; @@ -120,16 +125,24 @@ public void testConvertRepetition() throws Exception { "message TestProtobuf.SchemaConverterRepetition {\n" + " optional int32 optionalPrimitive = 1;\n" + " required int32 requiredPrimitive = 2;\n" + - " repeated int32 repeatedPrimitive = 3;\n" + + " optional group repeatedPrimitive (LIST) = 3 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + " optional group optionalMessage = 7 {\n" + " optional int32 someId = 3;\n" + " }\n" + - " required group requiredMessage = 8 {" + + " required group requiredMessage = 8 {\n" + " optional int32 someId= 3;\n" + " }\n" + - " repeated group repeatedMessage = 9 {" + - " optional int32 someId = 3;\n" + - " }\n" + + " optional group repeatedMessage (LIST) = 9 {\n" + + " repeated group list {\n" + + " optional group element {\n" + + " optional int32 someId = 3;\n" + + " }\n" + + " }\n" + + " }" + "}"; testConversion(TestProtobuf.SchemaConverterRepetition.class, expectedSchema); @@ -140,15 +153,193 @@ public void testProto3ConvertRepetition() throws Exception { String expectedSchema = "message TestProto3.SchemaConverterRepetition {\n" + " optional int32 optionalPrimitive = 1;\n" + - " repeated int32 repeatedPrimitive = 3;\n" + + " optional group repeatedPrimitive (LIST) = 3 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + " optional group optionalMessage = 7 {\n" + " optional int32 someId = 3;\n" + " }\n" + - " repeated group repeatedMessage = 9 {" + - " optional int32 someId = 3;\n" + + " optional group repeatedMessage (LIST) = 9 {\n" + + " repeated group list {\n" + + " optional group element {\n" + + " optional int32 someId = 3;\n" + + " }\n" + + " }\n" + " }\n" + "}"; testConversion(TestProto3.SchemaConverterRepetition.class, expectedSchema); } + + @Test + public void testConvertRepeatedIntMessage() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedIntMessage {\n" + + " optional group repeatedInt (LIST) = 1 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.RepeatedIntMessage.class, expectedSchema); + } + + @Test + public void testConvertRepeatedIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedIntMessage {\n" + + " repeated int32 repeatedInt = 1;\n" + + "}"; + + testConversion(TestProtobuf.RepeatedIntMessage.class, expectedSchema, false); + } + + @Test + public void testProto3ConvertRepeatedIntMessage() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedIntMessage {\n" + + " optional group repeatedInt (LIST) = 1 {\n" + + " repeated group list {\n" + + " required int32 element;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProto3.RepeatedIntMessage.class, expectedSchema); + } + + @Test + public void testProto3ConvertRepeatedIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedIntMessage {\n" + + " repeated int32 repeatedInt = 1;\n" + + "}"; + + testConversion(TestProto3.RepeatedIntMessage.class, expectedSchema, false); + } + + @Test + public void testConvertRepeatedInnerMessage() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedInnerMessage {\n" + + " optional group repeatedInnerMessage (LIST) = 1 {\n" + + " repeated group list {\n" + + " optional group element {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.RepeatedInnerMessage.class, expectedSchema); + } + + @Test + public void testConvertRepeatedInnerMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProtobuf.RepeatedInnerMessage {\n" + + " repeated group repeatedInnerMessage = 1 {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.RepeatedInnerMessage.class, expectedSchema, false); + } + + @Test + public void testProto3ConvertRepeatedInnerMessage() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedInnerMessage {\n" + + " optional group repeatedInnerMessage (LIST) = 1 {\n" + + " repeated group list {\n" + + " optional group element {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProto3.RepeatedInnerMessage.class, expectedSchema); + } + + @Test + public void testProto3ConvertRepeatedInnerMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProto3.RepeatedInnerMessage {\n" + + " repeated group repeatedInnerMessage = 1 {\n" + + " optional binary one (UTF8) = 1;\n" + + " optional binary two (UTF8) = 2;\n" + + " optional binary three (UTF8) = 3;\n" + + " }\n" + + "}"; + + testConversion(TestProto3.RepeatedInnerMessage.class, expectedSchema, false); + } + + @Test + public void testConvertMapIntMessage() throws Exception { + String expectedSchema = + "message TestProtobuf.MapIntMessage {\n" + + " optional group mapInt (MAP) = 1 {\n" + + " repeated group key_value {\n" + + " required int32 key;\n" + + " optional int32 value;\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.MapIntMessage.class, expectedSchema); + } + + @Test + public void testConvertMapIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProtobuf.MapIntMessage {\n" + + " repeated group mapInt = 1 {\n" + + " optional int32 key = 1;\n" + + " optional int32 value = 2;\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.MapIntMessage.class, expectedSchema, false); + } + + @Test + public void testProto3ConvertMapIntMessage() throws Exception { + String expectedSchema = + "message TestProto3.MapIntMessage {\n" + + " optional group mapInt (MAP) = 1 {\n" + + " repeated group key_value {\n" + + " required int32 key;\n" + + " optional int32 value;\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProto3.MapIntMessage.class, expectedSchema); + } + + @Test + public void testProto3ConvertMapIntMessageNonSpecsCompliant() throws Exception { + String expectedSchema = + "message TestProto3.MapIntMessage {\n" + + " repeated group mapInt = 1 {\n" + + " optional int32 key = 1;\n" + + " optional int32 value = 2;\n" + + " }\n" + + "}"; + + testConversion(TestProto3.MapIntMessage.class, expectedSchema, false); + } } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java index b937618c3b..f71229c222 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java @@ -31,8 +31,12 @@ public class ProtoWriteSupportTest { private ProtoWriteSupport createReadConsumerInstance(Class cls, RecordConsumer readConsumerMock) { + return createReadConsumerInstance(cls, readConsumerMock, new Configuration()); + } + + private ProtoWriteSupport createReadConsumerInstance(Class cls, RecordConsumer readConsumerMock, Configuration conf) { ProtoWriteSupport support = new ProtoWriteSupport(cls); - support.init(new Configuration()); + support.init(conf); support.prepareForWrite(readConsumerMock); return support; } @@ -79,6 +83,45 @@ public void testProto3SimplestMessage() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test + public void testRepeatedIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + msg.addRepeatedInt(1323); + msg.addRepeatedInt(54469); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("repeatedInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).addInteger(1323); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).addInteger(54469); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("repeatedInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + @Test public void testRepeatedIntMessage() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); @@ -101,6 +144,79 @@ public void testRepeatedIntMessage() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test + public void testRepeatedIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testRepeatedIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.RepeatedIntMessage.class, readConsumerMock, conf); + + TestProto3.RepeatedIntMessage.Builder msg = TestProto3.RepeatedIntMessage.newBuilder(); + msg.addRepeatedInt(1323); + msg.addRepeatedInt(54469); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("repeatedInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).addInteger(1323); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).addInteger(54469); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("repeatedInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + @Test public void testProto3RepeatedIntMessage() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); @@ -123,6 +239,268 @@ public void testProto3RepeatedIntMessage() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test + public void testProto3RepeatedIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock); + + TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder(); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock, conf); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key_value", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("key_value", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessage() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock, conf); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testMapIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock); + + TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessageSpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock, conf); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key_value", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("key_value", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessage() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + msg.putMapInt(123, 1); + msg.putMapInt(234, 2); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("mapInt", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(123); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(1); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("key", 0); + inOrder.verify(readConsumerMock).addInteger(234); + inOrder.verify(readConsumerMock).endField("key", 0); + inOrder.verify(readConsumerMock).startField("value", 1); + inOrder.verify(readConsumerMock).addInteger(2); + inOrder.verify(readConsumerMock).endField("value", 1); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("mapInt", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessageEmptySpecsCompliant() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock, conf); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3MapIntMessageEmpty() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock); + + TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder(); + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + @Test public void testRepeatedInnerMessageMessage_message() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); @@ -137,6 +515,7 @@ public void testRepeatedInnerMessageMessage_message() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); + inOrder.verify(readConsumerMock).startGroup(); inOrder.verify(readConsumerMock).startField("one", 0); inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); @@ -145,14 +524,54 @@ public void testRepeatedInnerMessageMessage_message() throws Exception { inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); inOrder.verify(readConsumerMock).endField("two", 1); inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("inner", 0); inOrder.verify(readConsumerMock).endMessage(); Mockito.verifyNoMoreInteractions(readConsumerMock); } @Test - public void testProto3RepeatedInnerMessageMessage_message() throws Exception { + public void testRepeatedInnerMessageSpecsCompliantMessage_message() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock, conf); + + TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one").setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedInnerMessageMessage_message() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class);; ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock); TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder(); @@ -164,6 +583,7 @@ public void testProto3RepeatedInnerMessageMessage_message() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); + inOrder.verify(readConsumerMock).startGroup(); inOrder.verify(readConsumerMock).startField("one", 0); inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); @@ -172,6 +592,96 @@ public void testProto3RepeatedInnerMessageMessage_message() throws Exception { inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); inOrder.verify(readConsumerMock).endField("two", 1); inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + @Test + public void testProto3RepeatedInnerMessageSpecsCompliantMessage_message() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock, conf); + + TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one").setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + inOrder.verify(readConsumerMock).startGroup(); + + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("element", 0); + + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + + + @Test + public void testRepeatedInnerMessageSpecsCompliantMessage_scalar() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock, conf); + + TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one"); + msg.addInnerBuilder().setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + + //first inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + //second inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); inOrder.verify(readConsumerMock).endField("inner", 0); inOrder.verify(readConsumerMock).endMessage(); Mockito.verifyNoMoreInteractions(readConsumerMock); @@ -192,6 +702,7 @@ public void testRepeatedInnerMessageMessage_scalar() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); + //first inner message inOrder.verify(readConsumerMock).startGroup(); inOrder.verify(readConsumerMock).startField("one", 0); @@ -226,6 +737,7 @@ public void testProto3RepeatedInnerMessageMessage_scalar() throws Exception { inOrder.verify(readConsumerMock).startMessage(); inOrder.verify(readConsumerMock).startField("inner", 0); + //first inner message inOrder.verify(readConsumerMock).startGroup(); inOrder.verify(readConsumerMock).startField("one", 0); @@ -245,6 +757,55 @@ public void testProto3RepeatedInnerMessageMessage_scalar() throws Exception { Mockito.verifyNoMoreInteractions(readConsumerMock); } + @Test + public void testProto3RepeatedInnerMessageSpecsCompliantMessage_scalar() throws Exception { + RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); + Configuration conf = new Configuration(); + ProtoWriteSupport.setWriteSpecsCompliant(conf, true); + ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock, conf); + + TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder(); + msg.addInnerBuilder().setOne("one"); + msg.addInnerBuilder().setTwo("two"); + + instance.write(msg.build()); + + InOrder inOrder = Mockito.inOrder(readConsumerMock); + + inOrder.verify(readConsumerMock).startMessage(); + inOrder.verify(readConsumerMock).startField("inner", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("list", 0); + + //first inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("one", 0); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes())); + inOrder.verify(readConsumerMock).endField("one", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + //second inner message + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("element", 0); + inOrder.verify(readConsumerMock).startGroup(); + inOrder.verify(readConsumerMock).startField("two", 1); + inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes())); + inOrder.verify(readConsumerMock).endField("two", 1); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("element", 0); + inOrder.verify(readConsumerMock).endGroup(); + + inOrder.verify(readConsumerMock).endField("list", 0); + inOrder.verify(readConsumerMock).endGroup(); + inOrder.verify(readConsumerMock).endField("inner", 0); + inOrder.verify(readConsumerMock).endMessage(); + Mockito.verifyNoMoreInteractions(readConsumerMock); + } + @Test public void testOptionalInnerMessage() throws Exception { RecordConsumer readConsumerMock = Mockito.mock(RecordConsumer.class); diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java index d18076a642..55f9237ec5 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java @@ -46,10 +46,18 @@ public class WriteUsingMR { private static final Logger LOG = LoggerFactory.getLogger(WriteUsingMR.class); - Configuration conf = new Configuration(); + private final Configuration conf; private static List inputMessages; Path outputPath; + public WriteUsingMR() { + this(new Configuration()); + } + + public WriteUsingMR(Configuration conf) { + this.conf = new Configuration(); + } + public Configuration getConfiguration() { return conf; } diff --git a/parquet-protobuf/src/test/resources/TestProto3.proto b/parquet-protobuf/src/test/resources/TestProto3.proto index 1896445306..e49eef5727 100644 --- a/parquet-protobuf/src/test/resources/TestProto3.proto +++ b/parquet-protobuf/src/test/resources/TestProto3.proto @@ -124,6 +124,14 @@ message RepeatedIntMessage { repeated int32 repeatedInt = 1; } +message RepeatedInnerMessage { + repeated InnerMessage repeatedInnerMessage = 1; +} + +message MapIntMessage { + map mapInt = 1; +} + message HighIndexMessage { repeated int32 repeatedInt = 50000; } diff --git a/parquet-protobuf/src/test/resources/TestProtobuf.proto b/parquet-protobuf/src/test/resources/TestProtobuf.proto index d7cdf03a91..d4ab4c7dcd 100644 --- a/parquet-protobuf/src/test/resources/TestProtobuf.proto +++ b/parquet-protobuf/src/test/resources/TestProtobuf.proto @@ -122,6 +122,14 @@ message RepeatedIntMessage { repeated int32 repeatedInt = 1; } +message RepeatedInnerMessage { + repeated InnerMessage repeatedInnerMessage = 1; +} + +message MapIntMessage { + map mapInt = 1; +} + message HighIndexMessage { repeated int32 repeatedInt = 50000; }