Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6123] [SPARK-6775] [SPARK-6776] [SQL] Refactors Parquet read path for interoperability and backwards-compatibility #7231

Closed
wants to merge 25 commits into from

Conversation

liancheng
Copy link
Contributor

This PR is a follow-up of #6617 and is part of SPARK-6774, which aims to ensure interoperability and backwards-compatibility for Spark SQL Parquet support. And this one fixes the read path. Now Spark SQL is expected to be able to read legacy Parquet data files generated by most (if not all) common libraries/tools like parquet-thrift, parquet-avro, and parquet-hive. However, we still need to refactor the write path to write standard Parquet LISTs and MAPs (SPARK-8848).

Major changes

  1. CatalystConverter class hierarchy refactoring

    • Replaces CatalystConverter trait with a much simpler ParentContainerUpdater.

      Now instead of extending the original CatalystConverter trait, every converter class accepts an updater which is responsible for propagating the converted value to some parent container. For example, appending array elements to a parent array buffer, appending a key-value pairs to a parent mutable map, or setting a converted value to some specific field of a parent row. Root converter doesn't have a parent and thus uses a NoopUpdater.

      This simplifies the design since converters don't need to care about details of their parent converters anymore.

    • Unifies CatalystRootConverter, CatalystGroupConverter and CatalystPrimitiveRowConverter into CatalystRowConverter

      Specifically, now all row objects are represented by SpecificMutableRow during conversion.

    • Refactors CatalystArrayConverter, and removes CatalystArrayContainsNullConverter and CatalystNativeArrayConverter

      CatalystNativeArrayConverter was probably designed with the intention of avoiding boxing costs. However, the way it uses Scala generics actually doesn't achieve this goal.

      The new CatalystArrayConverter handles both nullable and non-nullable array elements in a consistent way.

    • Implements backwards-compatibility rules in CatalystArrayConverter

      When Parquet records are being converted, schema of Parquet files should have already been verified. So we only need to care about the structure rather than field names in the Parquet schema. Since all map objects represented in legacy systems have the same structure as the standard one (see backwards-compatibility rules for MAP), we only need to deal with LIST (namely array) in CatalystArrayConverter.

  2. Requested columns handling

    When specifying requested columns in RowReadSupport, we used to use a Parquet MessageType converted from a Catalyst StructType which contains all requested columns. This is not preferable when taking compatibility and interoperability into consideration. Because the actual Parquet file may have different physical structure from the converted schema.

    In this PR, the schema for requested columns is constructed using the following method:

    • For a column that exists in the target Parquet file, we extract the column type by name from the full file schema, and construct a single-field MessageType for that column.
    • For a column that doesn't exist in the target Parquet file, we create a single-field StructType and convert it to a MessageType using CatalystSchemaConverter.
    • Unions all single-field MessageTypes into a full schema containing all requested fields

    With this change, we also fix SPARK-6123 by validating the global schema against each individual Parquet part-files.

Testing

This PR also adds compatibility tests for parquet-avro, parquet-thrift, and parquet-hive. Please refer to README.md under sql/core/src/test for more information about these tests. To avoid build time code generation and adding extra complexity to the build system, Java code generated from testing Thrift schema and Avro IDL is also checked in.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

val julianDay = buf.getInt
updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos))
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although INT96 and nanosecond timestamp is being deprecated in Parquet, this PR doesn't implement TIMESTAMP_MICROS, because parquet-mr 1.7.0 (the version we are currently using) doesn't have TIMESTAMP_MICROS yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Should probably add a TODO comment there.)

Done

@SparkQA
Copy link

SparkQA commented Jul 6, 2015

Test build #36553 has started for PR 7231 at commit 3581497.

@SparkQA
Copy link

SparkQA commented Jul 6, 2015

Test build #36553 has finished for PR 7231 at commit 3581497.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Jul 6, 2015

Test build #36554 has started for PR 7231 at commit 9b87903.

@SparkQA
Copy link

SparkQA commented Jul 6, 2015

Test build #36554 has finished for PR 7231 at commit 9b87903.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Jul 6, 2015

Test build #36557 has started for PR 7231 at commit 8be4723.

@SparkQA
Copy link

SparkQA commented Jul 6, 2015

Test build #36557 has finished for PR 7231 at commit 8be4723.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@liancheng
Copy link
Contributor Author

@rdblue As usual, you're always welcome to help reviewing this :-) As you suggested, the main part of this PR (CatalystRowConverter) takes parquet-avro as the reference and mostly resembles AvroIndexedRecordConverter.

@scwf It would be nice if you can help testing this PR against your legacy Hive data. It should fix the Parquet compatibility issue you mentioned to me. Thanks!

@Sephiroth-Lin Also would like to know whether this fixes your issue behind SPARK-8811. Thanks!

@rdblue
Copy link
Contributor

rdblue commented Jul 6, 2015

@liancheng, thanks! I'll make some time to look at it this week. Most likely Wednesday.

@liancheng liancheng changed the title [SPARK-6776] [SPARK-8811] [SQL] Refactors Parquet read path for interoperability and backwards-compatibility [SPARK-6123] [SPARK-6775] [SPARK-6776] [SQL] Refactors Parquet read path for interoperability and backwards-compatibility Jul 6, 2015
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Jul 6, 2015

Test build #36593 has started for PR 7231 at commit 8216e95.

@SparkQA
Copy link

SparkQA commented Jul 6, 2015

Test build #36593 has finished for PR 7231 at commit 8216e95.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@SparkQA
Copy link

SparkQA commented Jul 7, 2015

Test build #36672 has started for PR 7231 at commit e94cb1c.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Jul 8, 2015

Test build #36767 has started for PR 7231 at commit 598c3e8.

@SparkQA
Copy link

SparkQA commented Jul 8, 2015

Test build #36767 has finished for PR 7231 at commit 598c3e8.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Jul 8, 2015

Test build #36769 has started for PR 7231 at commit c6fbc06.

@SparkQA
Copy link

SparkQA commented Jul 8, 2015

Test build #36769 has finished for PR 7231 at commit c6fbc06.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Jul 8, 2015

Test build #36819 has started for PR 7231 at commit 360fe18.

@liancheng
Copy link
Contributor Author

Will probably merge this soon after Jenkins says OK as it blocks other work. More compatibility tests need to be done, but I'd like to leave them to follow-up PRs (ordered by importance):

  • Compatibility tests for older versions of Spark SQL
  • More patterns of nested complex types
  • Avro enum
  • Possibly compatibility tests for parquet-protobuf

@SparkQA
Copy link

SparkQA commented Jul 8, 2015

Test build #36819 has finished for PR 7231 at commit 360fe18.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@liancheng
Copy link
Contributor Author

@yhuai Thanks for the review! I'm merging this to master now.

@rdblue Please feel free to comment from the perspective of the Parquet spec and compatibility. Thanks!

@asfgit asfgit closed this in 4ffc27c Jul 8, 2015
@liancheng liancheng deleted the spark-6776 branch July 8, 2015 22:52
asfgit pushed a commit that referenced this pull request Jul 10, 2015
…ndencies

These two dependencies were introduced in #7231 to help testing Parquet compatibility with `parquet-thrift`. However, they somehow crash the Scala compiler in Maven builds.

This PR fixes this issue by:

1. Removing these two dependencies, and
2. Instead of generating the testing Parquet file programmatically, checking in an actual testing Parquet file generated by `parquet-thrift` as a test resource.

This is just a quick fix to bring back Maven builds. Need to figure out the root case as binary Parquet files are harder to maintain.

Author: Cheng Lian <lian@databricks.com>

Closes #7330 from liancheng/spark-8959 and squashes the following commits:

cf69512 [Cheng Lian] Brings back Maven builds
asfgit pushed a commit that referenced this pull request Sep 1, 2015
This PR can be quite challenging to review.  I'm trying to give a detailed description of the problem as well as its solution here.

When reading Parquet files, we need to specify a potentially nested Parquet schema (of type `MessageType`) as requested schema for column pruning.  This Parquet schema is translated from a Catalyst schema (of type `StructType`), which is generated by the query planner and represents all requested columns.  However, this translation can be fairly complicated because of several reasons:

1.  Requested schema must conform to the real schema of the physical file to be read.

    This means we have to tailor the actual file schema of every individual physical Parquet file to be read according to the given Catalyst schema.  Fortunately we are already doing this in Spark 1.5 by pushing request schema conversion to executor side in PR #7231.

1.  Support for schema merging.

    A single Parquet dataset may consist of multiple physical Parquet files come with different but compatible schemas.  This means we may request for a column path that doesn't exist in a physical Parquet file.  All requested column paths can be nested.  For example, for a Parquet file schema

    ```
    message root {
      required group f0 {
        required group f00 {
          required int32 f000;
          required binary f001 (UTF8);
        }
      }
    }
    ```

    we may request for column paths defined in the following schema:

    ```
    message root {
      required group f0 {
        required group f00 {
          required binary f001 (UTF8);
          required float f002;
        }
      }

      optional double f1;
    }
    ```

    Notice that we pruned column path `f0.f00.f000`, but added `f0.f00.f002` and `f1`.

    The good news is that Parquet handles non-existing column paths properly and always returns null for them.

1.  The map from `StructType` to `MessageType` is a one-to-many map.

    This is the most unfortunate part.

    Due to historical reasons (dark histories!), schemas of Parquet files generated by different libraries have different "flavors".  For example, to handle a schema with a single non-nullable column, whose type is an array of non-nullable integers, parquet-protobuf generates the following Parquet schema:

    ```
    message m0 {
      repeated int32 f;
    }
    ```

    while parquet-avro generates another version:

    ```
    message m1 {
      required group f (LIST) {
        repeated int32 array;
      }
    }
    ```

    and parquet-thrift spills this:

    ```
    message m1 {
      required group f (LIST) {
        repeated int32 f_tuple;
      }
    }
    ```

    All of them can be mapped to the following _unique_ Catalyst schema:

    ```
    StructType(
      StructField(
        "f",
        ArrayType(IntegerType, containsNull = false),
        nullable = false))
    ```

    This greatly complicates Parquet requested schema construction, since the path of a given column varies in different cases.  To read the array elements from files with the above schemas, we must use `f` for `m0`, `f.array` for `m1`, and `f.f_tuple` for `m2`.

In earlier Spark versions, we didn't try to fix this issue properly.  Spark 1.4 and prior versions simply translate the Catalyst schema in a way more or less compatible with parquet-hive and parquet-avro, but is broken in many other cases.  Earlier revisions of Spark 1.5 only try to tailor the Parquet file schema at the first level, and ignore nested ones.  This caused [SPARK-10301] [spark-10301] as well as [SPARK-10005] [spark-10005].  In PR #8228, I tried to avoid the hard part of the problem and made a minimum change in `CatalystRowConverter` to fix SPARK-10005.  However, when taking SPARK-10301 into consideration, keeping hacking `CatalystRowConverter` doesn't seem to be a good idea.  So this PR is an attempt to fix the problem in a proper way.

For a given physical Parquet file with schema `ps` and a compatible Catalyst requested schema `cs`, we use the following algorithm to tailor `ps` to get the result Parquet requested schema `ps'`:

For a leaf column path `c` in `cs`:

- if `c` exists in `cs` and a corresponding Parquet column path `c'` can be found in `ps`, `c'` should be included in `ps'`;
- otherwise, we convert `c` to a Parquet column path `c"` using `CatalystSchemaConverter`, and include `c"` in `ps'`;
- no other column paths should exist in `ps'`.

Then comes the most tedious part:

> Given `cs`, `ps`, and `c`, how to locate `c'` in `ps`?

Unfortunately, there's no quick answer, and we have to enumerate all possible structures defined in parquet-format spec.  They are:

1.  the standard structure of nested types, and
1.  cases defined in all backwards-compatibility rules for `LIST` and `MAP`.

The core part of this PR is `CatalystReadSupport.clipParquetType()`, which tailors a given Parquet file schema according to a requested schema in its Catalyst form.  Backwards-compatibility rules of `LIST` and `MAP` are covered in `clipParquetListType()` and `clipParquetMapType()` respectively.  The column path selection algorithm is implemented in `clipParquetGroupFields()`.

With this PR, we no longer need to do schema tailoring in `CatalystReadSupport` and `CatalystRowConverter`.  Another benefit is that, now we can also read Parquet datasets consist of files with different physical Parquet schema but share the same logical schema, for example, files generated by different Parquet libraries.  This situation is illustrated by [this test case] [test-case].

[spark-10301]: https://issues.apache.org/jira/browse/SPARK-10301
[spark-10005]: https://issues.apache.org/jira/browse/SPARK-10005
[test-case]: liancheng@38644d8#diff-a9b98e28ce3ae30641829dffd1173be2R26

Author: Cheng Lian <lian@databricks.com>

Closes #8509 from liancheng/spark-10301/fix-parquet-requested-schema.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants