Skip to content

Commit

Permalink
[SPARK-47564][SQL] Always throw FAILED_READ_FILE error when fail to r…
Browse files Browse the repository at this point in the history
…ead files

### What changes were proposed in this pull request?

This is a followup of #44953 to refine the newly added `FAILED_READ_FILE` error. It's better to always throw `FAILED_READ_FILE` error if anything goes wrong during file reading. This is more predictable and easier for users to do error handling. This PR adds sub error classes to `FAILED_READ_FILE` so that users can know what went wrong quicker.

### Why are the changes needed?

better error reporting

### Does this PR introduce _any_ user-facing change?

no, `FAILED_READ_FILE` is not released yet.

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #45723 from cloud-fan/error.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
2 people authored and MaxGekk committed Mar 27, 2024
1 parent 87449c3 commit d10dbaa
Show file tree
Hide file tree
Showing 29 changed files with 272 additions and 215 deletions.
50 changes: 25 additions & 25 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,6 @@
],
"sqlState" : "22007"
},
"CANNOT_READ_FILE_FOOTER" : {
"message" : [
"Could not read footer for file: <file>. Please ensure that the file is in either ORC or Parquet format.",
"If not, please convert it to a valid format. If the file is in the valid format, please check if it is corrupt.",
"If it is, you can choose to either ignore it or fix the corruption."
],
"sqlState" : "KD001"
},
"CANNOT_RECOGNIZE_HIVE_TYPE" : {
"message" : [
"Cannot recognize hive type string: <fieldType>, column: <fieldName>. The specified data type for the field cannot be recognized by Spark SQL. Please check the data type of the specified field and ensure that it is a valid Spark SQL data type. Refer to the Spark SQL documentation for a list of valid data types and their format. If the data type is correct, please ensure that you are using a supported version of Spark SQL."
Expand Down Expand Up @@ -1257,6 +1249,31 @@
"message" : [
"Encountered error while reading file <path>."
],
"subClass" : {
"CANNOT_READ_FILE_FOOTER" : {
"message" : [
"Could not read footer. Please ensure that the file is in either ORC or Parquet format.",
"If not, please convert it to a valid format. If the file is in the valid format, please check if it is corrupt.",
"If it is, you can choose to either ignore it or fix the corruption."
]
},
"FILE_NOT_EXIST" : {
"message" : [
"File does not exist. It is possible the underlying files have been updated.",
"You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved."
]
},
"NO_HINT" : {
"message" : [
""
]
},
"PARQUET_COLUMN_DATA_TYPE_MISMATCH" : {
"message" : [
"Data type mismatches when reading Parquet column <column>. Expected Spark type <expectedType>, actual Parquet type <actualType>."
]
}
},
"sqlState" : "KD001"
},
"FAILED_REGISTER_CLASS_WITH_KRYO" : {
Expand Down Expand Up @@ -6119,12 +6136,6 @@
"buildReader is not supported for <format>."
]
},
"_LEGACY_ERROR_TEMP_2055" : {
"message" : [
"<message>",
"It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved."
]
},
"_LEGACY_ERROR_TEMP_2056" : {
"message" : [
"Unable to clear output directory <staticPrefixPath> prior to writing to it."
Expand Down Expand Up @@ -6157,17 +6168,6 @@
"No records should be returned from EmptyDataReader."
]
},
"_LEGACY_ERROR_TEMP_2062" : {
"message" : [
"<message>",
"It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by recreating the Dataset/DataFrame involved."
]
},
"_LEGACY_ERROR_TEMP_2063" : {
"message" : [
"Parquet column cannot be converted in file <filePath>. Column: <column>, Expected: <logicalType>, Found: <physicalType>."
]
},
"_LEGACY_ERROR_TEMP_2065" : {
"message" : [
"Cannot create columnar reader."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,14 @@ private[avro] class AvroOutputWriter(

override def write(row: InternalRow): Unit = {
val key = new AvroKey(serializer.serialize(row).asInstanceOf[GenericRecord])
recordWriter.write(key, NullWritable.get())
try {
recordWriter.write(key, NullWritable.get())
} catch {
// Unwrap the Avro `AppendWriteException` which is only used to work around the Java API
// signature (DataFileWriter#write) that only allows to throw `IOException`.
case e: org.apache.avro.file.DataFileWriter.AppendWriteException =>
throw e.getCause
}
}

override def close(): Unit = recordWriter.close(context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
val ex = intercept[SparkException] {
spark.read.format("avro").load(s"$dir.avro").collect()
}
assert(ex.getErrorClass == "FAILED_READ_FILE")
assert(ex.getErrorClass.startsWith("FAILED_READ_FILE"))
checkError(
exception = ex.getCause.asInstanceOf[SparkArithmeticException],
errorClass = "NUMERIC_VALUE_OUT_OF_RANGE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ abstract class AvroSuite
val ex = intercept[SparkException] {
spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect()
}
assert(ex.getErrorClass == "FAILED_READ_FILE")
assert(ex.getErrorClass.startsWith("FAILED_READ_FILE"))
checkError(
exception = ex.getCause.asInstanceOf[AnalysisException],
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
Expand Down Expand Up @@ -909,7 +909,7 @@ abstract class AvroSuite
val ex = intercept[SparkException] {
spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
}
assert(ex.getErrorClass == "FAILED_READ_FILE")
assert(ex.getErrorClass.startsWith("FAILED_READ_FILE"))
checkError(
exception = ex.getCause.asInstanceOf[AnalysisException],
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
Expand Down Expand Up @@ -946,7 +946,7 @@ abstract class AvroSuite
val ex = intercept[SparkException] {
spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
}
assert(ex.getErrorClass == "FAILED_READ_FILE")
assert(ex.getErrorClass.startsWith("FAILED_READ_FILE"))
checkError(
exception = ex.getCause.asInstanceOf[AnalysisException],
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
Expand Down
52 changes: 52 additions & 0 deletions docs/sql-error-conditions-failed-read-file-error-class.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
layout: global
title: FAILED_READ_FILE error class
displayTitle: FAILED_READ_FILE error class
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

<!--
DO NOT EDIT THIS FILE.
It was generated automatically by `org.apache.spark.SparkThrowableSuite`.
-->

SQLSTATE: KD001

Encountered error while reading file `<path>`.

This error class has the following derived error classes:

## CANNOT_READ_FILE_FOOTER

Could not read footer. Please ensure that the file is in either ORC or Parquet format.
If not, please convert it to a valid format. If the file is in the valid format, please check if it is corrupt.
If it is, you can choose to either ignore it or fix the corruption.

## FILE_NOT_EXIST

File does not exist. It is possible the underlying files have been updated.
You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

## NO_HINT



## PARQUET_COLUMN_DATA_TYPE_MISMATCH

Data type mismatches when reading Parquet column `<column>`. Expected Spark type `<expectedType>`, actual Parquet type `<actualType>`.


12 changes: 3 additions & 9 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,6 @@ Error parsing descriptor bytes into Protobuf FileDescriptorSet.

`<message>`. If necessary set `<ansiConfig>` to "false" to bypass this error.

### CANNOT_READ_FILE_FOOTER

SQLSTATE: KD001

Could not read footer for file: `<file>`. Please ensure that the file is in either ORC or Parquet format.
If not, please convert it to a valid format. If the file is in the valid format, please check if it is corrupt.
If it is, you can choose to either ignore it or fix the corruption.

### CANNOT_RECOGNIZE_HIVE_TYPE

[SQLSTATE: 429BB](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down Expand Up @@ -732,12 +724,14 @@ For more details see [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.

Failed parsing struct: `<raw>`.

### FAILED_READ_FILE
### [FAILED_READ_FILE](sql-error-conditions-failed-read-file-error-class.html)

SQLSTATE: KD001

Encountered error while reading file `<path>`.

For more details see [FAILED_READ_FILE](sql-error-conditions-failed-read-file-error-class.html)

### FAILED_REGISTER_CLASS_WITH_KRYO

SQLSTATE: KD000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.errors

import java.io.{File, FileNotFoundException, IOException}
import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
import java.net.{URISyntaxException, URL}
import java.time.DateTimeException
Expand All @@ -32,7 +32,7 @@ import org.codehaus.commons.compiler.{CompileException, InternalCompilerExceptio
import org.apache.spark._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.memory.SparkOutOfMemoryError
import org.apache.spark.sql.{AnalysisException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
Expand Down Expand Up @@ -747,12 +747,6 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
cause = cause)
}

def readCurrentFileNotFoundError(e: FileNotFoundException): SparkFileNotFoundException = {
new SparkFileNotFoundException(
errorClass = "_LEGACY_ERROR_TEMP_2055",
messageParameters = Map("message" -> e.getMessage))
}

def saveModeUnsupportedError(saveMode: Any, pathExists: Boolean): Throwable = {
val errorSubClass = if (pathExists) "EXISTENT_PATH" else "NON_EXISTENT_PATH"
new SparkIllegalArgumentException(
Expand Down Expand Up @@ -805,33 +799,34 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
cause = null)
}

def fileNotFoundError(e: FileNotFoundException): SparkFileNotFoundException = {
new SparkFileNotFoundException(
errorClass = "_LEGACY_ERROR_TEMP_2062",
messageParameters = Map("message" -> e.getMessage))
def fileNotExistError(path: String, e: Exception): Throwable = {
new SparkException(
errorClass = "FAILED_READ_FILE.FILE_NOT_EXIST",
messageParameters = Map("path" -> path),
cause = e)
}

def unsupportedSchemaColumnConvertError(
filePath: String,
def parquetColumnDataTypeMismatchError(
path: String,
column: String,
logicalType: String,
physicalType: String,
expectedType: String,
actualType: String,
e: Exception): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2063",
errorClass = "FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH",
messageParameters = Map(
"filePath" -> filePath,
"path" -> path,
"column" -> column,
"logicalType" -> logicalType,
"physicalType" -> physicalType),
"expectedType" -> expectedType,
"actualType" -> actualType),
cause = e)
}

def cannotReadFilesError(
e: Throwable,
path: String): Throwable = {
new SparkException(
errorClass = "FAILED_READ_FILE",
errorClass = "FAILED_READ_FILE.NO_HINT",
messageParameters = Map("path" -> path),
cause = e)
}
Expand Down Expand Up @@ -1014,8 +1009,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE

def cannotReadFooterForFileError(file: Path, e: Exception): Throwable = {
new SparkException(
errorClass = "CANNOT_READ_FILE_FOOTER",
messageParameters = Map("file" -> file.toString()),
errorClass = "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER",
messageParameters = Map("path" -> file.toString()),
cause = e)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,7 @@ abstract class FileFormatDataWriter(
if (count % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) {
CustomMetrics.updateMetrics(currentMetricsValues.toImmutableArraySeq, customMetrics)
}
try {
write(record)
} catch {
// Unwrap the Avro `AppendWriteException` which is only used to work around the Java API
// signature (DataFileWriter#write) that only allows to throw `IOException`.
case e: org.apache.avro.file.DataFileWriter.AppendWriteException =>
throw e.getCause
}
write(record)
}

/** Write an iterator of records. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, JoinedRow, Literal, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.types.PhysicalDataType
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.FileFormat._
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.execution.vectorized.{ColumnVectorUtils, ConstantColumnVector}
Expand Down Expand Up @@ -224,12 +223,7 @@ class FileScanRDD(
}

private def readCurrentFile(): Iterator[InternalRow] = {
try {
readFunction(currentFile)
} catch {
case e: FileNotFoundException =>
throw QueryExecutionErrors.readCurrentFileNotFoundError(e)
}
readFunction(currentFile)
}

/** Advances to the next file. Returns true if a new non-empty iterator is available. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import java.io.FileNotFoundException
import java.util

import scala.jdk.CollectionConverters._
Expand All @@ -26,7 +27,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkException, SparkFileNotFoundException, SparkUpgradeException}
import org.apache.spark.{SparkException, SparkUpgradeException}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
Expand Down Expand Up @@ -124,22 +125,21 @@ object FileDataSourceV2 {

def attachFilePath(filePath: => String, ex: Throwable): Throwable = {
ex match {
// This is not a file issue, throw it directly and ask users to handle the upgrade issue.
case sue: SparkUpgradeException =>
throw sue
// The error is already FAILED_READ_FILE, throw it directly. To be consistent, schema
// inference code path throws `FAILED_READ_FILE`, but the file reading code path can reach
// that code path as well and we should not double-wrap the error.
case e: SparkException if e.getErrorClass == "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER" =>
throw e
case e: SchemaColumnConvertNotSupportedException =>
throw QueryExecutionErrors.unsupportedSchemaColumnConvertError(
throw QueryExecutionErrors.parquetColumnDataTypeMismatchError(
filePath, e.getColumn, e.getLogicalType, e.getPhysicalType, e)
case sue: SparkUpgradeException => throw sue
// the following exceptions already contains file path, we don't need to wrap it with
// `QueryExecutionErrors.cannotReadFilesError` to provide the file path.
case e: SparkFileNotFoundException => throw e
case e: SparkException if e.getErrorClass == "CANNOT_READ_FILE_FOOTER" => throw e
case e: FileNotFoundException =>
throw QueryExecutionErrors.fileNotExistError(filePath, e)
case NonFatal(e) =>
// TODO: do we need to check the cause?
e.getCause match {
case sue: SparkUpgradeException => throw sue
case e: SparkFileNotFoundException => throw e
case e: SparkException if e.getErrorClass == "CANNOT_READ_FILE_FOOTER" => throw e
case _ => throw QueryExecutionErrors.cannotReadFilesError(e, filePath)
}
throw QueryExecutionErrors.cannotReadFilesError(e, filePath)
}
}
}
Loading

0 comments on commit d10dbaa

Please sign in to comment.