Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing a column containing invalid json into StructureType with schema throws an Exception. #10891

Open
Feng-Jiang28 opened this issue May 24, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@Feng-Jiang28
Copy link
Collaborator

Feng-Jiang28 commented May 24, 2024

Parsing a column containing invalid json into StructureType with schema throws an Exception.

Repro:

:~$ $SPARK_HOME/bin/spark-shell --master local[*] --jars ${SPARK_RAPIDS_PLUGIN_JAR} 
--conf spark.plugins=com.nvidia.spark.SQLPlugin 
--conf spark.rapids.sql.enabled=true 
--conf spark.rapids.sql.explain=ALL 
--driver-java-options '-ea -Duser.timezone=UTC ' 
--conf spark.rapids.sql.expression.JsonTuple=true 
--conf spark.rapids.sql.expression.GetJsonObject=true 
--conf spark.rapids.sql.expression.JsonToStructs=true 
--conf spark.rapids.sql.expression.StructsToJson=true

GPU:

scala> import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, StructType}

scala> val df = Seq("""{"a" 1}""").toDS()
df: org.apache.spark.sql.Dataset[String] = [value: string]

scala> df.write.mode("OVERWRITE").parquet("TEMP")
24/05/24 09:51:35 WARN GpuOverrides: 
*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> will run on GPU
  ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
    @Expression <AttributeReference> value#1 could run on GPU


scala> val df2 = spark.read.parquet("TEMP")
df2: org.apache.spark.sql.DataFrame = [value: string]

scala> val schema = new StructType().add("a", IntegerType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true))

scala> var parsed = df2.select(from_json($"value", schema))
parsed: org.apache.spark.sql.DataFrame = [from_json(value): struct<a: int>]

scala> parsed.show()
...
24/05/24 09:51:39 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.IllegalStateException: No empty row count provided and the table read has no row count or columns
	at ai.rapids.cudf.Table.gatherJSONColumns(Table.java:1204)
	at ai.rapids.cudf.Table.readJSON(Table.java:1446)
	at ai.rapids.cudf.Table.readJSON(Table.java:1428)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$doColumnar$2(GpuJsonToStructs.scala:179)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$doColumnar$1(GpuJsonToStructs.scala:177)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)

CPU:


scala> import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, StructType}

scala> val df = Seq("""{"a" 1}""").toDS()
df: org.apache.spark.sql.Dataset[String] = [value: string]

scala> df.write.mode("OVERWRITE").parquet("TEMP")
                                                                                
scala> val df2 = spark.read.parquet("TEMP")
df2: org.apache.spark.sql.DataFrame = [value: string]

scala> val schema = new StructType().add("a", IntegerType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true))

scala> var parsed = df2.select(from_json($"value", schema))
parsed: org.apache.spark.sql.DataFrame = [from_json(value): struct<a: int>]

scala> parsed.show()
+----------------+
|from_json(value)|
+----------------+
|          {null}|
+----------------+

@Feng-Jiang28 Feng-Jiang28 changed the title from_json invalid json Parsing a column containing invalid json into StructureType with schema throws an Exception. May 24, 2024
@revans2 revans2 self-assigned this May 24, 2024
@revans2
Copy link
Collaborator

revans2 commented May 24, 2024

@Feng-Jiang28 Thanks for submitting this. I put in some fixes into CUDF, but it still requires a small change to the plugin to help address it. I thought I had put up a PR for it. But I guess I forgot to. If you want to give it a try you just need to change

cudf.Table.readJSON(cudfSchema, jsonOptions, ds)

So it passes numRows as the last argument after ds. This will let the CUDF code know that if it sees a situation where there were no columns could be returned (a limitation in the current CUDF JSON parser) that it can create null columns of for the schema passed in. If you do want to do it let me know, otherwise I'll try to get to it next week.

@Feng-Jiang28
Copy link
Collaborator Author

No problem Bobby, I will I would put a PR for it

@Feng-Jiang28 Feng-Jiang28 self-assigned this May 28, 2024
@revans2 revans2 removed their assignment May 28, 2024
@revans2 revans2 added bug Something isn't working ? - Needs Triage Need team to review and classify labels May 28, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label May 28, 2024
@Feng-Jiang28 Feng-Jiang28 removed their assignment Jun 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants