Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-1441: SchemaParseException: Can't redefine: list in AvroIndexedRecordConverter #560

Merged
merged 2 commits into from May 28, 2019

Conversation

nandorKollar
Copy link
Contributor

@nandorKollar nandorKollar commented Nov 29, 2018

Parquet Avro reader can't convert such Parquet schemas, where a group field name is reused
in an inner structure. The converter creates Avro record schma in this case,
but in Avro record types should have a unique name, therefore the result is an invalid Avro
schema. This patch fixes this case by adding a namespace for the record if the name was
defined before, this way making the record names unique.

…edRecordConverter

Parquet Avro reader can't convert Parquet such a schema where a group field name is reused
in an inner structure. The converter creates Avro record schma in this case,
but in Avro record types should have a unique name, therefore the result is an invalid Avro
schema. This patch fixes this case by adding a namespace for the record if the name was
defined before, this way making the record names unique.
Copy link
Contributor

@gszadovszky gszadovszky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@liorchaga
Copy link

Really looking forward for this patch!

So I've tested the patch locally, and it fails on following schema for 2nd parquet nested column:

optional group pv_acceptedLanguages (LIST) {
  repeated group list {
    optional binary element (STRING);
  }
}

This is the GroupType that represents following column in parquet file:

{
   "name":"pv_acceptedLanguages",
   "type":{
      "type":"array",
      "elementType":"string",
      "containsNull":true
   },
   "nullable":true,
   "metadata":{
   }
}

The exception I'm getting is:

Exception in thread "main" java.lang.ClassCastException: optional binary element (STRING) is not a group
	at org.apache.parquet.schema.Type.asGroupType(Type.java:227)
	at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:168)
	at org.apache.parquet.avro.AvroIndexedRecordConverter.access$200(AvroIndexedRecordConverter.java:48)
	at org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter$ElementConverter.<init>(AvroIndexedRecordConverter.java:381)
	at org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter.<init>(AvroIndexedRecordConverter.java:344)
	at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:172)
	at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
	at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:66)
	at org.apache.parquet.avro.AvroCompatRecordMaterializer.<init>(AvroCompatRecordMaterializer.java:34)
	at org.apache.parquet.avro.AvroReadSupport.newCompatMaterializer(AvroReadSupport.java:144)
	at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:136)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
	at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)

The exception is triggered just after successfully processing the following parquet column (first nested column it encountered):

{
   "name":"pv_events",
   "type":{
      "type":"array",
      "elementType":{
         "type":"struct",
         "fields":[
            {
               "name":"eventTime",
               "type":"long",
               "nullable":true,
               "metadata":{}
            },
            {
               "name":"name",
               "type":"string",
               "nullable":true,
               "metadata":{}
            },
            {
               "name":"type",
               "type":"string",
               "nullable":true,
               "metadata":{}
            }
         ]
      },
      "containsNull":true
   },
   "nullable":true,
   "metadata":{}
}

@nandorKollar
Copy link
Contributor Author

nandorKollar commented Dec 4, 2018

@liorchaga is this something that used to work without this patch? It would be awkward to cause regression. Would you mind demonstrate the failure with a unit test too, or help me how to reproduce? The schema you provided doesn't look familiar for me (one with name pv_events), what kind of schema is it?

@liorchaga
Copy link

@nandorKollar No, this is not something that used to work. I tried making it work last week with parquet-mr 1.8.2 but got SchemaParseException: Can't redefine: list.
Setting parquet.avro.add-list-element-records to false just changed it to SchemaParseException: Can't redefine: element. So this patch really comes at the right time.

The schema I provided is just the name of a column we have in production data.

I will provide a test or reproduction steps tomorrow... Thanks!

@liorchaga
Copy link

@nandorKollar
Attached is a sample parquet file containing two columns - availableLanguages and events. Both are nested columns.

Used the following code snippet to get avro schema:

AvroReadSupport readSupport = new AvroReadSupport();
ParquetReader<GenericData.Record> avroParquetReader =
        AvroParquetReader.builder(readSupport,
             new Path("file:///path/to/file/part-00000-a13d1582-160e-4e18-9be3-9b178641db76-c000.snappy.parquet"))
            .build();
GenericData.Record record = avroParquetReader.read();
record.getSchema();

part-00000-a13d1582-160e-4e18-9be3-9b178641db76-c000.snappy.parquet.zip

@nandorKollar
Copy link
Contributor Author

@liorchaga this particular case I think you should explicitly tell, that the file schema is written using 3-level structure by setting ADD_LIST_ELEMENT_RECORDS to false in the Configuration object, and build the reader with it:

    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(false);
    conf.set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false");
    AvroReadSupport readSupport = new AvroReadSupport();
    ParquetReader<GenericData.Record> avroParquetReader =
      AvroParquetReader.builder(readSupport,
        new Path("file:////Users/nkollar/Downloads/part-00000-a13d1582-160e-4e18-9be3-9b178641db76-c000.snappy.parquet"))
        .withConf(conf)
        .build();
    GenericData.Record record = avroParquetReader.read();
    record.getSchema();

@liorchaga
Copy link

liorchaga commented Dec 5, 2018

Thanks @nandorKollar . I was under the impression that patch obviates the need for this flag.
Anyway, now it fails on nested arrays within nested arrays.

org.apache.avro.SchemaParseException: Can't redefine: element2.element

parquet schema example:

message spark_schema {
  optional group history (LIST) {
    repeated group list {
      optional group element {
        optional binary user (UTF8);
        optional group historyDepthByUserHistoryType (LIST) {
          repeated group list {
            optional group element {
              optional binary historyType (UTF8);
              optional int64 numEvents;
            }
          }
        }
      }
    }
  }
}

part-00000-83d942f8-dec0-4420-aaaa-4e3e33ba340a-c000.snappy.parquet.zip

@nandorKollar
Copy link
Contributor Author

Strange, I tried, but I didn't get the exception you mentioned.

@liorchaga
Copy link

Well, just to make sure - this is the code I executed. Now I see it actually failed on the avroParquetReader.read(), and not on the schema.

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetReader;

public class Tester {
    public static void main(String[] args) throws IOException {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false");
        AvroReadSupport readSupport = new AvroReadSupport();
        ParquetReader<GenericData.Record> avroParquetReader =
                AvroParquetReader.builder(readSupport,
                        new Path("file:///path/to/file/part-00001-ea373355-e5b6-4372-b7cc-f707e94111fb-c000.snappy.parquet"))
                        .withConf(conf)
                        .build();
        GenericData.Record record = avroParquetReader.read();
        final Schema schema = record.getSchema();
    }
}

@liorchaga
Copy link

Ok, my bad. If failed on another file with wider schema, containing also this field. I'll check myself again (must be out of focus today)

@liorchaga
Copy link

Ok, so once I have two upper level arrays, and one of them contains nested arrays, it actually generates a valid schema, but fails on schema.toString() with SchemaParseException: Can't redefine: element2.element

schema:

message spark_schema {
  optional group history (LIST) {
    repeated group list {
      optional group element {
        optional binary user (UTF8);
        optional group historyDepthByUserHistoryType (LIST) {
          repeated group list {
            optional group element {
              optional binary historyType (UTF8);
              optional int64 numEvents;
            }
          }
        }
      }
    }
  }
  optional group campaigns (LIST) {
    repeated group list {
      optional group element {
        optional double calibratedRecsHistory;
        optional int64 campaignId;
        optional int64 eventTime;
        optional int64 recsHistory;
      }
    }
  }
}

file attached.
part-00000-4b659403-e0ec-4f55-b15c-09c25894e4f1-c000.snappy.parquet.zip

Full stack trace:

Exception in thread "main" org.apache.avro.SchemaParseException: Can't redefine: element2.element
	at org.apache.avro.Schema$Names.put(Schema.java:1128)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema.toString(Schema.java:324)
	at org.apache.avro.Schema.toString(Schema.java:314)
	at java.lang.String.valueOf(String.java:2994)
	at java.io.PrintStream.println(PrintStream.java:821)
	at Tester.main(Tester.java:24)

@liorchaga
Copy link

Also - when I'm running on full schema, same exception occurs on avroParquetReader.read()
I'll try to isolate and provide relevant schema

@liorchaga
Copy link

Well, turns out that if I just switch the order of columns in the schema, it fails on read. Attached file here:
part-00000-471d7fc7-dea3-4198-9db3-3a968b1a075d-c000.snappy.parquet.zip

Stack trace:

org.apache.avro.SchemaParseException: Can't redefine: element2.element
	at org.apache.avro.Schema$Names.put(Schema.java:1128)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema.toString(Schema.java:324)
	at org.apache.avro.Schema.toString(Schema.java:314)
	at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:305)
	at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:277)
	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:231)
	at org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:152)
	at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:251)
	at org.apache.avro.SchemaCompatibility$ReaderWriterCompatiblityChecker.getCompatibility(SchemaCompatibility.java:228)
	at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:61)
	at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
	at org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter.<init>(AvroIndexedRecordConverter.java:333)
	at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:172)
	at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
	at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:66)
	at org.apache.parquet.avro.AvroCompatRecordMaterializer.<init>(AvroCompatRecordMaterializer.java:34)
	at org.apache.parquet.avro.AvroReadSupport.newCompatMaterializer(AvroReadSupport.java:144)
	at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:136)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
	at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
	at Tester.main(Tester.java:22)

@nandorKollar
Copy link
Contributor Author

@liorchaga thanks for catching this, I think I found the problem: I used Map#merge wrongly. Pushed a new commit, it should fix this case.

@liorchaga
Copy link

Superb. I'll test it on full schema again soon

@liorchaga
Copy link

@nandorKollar fix works likes magic. Thanks!
I think it worth adding a unit test to cover it.

@heuermh
Copy link

heuermh commented Mar 10, 2019

Any updates to the state of this pull request?

This effects us downstream of Apache Spark, which is at the moment still going through the RC process for a version 2.4.1 release. (This issue broke us as of Spark version 2.4.0)

https://issues.apache.org/jira/browse/SPARK-25588
bigdatagenomics/adam#2058
bigdatagenomics/adam#2056

@nandorKollar
Copy link
Contributor Author

@rdblue what do you think, is using Avro namespace to solve this problem the right approach? It looks like name collision in Avro schema (while converting Parquet lists for example, where the group name is the same for embedded list) causes problems like this, and I think Avro namespaces would be a good option to solve issues like this.

@liorchaga
Copy link

I personally think this fix is superb. Already use it in production with shaded jar including tgis patch, reading and writing very complex parquet schema.
+1 for merging

@rdblue
Copy link
Contributor

rdblue commented Mar 18, 2019

I think it is okay to use the namespace to solve this problem. The idea is to use the path within the schema as the namespace and the last component as the record name, right? So it would produce list_one.element and list_two.element instead of two records named element?

We fixed this in Iceberg by moving to IDs. You could consider doing something similar when there is a field ID defined on the Parquet type. We use r12 for ID 12, for example.

@nandorKollar
Copy link
Contributor Author

@rdblue something similar, but the namespace is not the entire path of the record, but simply the record name + a counter. So in your example the namespaces will be element1 and element2, and the two record names are element1.element and element2.element. Namespaces are used only when it is indeed required: when the record name was encountered before, so if there's only one list with element, then no namespace is used. Would you prefer entire path to the record as namespaces?

IDs you mean Parquet field IDs? In that case, IDs would give the namespaces for the Avro records right? That sounds also promising for me, however in that case the client should modify the Parquet schema by adding unique ID fields, since AFAIK IDs are optional.

@rdblue
Copy link
Contributor

rdblue commented Mar 18, 2019

Sounds reasonable to me. I don't think it matters whether it is a path or a counter.

We used field IDs to name the record itself without using namespaces.

@heuermh
Copy link

heuermh commented Mar 20, 2019

Might it be possible to get this merged and into a bugfix release 1.10.2 soon? If the Spark 2.4.1 RC process drags out a bit longer, it would be nice to get this in.

@rdblue
Copy link
Contributor

rdblue commented Mar 20, 2019

I don't think it is necessary to get this into Spark 2.4.1. Spark doesn't use parquet-avro, except in tests.

@heuermh
Copy link

heuermh commented Mar 20, 2019

I don't think it is necessary to get this into Spark 2.4.1. Spark doesn't use parquet-avro, except in tests.

Sorry if I sound frustrated, I feel like I keep having to re-explain myself. The versions of Parquet and Avro in Spark are incompatible with each other, and downstream we don't have any way to workaround the conflict. Per

https://issues.apache.org/jira/browse/SPARK-25588

We have been struggling with Spark's conflicting Parquet and Avro dependencies for many versions. Our most recent workaround is to pin parquet-avro to version 1.8.1 and exclude all its transitive dependencies. This workaround worked for 2.3.2, thus I gave the last Spark RC a non-binding +1.

https://github.com/bigdatagenomics/adam/blob/master/pom.xml#L520

That workaround does not work for 2.4.0, as this pinned version 1.8.1 conflicts at runtime with version 1.10.0 brought in by Spark.

$ mvn test
...
*** RUN ABORTED ***
  java.lang.NoSuchFieldError: BROTLI
  at org.apache.parquet.hadoop.metadata.CompressionCodecName.<clinit>(CompressionCodecName.java:31)
  at org.bdgenomics.adam.rdd.JavaSaveArgs$.$lessinit$greater$default$4(GenomicRDD.scala:78)

Removing the pinned version and dependency exclusions, bringing the build dependency version to 1.10.0, results in the error reported here in our unit tests under Spark version 2.4.0.

bigdatagenomics/adam#2056

I believe this pull request will fix the SchemaParseException error, and when the Parquet dependency version in Spark is bumped to a Parquet release containing this fix, downstream projects such as ours will no longer be blocked going to Spark 2.4.x.

@rdblue
Copy link
Contributor

rdblue commented Mar 21, 2019

@heuermh, you should be able to shade parquet-avro to avoid that conflict. Have you tried that?

@heuermh
Copy link

heuermh commented Mar 24, 2019

you should be able to shade parquet-avro to avoid that conflict. Have you tried that?

I've not yet found any combination of dependency exclusion, version overrides, and dependency shading that works, not for lack of trying.

Spark 2.4.1 rc8 has failed and there might be a chance to get a PR in rc9 or a later release candidate that updates the Parquet dependency to include this fix. How heavyweight is the release process here for a 1.10.2 release?

@rdblue
Copy link
Contributor

rdblue commented Mar 25, 2019

@heuermh, have you tried setting the schemas manually instead of letting Parquet convert them?

Unfortunately, I think this needs to wait to be in the 1.11.0 release. 1.10.2 would be a patch release and we want to keep those to just bug fixes. While this is a bug, it can break existing code because it changes the names of structures in the Avro schema.

This could cause a problem when reading with a projection schema. Inside of unions, types are matched by name. Because this changes the full name of types, it could cause existing code to not be able to read unions when projecting with an Avro schema.

@heuermh
Copy link

heuermh commented Mar 25, 2019

This could cause a problem when reading with a projection schema. Inside of unions, types are matched by name. Because this changes the full name of types, it could cause existing code to not be able to read unions when projecting with an Avro schema.

Ah right, good point.

have you tried setting the schemas manually instead of letting Parquet convert them?

It is Spark SQL generating the schemas when saving a Dataset or DataFrame as Parquet; I'm not sure there is a hook to specify an Avro-generated schema there. Will look into it.

In any case, I generated and deployed SNAPSHOT builds all the way up our stack from this branch (Parquet → Spark → bdg-utilsADAM) and it seems to work for us.

@zivanfi zivanfi merged commit 1e5fda5 into apache:master May 28, 2019
@heuermh
Copy link

heuermh commented May 28, 2019

@zivanfi Thank you for the merge! Which Parquet version would this fix be released in?

The workaround we had to put in place is very brittle -- it involves a copy of parquet-avro source with modifications in our build, shading of external parquet-avro classes, and a custom maven-shade-plugin Shader that excludes the external parquet-avro copies of the modified classes in our build.

@zivanfi
Copy link
Contributor

zivanfi commented May 28, 2019

@heuermh Sorry, I can't tell, unfortunately I don't have too much impact on the release process.

@elkhand
Copy link

elkhand commented Jul 19, 2019

While trying to read with master branch build, no exceptions are thrown, but the values of repeated array types are set to null. I verified the values indeed exist in parquet file via parquet-tools and pyspark:

optional group trip_uuids (LIST) {
      repeated binary array (UTF8);
}

Corresponding avro schema:

{
              "name": "trip_uuids",
              "type": [
                "null",
                {
                  "type": "array",
                  "items": "string"
                }
              ],
              "default": null
}

I have this Hadoop configuration:

this.hadoopConf = new Configuration(false);
this.hadoopConf.setBoolean("parquet.avro.add-list-element-records", false);
this.hadoopConf.setBoolean("parquet.avro.write-old-list-structure", false);

@heuermh , is there any property I am missing in config ?

@elkhand
Copy link

elkhand commented Jul 19, 2019

The issue was the parquet file was written with Parquet version 1.8.1 and avro 1.8.2, while trying to read with the built from master (1.12) it causes extra issues. Maybe backward compatibility is broken.
and hadoop conf I used was just default one:

this.hadoopConf = new Configuration();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants