-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Description
Hi folks!
We're working on adding support for ProtoParquet to work with Hive / AWS Athena (Presto) [1]. The problem we've encountered appears whenever we declare a repeated field (array) or a map in the protobuf schema and we then try to convert it to parquet. The conversion works fine, but when we try to query the data with Hive/Presto, we get some freaky errors.
We've noticed though that AvroToParquet works great, even when we declare such fields (arrays, maps)!
Comparing the parquet schema generated by protobuf vs avro, we've noticed a few differences.
Take the simple schema below (protobuf):
message ListOfList {
string top_field = 1;
repeated MyInnerMessage first_array = 2;
}
message MyInnerMessage {
int32 inner_field = 1;
repeated int32 second_array = 2;
}After using ProtoParquetWriter, the resulting parquet schema is the following:
message TestProtobuf.ListOfList {
optional binary top_field (UTF8);
repeated group first_array {
optional int32 inner_field;
repeated int32 second_array;
}
}When we try to query this data, we get parsing errors from Hive/Athena. The parsing errors are related to the array/map fields.
However, if we create a similar avro schema, the parquet result of the AvroParquetWriter is the following:
message TestProtobuf.ListOfList {
required binary top_field (UTF8);
required group first_array (LIST) {
repeated group array {
required int32 inner_field;
required group second_array (LIST) {
repeated int32 array;
}
}
}
}This works beautifully with Hive/Athena. Too bad our systems are stuck with protobuf :-) .
You can see the additional wrappers which are missing from protobuf: required group first_array (LIST).
Our goal is to make the ProtoParquetWriter generate a parquet schema similar to what Avro is doing. We basically want to add these wrappers around lists/maps.
Everything seemed to work great, until we've bumped into an issue. We tuned ProtoParquetWriter to generate the same parquet schema as AvroParquetWriter. However, one difference between protobuf and avro is that in protobuf we can have a bunch of Optional fields.
message TestProtobuf.ListOfList {
optional binary top_field (UTF8);
required group first_array (LIST) {
repeated group array {
optional int32 inner_field;
required group second_array (LIST) {
repeated int32 array;
}
}
}
}Notice the: optional int32 inner_field (for avro that was required).
When testing with some real proto-parquet data, we get an error every time inner_field is not populated, but the second_array is.
parquet-tools cat /tmp/test23.parquet
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/test23.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:223)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:122)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:126)
at org.apache.parquet.tools.command.CatCommand.execute(CatCommand.java:79)
at org.apache.parquet.proto.tools.Main.main(Main.java:214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.parquet.io.ParquetDecodingException: totalValueCount '0' <= 0
at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:349)
at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:82)
at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:77)
at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:272)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:145)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:107)
at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:155)
at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:107)
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:194)
... 9 more
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/test23.parquet
Process finished with exit code 1
Basically this errors occurs whenever the first_array.inner_field is not populated, but first_array.second_array is.
I'm attaching the code used to generate the parquet files (though keep in mind that we're working on a fork atm).
Going through the code, I've noticed that the errors stop and everything seems to work fine, once I change this condition in ColumnReaderImpl:
From:
if (totalValueCount <= 0) {
throw new ParquetDecodingException("totalValueCount '" + totalValueCount + "' <= 0");
}To:
if (totalValueCount < 0) {
throw new ParquetDecodingException("totalValueCount '" + totalValueCount + "' < 0");
}--->
parquet-tools cat /tmp/test24.parquet
[main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - RecordReader initialized will read a total of 10 records.
[main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - at row 0. reading next block
[main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 27 ms. row count = 10
top_field = top_field
first_array:
.array:
..second_array:
...array = 20
top_field = top_field
first_array:
.array:
..second_array:
...array = 20
I am wondering what are your thoughts on this? Should we change this condition to if (totalValueCount < 0)?
Any feedback is gladly appreciated! Let me know if I missed some information.
Thanks,
Costi
[1] https://aws.amazon.com/athena/
Reporter: Constantin Muraru / @costimuraru
Original Issue Attachments:
Note: This issue was originally created as PARQUET-964. Please see the migration documentation for further details.