Skip to content

Commit

Permalink
PARQUET-968 add proto flag to enable writing using specs-compliant sc…
Browse files Browse the repository at this point in the history
…hemas (#2)

* PARQUET-968 Add flag to write using specs compliant schemas

For users that require backward compatibility with parquet 1.9.0 and
older, the flag "parquet.proto.writeSpecsCompliant" is introduced to
allow writing collection using the old style (using repeated and not
using the LIST and MAP wrappers that are recommended by the parquet
specs).

* PARQUET-968 Add InputOutputFormat tests to validate read/write
  • Loading branch information
BenoitHanotte authored and costimuraru committed Apr 13, 2018
1 parent a8bd704 commit 16eafcb
Show file tree
Hide file tree
Showing 9 changed files with 920 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,13 @@ public void add(Object value) {
};
}

OriginalType originalType = parquetType.getOriginalType() == null ? OriginalType.UTF8 : parquetType.getOriginalType();
switch (originalType) {
case LIST: return new ListConverter(parentBuilder, fieldDescriptor, parquetType);
case MAP: return new MapConverter(parentBuilder, fieldDescriptor, parquetType);
default: return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
if (OriginalType.LIST == parquetType.getOriginalType()) {
return new ListConverter(parentBuilder, fieldDescriptor, parquetType);
}
if (OriginalType.MAP == parquetType.getOriginalType()) {
return new MapConverter(parentBuilder, fieldDescriptor, parquetType);
}
return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
}

private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
Expand Down Expand Up @@ -363,38 +364,38 @@ public void addBinary(Binary binary) {
* <p>
* A LIST wrapper is created in parquet for the above mentioned protobuf schema:
* message SimpleList {
* required group first_array (LIST) = 1 {
* repeated int32 element;
* optional group first_array (LIST) = 1 {
* repeated group list {
* optional int32 element;
* }
* }
* }
* <p>
* The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains
* one only one field: either a primitive field (like in the example above, where we have an array of ints) or
* another group (array of messages).
* a repeated group named 'list', itself containing only one field called 'element' of the type of the repeated
* object (can be a primitive as in this example or a group in case of a repeated message in protobuf).
*/
final class ListConverter extends GroupConverter {
private final Converter converter;
private final boolean listOfMessage;

public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
OriginalType originalType = parquetType.getOriginalType();
if (originalType != OriginalType.LIST) {
if (originalType != OriginalType.LIST || parquetType.isPrimitive()) {
throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead.");
}

listOfMessage = fieldDescriptor.getJavaType() == JavaType.MESSAGE;
GroupType rootWrapperType = parquetType.asGroupType();
if (!rootWrapperType.containsField("list") || rootWrapperType.getType("list").isPrimitive()) {
throw new ParquetDecodingException("Expected repeated 'list' group inside LIST wrapperr but got: " + rootWrapperType);
}

Type parquetSchema;
if (parquetType.asGroupType().containsField("list")) {
parquetSchema = parquetType.asGroupType().getType("list");
if (parquetSchema.asGroupType().containsField("element")) {
parquetSchema = parquetSchema.asGroupType().getType("element");
}
} else {
throw new ParquetDecodingException("Expected list but got: " + parquetType);
GroupType listType = rootWrapperType.getType("list").asGroupType();
if (!listType.containsField("element")) {
throw new ParquetDecodingException("Expected 'element' inside repeated list group but got: " + listType);
}

converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema);
Type elementType = listType.getType("element");
converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@
public class ProtoSchemaConverter {

private static final Logger LOG = LoggerFactory.getLogger(ProtoSchemaConverter.class);
private final boolean parquetSpecsCompliant;

public ProtoSchemaConverter() {
this(false);
}

/**
* Instanciate a schema converter to get the parquet schema corresponding to protobuf classes.
* @param parquetSpecsCompliant If set to false, the parquet schema generated will be using the old
* schema style (prior to PARQUET-968) to provide backward-compatibility
* but which does not use LIST and MAP wrappers around collections as required
* by the parquet specifications. If set to true, specs compliant schemas are used.
*/
public ProtoSchemaConverter(boolean parquetSpecsCompliant) {
this.parquetSpecsCompliant = parquetSpecsCompliant;
}

public MessageType convert(Class<? extends Message> protobufClass) {
LOG.debug("Converting protocol buffer class \"" + protobufClass + "\" to parquet schema.");
Expand Down Expand Up @@ -86,7 +102,8 @@ private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addF
}

ParquetType parquetType = getParquetType(descriptor);
if (descriptor.isRepeated()) {
if (descriptor.isRepeated() && parquetSpecsCompliant) {
// the old schema style did not include the LIST wrapper around repeated fields
return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder);
}

Expand All @@ -98,7 +115,7 @@ private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addR
OriginalType originalType,
final GroupBuilder<T> builder) {
return builder
.group(Type.Repetition.REQUIRED).as(OriginalType.LIST)
.group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED)
.primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType)
.named("element")
Expand All @@ -108,7 +125,7 @@ private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addR
private <T> GroupBuilder<GroupBuilder<T>> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder<T> builder) {
GroupBuilder<GroupBuilder<GroupBuilder<GroupBuilder<T>>>> result =
builder
.group(Type.Repetition.REQUIRED).as(OriginalType.LIST)
.group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
.group(Type.Repetition.REPEATED)
.group(Type.Repetition.OPTIONAL);

Expand All @@ -118,9 +135,12 @@ private <T> GroupBuilder<GroupBuilder<T>> addRepeatedMessage(FieldDescriptor des
}

private <T> GroupBuilder<GroupBuilder<T>> addMessageField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
if (descriptor.isMapField()) {
if (descriptor.isMapField() && parquetSpecsCompliant) {
// the old schema style did not include the MAP wrapper around map groups
return addMapField(descriptor, builder);
} else if (descriptor.isRepeated()) {
}
if (descriptor.isRepeated() && parquetSpecsCompliant) {
// the old schema style did not include the LIST wrapper around repeated messages
return addRepeatedMessage(descriptor, builder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<

private static final Logger LOG = LoggerFactory.getLogger(ProtoWriteSupport.class);
public static final String PB_CLASS_WRITE = "parquet.proto.writeClass";
// PARQUET-968 introduces changes to allow writing specs compliant schemas with parquet-protobuf.
// In the past, collection were not written using the LIST and MAP wrappers and thus were not compliant
// with the parquet specs. This flag, is set to true, allows to write using spec compliant schemas
// but is set to false by default to keep backward compatibility
public static final String PB_SPECS_COMPLIANT_WRITE = "parquet.proto.writeSpecsCompliant";

private boolean writeSpecsCompliant = false;
private RecordConsumer recordConsumer;
private Class<? extends Message> protoMessage;
private MessageWriter messageWriter;
Expand All @@ -68,6 +74,16 @@ public static void setSchema(Configuration configuration, Class<? extends Messag
configuration.setClass(PB_CLASS_WRITE, protoClass, Message.class);
}

/**
* Make parquet-protobuf use the LIST and MAP wrappers for collections. Set to false if you need backward
* compatibility with parquet before PARQUET-968 (1.9.0 and older).
* @param configuration The hadoop configuration
* @param writeSpecsCompliant If set to true, the old schema style will be used (without wrappers).
*/
public static void setWriteSpecsCompliant(Configuration configuration, boolean writeSpecsCompliant) {
configuration.setBoolean(PB_SPECS_COMPLIANT_WRITE, writeSpecsCompliant);
}

/**
* Writes Protocol buffer to parquet file.
* @param record instance of Message.Builder or Message.
Expand Down Expand Up @@ -105,7 +121,8 @@ public WriteContext init(Configuration configuration) {
}
}

MessageType rootSchema = new ProtoSchemaConverter().convert(protoMessage);
writeSpecsCompliant = configuration.getBoolean(PB_SPECS_COMPLIANT_WRITE, writeSpecsCompliant);
MessageType rootSchema = new ProtoSchemaConverter(writeSpecsCompliant).convert(protoMessage);
Descriptor messageDescriptor = Protobufs.getMessageDescriptor(protoMessage);
validatedMapping(messageDescriptor, rootSchema);

Expand All @@ -114,6 +131,7 @@ public WriteContext init(Configuration configuration) {
Map<String, String> extraMetaData = new HashMap<String, String>();
extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName());
extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage));
extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant));
return new WriteContext(rootSchema, extraMetaData);
}

Expand Down Expand Up @@ -158,8 +176,12 @@ class MessageWriter extends FieldWriter {
Type type = schema.getType(name);
FieldWriter writer = createWriter(fieldDescriptor, type);

if(fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) {
writer = new ArrayWriter(writer);
if(writeSpecsCompliant && fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) {
writer = new ArrayWriter(writer);
}
else if (!writeSpecsCompliant && fieldDescriptor.isRepeated()) {
// the old schemas style used to write maps as repeated fields instead of wrapping them in a LIST
writer = new RepeatedWriter(writer);
}

writer.setFieldName(name);
Expand Down Expand Up @@ -187,7 +209,7 @@ private FieldWriter createWriter(FieldDescriptor fieldDescriptor, Type type) {
}

private FieldWriter createMessageWriter(FieldDescriptor fieldDescriptor, Type type) {
if (fieldDescriptor.isMapField()) {
if (fieldDescriptor.isMapField() && writeSpecsCompliant) {
return createMapWriter(fieldDescriptor, type);
}

Expand Down Expand Up @@ -235,16 +257,16 @@ void writeTopLevelMessage(Object value) {
/** Writes message as part of repeated field. It cannot start field*/
@Override
final void writeRawValue(Object value) {
recordConsumer.startGroup();
writeAllFields((MessageOrBuilder) value);
recordConsumer.endGroup();
}

/** Used for writing nonrepeated (optional, required) fields*/
@Override
final void writeField(Object value) {
recordConsumer.startField(fieldName, index);
recordConsumer.startGroup();
writeAllFields((MessageOrBuilder) value);
recordConsumer.endGroup();
writeRawValue(value);
recordConsumer.endField(fieldName, index);
}

Expand Down Expand Up @@ -288,21 +310,11 @@ final void writeField(Object value) {
recordConsumer.startField("list", 0); // This is the wrapper group for the array field
for (Object listEntry: list) {
recordConsumer.startGroup();

recordConsumer.startField("element", 0); // This is the mandatory inner field

if (!isPrimitive(listEntry)) {
recordConsumer.startGroup();
}

fieldWriter.writeRawValue(listEntry);

if (!isPrimitive(listEntry)) {
recordConsumer.endGroup();
}

recordConsumer.endField("element", 0);

recordConsumer.endGroup();
}
recordConsumer.endField("list", 0);
Expand All @@ -312,8 +324,33 @@ final void writeField(Object value) {
}
}

private boolean isPrimitive(Object listEntry) {
return !(listEntry instanceof Message);
/**
* The RepeatedWriter is used to write collections (lists and maps) using the old style (without LIST and MAP
* wrappers).
*/
class RepeatedWriter extends FieldWriter {
final FieldWriter fieldWriter;

RepeatedWriter(FieldWriter fieldWriter) {
this.fieldWriter = fieldWriter;
}

@Override
final void writeRawValue(Object value) {
throw new UnsupportedOperationException("Array has no raw value");
}

@Override
final void writeField(Object value) {
recordConsumer.startField(fieldName, index);
List<?> list = (List<?>) value;

for (Object listEntry: list) {
fieldWriter.writeRawValue(listEntry);
}

recordConsumer.endField(fieldName, index);
}
}

/** validates mapping between protobuffer fields and parquet fields.*/
Expand Down Expand Up @@ -440,5 +477,4 @@ private String serializeDescriptor(Class<? extends Message> protoClass) {
DescriptorProtos.DescriptorProto asProto = descriptor.toProto();
return TextFormat.printToString(asProto);
}

}

0 comments on commit 16eafcb

Please sign in to comment.