Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from_json with predefined schema when converts json with missing fields, throwing an exception. #10922

Open
Feng-Jiang28 opened this issue May 28, 2024 · 0 comments
Assignees
Labels
bug Something isn't working

Comments

@Feng-Jiang28
Copy link
Collaborator

Feng-Jiang28 commented May 28, 2024

from_json with predefined schema when converts json with misssing fields, throwing an exception.

Reproduce:

CPU:

scala> import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.{SparkSession, Row}

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> 

scala> val spark = SparkSession.builder.appName("Test").getOrCreate()
24/05/28 15:43:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@58f6dab9

scala> import spark.implicits._
import spark.implicits._

scala> val input = """{
     |   |  "a": 1,
     |   |  "c": "foo"
     |   |}
     |   |""".stripMargin
input: String =
"{
  "a": 1,
  "c": "foo"
}
"

scala> val jsonSchema = new StructType().add("a", LongType, nullable = false).add("b", StringType, nullable = false).add("c", StringType, nullable = false)
jsonSchema: org.apache.spark.sql.types.StructType = StructType(StructField(a,LongType,false),StructField(b,StringType,false),StructField(c,StringType,false))

scala> val jsonData = Seq(input).toDS()
jsonData: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val df = jsonData.toDF("jsonString")
df: org.apache.spark.sql.DataFrame = [jsonString: string]

scala> 

scala> df.write.mode("OVERWRITE").parquet("TEMP")
                                                                                
scala> val df2 = spark.read.parquet("TEMP")
df2: org.apache.spark.sql.DataFrame = [jsonString: string]

scala> val parsedDF = df2.select(from_json(col("jsonString"), jsonSchema).as("data"))
parsedDF: org.apache.spark.sql.DataFrame = [data: struct<a: bigint, b: string ... 1 more field>]

scala>   .select("data.*")
res1: org.apache.spark.sql.DataFrame = [a: bigint, b: string ... 1 more field]

scala> parsedDF.show(false)
+--------------+                                                                
|data          |
+--------------+
|{1, null, foo}|
+--------------+


scala> parsedDF.printSchema()
root
 |-- data: struct (nullable = true)
 |    |-- a: long (nullable = true)
 |    |-- b: string (nullable = true)
 |    |-- c: string (nullable = true)


scala> spark.stop()

GPU:

:~$ $SPARK_HOME/bin/spark-shell --master local[*] --jars ${SPARK_RAPIDS_PLUGIN_JAR} 
--conf spark.plugins=com.nvidia.spark.SQLPlugin 
--conf spark.rapids.sql.enabled=true 
--conf spark.rapids.sql.explain=ALL 
--driver-java-options '-ea -Duser.timezone=UTC ' 
--conf spark.rapids.sql.expression.JsonTuple=true 
--conf spark.rapids.sql.expression.GetJsonObject=true 
--conf spark.rapids.sql.expression.JsonToStructs=true 
--conf spark.rapids.sql.expression.StructsToJson=true
scala> import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.{SparkSession, Row}

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> 

scala> val spark = SparkSession.builder.appName("Test").getOrCreate()
24/05/28 07:45:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/05/28 07:45:26 WARN RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.
24/05/28 07:45:26 WARN RapidsPluginUtils: spark.rapids.sql.explain is set to `ALL`. Set it to 'NONE' to suppress the diagnostics logging about the query placement on the GPU.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@64a97889

scala> import spark.implicits._
import spark.implicits._

scala> val input = """{
     |   |  "a": 1,
     |   |  "c": "foo"
     |   |}
     |   |""".stripMargin
input: String =
"{
  "a": 1,
  "c": "foo"
}
"

scala> val jsonSchema = new StructType().add("a", LongType, nullable = false).add("b", StringType, nullable = false).add("c", StringType, nullable = false)
jsonSchema: org.apache.spark.sql.types.StructType = StructType(StructField(a,LongType,false),StructField(b,StringType,false),StructField(c,StringType,false))

scala> val jsonData = Seq(input).toDS()
jsonData: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val df = jsonData.toDF("jsonString")
df: org.apache.spark.sql.DataFrame = [jsonString: string]

scala> 

scala> df.write.mode("OVERWRITE").parquet("TEMP")
24/05/28 07:45:29 WARN GpuOverrides: 
*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> will run on GPU
  ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
    @Expression <AttributeReference> jsonString#93 could run on GPU


scala> val df2 = spark.read.parquet("TEMP")
df2: org.apache.spark.sql.DataFrame = [jsonString: string]

scala> val parsedDF = df2.select(from_json(col("jsonString"), jsonSchema).as("data"))
parsedDF: org.apache.spark.sql.DataFrame = [data: struct<a: bigint, b: string ... 1 more field>]

scala>   .select("data.*")
res20: org.apache.spark.sql.DataFrame = [a: bigint, b: string ... 1 more field]

scala> parsedDF.show(false)
24/05/28 07:45:29 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(from_json(StructField(a,LongType,false), StructField(b,StringType,false), StructField(c,StringType,false), jsonString#96, Some(UTC)) as string) AS data#108 will run on GPU
      *Expression <Cast> cast(from_json(StructField(a,LongType,false), StructField(b,StringType,false), StructField(c,StringType,false), jsonString#96, Some(UTC)) as string) will run on GPU
        *Expression <JsonToStructs> from_json(StructField(a,LongType,false), StructField(b,StringType,false), StructField(c,StringType,false), jsonString#96, Some(UTC)) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

24/05/28 07:45:29 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.IllegalArgumentException: We cannot currently support parsing JSON that contains a line separator in it
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$checkForNewline$3(GpuJsonToStructs.scala:144)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$checkForNewline$3$adapted(GpuJsonToStructs.scala:141)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$checkForNewline$2(GpuJsonToStructs.scala:141)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$checkForNewline$2$adapted(GpuJsonToStructs.scala:140)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$checkForNewline$1(GpuJsonToStructs.scala:140)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$checkForNewline$1$adapted(GpuJsonToStructs.scala:139)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.checkForNewline(GpuJsonToStructs.scala:139)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$cleanAndConcat$12(GpuJsonToStructs.scala:116)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$cleanAndConcat$11(GpuJsonToStructs.scala:115)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)

@Feng-Jiang28 Feng-Jiang28 added the bug Something isn't working label May 28, 2024
@Feng-Jiang28 Feng-Jiang28 changed the title from_json missing fields from_json with predefined schema when converts json with misssing fields, throwing an exception. May 28, 2024
@revans2 revans2 self-assigned this May 28, 2024
@revans2 revans2 added the ? - Needs Triage Need team to review and classify label May 28, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label May 28, 2024
@GaryShen2008 GaryShen2008 changed the title from_json with predefined schema when converts json with misssing fields, throwing an exception. from_json with predefined schema when converts json with missing fields, throwing an exception. Jun 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants