Native Protocol Buffer support #264

Merged
merged 69 commits into from Feb 28, 2014

Conversation

Projects
None yet
4 participants
Contributor

lukasnalezenec commented Jan 1, 2014

No description provided.

lukasnalezenec added some commits Oct 23, 2013

@lukasnalezenec lukasnalezenec Initial commit 8edc893
@lukasnalezenec lukasnalezenec initial commit 5c46f05
@lukasnalezenec lukasnalezenec Merge branch 'master' of https://github.com/lukasnalezenec/parquet-pr… dfa27b4
@lukasnalezenec lukasnalezenec Delete todo.txt dd536a4
@lukasnalezenec lukasnalezenec Protobuf conversion over Java types e337bd2
@lukasnalezenec lukasnalezenec Merge branch 'master' of https://github.com/lukasnalezenec/parquet-pr… 55a451c
@lukasnalezenec lukasnalezenec Copyrights in converters 65ca5ed
@lukasnalezenec lukasnalezenec Specification of written protobuffer class in output format a7de264
@lukasnalezenec lukasnalezenec Code cleanup 2e78704
@lukasnalezenec lukasnalezenec Code Cleanup 090a2a4
@lukasnalezenec lukasnalezenec Projections in read support 1bec97f
@lukasnalezenec lukasnalezenec artifact version changed to 1.2.5, unused dependencies removed. 40ae3fb
@lukasnalezenec lukasnalezenec Wrong merge 402e96d
@lukasnalezenec lukasnalezenec Loading correct pbClass to ProtoSchemaConverter e2d819c
@lukasnalezenec lukasnalezenec Depricated init override removed 08a204d
@lukasnalezenec lukasnalezenec pom.xml version 1.2.10-SNAPSHOT 83f0646
@lukasnalezenec lukasnalezenec TestUtils refactoring 0517253
@lukasnalezenec lukasnalezenec Obsolete test removed c590038
@lukasnalezenec lukasnalezenec ProtoSchemaConverterUnitTest 978e396
@lukasnalezenec lukasnalezenec Removing hadoop-core dependency conflict 861016b
@lukasnalezenec lukasnalezenec tests for Input and Output Formats dba65be
@lukasnalezenec lukasnalezenec ProtoSchemaConverter Code Style 16b2f73
@lukasnalezenec lukasnalezenec CodeStyle 1394236
@lukasnalezenec lukasnalezenec Merge pull request #1 from Parquet/master
update
82b889c
@lukasnalezenec lukasnalezenec junit test for enum schema conversion 1f75813
@lukasnalezenec lukasnalezenec remove old package info 51ca71a
@lukasnalezenec lukasnalezenec remove commented code 52ffcfe
@lukasnalezenec lukasnalezenec correct byte[] storage f7a9023
@lukasnalezenec lukasnalezenec #projection test 5997bf5
@lukasnalezenec lukasnalezenec #projection test - fix - cannot use inner class as mapper 96f2300
@lukasnalezenec lukasnalezenec Code cleanup 985002e
@lukasnalezenec lukasnalezenec ConverterTest b273684
@lukasnalezenec lukasnalezenec new root directory 99b7e52
@lukasnalezenec lukasnalezenec delete .idea directory 94b2ec0
@lukasnalezenec lukasnalezenec merge a717bbf
@lukasnalezenec lukasnalezenec parquet-protobuf added to root pom.xml d708c7d
@lukasnalezenec lukasnalezenec Consistent naming protoXYZ 919db0b
@lukasnalezenec lukasnalezenec pom - latest version c8188f3
@lukasnalezenec lukasnalezenec Code cleanup 1f4a9db
@lukasnalezenec lukasnalezenec Repeated Messages test c7c39c3
@lukasnalezenec lukasnalezenec Method ProtoParquetInputFormat.setRequestedProjection signature 47cd572
@lukasnalezenec lukasnalezenec Code cleanup 7c0d290
@lukasnalezenec lukasnalezenec Code cleanup - Enum comparsions 63b710d
@lukasnalezenec lukasnalezenec Unnecessary unboxing 8ed45d0
@lukasnalezenec lukasnalezenec Url to main parquet repo 31e4b06
Owner

julienledem commented Jan 1, 2014

Happy new year @lukasnalezenec!

Contributor

dvryaboy commented Jan 3, 2014

@lukasnalezenec thanks for putting this together. I'll start reviewing over the weekend. The Travis CI build failed for some reason, looks like it was pulling an old version of the parent pom or something -- I restarted the build, but if it fails again, might be worth checking out what's going on. Also, Aniket just merged EB 4.4 into master, so you may need to re-sync up.

Contributor

lukasnalezenec commented Jan 5, 2014

Hi, i have committed few patches. Build is now green.

There is still few know issues:

When writing protobuffer to parquet file metadata, i am using its Java class name. I am not sure its the best solution. Writing real Protobuffer name sounds as better solution but i am not sure if there is way how to create protobuffer from its real name.

I am not writing PIG metadata. I think neither Thrift nor Avro projects do it now. They did it when i started this project.

There are some String constants as ProtoWriteSupport.PB_CLASS_WRITE that need review.

Note:
The code is based on Avro project. i guess It would be better if it was based on Thrift project.

@dvryaboy dvryaboy commented on the diff Jan 6, 2014

parquet-protobuf/README.md
@@ -0,0 +1,4 @@
+parquet-protobuf
+================
+
+Protocol Buffer support for Parquet columnar format
@dvryaboy

dvryaboy Jan 6, 2014

Contributor

let's add some sample code?
and a blurb in the main readme directing people here (and updating the table of features, which says protobuf support is coming)

@dvryaboy dvryaboy commented on the diff Jan 6, 2014

...main/java/parquet/proto/ProtoParquetOutputFormat.java
+ * <p/>
+ * <pre>
+ * {@code
+ * final Job job = new Job(conf, "Parquet writing job");
+ * job.setOutputFormatClass(ProtoParquetOutputFormat.class);
+ * ProtoParquetOutputFormat.setOutputPath(job, parquetPath);
+ * ProtoParquetOutputFormat.setProtobufClass(job, YourProtocolbuffer.class);
+ * }
+ * </pre>
+ *
+ * @author Lukas Nalezenec
+ */
+public class ProtoParquetOutputFormat<T extends MessageOrBuilder> extends ParquetOutputFormat<T> {
+
+ public static void setProtobufClass(Job job, Class<? extends Message> protoClass) {
+ ProtoWriteSupport.setSchema(ContextUtil.getConfiguration(job), protoClass);
@dvryaboy

dvryaboy Jan 6, 2014

Contributor

A neat trick could be serializing the Descriptor for the protocol buffer message -- itself a well-known protobuf (https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/Descriptors.Descriptor). This would allow one to have reflection-based access to the messages without having the actual compiled protobuf class.
getFullName on said descriptor would return the class, so we could check in the beginning if we have the actual compiled class, and only fall back to reflection.

I would suggest doing the first part of that -- serializing the descriptor -- now, but leaving the ability to use protobuf via reflection on the descriptor for later, as it's a lot of work and what you have so far is already likely to be super useful.

@lukasnalezenec

lukasnalezenec Jan 12, 2014

Contributor

It does not work with messages with message fields. :(

DescriptorProtos.DescriptorProto descriptorProto = inputDescriptor.toProto();
//....
Descriptors.Descriptor outputDescriptor = Protobufs.makeMessageDescriptor(descriptorProto);

@lukasnalezenec

lukasnalezenec Jan 14, 2014

Contributor

@dvryaboy I need help. When you serialize some Descriptor to protobuf, it does not contain description of inner messages.

@dvryaboy

dvryaboy Jan 14, 2014

Contributor

Could you put the code demonstrating the problem in a gist? Hard to debug from the message above. Ideally, something with a minimum of dependencies (no parquet, etc).

@lukasnalezenec

lukasnalezenec Jan 14, 2014

Contributor

Here is it:
https://gist.github.com/lukasnalezenec/8427423
Itst not completely standalone, you need to compile protobufs

@lukasnalezenec

lukasnalezenec Jan 25, 2014

Contributor

Hi @dvryaboy,
This is last significant problem. Should i send you maven project with demonstration ?

@dvryaboy

dvryaboy Jan 27, 2014

Contributor

your latest rewrite fixes this issue, right?

@lukasnalezenec

lukasnalezenec Jan 28, 2014

Contributor

No, not yet. I am not sure how to to it. BTW, I think this is nice to have, not big issue.
Do you mean serialize directly Descriptor class using Java Serialization or DescriptorProto using protobufer serialization? I dont think that using Java Serialization is good idea.

@dvryaboy

dvryaboy Jan 29, 2014

Contributor

Yeah I mean protobuf serialization of course.

@lukasnalezenec

lukasnalezenec Feb 2, 2014

Contributor

It wont work, we would need to serialize not only DescriptorProtos.DescriptorProto but entire DescriptorProtos.FileDescriptorProto and then find corresponding DescriptorProtos.DescriptorProto by name.

I think we could recreate the Protobuf descriptor from Parquet Schema. We need keep original field number for each protobuffer field.
Could we add field "int originalIndex" to parquet.schema.Type @julienledem? I guess it could be usefull also for other frameworks (Thrift).

@julienledem

julienledem Feb 4, 2014

Owner

That's a good idea but cannot be added in this pull request as it requires a change to parquet-format.
In the mean time I would recommend keeping the information in the "extra metadata map"
parquet-thrift does it as a json structure representing the original thrift schema

@dvryaboy dvryaboy commented on an outdated diff Jan 6, 2014

...f/src/main/java/parquet/proto/ProtoParquetWriter.java
+ /**
+ * Create a new {@link ProtoParquetWriter}.
+ *
+ * @param file The file name to write to.
+ * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED
+ * @param blockSize HDFS block size
+ * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes.
+ * @param enableDictionary Whether to use a dictionary to compress columns.
+ * @throws IOException
+ */
+ public ProtoParquetWriter(Path file, Class<? extends Message> protoMessage,
+ CompressionCodecName compressionCodecName, int blockSize,
+ int pageSize, boolean enableDictionary) throws IOException {
+ super(file, (WriteSupport<T>)
+ new ProtoWriteSupport(protoMessage),
+ compressionCodecName, blockSize, pageSize, enableDictionary, false);
@dvryaboy

dvryaboy Jan 6, 2014

Contributor

why no validation?
(and when hardcoding boolean values, let's drop in a comment saying what we are setting to true/false, to make code more readable)

@dvryaboy dvryaboy commented on the diff Jan 6, 2014

...f/src/main/java/parquet/proto/ProtoParquetWriter.java
+ CompressionCodecName compressionCodecName, int blockSize,
+ int pageSize, boolean enableDictionary) throws IOException {
+ super(file, (WriteSupport<T>)
+ new ProtoWriteSupport(protoMessage),
+ compressionCodecName, blockSize, pageSize, enableDictionary, false);
+ }
+
+ /**
+ * Create a new {@link ProtoParquetWriter}. The default block size is 50 MB.The default
+ * page size is 1 MB. Default compression is no compression. (Inherited from {@link ParquetWriter})
+ *
+ * @param file The file name to write to.
+ * @throws IOException
+ */
+ public ProtoParquetWriter(Path file, Class<? extends Message> protoMessage) throws IOException {
+ this(file, protoMessage, CompressionCodecName.UNCOMPRESSED,
@dvryaboy

dvryaboy Jan 6, 2014

Contributor

hm we should probably add a default field for that in ParquetWriter to keep things cleaner. ( @aniket486 and @tsdeng do you agree?)

@aniket486

aniket486 Jan 6, 2014

Contributor

We are planning to move these to ParquetProperties to make this interface cleaner.

@dvryaboy dvryaboy commented on an outdated diff Jan 6, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+ Class<? extends Message> pbClass = configuration.getClass(PB_CLASS_WRITE, null, Message.class);
+ if (pbClass != null) {
+ protoMessage = pbClass;
+ rootSchema = new ProtoSchemaConverter().convert(pbClass);
+ } else {
+ String msg = "Protocol buffer class not specified.";
+ String hint = " Please use method ProtoParquetOutputFormat.setProtobufClass(...) or other similar method.";
+ throw new RuntimeException(msg + hint);
+ }
+ }
+
+ Map<String, String> extraMetaData = new HashMap<String, String>();
+ extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName());
+ extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage));
+
+ // TODO add also pig descriptor
@dvryaboy

dvryaboy Jan 6, 2014

Contributor

my preference here is to not do what thrift does but rather to allow end users to compose metadata writers, and create a bunch of them people can toss in, along the lines of what was suggested in #185 .

(which is a round-about way of saying I think it should be possible to drop the todo and yet have users who want to use pig, hive, etc, have a successful integration here without needing to change this package)

@dvryaboy dvryaboy and 2 others commented on an outdated diff Jan 6, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+ }
+
+ private void writeMessage(GroupType schema, MessageOrBuilder message) {
+ recordConsumer.startGroup();
+ writeRecordFields(schema, message);
+ recordConsumer.endGroup();
+ }
+
+ private void writeRecordFields(GroupType parquetSchema, MessageOrBuilder record) {
+ List<Type> fields = parquetSchema.getFields();
+
+ Map<Descriptors.FieldDescriptor, Object> pbFields = record.getAllFields();
+
+ for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : pbFields.entrySet()) {
+
+ Descriptors.FieldDescriptor fieldDescriptor = entry.getKey();
@dvryaboy

dvryaboy Jan 6, 2014

Contributor

Is there a reason you are not using a ParquetProtocol, like what was done in https://github.com/Parquet/parquet-mr/blob/master/parquet-thrift/src/main/java/parquet/thrift/ParquetWriteProtocol.java ? I suspect that approach could be significantly faster than doing this iteration and branching on every record.

@lukasnalezenec

lukasnalezenec Jan 6, 2014

Contributor

The only reason is that the code is based on older Avro code, not on Thrift code. I will look at this.

@julienledem

julienledem Jan 6, 2014

Owner

FYI: The main reason for the TProtocol implementation of parquet-thrift is to make it work with both Thrift and Scrooge.

Definitely here I would avoid looking up fields by their names and try to make everything run by index.

@dvryaboy

dvryaboy Jan 6, 2014

Contributor

@julienledem it seems like one should be able to build a state machine given a descriptor that would avoid all kinds of unnecessary branching points, field lookups, etc. For some reason I thought we were doing that for Thrift but I can see we aren't.. obviously this is beyond the scope of this PR, but am I totally wrong on that?

@dvryaboy dvryaboy and 1 other commented on an outdated diff Jan 6, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+ String arrayType = schema.getName();
+ recordConsumer.startField(arrayType, 0);
+ for (T elt : array) {
+ writeScalarValue((schema.getType(0)), fieldDescriptor, elt);
+ }
+ recordConsumer.endField(arrayType, 0);
+ }
+ recordConsumer.endGroup();
+ }
+
+
+ private void writeScalarValue(Type type, Descriptors.FieldDescriptor fieldDescriptor, Object value) {
+
+ JavaType javaType = fieldDescriptor.getJavaType();
+
+ if (javaType == JavaType.STRING) {
@dvryaboy

dvryaboy Jan 6, 2014

Contributor

switch?

@julienledem

julienledem Jan 6, 2014

Owner

switch for enums please

@dvryaboy dvryaboy commented on an outdated diff Jan 6, 2014

...va/parquet/proto/converters/ParentValueContainer.java
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.proto.converters;
+
+public abstract class ParentValueContainer {
@dvryaboy

dvryaboy Jan 6, 2014

Contributor

does this actually need to be public?
consider the same for "public" classes that are not really intended for use outside of the internal workings of this sub-module. We are adding semver checking in #265 and limiting public interfaces will let us be more flexible with regards to changing things around without needlessly bumping major version numbers.

@dvryaboy dvryaboy and 2 others commented on an outdated diff Jan 6, 2014

...a/parquet/proto/converters/ProtoBooleanConverter.java
+
+package parquet.proto.converters;
+
+import parquet.io.api.PrimitiveConverter;
+
+public final class ProtoBooleanConverter extends PrimitiveConverter {
+
+ final ParentValueContainer parent;
+
+ public ProtoBooleanConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBoolean(boolean value) {
+ parent.add(value ? Boolean.TRUE : Boolean.FALSE);
@dvryaboy

dvryaboy Jan 6, 2014

Contributor
  1. why not just parent.add(value) ?
  2. this points to the boxing issue.. staying in primitives when possible is a big time and gc saver. Take a look at https://github.com/wesleypeck/parquet-mr/blob/custom_file_metadata_support/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java
@lukasnalezenec

lukasnalezenec Jan 6, 2014

Contributor
  1. I wanted to bypass autoboxing this way. But i think there is no real difference - JIT will optimize it.
  2. I am using objects here because AFAIK there is no support for primitives in protocol buffers.
Contributor

dvryaboy commented Jan 6, 2014

I'm done with a cursory pass -- the only big things I see that block an initial import are the visibility level of internal interfaces/classes, and the class vs serialized descriptor as file metadata. The perf stuff can be changed as needed.

Other things to consider:

  • projection pushdown (expected schema is a subset of serialized schema -- what happens?)
  • evolved schema (you are reading two files, one has a new version of the protobuf schema, which is backwards compatible -- what happens?)

@tsdeng might have some thoughts too, he's had to work on the Thrift side quite a bit.

@lukasnalezenec lukasnalezenec commented on an outdated diff Jan 6, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+ writeMessage(type.asGroupType(), msg);
+ } else if (javaType == JavaType.INT) {
+ recordConsumer.addInteger((Integer) value);
+ } else if (javaType == JavaType.LONG) {
+ recordConsumer.addLong((Long) value);
+ } else if (javaType == JavaType.FLOAT) {
+ recordConsumer.addFloat((Float) value);
+ } else if (javaType == JavaType.DOUBLE) {
+ recordConsumer.addDouble((Double) value);
+ } else if (javaType == JavaType.ENUM) {
+ Descriptors.EnumValueDescriptor enumDescriptor = (Descriptors.EnumValueDescriptor) value;
+ recordConsumer.addBinary(Binary.fromString(enumDescriptor.getName()));
+ } else if (javaType == JavaType.BOOLEAN) {
+ recordConsumer.addBoolean((Boolean) value);
+ } else if (javaType == JavaType.BYTE_STRING) {
+ ByteString byteString = (ByteString) value;
@lukasnalezenec

lukasnalezenec Jan 6, 2014

Contributor

This could be faster

@lukasnalezenec lukasnalezenec and 1 other commented on an outdated diff Jan 6, 2014

...va/parquet/proto/converters/ProtoBinaryConverter.java
+
+import com.google.protobuf.ByteString;
+import parquet.io.api.Binary;
+import parquet.io.api.PrimitiveConverter;
+
+public final class ProtoBinaryConverter extends PrimitiveConverter {
+
+ final ParentValueContainer parent;
+
+ public ProtoBinaryConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary binary) {
+ ByteString byteString = ByteString.copyFrom(binary.toByteBuffer());
@lukasnalezenec

lukasnalezenec Jan 6, 2014

Contributor

This could be faster :(

@julienledem

julienledem Jan 6, 2014

Owner

how does the ByteString work?
I'm happy to help modifying the Binary class if that makes things easier

@lukasnalezenec

lukasnalezenec Jan 30, 2014

Contributor

It makes two or three allocations (ByteString + byte[], sometimes defensive copy of byte[]) - I am talking generally about conversion between ByteString and Binary and back)
I think we cannot do much about it.

@julienledem julienledem commented on an outdated diff Jan 6, 2014

...f/src/main/java/parquet/proto/ProtoParquetReader.java
+
+import com.google.protobuf.MessageOrBuilder;
+import org.apache.hadoop.fs.Path;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.ParquetReader;
+import parquet.hadoop.api.ReadSupport;
+
+import java.io.IOException;
+
+/**
+ * Read Protobuf records from a Parquet file.
+ */
+public class ProtoParquetReader<T extends MessageOrBuilder> extends ParquetReader<T> {
+
+ public ProtoParquetReader(Path file) throws IOException {
+ super(file, (ReadSupport<T>) new ProtoReadSupport());
@julienledem

julienledem Jan 6, 2014

Owner

should the bounds on ProtoReadSupport's type parameter be changed?
This cast seems unnecessary.

@julienledem julienledem commented on the diff Jan 6, 2014

...buf/src/main/java/parquet/proto/ProtoReadSupport.java
+ MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), requestedProjectionString);
+ LOG.debug("Reading data with projection " + requestedProjection);
+ return new ReadContext(requestedProjection);
+ } else {
+ MessageType fileSchema = context.getFileSchema();
+ LOG.debug("Reading data with schema " + fileSchema);
+ return new ReadContext(fileSchema);
+ }
+ }
+
+ @Override
+ public RecordMaterializer<T> prepareForRead(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
+ String strProtoClass = keyValueMetaData.get(PB_CLASS);
+
+ if (strProtoClass == null) {
+ throw new RuntimeException("I Need parameter " + PB_CLASS + " with Protocol Buffer class");
@julienledem

julienledem Jan 6, 2014

Owner

look at Prevalidator.nonNull()

@julienledem julienledem commented on an outdated diff Jan 6, 2014

...src/main/java/parquet/proto/ProtoRecordConverter.java
+
+
+ @Override
+ public void start() {
+ reusedBuilder.clear();
+ super.start();
+ }
+
+ @Override
+ public void end() {
+ // do nothing, dont call ParentValueContainer at top level.
+ }
+
+ T getCurrentRecord() {
+ if (buildBefore) {
+ return (T) this.reusedBuilder.build();
@julienledem

julienledem Jan 6, 2014

Owner

does Message.Builder have a type parameter? It looks like we can get rid of that cast

@julienledem julienledem commented on an outdated diff Jan 6, 2014

...src/main/java/parquet/proto/ProtoSchemaConverter.java
+ return messageType;
+ }
+
+ /* Iterates over list of fields. **/
+ private List<Type> convertFields(List<Descriptors.FieldDescriptor> fieldDescriptors) {
+ List<Type> types = new ArrayList<Type>();
+
+ for (Descriptors.FieldDescriptor fieldDescriptor : fieldDescriptors) {
+
+ String fieldName = fieldDescriptor.getName();
+ Type.Repetition repetition = getRepetition(fieldDescriptor);
+
+ Type type;
+ if (fieldDescriptor.isRepeated()) {
+ Type nestedType = convertScalarField(fieldName + "_tuple", fieldDescriptor, Type.Repetition.REPEATED);
+ type = ConversionPatterns.listType(Type.Repetition.OPTIONAL, fieldName, nestedType);
@julienledem

julienledem Jan 6, 2014

Owner

for protocol buffers you don't need to use the listType pattern.
This adds unnecessary layers to the schema.
It can just be a repeated field.

@julienledem julienledem commented on an outdated diff Jan 6, 2014

...src/main/java/parquet/proto/ProtoSchemaConverter.java
+ private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) {
+ Type.Repetition repetition;
+ if (descriptor.isRequired()) {
+ repetition = Type.Repetition.REQUIRED;
+ } else if (descriptor.isRepeated()) {
+ repetition = Type.Repetition.REPEATED;
+ } else {
+ repetition = Type.Repetition.OPTIONAL;
+ }
+ return repetition;
+ }
+
+ private Type convertScalarField(String fieldName, Descriptors.FieldDescriptor descriptor, Type.Repetition repetition) {
+ JavaType javaType = descriptor.getJavaType();
+
+ if (javaType == JavaType.BOOLEAN) {
@julienledem

julienledem Jan 6, 2014

Owner

if javaType is an enum, please use a switch here

@julienledem julienledem and 1 other commented on an outdated diff Jan 6, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+ rootSchema = new ProtoSchemaConverter().convert(protoMessage);
+ }
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+
+ // if no protobuf descriptor was given in constructor, load descriptor from configuration (set with setProtobufClass)
+ if (protoMessage == null) {
+ Class<? extends Message> pbClass = configuration.getClass(PB_CLASS_WRITE, null, Message.class);
+ if (pbClass != null) {
+ protoMessage = pbClass;
+ rootSchema = new ProtoSchemaConverter().convert(pbClass);
+ } else {
+ String msg = "Protocol buffer class not specified.";
+ String hint = " Please use method ProtoParquetOutputFormat.setProtobufClass(...) or other similar method.";
+ throw new RuntimeException(msg + hint);
@julienledem

julienledem Jan 6, 2014

Owner

Please use a more specific exception type. (I believe we have a parquet InvalidConfigurationException)

@lukasnalezenec

lukasnalezenec Jan 6, 2014

Contributor

found BadConfigurationException

@julienledem julienledem commented on an outdated diff Jan 6, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+
+ Object value = entry.getValue();
+
+ if (value != null) {
+
+ int parquetIndex = parquetSchema.getFieldIndex(fieldDescriptor.getName());
+
+ if (fieldDescriptor.isRepeated()) {
+ recordConsumer.startField(fieldType.getName(), parquetIndex);
+ writeArray(fieldType.asGroupType(), fieldDescriptor, (List<?>) value);
+ recordConsumer.endField(fieldType.getName(), parquetIndex);
+ } else {
+ recordConsumer.startField(fieldType.getName(), parquetIndex);
+ writeScalarValue(fieldType, fieldDescriptor, value);
+ recordConsumer.endField(fieldType.getName(), parquetIndex);
+ }
@julienledem

julienledem Jan 6, 2014

Owner

some of the code is the same in both branches.
also if you remove the list extra layer you can simplify a little

@julienledem julienledem commented on an outdated diff Jan 6, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+ writeRecordFields(schema, message);
+ recordConsumer.endGroup();
+ }
+
+ private void writeRecordFields(GroupType parquetSchema, MessageOrBuilder record) {
+ List<Type> fields = parquetSchema.getFields();
+
+ Map<Descriptors.FieldDescriptor, Object> pbFields = record.getAllFields();
+
+ for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : pbFields.entrySet()) {
+
+ Descriptors.FieldDescriptor fieldDescriptor = entry.getKey();
+ int protoIndex = fieldDescriptor.getIndex();
+ Type fieldType = fields.get(protoIndex);
+
+ Object value = entry.getValue();
@julienledem

julienledem Jan 6, 2014

Owner

you probably could avoid the value boxing here by moving the getValue() to the writeValue switch

@julienledem julienledem commented on an outdated diff Jan 6, 2014

...va/parquet/proto/converters/ProtoDoubleConverter.java
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.proto.converters;
+
+import parquet.io.api.PrimitiveConverter;
+
+public final class ProtoDoubleConverter extends PrimitiveConverter {
@julienledem

julienledem Jan 6, 2014

Owner

You can put all the primitive converters in the same class if you want.

@julienledem julienledem and 1 other commented on an outdated diff Jan 6, 2014

...java/parquet/proto/converters/ProtoEnumConverter.java
+ lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name));
+ }
+
+ return lookupStructure;
+ }
+
+ @Override
+ final public void addBinary(Binary binaryValue) {
+ Descriptors.EnumValueDescriptor protoValue = enumLookup.get(binaryValue);
+
+ if (protoValue == null) {
+ Set<Binary> knownValues = enumLookup.keySet();
+ String msg = "Illegal enum value \"" + binaryValue + "\""
+ + " in protocol buffer \"" + fieldType.getFullName() + "\""
+ + " legal values are: \"" + knownValues + "\"";
+ throw new RuntimeException(msg);
@julienledem

julienledem Jan 6, 2014

Owner

please use a more specific exception (that can expend ParquetRuntimeException)

@lukasnalezenec

lukasnalezenec Jan 6, 2014

Contributor

Which is better ParquetDecodingException or InvalidRecordException ?

@julienledem

julienledem Jan 6, 2014

Owner

ParquetDecodingException is fine

@julienledem julienledem and 1 other commented on an outdated diff Jan 6, 2014

...java/parquet/proto/converters/ProtoEnumConverter.java
+ Descriptors.EnumDescriptor enumType = enumFieldType.getEnumType();
+ Map<Binary, Descriptors.EnumValueDescriptor> lookupStructure = new HashMap<Binary, Descriptors.EnumValueDescriptor>();
+
+ List<Descriptors.EnumValueDescriptor> enumValues = enumType.getValues();
+
+ for (Descriptors.EnumValueDescriptor value : enumValues) {
+ String name = value.getName();
+ lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name));
+ }
+
+ return lookupStructure;
+ }
+
+ @Override
+ final public void addBinary(Binary binaryValue) {
+ Descriptors.EnumValueDescriptor protoValue = enumLookup.get(binaryValue);

@julienledem julienledem and 1 other commented on an outdated diff Jan 6, 2014

...a/parquet/proto/converters/ProtoMessageConverter.java
+ public ProtoMessageConverter(ParentValueContainer pvc, Class<? extends Message> protoClass, GroupType parquetSchema) {
+ this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema);
+ }
+
+
+ // For usage in message arrays
+ public ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) {
+
+ int schemaSize = parquetSchema.getFieldCount();
+ converters = new Converter[schemaSize];
+
+ this.parent = pvc;
+ int parquetFieldIndex = 1;
+
+ if (pvc == null) {
+ throw new IllegalStateException("Missing parent value container");
@julienledem

julienledem Jan 6, 2014

Owner

Prevalidators

@lukasnalezenec

lukasnalezenec Jan 18, 2014

Contributor

Preconditions ;)

@julienledem julienledem commented on an outdated diff Jan 6, 2014

...a/parquet/proto/converters/ProtoMessageConverter.java
+
+ if (pvc == null) {
+ throw new IllegalStateException("Missing parent value container");
+ }
+
+ myBuilder = builder;
+
+ Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType();
+
+ for (Type parquetField : parquetSchema.getFields()) {
+ Descriptors.FieldDescriptor protoField = protoDescriptor.findFieldByName(parquetField.getName());
+
+ if (parquetField.isRepetition(Type.Repetition.REPEATED)) {
+ GroupType groupType = parquetField.asGroupType();
+ if (groupType.getFieldCount() != 1) {
+ throw new RuntimeException("One field expected but found " + groupType);
@julienledem

julienledem Jan 6, 2014

Owner

better exception type

@julienledem julienledem commented on an outdated diff Jan 6, 2014

...a/parquet/proto/converters/ProtoMessageConverter.java
+ @Override
+ public void add(Object value) {
+ parentBuilder.addRepeatedField(fieldDescriptor, value);
+ }
+ };
+ } else {
+ parent = new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parentBuilder.setField(fieldDescriptor, value);
+ }
+ };
+ }
+
+ if (isRepeated) {
+ parquetType = parquetType.asGroupType().getType(0);
@julienledem

julienledem Jan 6, 2014

Owner

this is unnecessary if you remove the extra list layer.

@julienledem julienledem commented on the diff Jan 6, 2014

...test/java/parquet/proto/ProtoRecordConverterTest.java
+import static parquet.proto.TestUtils.testData;
+import static parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes;
+
+public class ProtoRecordConverterTest {
+
+ @Test
+ public void testAllTypes() throws Exception {
+ SchemaConverterAllDatatypes.Builder data;
+ data = SchemaConverterAllDatatypes.newBuilder();
+
+ data.setOptionalBool(true);
+ data.setOptionalBytes(ByteString.copyFrom("someText", "UTF-8"));
+ data.setOptionalDouble(0.577);
+ data.setOptionalFloat(3.1415f);
+ data.setOptionalEnum(SchemaConverterAllDatatypes.TestEnum.FIRST);
+ data.setOptionalFixed32(1000 * 1000 * 1);
@julienledem

julienledem Jan 6, 2014

Owner

can't we use those type-specific apis in the converters?

@lukasnalezenec

lukasnalezenec Jan 6, 2014

Contributor

We could but I think that most of the the tests would be trivial.

Owner

julienledem commented Jan 6, 2014

Great job, @lukasnalezenec
That's a great contribution.
I've made a couple of comments.
Thank you!

Contributor

dvryaboy commented Jan 6, 2014

I clearly suck at code reviews. Thanks @julienledem. :)

Owner

julienledem commented Jan 6, 2014

@dvryaboy we just have different sensibilities. :)

Contributor

lukasnalezenec commented Jan 27, 2014

@dvryaboy @julienledem
I have fixed most of code review comments.
ProtoWriteSuppors was rewriten almost from scratch.

@dvryaboy dvryaboy commented on the diff Jan 27, 2014

...et-column/src/main/java/parquet/io/api/Converter.java
@@ -27,11 +27,11 @@
abstract public boolean isPrimitive();
public PrimitiveConverter asPrimitiveConverter() {
- throw new ClassCastException(getClass().getName());
+ throw new ClassCastException("Expected instance of primitive converter but got \"" + getClass().getName() + "\"");
@dvryaboy

dvryaboy Jan 27, 2014

Contributor

good call, thanks.

@dvryaboy dvryaboy commented on the diff Jan 27, 2014

parquet-protobuf/pom.xml
+ <archive>
+ <manifestEntries>
+ <git-SHA-1>${buildNumber}</git-SHA-1>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
@dvryaboy

dvryaboy Jan 27, 2014

Contributor

I don't see jackson in the dependencies list -- is it included from somewhere else? Or are you just inheriting the transitive dependency via one of your other deps?
If you do need it, the current "best practice" for depending on jackson is to depend on parquet-jackson, which lets us only include the rewritten files once, via the p-j module.
It's definitely best to be explicit about needing something like jackson if you use it in your code than to rely on a transitive dep to bring it in (as a change in a transitive dep can thus break your code even if it's api-compatible).

@lukasnalezenec

lukasnalezenec Jan 28, 2014

Contributor

I dont use Parquet-Jackon in the protobuf subproject. Do i need it ? Some JSON support is inside Protobufs.
Reference to p-j is not even in Parquet-Avro.

@julienledem

julienledem Jan 29, 2014

Owner

If you don't use jackson, you should remove the shading config right bellow

@julienledem julienledem and 1 other commented on an outdated diff Jan 29, 2014

parquet-protobuf/pom.xml
@@ -0,0 +1,166 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet</artifactId>
+ <relativePath>../pom.xml</relativePath>
+ <version>1.3.2-SNAPSHOT</version>
@julienledem

julienledem Jan 29, 2014

Owner

you need to update to whatever is current in master

@lukasnalezenec

lukasnalezenec Jan 29, 2014

Contributor

I am sorry, fixed.

@julienledem julienledem commented on the diff Feb 4, 2014

...rc/main/java/parquet/proto/ProtoMessageConverter.java
+ }
+ }
+
+ }
+
+ final class ProtoBinaryConverter extends PrimitiveConverter {
+
+ final ParentValueContainer parent;
+
+ public ProtoBinaryConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary binary) {
+ ByteString byteString = ByteString.copyFrom(binary.toByteBuffer());
@julienledem

julienledem Feb 4, 2014

Owner

possibly we are copying the bytes twice here.
It could be fine, just a reminder to check here if perf improvements are needed in the future.

@julienledem julienledem commented on the diff Feb 4, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+
+ class FieldWriter {
+ String fieldName;
+ int index = -1;
+
+ void setFieldName(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ /** sets index of field inside parquet message.*/
+ void setIndex(int index) {
+ this.index = index;
+ }
+
+ /** Used for writing repeated fields*/
+ void writeRawValue(Object value) {
@julienledem

julienledem Feb 4, 2014

Owner

having Object here will force boxing primitives and add overhead (memory, GC, ...)

@julienledem julienledem commented on the diff Feb 4, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+ final void writeRawValue(Object value) {
+ recordConsumer.addInteger((Integer) value);
+ }
+ }
+
+ class LongWriter extends FieldWriter {
+
+ @Override
+ final void writeRawValue(Object value) {
+ recordConsumer.addLong((Long) value);
+ }
+ }
+
+ class FloatWriter extends FieldWriter {
+ @Override
+ final void writeRawValue(Object value) {
@julienledem

julienledem Feb 4, 2014

Owner

if we define a writeRawFloat(float value) we can use a type specific API

@julienledem julienledem commented on the diff Feb 4, 2014

...uf/src/main/java/parquet/proto/ProtoWriteSupport.java
+ final void writeField(Object value) {
+ recordConsumer.startField(fieldName, index);
+ recordConsumer.startGroup();
+ writeAllFields((MessageOrBuilder) value);
+ recordConsumer.endGroup();
+ recordConsumer.endField(fieldName, index);
+ }
+
+ private void writeAllFields(MessageOrBuilder pb) {
+ //returns changed fields with values. Map is ordered by id.
+ Map<Descriptors.FieldDescriptor, Object> changedPbFields = pb.getAllFields();
+
+ for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : changedPbFields.entrySet()) {
+ Descriptors.FieldDescriptor fieldDescriptor = entry.getKey();
+ int fieldIndex = fieldDescriptor.getIndex();
+ fieldWriters[fieldIndex].writeField(entry.getValue());
@julienledem

julienledem Feb 4, 2014

Owner

this is where we need a different call for each primitive type to avoid autoboxing.

@lukasnalezenec

lukasnalezenec Feb 9, 2014

Contributor

Are you sure we can avoid autoboxing in this case ? It is already autoboxed inside map.

@julienledem

julienledem Feb 10, 2014

Owner

Right, not in the map case, but in the other cases.

@lukasnalezenec

lukasnalezenec Feb 27, 2014

Contributor

IMHO we cant avoid auto/un boxing at all - (open source) protobuffers has no API working with primitive values. We will get autoboxed value and we need unbox it at least once.
We could do other optimization as devirtualizing method calls.

Owner

julienledem commented Feb 4, 2014

Thanks a lot @lukasnalezenec
This is great work.
I think we have 2 remaining comments:

  • how to keep information in the metadata about the original protobuf schema (field ids)
  • improve converters to avoid autoboxing (maybe some refactoring involved)

I would be Ok with addressing those in a later Pull Request.

Contributor

lukasnalezenec commented Feb 9, 2014

I am working now on the metadata storage. I am serializing the descriptor to JSON. Its little bit difficult for GeneratedMessages. I hope i will commit it in next few days. We could open new pull request for autoboxing.

Owner

julienledem commented Feb 10, 2014

This sounds good to me. If you are using Jackson, add the dependency on parquet-jackson and follow how dependencies and shading are configured in parquet-thrift. parquet-jackson is meant to avoid bundling a shaded jackson more than once.

Owner

julienledem commented Feb 20, 2014

@lukasnalezenec should I merge this pull request?
Which one do you prefer?

  • add more changes in this PR.
  • merge now and create other PRs for changes discussed.
Contributor

lukasnalezenec commented Feb 27, 2014

Hi,
I am sorry, I was a little busy. You can merge the request now, I will send next pull request soon. There are to ways how to store metadata, i need to discus it.

julienledem merged commit d356578 into Parquet:master Feb 28, 2014

1 check passed

default The Travis CI build passed
Details
Owner

julienledem commented Feb 28, 2014

Thanks a lot @lukasnalezenec !

@julienledem julienledem pushed a commit to julienledem/old-parquet-mr that referenced this pull request Jul 30, 2016

@rdblue rdblue PARQUET-364: Fix compatibility for Avro lists of lists.
This fixes lists of lists that have been written with Avro's 2-level
representation. The conversion setup logic missed the case where the
inner field is repeated and cannot be the element in a 3-level list.

This also fixes the schema conversion for cases where an unknown
writer used a 2-level list of lists.

This is based on @liancheng's #264 but fixes the problem in a slightly different way.

Author: Ryan Blue <blue@apache.org>

Closes #272 from rdblue/PARQUET-364-fix-avro-lists-of-lists and squashes the following commits:

41a70e0 [Ryan Blue] PARQUET-364: Fix compatibility for Avro lists of lists.
440882c
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment