Skip to content

Commit

Permalink
[SPARK-36935][SQL] Extend ParquetSchemaConverter to compute Parquet r…
Browse files Browse the repository at this point in the history
…epetition & definition level

### What changes were proposed in this pull request?

This PR includes the following:
1. adds a new class `ParquetColumn`, which is a wrapper on a Spark `DataType` with additional Parquet column information, including its max repetition level & definition level, path from the root schema, column descriptor if the node is a leaf, the children nodes is it is a non-leaf, etc. This is needed to support complex type in the vectorized path, where we need to assemble a column vector of complex type using these information.
2. extends `ParquetSchemaConverter` to convert from a Parquet `MessageType` to a `ParquetColumn`, mostly by piggy-backing on the existing logic. A new method `converParquetColumn` is added for this purpose, and the existing `convert` is changed to simply by calling the former.

### Why are the changes needed?

In order to support complex type for the vectorized Parquet reader (see SPARK-34863 for more info), we'll need to capture Parquet specific information such as max repetition/definition level for Spark's `DataType`, so that we can assemble primitive column vectors into ones with complex type (e.g., struct, map, array).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Extended the test cases in `ParquetSchemaSuite`

Closes #34199 from sunchao/SPARK-36935-column-io.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
sunchao authored and dongjoon-hyun committed Nov 2, 2021
1 parent 293c085 commit d246010
Show file tree
Hide file tree
Showing 6 changed files with 1,461 additions and 86 deletions.
40 changes: 40 additions & 0 deletions sql/core/src/main/java/org/apache/parquet/io/ColumnIOUtil.java
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.parquet.io;

/**
* This is a workaround since methods below are not public in {@link ColumnIO}.
*
* TODO(SPARK-36511): we should remove this once PARQUET-2050 and PARQUET-2083 are released with
* Parquet 1.13.
*/
public class ColumnIOUtil {
private ColumnIOUtil() {}

public static int getDefinitionLevel(ColumnIO column) {
return column.getDefinitionLevel();
}

public static int getRepetitionLevel(ColumnIO column) {
return column.getRepetitionLevel();
}

public static String[] getFieldPath(ColumnIO column) {
return column.getFieldPath();
}
}
Expand Up @@ -152,6 +152,7 @@ protected void initialize(String path, List<String> columns) throws IOException
Configuration config = new Configuration();
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);

this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
Expand Down Expand Up @@ -199,6 +200,7 @@ protected void initialize(
Configuration config = new Configuration();
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
this.totalRowCount = totalRowCount;
}
Expand Down
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.parquet.column.ColumnDescriptor
import org.apache.parquet.io.ColumnIOUtil
import org.apache.parquet.io.GroupColumnIO
import org.apache.parquet.io.PrimitiveColumnIO
import org.apache.parquet.schema.Type.Repetition

import org.apache.spark.sql.types.DataType

/**
* Rich information for a Parquet column together with its SparkSQL type.
*/
case class ParquetColumn(
sparkType: DataType,
descriptor: Option[ColumnDescriptor], // only set when this is a primitive column
repetitionLevel: Int,
definitionLevel: Int,
required: Boolean,
path: Seq[String],
children: Seq[ParquetColumn]) {

def isPrimitive: Boolean = descriptor.nonEmpty
}

object ParquetColumn {
def apply(sparkType: DataType, io: PrimitiveColumnIO): ParquetColumn = {
this(sparkType, Some(io.getColumnDescriptor), ColumnIOUtil.getRepetitionLevel(io),
ColumnIOUtil.getDefinitionLevel(io), io.getType.isRepetition(Repetition.REQUIRED),
ColumnIOUtil.getFieldPath(io), Seq.empty)
}

def apply(sparkType: DataType, io: GroupColumnIO, children: Seq[ParquetColumn]): ParquetColumn = {
this(sparkType, None, ColumnIOUtil.getRepetitionLevel(io),
ColumnIOUtil.getDefinitionLevel(io), io.getType.isRepetition(Repetition.REQUIRED),
ColumnIOUtil.getFieldPath(io), children)
}
}
Expand Up @@ -25,8 +25,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.ColumnIOFactory
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.{GroupType, Type}
import org.apache.parquet.schema.{GroupType, Type, Types}
import org.apache.parquet.schema.LogicalTypeAnnotation._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96}

Expand Down Expand Up @@ -609,7 +610,13 @@ private[parquet] class ParquetRowConverter(
//
// If the element type does not match the Catalyst type and the underlying repeated type
// does not belong to the legacy LIST type, then it is case 1; otherwise, it is case 2.
val guessedElementType = schemaConverter.convertField(repeatedType)
//
// Since `convertField` method requires a Parquet `ColumnIO` as input, here we first create
// a dummy message type which wraps the given repeated type, and then convert it to the
// `ColumnIO` using Parquet API.
val messageType = Types.buildMessage().addField(repeatedType).named("foo")
val column = new ColumnIOFactory().getColumnIO(messageType)
val guessedElementType = schemaConverter.convertField(column.getChild(0)).sparkType
val isLegacy = schemaConverter.isElementType(repeatedType, parquetSchema.getName)

if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) || isLegacy) {
Expand Down

0 comments on commit d246010

Please sign in to comment.