Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45022][SQL] Provide context for dataset API errors #43334

Closed
wants to merge 34 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Oct 11, 2023

What changes were proposed in this pull request?

This PR captures the dataset APIs used by the user code and the call site in the user code and provides better error messages.

E.g. consider the following Spark app SimpleApp.scala:

   1  import org.apache.spark.sql.SparkSession
   2  import org.apache.spark.sql.functions._
   3
   4  object SimpleApp {
   5    def main(args: Array[String]) {
   6      val spark = SparkSession.builder.appName("Simple Application").config("spark.sql.ansi.enabled", true).getOrCreate()
   7      import spark.implicits._
   8
   9      val c = col("a") / col("b")
  10
  11      Seq((1, 0)).toDF("a", "b").select(c).show()
  12
  13      spark.stop()
  14    }
  15  }

After this PR the error message contains the error context (which Spark Dataset API is called from where in the user code) in the following form:

Exception in thread "main" org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.
== Dataset ==
"div" was called from SimpleApp$.main(SimpleApp.scala:9)

	at org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:201)
	at org.apache.spark.sql.catalyst.expressions.DivModLike.eval(arithmetic.scala:672
...

which is similar to the already provided context in case of SQL queries:

org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.
== SQL(line 1, position 1) ==
a / b
^^^^^

	at org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:201)
	at org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)
...

Please note that stack trace in spark-shell doesn't contain meaningful elements:

scala> Thread.currentThread().getStackTrace.foreach(println)
java.base/java.lang.Thread.getStackTrace(Thread.java:1602)
$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:23)
$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)
$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)
$line15.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)
$line15.$read$$iw$$iw$$iw$$iw.<init>(<console>:33)
$line15.$read$$iw$$iw$$iw.<init>(<console>:35)
$line15.$read$$iw$$iw.<init>(<console>:37)
$line15.$read$$iw.<init>(<console>:39)
$line15.$read.<init>(<console>:41)
$line15.$read$.<init>(<console>:45)
$line15.$read$.<clinit>(<console>)
$line15.$eval$.$print$lzycompute(<console>:7)
$line15.$eval$.$print(<console>:6)
$line15.$eval.$print(<console>)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
...

so this change doesn't help with that usecase.

Why are the changes needed?

To provide more user friendly errors.

Does this PR introduce any user-facing change?

Yes.

How was this patch tested?

Added new UTs to QueryExecutionAnsiErrorsSuite.

Was this patch authored or co-authored using generative AI tooling?

No.

@@ -45,4 +48,13 @@ public interface QueryContext {

// The corresponding fragment of the query which throws the exception.
String fragment();

// The Spark code (API) that caused throwing the exception.
String code();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we reuse the fragment function to return the code fragment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

def to(schema: StructType): DataFrame = withPlan {
val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
Project.matchSchema(logicalPlan, replaced, sparkSession.sessionState.conf)
def to(schema: StructType): DataFrame = withOrigin() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good enough to attach expression call site for ansi mode, we can attach plan call site later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a little bit more, why ansi mode is important here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I revert changes in method like this DataFrame -> DataFrame, the fragment becomes blurry, like:
Before: select
After: anonfun$select$4

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan For example, with withOrigin in select, we stop at index 3:
Screenshot 2023-11-01 at 21 25 06

Without withOrigin in select, the picture is different. We are in withOrigin in Column's constructor, and stoped at the index 5:
Screenshot 2023-11-01 at 21 29 18

} else {
val st = Thread.currentThread().getStackTrace
var i = framesToDrop + 3
while (sparkCode(st(i))) i += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we have this loop here, why do we still need framesToDrop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set framesToDrop = 1 in a few places:

  • Column.fn
  • withExpr
  • repartitionByExpression
  • repartitionByRange
  • withAggregateFunction
  • createLambda

So, there are 2 options either

  • the function sparkCode doesn't work properly, and we skip 1 frame forcibly
  • or a premature optimization.

I will check that after all tests passed eventually.

Copy link
Contributor

@peter-toth peter-toth Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wated to use it only for optimization, at certrain places we simply know that at least how many frames deep we are in Spark's code. sparkCode() uses regex so it can be a bit slow...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sparkCode() uses regex so it can be a bit slow.

Shouldn't be so slow, I think. Especially, just one pattern match. I'll remove the optimization so far.

@heyihong
Copy link
Contributor

heyihong commented Oct 30, 2023

One more question: does this feature work with spark connect scala client? If not, we probably should disable this feature for spark connect for now since customers may get confused if they see contexts for dataset API errors (likely in the spark connect planner) in the error message.

@MaxGekk
Copy link
Member Author

MaxGekk commented Oct 30, 2023

If not, we probably should disable this feature for spark connect for now since customers may get confused if they see contexts for dataset API errors (likely in the spark connect planner) in the error message.

Don't think we should disable it even if it doesn't work. We have enough time to implement it before Spark 4.0.

/**
* The type of {@link QueryContext}.
*
* @since 3.5.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.0.0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

override val objectType = originObjectType.getOrElse("")
override val objectName = originObjectName.getOrElse("")
override val startIndex = originStartIndex.getOrElse(-1)
override val stopIndex = originStopIndex.getOrElse(-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove override?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will revert it back.

@Evolving
public enum QueryContextType {
SQL,
Dataset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no Dataset in PySpark, shall we use the name DataFrame? It also exists in Scala as a type alias of Dataset[Row]. And DataFrame is a more common name in the industry.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will rename it.


builder ++= fragment
builder ++= "\""
builder ++= " was called from "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a \n before the call site?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. Now it looks:

== Dataset ==
"col" was called from org.apache.spark.sql.DatasetSuite.$anonfun$new$621(DatasetSuite.scala:2673)

but what you propose:

== Dataset ==
"col" was called from
org.apache.spark.sql.DatasetSuite.$anonfun$new$621(DatasetSuite.scala:2673)

@cloud-fan Are you sure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let's leave it as it is

f
} else {
val st = Thread.currentThread().getStackTrace
var i = 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment to explain this magic number?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding to the magic number, we always have 3 those elements at the beginning of the input array:
Screenshot 2023-11-01 at 21 36 41

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been discussed at #42740 (comment).

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some minor comments

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 1, 2023

Merging to master. Thank you, @peter-toth for the original PR, @cloud-fan @heyihong for review.

@@ -1572,7 +1589,9 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
def select(col: String, cols: String*): DataFrame = withOrigin {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is helpful -- the underlying select already has a withOrigin call, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are reverting it in #44501

var i = 3
while (i < st.length && sparkCode(st(i))) i += 1
val origin =
Origin(stackTrace = Some(Thread.currentThread().getStackTrace.slice(i - 1, i + 1)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this super expensive, calling currentThread().getStackTrace in a loop?? Can't we grab the stacktrace only once, and filter it as needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which loop do you mean?

@itholic
Copy link
Contributor

itholic commented Apr 1, 2024

If not, we probably should disable this feature for spark connect for now since customers may get confused if they see contexts for dataset API errors (likely in the spark connect planner) in the error message.

Don't think we should disable it even if it doesn't work. We have enough time to implement it before Spark 4.0.

Do we happen to have any specific plan or timeline for supporting this features for Spark Connect? Seems like it is not working both on Spark Connect Scala and Python client for now.

@cloud-fan
Copy link
Contributor

I don't think so... We are still waiting for people who are familiar with Spark Connect to pick it up.

cloud-fan pushed a commit that referenced this pull request Apr 11, 2024
…taFrame API errors

### What changes were proposed in this pull request?

This PR introduces an enhancement to the error messages generated by PySpark's DataFrame API, adding detailed context about the location within the user's PySpark code where the error occurred.

This directly adds a PySpark user call site information into `DataFrameQueryContext` added from #43334, aiming to provide PySpark users with the same level of detailed error context for better usability and debugging efficiency for DataFrame APIs.

This PR also introduces `QueryContext.pysparkCallSite` and `QueryContext.pysparkFragment` to get a PySpark information from the query context easily.

This PR also enhances the functionality of `check_error` so that it can test the query context if it exists.

### Why are the changes needed?

To improve a debuggability. Errors originating from PySpark operations can be difficult to debug with limited context in the error messages. While improvements on the JVM side have been made to offer detailed error contexts, PySpark errors often lack this level of detail.

### Does this PR introduce _any_ user-facing change?

No API changes, but error messages will include a reference to the exact line of user code that triggered the error, in addition to the existing descriptive error message.

For example, consider the following PySpark code snippet that triggers a `DIVIDE_BY_ZERO` error:

```python
1  spark.conf.set("spark.sql.ansi.enabled", True)
2
3  df = spark.range(10)
4  df.select(df.id / 0).show()
```

**Before:**
```
pyspark.errors.exceptions.captured.ArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22012
== DataFrame ==
"divide" was called from
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
```

**After:**
```
pyspark.errors.exceptions.captured.ArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22012
== DataFrame ==
"divide" was called from
/.../spark/python/test_pyspark_error.py:4
```

Now the error message points out the exact problematic code path with file name and line number that user writes.

## Points to the actual problem site instead of the site where the action was called

Even when action calling after multiple transform operations are mixed, the exact problematic site can be provided to the user:

**In:**

```python
  1 spark.conf.set("spark.sql.ansi.enabled", True)
  2 df = spark.range(10)
  3
  4 df1 = df.withColumn("div_ten", df.id / 10)
  5 df2 = df1.withColumn("plus_four", df.id + 4)
  6
  7 # This is problematic divide operation that occurs DIVIDE_BY_ZERO.
  8 df3 = df2.withColumn("div_zero", df.id / 0)
  9 df4 = df3.withColumn("minus_five", df.id / 5)
 10
 11 df4.collect()
```

**Out:**

```
pyspark.errors.exceptions.captured.ArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22012
== DataFrame ==
"divide" was called from
/.../spark/python/test_pyspark_error.py:8
```

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45377 from itholic/error_context_for_dataframe_api.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants