Skip to content

Conversation

panbingkun
Copy link
Contributor

What changes were proposed in this pull request?

The pr aims to add Codegen Support for schema_of_json.

Why are the changes needed?

  • improve codegen coverage.
  • simplified code.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Pass GA & Existed UT (eg: JsonFunctionsSuite#*schema_of_json*)

Was this patch authored or co-authored using generative AI tooling?

No.

@panbingkun
Copy link
Contributor Author

The difference between this and #48452 is that the Invoke object in this version is implemented using Scala.
Scala(Invoke object): #48473
Java(Invoke object): #48452

def evalTypeExpr(exp: Expression): DataType = {
if (exp.foldable) {
exp.eval() match {
prepareForEval(exp).eval() match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must improve it, otherwise

test("SPARK-33270: infers schema for JSON field with spaces and pass them to from_json") {
val in = Seq("""{"a b": 1}""").toDS()
val out = in.select(from_json($"value", schema_of_json("""{"a b": 100}""")) as "parsed")
val expected = new StructType().add("parsed", new StructType().add("a b", LongType))
assert(out.schema == expected)
}

[INTERNAL_ERROR] Cannot evaluate expression: schema_of_json({"a b": 100}) SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: schema_of_json({"a b": 100}) SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:65)
	at org.apache.spark.sql.catalyst.expressions.RuntimeReplaceable.eval(Expression.scala:424)
	at org.apache.spark.sql.catalyst.expressions.RuntimeReplaceable.eval$(Expression.scala:423)
	at org.apache.spark.sql.catalyst.expressions.SchemaOfJson.eval(jsonExpressions.scala:878)
	at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:39)
	at org.apache.spark.sql.catalyst.expressions.JsonToStructs.<init>(jsonExpressions.scala:659)
	at org.apache.spark.sql.catalyst.expressions.JsonToStructs.<init>(jsonExpressions.scala:664)

@panbingkun
Copy link
Contributor Author

If we use Java to write Invoke object, we must write it as:
https://github.com/apache/spark/pull/48452/files#diff-5dcf9919b53bb052a961f4143ed75bc8567d917b016e83ce0175fe918c7800caR124-R126
image
Otherwise, compilation will fail.

@panbingkun
Copy link
Contributor Author

panbingkun commented Oct 15, 2024

If we use Java to implement it, there is a difference between Java version and the original logic when an exception occurs, as shown below:

select schema_of_json('{"f1":"abc","f2":{"f3":"a", "f4" "b"}}');
  • Java version
    image
spark-sql (default)> select schema_of_json('{"f1":"abc","f2":{"f3":"a", "f4" "b"}}');
24/10/15 17:18:51 ERROR SparkSQLDriver: Failed in [select schema_of_json('{"f1":"abc","f2":{"f3":"a", "f4" "b"}}')]
java.lang.RuntimeException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('"' (code 34)): was expecting a colon to separate field name and value
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 34]
	at org.apache.spark.sql.catalyst.expressions.json.JsonExpressionUtils.schemaOfJson(JsonExpressionUtils.java:125)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.invoke(objects.scala:178)
	at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.invoke$(objects.scala:163)
	at org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.invoke(objects.scala:279)
	at org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.eval(objects.scala:305)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:82)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.$anonfun$constantFolding$4(expressions.scala:102)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1231)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1230)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:553)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:102)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.$anonfun$applyOrElse$1(expressions.scala:106)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:221)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:221)
  • The original logic
spark-sql (default)> select schema_of_json('{"f1":"abc","f2":{"f3":"a", "f4" "b"}}');
24/10/15 17:12:42 ERROR SparkSQLDriver: Failed in [select schema_of_json('{"f1":"abc","f2":{"f3":"a", "f4" "b"}}')]
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('"' (code 34)): was expecting a colon to separate field name and value
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 34]
	at com.fasterxml.jackson.core.JsonParser._constructReadException(JsonParser.java:2660)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:741)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipColon2(ReaderBasedJsonParser.java:2356)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipColon(ReaderBasedJsonParser.java:2335)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:712)
	at org.apache.spark.sql.catalyst.json.JacksonUtils$.nextUntil(JacksonUtils.scala:32)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField(JsonInferSchema.scala:199)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField(JsonInferSchema.scala:152)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField(JsonInferSchema.scala:202)
	at org.apache.spark.sql.catalyst.expressions.SchemaOfJson.$anonfun$eval$4(jsonExpressions.scala:926)
	at org.apache.spark.util.SparkErrorUtils.tryWithResource(SparkErrorUtils.scala:48)
	at org.apache.spark.util.SparkErrorUtils.tryWithResource$(SparkErrorUtils.scala:46)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:99)
	at org.apache.spark.sql.catalyst.expressions.SchemaOfJson.eval(jsonExpressions.scala:923)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:82)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.$anonfun$constantFolding$4(expressions.scala:102)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1231)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1230)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:553)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:102)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.$anonfun$applyOrElse$1(expressions.scala:106)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:221)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:221)
  • Scala version
spark-sql (default)> select schema_of_json('{"f1":"abc","f2":{"f3":"a", "f4" "b"}}');
24/10/15 17:25:52 ERROR SparkSQLDriver: Failed in [select schema_of_json('{"f1":"abc","f2":{"f3":"a", "f4" "b"}}')]
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('"' (code 34)): was expecting a colon to separate field name and value
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 34]
	at com.fasterxml.jackson.core.JsonParser._constructReadException(JsonParser.java:2660)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:741)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipColon2(ReaderBasedJsonParser.java:2356)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipColon(ReaderBasedJsonParser.java:2335)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:712)
	at org.apache.spark.sql.catalyst.json.JacksonUtils$.nextUntil(JacksonUtils.scala:32)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField(JsonInferSchema.scala:199)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField(JsonInferSchema.scala:152)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField(JsonInferSchema.scala:202)
	at org.apache.spark.sql.catalyst.expressions.json.JsonExpressionEvalUtils$.$anonfun$schemaOfJson$2(JsonExpressionEvalUtils.scala:37)
	at org.apache.spark.util.SparkErrorUtils.tryWithResource(SparkErrorUtils.scala:48)
	at org.apache.spark.util.SparkErrorUtils.tryWithResource$(SparkErrorUtils.scala:46)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:99)
	at org.apache.spark.sql.catalyst.expressions.json.JsonExpressionEvalUtils$.schemaOfJson(JsonExpressionEvalUtils.scala:34)
	at org.apache.spark.sql.catalyst.expressions.json.JsonExpressionEvalUtils.schemaOfJson(JsonExpressionEvalUtils.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.invoke(objects.scala:178)
	at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.invoke$(objects.scala:163)
	at org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.invoke(objects.scala:279)
	at org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.eval(objects.scala:305)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:82)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.$anonfun$constantFolding$4(expressions.scala:102)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1231)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1230)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:553)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:102)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.$anonfun$applyOrElse$1(expressions.scala:106)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:221)

Conclusion: The implementation of Scala version is consistent with the original logic.

@panbingkun panbingkun marked this pull request as ready for review October 15, 2024 10:40
@panbingkun
Copy link
Contributor Author

cc @MaxGekk @cloud-fan

@MaxGekk
Copy link
Member

MaxGekk commented Oct 15, 2024

+1, LGTM. Merging to master.
Thank you, @panbingkun.

@MaxGekk MaxGekk closed this in 1269b35 Oct 15, 2024
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
…imeReplaceable)

### What changes were proposed in this pull request?
The pr aims to add `Codegen` Support for `schema_of_json`.

### Why are the changes needed?
- improve codegen coverage.
- simplified code.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA & Existed UT (eg: JsonFunctionsSuite#`*schema_of_json*`)

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48473 from panbingkun/SPARK-49954_scala.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
cloud-fan pushed a commit that referenced this pull request Nov 28, 2024
…lable as false

### What changes were proposed in this pull request?
The pr is following up [schema_of_json](#48473), [schema_of_xml](#48594) and [schema_of_csv](#48595), to make  returnNullable as false.

### Why are the changes needed?
As `cloud-fan`'s comment https://github.com/apache/spark/pull/48594/files#r1860534460, we should follow the original logic, otherwise it's a regression.

https://github.com/apache/spark/blob/1a502d32ef5a69739e10b827be4c9063b2a20493/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L846

https://github.com/apache/spark/blob/1a502d32ef5a69739e10b827be4c9063b2a20493/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala#L166

https://github.com/apache/spark/blob/1a502d32ef5a69739e10b827be4c9063b2a20493/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala#L141

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48987 from panbingkun/SPARK-50066_FOLLOWUP.

Authored-by: panbingkun <panbingkun@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request Dec 16, 2024
### What changes were proposed in this pull request?

This is a followup of #48473.

We should follow the same approach of `schema_of_csv` and create `SchemaOfJsonEvaluator` to keep the states and the `schema_of_json` function can leverage it using `Invoke`.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #49184 from cloud-fan/json.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants