In [1]:
import findspark
findspark.init()

In [2]:
# Create SparkSession and import all required packages
from pyspark.sql import SparkSession,types

spark = SparkSession.builder.master("local").appName('Json complex')\
                    .getOrCreate()

In [3]:
inp=spark.read.option("header",True).option("escape","\"") \
                                    .option("multiline","true") \
                                    .csv("dummy2.csv")

In [4]:
inp.printSchema()

root
 |-- PartitionDate: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- request: string (nullable = true)



In [5]:
inp.show()

+-------------+--------------+--------------------+
|PartitionDate|        Status|             request|
+-------------+--------------+--------------------+
|   2020-06-30|Internal Error|{"Response":{"Mes...|
|   2020-06-30|       Success|{"Response":{"Mes...|
+-------------+--------------+--------------------+



In [7]:
from pyspark.sql.functions import col,json_tuple,to_json,from_json

Method 1: Using JSON tuple

In [8]:
inp.select("*",json_tuple("request","Response")).show()

+-------------+--------------+--------------------+--------------------+
|PartitionDate|        Status|             request|                  c0|
+-------------+--------------+--------------------+--------------------+
|   2020-06-30|Internal Error|{"Response":{"Mes...| {"MessageId":15432}|
|   2020-06-30|       Success|{"Response":{"Mes...|{"MessageId":1543...|
+-------------+--------------+--------------------+--------------------+



In [11]:
inp.select("*",json_tuple("request","Response")).drop("request") \
    .select("*",json_tuple("c0","MessageId","Latitude","longitude").alias("MessageId","Latitude","longitude")) \
    .drop("c0").show()

+-------------+--------------+---------+---------+---------+
|PartitionDate|        Status|MessageId| Latitude|longitude|
+-------------+--------------+---------+---------+---------+
|   2020-06-30|Internal Error|    15432|     null|     null|
|   2020-06-30|       Success|    15432|-176.2989|   7.3614|
+-------------+--------------+---------+---------+---------+



Method 2: Using From_Json

In [13]:
inp.select(col('request').alias("jsoncol")).rdd.map(lambda x: x.jsoncol).collect()

['{"Response":{"MessageId" : 15432 }}',
 '{"Response":{"MessageId" : 15432,"Latitude":"-176.2989","longitude":"7.3614" }}']

In [14]:
in_sch=spark.read.json(inp.select(col('request').alias("jsoncol")).rdd.map(lambda x: x.jsoncol)).schema

In [15]:
in_sch

StructType(List(StructField(Response,StructType(List(StructField(Latitude,StringType,true),StructField(MessageId,LongType,true),StructField(longitude,StringType,true))),true)))

In [16]:
inp_json=inp.select("*",from_json("request",in_sch).alias("jsonstr"))

In [17]:
inp_json.printSchema()

root
 |-- PartitionDate: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- request: string (nullable = true)
 |-- jsonstr: struct (nullable = true)
 |    |-- Response: struct (nullable = true)
 |    |    |-- Latitude: string (nullable = true)
 |    |    |-- MessageId: long (nullable = true)
 |    |    |-- longitude: string (nullable = true)



In [19]:
col1=inp_json.schema['jsonstr'].dataType.names[0]
chk="jsonstr."+col1+".*"
chk

'jsonstr.Response.*'

In [20]:
inp_json.select("*",col(chk)).drop("request","jsonstr").show()

+-------------+--------------+---------+---------+---------+
|PartitionDate|        Status| Latitude|MessageId|longitude|
+-------------+--------------+---------+---------+---------+
|   2020-06-30|Internal Error|     null|    15432|     null|
|   2020-06-30|       Success|-176.2989|    15432|   7.3614|
+-------------+--------------+---------+---------+---------+



Usage of to_json:

In [21]:
inp_json.select(col("jsonstr.*")).select(to_json(col("Response"))).show()

+-----------------------+
|structstojson(Response)|
+-----------------------+
|    {"MessageId":15432}|
|   {"Latitude":"-176...|
+-----------------------+

