diff --git a/native/common/src/error.rs b/native/common/src/error.rs index 1e2d7db9c4..4453d76d8c 100644 --- a/native/common/src/error.rs +++ b/native/common/src/error.rs @@ -215,6 +215,15 @@ pub enum SparkError { spark_type: String, }, + /// A per-file read failure (corrupt footer/page, truncated/empty file, deleted file) raised by + /// the native parquet reader / object_store. Classified by typed `DataFusionError` variant (no + /// message matching) and translated by the JVM shim into Spark's `FAILED_READ_FILE` + /// (`QueryExecutionErrors.cannotReadFilesError`). `file_path` may be empty when the underlying + /// error doesn't carry it (only `object_store::Error::NotFound` does); the JVM side then fills + /// it from the per-task file list. + #[error("Encountered error while reading file {file_path}: {message}")] + CannotReadFile { file_path: String, message: String }, + #[error("ArrowError: {0}.")] Arrow(Arc), @@ -291,6 +300,7 @@ impl SparkError { SparkError::DuplicateFieldByFieldId { .. } => "DuplicateFieldByFieldId", SparkError::ParquetMissingFieldIds => "ParquetMissingFieldIds", SparkError::ParquetSchemaConvert { .. } => "ParquetSchemaConvert", + SparkError::CannotReadFile { .. } => "CannotReadFile", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", } @@ -528,6 +538,12 @@ impl SparkError { "sparkType": spark_type, }) } + SparkError::CannotReadFile { file_path, message } => { + serde_json::json!({ + "filePath": file_path, + "message": message, + }) + } SparkError::Arrow(e) => { serde_json::json!({ "message": e.to_string(), @@ -617,6 +633,10 @@ impl SparkError { "org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException" } + // CannotReadFile - converted to a FAILED_READ_FILE SparkException by the shim + // (QueryExecutionErrors.cannotReadFilesError). + SparkError::CannotReadFile { .. } => "org/apache/spark/SparkException", + // Generic errors SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException", } @@ -707,6 +727,10 @@ impl SparkError { // SparkException error class, so no error class is exposed here. SparkError::ParquetSchemaConvert { .. } => None, + // CannotReadFile — the JVM shim wraps it via cannotReadFilesError, which supplies the + // FAILED_READ_FILE error class, so none is exposed here. + SparkError::CannotReadFile { .. } => None, + // Generic errors (no error class) SparkError::Arrow(_) | SparkError::Internal(_) => None, } diff --git a/native/jni-bridge/src/errors.rs b/native/jni-bridge/src/errors.rs index 7bf4073c8d..1971d79c41 100644 --- a/native/jni-bridge/src/errors.rs +++ b/native/jni-bridge/src/errors.rs @@ -490,7 +490,7 @@ fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option) // Handle DataFusion errors containing SparkError or SparkErrorWithContext CometError::DataFusion { msg: _, - source: DataFusionError::External(e), + source: df_error @ DataFusionError::External(e), } => { if let Some(spark_error_with_ctx) = e.downcast_ref::() { let json_message = spark_error_with_ctx.to_json(); @@ -504,45 +504,10 @@ fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option) jni::jni_str!("org/apache/comet/exceptions/CometQueryExecutionException"), JNIString::new(json_message), ) - } else { - // Check for file-not-found errors from object store - let error_msg = e.to_string(); - if error_msg.contains("not found") - && error_msg.contains("No such file or directory") - { - let spark_error = SparkError::FileNotFound { message: error_msg }; - throw_spark_error_as_json(env, &spark_error) - } else { - // Not a SparkError, use generic exception - let exception = error.to_exception(); - match backtrace { - Some(backtrace_string) => env.throw_new( - JNIString::new(exception.class), - JNIString::new( - to_stacktrace_string(exception.msg, backtrace_string).unwrap(), - ), - ), - _ => env.throw_new( - JNIString::new(exception.class), - JNIString::new(exception.msg), - ), - } - } - } - } - // Handle direct SparkError - serialize to JSON - CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error), - _ => { - let error_msg = error.to_string(); - // Check for file-not-found errors that may arrive through other wrapping paths - if error_msg.contains("not found") - && error_msg.contains("No such file or directory") - { - let spark_error = SparkError::FileNotFound { message: error_msg }; - throw_spark_error_as_json(env, &spark_error) - } else if let Some(spark_error) = try_convert_duplicate_field_error(&error_msg) { + } else if let Some(spark_error) = try_classify_file_read_error(df_error) { throw_spark_error_as_json(env, &spark_error) } else { + // Not a SparkError, use generic exception let exception = error.to_exception(); match backtrace { Some(backtrace_string) => env.throw_new( @@ -558,10 +523,52 @@ fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option) } } } + // Typed file-read errors (corrupt/truncated parquet, object_store) raised by the native + // scan -- classified by DataFusionError variant, not message text -- surfaced as + // FAILED_READ_FILE / FileNotFound via the structured SparkError channel. Anything else + // falls back to generic handling. + CometError::DataFusion { msg: _, source } => { + if let Some(spark_error) = try_classify_file_read_error(source) { + throw_spark_error_as_json(env, &spark_error) + } else { + throw_generic_exception(env, error, backtrace) + } + } + // Handle direct SparkError - serialize to JSON + CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error), + _ => throw_generic_exception(env, error, backtrace), }; } } +/// Generic fallback throw for an error that isn't a structured `SparkError`. Recognises a +/// file-not-found arriving through non-typed wrapping paths and duplicate-field errors; otherwise +/// throws the error's natural JVM exception (with the captured backtrace when available). +fn throw_generic_exception( + env: &mut Env, + error: &CometError, + backtrace: Option, +) -> jni::errors::Result<()> { + let error_msg = error.to_string(); + // A file-not-found that arrived through a non-typed wrapping path (the typed classification + // is handled by `try_classify_file_read_error`). + if error_msg.contains("not found") && error_msg.contains("No such file or directory") { + let spark_error = SparkError::FileNotFound { message: error_msg }; + throw_spark_error_as_json(env, &spark_error) + } else if let Some(spark_error) = try_convert_duplicate_field_error(&error_msg) { + throw_spark_error_as_json(env, &spark_error) + } else { + let exception = error.to_exception(); + match backtrace { + Some(backtrace_string) => env.throw_new( + JNIString::new(exception.class), + JNIString::new(to_stacktrace_string(exception.msg, backtrace_string).unwrap()), + ), + _ => env.throw_new(JNIString::new(exception.class), JNIString::new(exception.msg)), + } + } +} + /// Throws a CometQueryExecutionException with JSON-encoded SparkError fn throw_spark_error_as_json(env: &mut Env, spark_error: &SparkError) -> jni::errors::Result<()> { // Serialize error to JSON @@ -574,6 +581,106 @@ fn throw_spark_error_as_json(env: &mut Env, spark_error: &SparkError) -> jni::er ) } +/// Classify a `DataFusionError` as a per-file read failure by TYPED variant (not message text), +/// returning `SparkError::CannotReadFile` if so. This is the structured replacement for the +/// previous JVM-side substring matching on error prose. +/// +/// A file-read failure is any of: +/// - `ParquetError` (corrupt footer/page, EOF, "failed to fill whole buffer", etc.) +/// - `ObjectStore` (truncated/empty/deleted file, range errors) -- `NotFound` carries the path +/// - `ArrowError`, when it wraps a `ParquetError` (the parquet reader surfaces some failures as +/// `ArrowError::ParquetError`) +/// - `IoError` (filesystem read failures) +/// +/// `Context`/`Shared` wrappers are unwrapped recursively. Note we do NOT match `Execution`/ +/// `Internal`/`External`-string or `object_store::Error::Generic`: those also carry non-file +/// errors (e.g. "Hdfs support is not enabled in this build") that must surface as-is. +/// +/// `file_path` is populated from `object_store::Error::NotFound { path, .. }` when available; +/// otherwise it is left empty and the JVM side fills it from the per-task file list. +fn try_classify_file_read_error(error: &DataFusionError) -> Option { + use datafusion::common::DataFusionError as DFE; + match error { + // A genuinely-missing file (object_store NotFound) is distinct from a corrupt/truncated + // one: Spark surfaces it as `readCurrentFileNotFoundError` ("It is possible the underlying + // files have been updated."), not `cannotReadFilesError`. The NotFound may arrive directly + // (`DFE::ObjectStore`) or wrapped by the parquet reader as `ParquetError::External(..)`, so + // inspect the source chain. Delta's CDC-after-VACUUM read depends on this distinction. + DFE::ParquetError(pe) if source_chain_has_object_store_not_found(pe.as_ref()) => { + Some(SparkError::FileNotFound { + message: error.to_string(), + }) + } + // NB: only ParquetError / ObjectStore / ArrowError(ParquetError) are treated as file reads. + // A bare `IoError` is intentionally NOT classified here: a scan surfaces read failures as a + // typed ParquetError or ObjectStore error, whereas an `IoError` can also originate from + // non-scan paths (spill, shuffle), which must not be relabelled FAILED_READ_FILE. + DFE::ParquetError(_) => Some(SparkError::CannotReadFile { + file_path: String::new(), + message: cannot_read_file_message(error), + }), + DFE::ObjectStore(e) => match e.as_ref() { + datafusion::object_store::Error::NotFound { .. } => Some(SparkError::FileNotFound { + message: error.to_string(), + }), + _ => Some(SparkError::CannotReadFile { + file_path: String::new(), + message: cannot_read_file_message(error), + }), + }, + // The parquet reader sometimes surfaces a failure as ArrowError::ParquetError. + DFE::ArrowError(e, _) => match e.as_ref() { + ArrowError::ParquetError(_) if source_chain_has_object_store_not_found(e.as_ref()) => { + Some(SparkError::FileNotFound { + message: error.to_string(), + }) + } + ArrowError::ParquetError(_) => Some(SparkError::CannotReadFile { + file_path: String::new(), + message: cannot_read_file_message(error), + }), + _ => None, + }, + // Unwrap context/shared wrappers and re-classify the inner error. + DFE::Context(_, inner) => try_classify_file_read_error(inner), + DFE::Shared(inner) => try_classify_file_read_error(inner), + _ => None, + } +} + +/// True if `err` or any error in its `source()` chain is an `object_store` `NotFound` -- i.e. a +/// genuinely-missing file. Used to tell a missing file apart from a corrupt/truncated one: the +/// parquet reader wraps the object_store error as `ParquetError::External(..)`, so the typed +/// `NotFound` is only reachable by walking the source chain (we match the typed variant, never the +/// message text). +fn source_chain_has_object_store_not_found(err: &(dyn std::error::Error + 'static)) -> bool { + let mut current: Option<&(dyn std::error::Error + 'static)> = Some(err); + while let Some(e) = current { + if let Some(os) = e.downcast_ref::() { + if matches!(os, datafusion::object_store::Error::NotFound { .. }) { + return true; + } + } + current = e.source(); + } + false +} + +/// Build the message for a `CannotReadFile` error. parquet-rs reports a bad magic / unreadable +/// footer as `"Invalid Parquet file. Corrupt footer"`, whereas Spark's own reader (and Spark's +/// `ParquetQuerySuite`) phrase it as `" is not a Parquet file"`. Append Spark's phrasing so +/// the cause carries it; the outer `cannotReadFilesError` wrapper ("Encountered error while reading +/// file …") is unchanged, so this composes with Spark's tests without changing the FAILED_READ_FILE +/// wrapping. Other read failures (corrupt pages, EOF, IO) keep their native message verbatim. +fn cannot_read_file_message(error: &DataFusionError) -> String { + let msg = error.to_string(); + if msg.contains("Invalid Parquet file") && !msg.contains("is not a Parquet file") { + format!("{msg} (file is not a Parquet file)") + } else { + msg + } +} + /// Try to convert a DataFusion "Unable to get field named" error into a SparkError. /// DataFusion produces this error when reading Parquet files with duplicate field names /// in case-insensitive mode. For example, if a Parquet file has columns "B" and "b", @@ -1101,4 +1208,172 @@ mod tests { // first line. assert_starts_with!(msg_rust, expected_message); } + + // --- try_classify_file_read_error: typed classification of file-read DataFusionErrors --- + // These guard the variant matching that replaced JVM-side error-message string matching. They + // need no JVM (pure DataFusionError -> Option), so they also run under miri. + + use datafusion::common::DataFusionError; + + fn file_path_of(err: &SparkError) -> &str { + match err { + SparkError::CannotReadFile { file_path, .. } => file_path, + other => panic!("expected CannotReadFile, got {other:?}"), + } + } + + #[test] + fn classify_parquet_error_is_file_read() { + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::General( + "corrupt footer".to_string(), + ))); + let classified = try_classify_file_read_error(&e); + assert!( + classified.is_some(), + "ParquetError should classify as file-read" + ); + // No path available from a bare ParquetError; JVM fills it from the per-task list. + assert_eq!(file_path_of(&classified.unwrap()), ""); + } + + #[test] + fn classify_arrow_parquet_error_is_file_read() { + // arrow-rs surfaces some parquet read failures as ArrowError::ParquetError. + let e = DataFusionError::ArrowError( + Box::new(ArrowError::ParquetError( + "failed to fill whole buffer".to_string(), + )), + None, + ); + assert!( + try_classify_file_read_error(&e).is_some(), + "ArrowError(ParquetError) should classify as file-read" + ); + } + + #[test] + fn classify_object_store_not_found_is_file_not_found() { + // A genuinely-missing file must classify as FileNotFound (-> readCurrentFileNotFoundError + // on the JVM side), NOT CannotReadFile (-> cannotReadFilesError). The path is carried in + // the message; the JVM shim extracts it. + let e = DataFusionError::ObjectStore(Box::new(datafusion::object_store::Error::NotFound { + path: "file:/tmp/data/part-3.parquet".to_string(), + source: "missing".into(), + })); + match try_classify_file_read_error(&e) { + Some(SparkError::FileNotFound { message }) => { + assert!(message.contains("part-3.parquet"), "message was: {message}") + } + other => panic!("expected FileNotFound, got {other:?}"), + } + } + + #[test] + fn classify_parquet_error_wrapping_not_found_is_file_not_found() { + // The parquet reader surfaces a missing data file as ParquetError::External wrapping an + // object_store NotFound (e.g. a Delta CDC file removed by VACUUM). The NotFound is only + // reachable through the source chain, but it must still classify as FileNotFound. + let os = datafusion::object_store::Error::NotFound { + path: "file:/tmp/t/_change_data/cdc.parquet".to_string(), + source: "missing".into(), + }; + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::External( + Box::new(os), + ))); + match try_classify_file_read_error(&e) { + Some(SparkError::FileNotFound { .. }) => {} + other => { + panic!("expected FileNotFound for a NotFound-wrapping ParquetError, got {other:?}") + } + } + } + + #[test] + fn classify_corrupt_parquet_error_stays_cannot_read_file() { + // A corrupt/truncated file (no NotFound in the chain) must remain CannotReadFile + // (-> FAILED_READ_FILE), unchanged by the FileNotFound carve-out. + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::General( + "corrupt footer".to_string(), + ))); + match try_classify_file_read_error(&e) { + Some(SparkError::CannotReadFile { .. }) => {} + other => panic!("expected CannotReadFile for a corrupt parquet error, got {other:?}"), + } + } + + #[test] + fn classify_invalid_parquet_file_carries_spark_wording() { + // parquet-rs reports a bad magic / unreadable footer as "Invalid Parquet file. Corrupt + // footer"; Spark's ParquetQuerySuite asserts the cause says "is not a Parquet file". The + // CannotReadFile message must carry that phrasing. + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::General( + "Invalid Parquet file. Corrupt footer".to_string(), + ))); + match try_classify_file_read_error(&e) { + Some(SparkError::CannotReadFile { message, .. }) => assert!( + message.contains("is not a Parquet file"), + "message was: {message}" + ), + other => panic!("expected CannotReadFile, got {other:?}"), + } + } + + #[test] + fn classify_other_parquet_error_keeps_native_message() { + // A non-magic parquet failure (e.g. corrupt page data) keeps its native message verbatim + // -- the "is not a Parquet file" phrasing is only added for the magic/footer case. + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::General( + "could not decode page header".to_string(), + ))); + match try_classify_file_read_error(&e) { + Some(SparkError::CannotReadFile { message, .. }) => assert!( + !message.contains("is not a Parquet file"), + "message should not be augmented: {message}" + ), + other => panic!("expected CannotReadFile, got {other:?}"), + } + } + + #[test] + fn classify_bare_io_error_is_not_file_read() { + // A bare IoError is not a scan read failure (scans surface ParquetError/ObjectStore); it + // can come from spill/shuffle, so it must NOT be classified as FAILED_READ_FILE. + let e = DataFusionError::IoError(io::Error::new(io::ErrorKind::UnexpectedEof, "eof")); + assert!(try_classify_file_read_error(&e).is_none()); + } + + #[test] + fn classify_unwraps_context_and_shared() { + let inner = DataFusionError::ParquetError(Box::new( + parquet::errors::ParquetError::General("corrupt".to_string()), + )); + let ctx = DataFusionError::Context("reading file".to_string(), Box::new(inner)); + assert!( + try_classify_file_read_error(&ctx).is_some(), + "Context-wrapped ParquetError should classify" + ); + let shared = DataFusionError::Shared(Arc::new(DataFusionError::ObjectStore(Box::new( + datafusion::object_store::Error::NotFound { + path: "p".to_string(), + source: "x".into(), + }, + )))); + assert!( + try_classify_file_read_error(&shared).is_some(), + "Shared-wrapped ObjectStore error should classify" + ); + } + + #[test] + fn classify_non_file_errors_are_not_file_read() { + // Execution / Internal errors (and object_store Generic config errors, which arrive as + // Execution strings) must NOT be masked as file-read failures. + assert!(try_classify_file_read_error(&DataFusionError::Execution( + "Hdfs support is not enabled in this build".to_string() + )) + .is_none()); + assert!( + try_classify_file_read_error(&DataFusionError::Internal("bug".to_string())).is_none() + ); + } } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 6140eca553..dd37b679d1 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -68,7 +68,8 @@ class CometExecIterator( partitionIndex: Int, broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None, encryptedFilePaths: Seq[String] = Seq.empty, - shuffleBlockIterators: Map[Int, CometShuffleBlockIterator] = Map.empty) + shuffleBlockIterators: Map[Int, CometShuffleBlockIterator] = Map.empty, + taskFilePaths: Seq[String] = Seq.empty) extends Iterator[ColumnarBatch] with Logging { @@ -169,29 +170,14 @@ class CometExecIterator( // Handle CometQueryExecutionException with JSON payload first case e: CometQueryExecutionException => logError(s"Native execution for task $taskAttemptId failed", e) - throw SparkErrorConverter.convertToSparkException(e) + throw SparkErrorConverter.convertToSparkException(e, taskFilePaths) case e: CometNativeException => // it is generally considered bad practice to log and then rethrow an // exception, but it really helps debugging to be able to see which task // threw the exception, so we log the exception with taskAttemptId here logError(s"Native execution for task $taskAttemptId failed", e) - - val parquetError: scala.util.matching.Regex = - """^Parquet error: (?:.*)$""".r - e.getMessage match { - case parquetError() => - // See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError - // See org.apache.parquet.hadoop.ParquetFileReader for error message. - // _LEGACY_ERROR_TEMP_2254 has no message placeholders; Spark 4 strict-checks - // parameters and raises INTERNAL_ERROR if any are passed. - throw new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2254", - messageParameters = Map.empty, - cause = new SparkException("File is not a Parquet file.", e)) - case _ => - throw e - } + throw e case e: Throwable => throw e } diff --git a/spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala b/spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala index 36059684c9..a6bc21aca7 100644 --- a/spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala +++ b/spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala @@ -69,7 +69,9 @@ object SparkErrorConverter extends ShimSparkErrorConverter { * @return * the corresponding Spark exception, or the original exception if parsing fails */ - def convertToSparkException(e: CometQueryExecutionException): Throwable = { + def convertToSparkException( + e: CometQueryExecutionException, + taskFilePaths: Seq[String] = Seq.empty): Throwable = { try { if (!e.isJsonMessage()) { // Not JSON, return original exception @@ -83,7 +85,18 @@ object SparkErrorConverter extends ShimSparkErrorConverter { val json = parse(e.getMessage) val errorJson = json.extract[ErrorJson] - val params = errorJson.params.getOrElse(Map.empty) + val rawParams = errorJson.params.getOrElse(Map.empty) + // CannotReadFile carries the offending file path natively only for the object_store NotFound + // case; for corrupt/truncated parquet the native error has no path, so fall back to the + // per-task file list threaded in from CometExecIterator. + val params = + if (errorJson.errorType == "CannotReadFile" + && rawParams.get("filePath").forall(p => p == null || p.toString.isEmpty) + && taskFilePaths.nonEmpty) { + rawParams + ("filePath" -> taskFilePaths.mkString(",")) + } else { + rawParams + } val errorClass = errorJson.errorClass.map(_.trim).filter(_.nonEmpty).getOrElse(UNKNOWN_ERROR) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala index 47eda98a11..1b728aa3a0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala @@ -36,7 +36,8 @@ import org.apache.comet.serde.OperatorOuterClass private[spark] class CometExecPartition( override val index: Int, val inputPartitions: Array[Partition], - val planDataByKey: Map[String, Array[Byte]]) + val planDataByKey: Map[String, Array[Byte]], + val filePaths: Seq[String] = Seq.empty) extends Partition /** @@ -66,7 +67,8 @@ private[spark] class CometExecRDD( subqueries: Seq[ScalarSubquery], broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None, encryptedFilePaths: Seq[String] = Seq.empty, - shuffleScanIndices: Set[Int] = Set.empty) + shuffleScanIndices: Set[Int] = Set.empty, + @transient perPartitionFilePaths: Array[Seq[String]] = Array.empty) extends RDD[ColumnarBatch](sc, inputRDDs.map(rdd => new OneToOneDependency(rdd))) { // Determine partition count: from inputs if available, otherwise from parameter @@ -90,7 +92,9 @@ private[spark] class CometExecRDD( (0 until numPartitions).map { idx => val inputParts = inputRDDs.map(_.partitions(idx)).toArray val planData = perPartitionByKey.map { case (key, arr) => key -> arr(idx) } - new CometExecPartition(idx, inputParts, planData) + val fp = + if (perPartitionFilePaths.length > idx) perPartitionFilePaths(idx) else Seq.empty[String] + new CometExecPartition(idx, inputParts, planData, fp) }.toArray } @@ -130,7 +134,8 @@ private[spark] class CometExecRDD( partition.index, broadcastedHadoopConfForEncryption, encryptedFilePaths, - shuffleBlockIters) + shuffleBlockIters, + taskFilePaths = partition.filePaths) // Register ScalarSubqueries so native code can look them up subqueries.foreach(sub => CometScalarSubquery.setSubquery(it.id, sub)) @@ -179,7 +184,8 @@ object CometExecRDD { subqueries: Seq[ScalarSubquery], broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None, encryptedFilePaths: Seq[String] = Seq.empty, - shuffleScanIndices: Set[Int] = Set.empty): CometExecRDD = { + shuffleScanIndices: Set[Int] = Set.empty, + perPartitionFilePaths: Array[Seq[String]] = Array.empty): CometExecRDD = { // scalastyle:on new CometExecRDD( @@ -194,6 +200,7 @@ object CometExecRDD { subqueries, broadcastedHadoopConfForEncryption, encryptedFilePaths, - shuffleScanIndices) + shuffleScanIndices, + perPartitionFilePaths) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index b9fc47c5c8..0ce8547563 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -154,7 +154,8 @@ case class CometNativeScanExec( * all files for all partitions in the driver, we serialize only common metadata (once) and each * partition's files (lazily, as tasks are scheduled). */ - @transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = { + @transient private lazy val serializedPartitionData + : (Array[Byte], Array[Array[Byte]], Array[Seq[String]]) = { // Outer partitionFilters (wrapper) DPP is resolved by Spark's standard // prepare -> waitForSubqueries lifecycle, triggered explicitly via // CometLeafExec.ensureSubqueriesResolved called from @@ -225,13 +226,20 @@ case class CometNativeScanExec( partitionNativeScan.toByteArray }.toArray - (commonBytes, perPartitionBytes) + // File paths per partition -- threaded through CometExecRDD to CometExecIterator so a native + // CannotReadFile error that lacks a path (corrupt/truncated parquet) can be surfaced as + // FAILED_READ_FILE naming the actual file (see SparkErrorConverter.convertToSparkException). + val perPartitionPaths = filePartitions.map(_.files.map(_.filePath.toString).toSeq).toArray + + (commonBytes, perPartitionBytes, perPartitionPaths) } def commonData: Array[Byte] = serializedPartitionData._1 def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2 + def perPartitionFilePaths: Array[Seq[String]] = serializedPartitionData._3 + override def doExecuteColumnar(): RDD[ColumnarBatch] = { val nativeMetrics = CometMetricNode.fromCometPlan(this) val serializedPlan = CometExec.serializeNativePlan(nativeOp) @@ -259,7 +267,8 @@ case class CometNativeScanExec( nativeMetrics, Seq.empty, broadcastedHadoopConfForEncryption, - encryptedFilePaths) { + encryptedFilePaths, + perPartitionFilePaths = perPartitionFilePaths) { override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { val res = super.compute(split, context) diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 5d261493fb..09ac063cd2 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -341,6 +341,16 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.readCurrentFileNotFoundError( new FileNotFoundException(s"File $path does not exist"))) + case "CannotReadFile" => + // A per-file read failure of a readable-but-broken file (corrupt/truncated parquet, + // object_store, IO) classified by typed DataFusionError variant on the native side. Wrap + // in the FAILED_READ_FILE SparkException Spark itself produces when its own parquet reader + // fails. (A genuinely-missing file is "FileNotFound" above.) `filePath` is filled by + // SparkErrorConverter from the per-task file list when the native error carried none. + val message = params.get("message").map(_.toString).getOrElse("") + val filePath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath)) + case _ => None } diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 2c803cab6d..c502e4d55d 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -336,6 +336,16 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.readCurrentFileNotFoundError( new FileNotFoundException(s"File $path does not exist"))) + case "CannotReadFile" => + // A per-file read failure (corrupt/truncated/deleted parquet, object_store, IO) classified + // by typed DataFusionError variant on the native side. Wrap in the FAILED_READ_FILE + // SparkException Spark itself produces when its own parquet reader fails. `filePath` is + // supplied by the native object_store NotFound error or, when empty, filled by + // SparkErrorConverter from the per-task file list. + val message = params.get("message").map(_.toString).getOrElse("") + val filePath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath)) + case _ => None } diff --git a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index ad5481c377..874a6af97c 100644 --- a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -351,6 +351,16 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .fileNotExistError(path, new FileNotFoundException(s"File $path does not exist"))) + case "CannotReadFile" => + // A per-file read failure (corrupt/truncated/deleted parquet, object_store, IO) classified + // by typed DataFusionError variant on the native side. Wrap in the FAILED_READ_FILE + // SparkException Spark itself produces when its own parquet reader fails. `filePath` is + // supplied by the native object_store NotFound error or, when empty, filled by + // SparkErrorConverter from the per-task file list. + val message = params.get("message").map(_.toString).getOrElse("") + val filePath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath)) + case _ => // Unknown error type - return None to trigger fallback None diff --git a/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala b/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala index d3e2c2c64b..3b81a76ead 100644 --- a/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala @@ -22,6 +22,51 @@ package org.apache.comet import org.scalatest.funsuite.AnyFunSuite class SparkErrorConverterSuite extends AnyFunSuite { + + test("CannotReadFile converts to a FAILED_READ_FILE SparkException naming the file") { + val ex = SparkErrorConverter + .convertErrorType( + "CannotReadFile", + "", + Map( + "filePath" -> "file:/tmp/data/part-0.parquet", + "message" -> "Parquet error: bad footer"), + Array.empty, + null) + .getOrElse(fail("Expected CannotReadFile to be converted to a Spark exception")) + // `cannotReadFilesError` IS the FAILED_READ_FILE path. Assert on the version-stable message + // ("Encountered error while reading file ...") rather than the `FAILED_READ_FILE` literal, + // which only Spark 4.x prepends to getMessage as the error-class tag (3.4/3.5 do not). + assert(ex.getMessage.contains("Encountered error while reading file")) + assert(ex.getMessage.contains("part-0.parquet")) + } + + test("CannotReadFile with empty native path falls back to the per-task file list") { + // The native error (e.g. corrupt parquet) carries no path; convertToSparkException must fill + // it from the per-task file list threaded in from CometExecIterator. + val json = + """{"errorType":"CannotReadFile","errorClass":"",""" + + """"params":{"filePath":"","message":"Parquet error: bad footer"}}""" + val ex = SparkErrorConverter.convertToSparkException( + new org.apache.comet.exceptions.CometQueryExecutionException(json), + taskFilePaths = Seq("file:/tmp/data/part-7.parquet")) + // Version-stable assertion (see above): only Spark 4.x renders the FAILED_READ_FILE class tag. + assert(ex.getMessage.contains("Encountered error while reading file")) + assert(ex.getMessage.contains("part-7.parquet")) + } + + test("CannotReadFile prefers the native path over the per-task file list") { + // When the native error supplied a path, keep it rather than the fallback list. + val json = + """{"errorType":"CannotReadFile","errorClass":"",""" + + """"params":{"filePath":"file:/tmp/data/native.parquet","message":"Parquet error: corrupt footer"}}""" + val ex = SparkErrorConverter.convertToSparkException( + new org.apache.comet.exceptions.CometQueryExecutionException(json), + taskFilePaths = Seq("file:/tmp/data/fallback.parquet")) + assert(ex.getMessage.contains("native.parquet")) + assert(!ex.getMessage.contains("fallback.parquet")) + } + private def castOverflowError(fromType: String, value: String): Throwable = { SparkErrorConverter .convertErrorType( diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index a1460427c0..9499577e47 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3974,6 +3974,78 @@ class CometExecSuite extends CometTestBase { } } + test("native parquet read failure surfaces as FAILED_READ_FILE with the file path") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "corrupt.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, 1000) + // Corrupt column/page data in the middle of the file while leaving the footer intact, so + // Spark's JVM-side footer pre-check passes during planning and the native DataFusion reader + // fails during execution -- the path CometExecIterator must wrap as FAILED_READ_FILE. + val f = new java.io.File(new java.net.URI(path.toString)) + val raf = new java.io.RandomAccessFile(f, "rw") + val len = raf.length() + raf.seek(8) // after the "PAR1" magic header, before the footer + raf.write(Array.fill[Byte](math.min(2048, (len / 2).toInt))(0xff.toByte)) + raf.close() + + withSQLConf(CometConf.COMET_ENABLED.key -> "true") { + val e = intercept[Throwable] { + spark.read.parquet(path.toString).collect() + } + // Spark reports its own per-file read failures as FAILED_READ_FILE carrying the path. + // Comet's native scan must do the same instead of leaking a raw CometNativeException. + val messages = Iterator + .iterate(e: Throwable)(_.getCause) + .takeWhile(_ != null) + .map(t => s"${t.getClass.getName}: ${t.getMessage}") + .toList + val chain = messages.mkString("\n ") + // `cannotReadFilesError` is the FAILED_READ_FILE path. Its message is version-stable + // ("Encountered error while reading file ..."); only Spark 4.x prepends the + // `[FAILED_READ_FILE.NO_HINT]` error-class tag, so assert on the stable substring. + assert( + messages.exists(m => m.contains("Encountered error while reading file")), + s"Expected a FAILED_READ_FILE (cannotReadFilesError) in the cause chain, but got:\n $chain") + assert( + messages.exists(m => m.contains("corrupt.parquet")), + s"Expected the offending file path in the cause chain, but got:\n $chain") + } + } + } + + test("native parquet read of a missing file surfaces readCurrentFileNotFoundError") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "missing.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, 1000) + val f = new java.io.File(new java.net.URI(path.toString)) + + withSQLConf(CometConf.COMET_ENABLED.key -> "true") { + // Read the schema (footer) while the file exists, then delete it so it is MISSING at + // execution time -- mirroring a file vacuumed/removed between planning and the scan + // (e.g. Delta's CDC-after-VACUUM read). A missing file is distinct from a corrupt one: + // Spark surfaces it as `readCurrentFileNotFoundError` ("It is possible the underlying + // files have been updated."), NOT `cannotReadFilesError`/`FAILED_READ_FILE`. Comet's + // native scan must classify the object_store NotFound the same way. + val df = spark.read.parquet(path.toString) + df.queryExecution.executedPlan // force planning (footer read) before deletion + assert(f.delete(), s"failed to delete $f") + + val e = intercept[Throwable] { + df.collect() + } + val messages = Iterator + .iterate(e: Throwable)(_.getCause) + .takeWhile(_ != null) + .map(t => s"${t.getClass.getName}: ${t.getMessage}") + .toList + val chain = messages.mkString("\n ") + assert( + messages.exists(m => m.contains("It is possible the underlying files have been updated")), + s"Expected readCurrentFileNotFoundError for a missing file, but got:\n $chain") + } + } + } + } case class BucketedTableTestSpec(