Skip to content
Permalink
Browse files

[SPARK-23822][SQL] Improve error message for Parquet schema mismatches

## What changes were proposed in this pull request?

This pull request tries to improve the error message for spark while reading parquet files with different schemas, e.g. One with a STRING column and the other with a INT column. A new ParquetSchemaColumnConvertNotSupportedException is added to replace the old UnsupportedOperationException. The Exception is again wrapped in FileScanRdd.scala to throw a more a general QueryExecutionException with the actual parquet file name which trigger the exception.

## How was this patch tested?

Unit tests added to check the new exception and verify the error messages.

Also manually tested with two parquet with different schema to check the error message.

<img width="1125" alt="screen shot 2018-03-30 at 4 03 04 pm" src="https://user-images.githubusercontent.com/37087310/38156580-dd58a140-3433-11e8-973a-b816d859fbe1.png">

Author: Yuchen Huo <yuchen.huo@databricks.com>

Closes #20953 from yuchenhuo/SPARK-23822.

(cherry picked from commit 9452401)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information...
yuchenhuo authored and gatorsmile committed Apr 6, 2018
1 parent f93667f commit ccc4a20453bbbaf1f3e5e46fb7c0277f1e6c65b9
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources;

import org.apache.spark.annotation.InterfaceStability;

/**
* Exception thrown when the parquet reader find column type mismatches.
*/
@InterfaceStability.Unstable
public class SchemaColumnConvertNotSupportedException extends RuntimeException {

/**
* Name of the column which cannot be converted.
*/
private String column;
/**
* Physical column type in the actual parquet file.
*/
private String physicalType;
/**
* Logical column type in the parquet schema the parquet reader use to parse all files.
*/
private String logicalType;

public String getColumn() {
return column;
}

public String getPhysicalType() {
return physicalType;
}

public String getLogicalType() {
return logicalType;
}

public SchemaColumnConvertNotSupportedException(
String column,
String physicalType,
String logicalType) {
super();
this.column = column;
this.physicalType = physicalType;
this.logicalType = logicalType;
}
}
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.IOException;
import java.util.Arrays;
import java.util.TimeZone;

import org.apache.parquet.bytes.BytesUtils;
@@ -31,6 +32,7 @@
import org.apache.parquet.schema.PrimitiveType;

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
@@ -231,6 +233,18 @@ private boolean shouldConvertTimestamps() {
return convertTz != null && !convertTz.equals(UTC);
}

/**
* Helper function to construct exception for parquet schema mismatch.
*/
private SchemaColumnConvertNotSupportedException constructConvertNotSupportedException(
ColumnDescriptor descriptor,
WritableColumnVector column) {
return new SchemaColumnConvertNotSupportedException(
Arrays.toString(descriptor.getPath()),
descriptor.getType().toString(),
column.dataType().toString());
}

/**
* Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
*/
@@ -261,7 +275,7 @@ private void decodeDictionaryIds(
}
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
throw constructConvertNotSupportedException(descriptor, column);
}
break;

@@ -282,7 +296,7 @@ private void decodeDictionaryIds(
}
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
throw constructConvertNotSupportedException(descriptor, column);
}
break;

@@ -321,7 +335,7 @@ private void decodeDictionaryIds(
}
}
} else {
throw new UnsupportedOperationException();
throw constructConvertNotSupportedException(descriptor, column);
}
break;
case BINARY:
@@ -360,7 +374,7 @@ private void decodeDictionaryIds(
}
}
} else {
throw new UnsupportedOperationException();
throw constructConvertNotSupportedException(descriptor, column);
}
break;

@@ -375,7 +389,9 @@ private void decodeDictionaryIds(
*/

private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
assert(column.dataType() == DataTypes.BooleanType);
if (column.dataType() != DataTypes.BooleanType) {
throw constructConvertNotSupportedException(descriptor, column);
}
defColumn.readBooleans(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}
@@ -394,7 +410,7 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) {
defColumn.readShorts(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
throw constructConvertNotSupportedException(descriptor, column);
}
}

@@ -414,7 +430,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) {
}
}
} else {
throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
throw constructConvertNotSupportedException(descriptor, column);
}
}

@@ -425,7 +441,7 @@ private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
defColumn.readFloats(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
throw constructConvertNotSupportedException(descriptor, column);
}
}

@@ -436,7 +452,7 @@ private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
defColumn.readDoubles(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
throw constructConvertNotSupportedException(descriptor, column);
}
}

@@ -471,7 +487,7 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) {
}
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
throw constructConvertNotSupportedException(descriptor, column);
}
}

@@ -510,7 +526,7 @@ private void readFixedLenByteArrayBatch(
}
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
throw constructConvertNotSupportedException(descriptor, column);
}
}

@@ -17,4 +17,5 @@

package org.apache.spark.sql.execution

class QueryExecutionException(message: String) extends Exception(message)
class QueryExecutionException(message: String, cause: Throwable = null)
extends Exception(message, cause)
@@ -21,11 +21,14 @@ import java.io.{FileNotFoundException, IOException}

import scala.collection.mutable

import org.apache.parquet.io.ParquetDecodingException

import org.apache.spark.{Partition => RDDPartition, TaskContext, TaskKilledException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.NextIterator

@@ -179,7 +182,23 @@ class FileScanRDD(
currentIterator = readCurrentFile()
}

hasNext
try {
hasNext
} catch {
case e: SchemaColumnConvertNotSupportedException =>
val message = "Parquet column cannot be converted in " +
s"file ${currentFile.filePath}. Column: ${e.getColumn}, " +
s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
throw new QueryExecutionException(message, e)
case e: ParquetDecodingException =>
if (e.getMessage.contains("Can not read value at")) {
val message = "Encounter error while reading parquet files. " +
"One possible cause: Parquet column cannot be converted in the " +
"corresponding files. Details: "
throw new QueryExecutionException(message, e)
}
throw e
}
} else {
currentFile = null
InputFileBlockHolder.unset()
@@ -20,10 +20,13 @@ package org.apache.spark.sql.execution.datasources.parquet
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

import org.apache.parquet.io.ParquetDecodingException
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -382,6 +385,58 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}

// =======================================
// Tests for parquet schema mismatch error
// =======================================
def testSchemaMismatch(path: String, vectorizedReaderEnabled: Boolean): SparkException = {
import testImplicits._

var e: SparkException = null
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReaderEnabled.toString) {
// Create two parquet files with different schemas in the same folder
Seq(("bcd", 2)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet(s"$path/parquet")
Seq((1, "abc")).toDF("a", "b").coalesce(1).write.mode("append").parquet(s"$path/parquet")

e = intercept[SparkException] {
spark.read.parquet(s"$path/parquet").collect()
}
}
e
}

test("schema mismatch failure error message for parquet reader") {
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false)
val expectedMessage = "Encounter error while reading parquet files. " +
"One possible cause: Parquet column cannot be converted in the corresponding " +
"files. Details:"
assert(e.getCause.isInstanceOf[QueryExecutionException])
assert(e.getCause.getCause.isInstanceOf[ParquetDecodingException])
assert(e.getCause.getMessage.startsWith(expectedMessage))
}
}

test("schema mismatch failure error message for parquet vectorized reader") {
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
assert(e.getCause.isInstanceOf[QueryExecutionException])
assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])

// Check if the physical type is reporting correctly
val errMsg = e.getCause.getMessage
assert(errMsg.startsWith("Parquet column cannot be converted in file"))
val file = errMsg.substring("Parquet column cannot be converted in file ".length,
errMsg.indexOf(". "))
val col = spark.read.parquet(file).schema.fields.filter(_.name.equals("a"))
assert(col.length == 1)
if (col(0).dataType == StringType) {
assert(errMsg.contains("Column: [a], Expected: IntegerType, Found: BINARY"))
} else {
assert(errMsg.endsWith("Column: [a], Expected: StringType, Found: INT32"))
}
}
}

// =======================================================
// Tests for converting Parquet LIST to Catalyst ArrayType
// =======================================================

0 comments on commit ccc4a20

Please sign in to comment.
You can’t perform that action at this time.