In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
import findspark
findspark.init()

In [70]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master("local")
        .appName("Colab")
        .config('spark.ui.port', '4050')
        .getOrCreate())

In [71]:
ordersDf = (spark.read.format('json')
            .option('inferSchema', 'true')
            .option('multiline', 'true')
            .load('/content/NestedJson.json'))

In [72]:
ordersDf.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|datasets                                                                                                                                                                                                                                                                                                                       |filename    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------

In [73]:
display(ordersDf)

DataFrame[datasets: array<struct<customerId:string,orderDate:string,orderDetails:array<struct<productId:string,quantity:bigint,sequence:bigint,totalPrice:struct<gross:bigint,net:bigint,tax:bigint>>>,orderId:string,shipmentDetails:struct<city:string,country:string,postalCode:string,state:string,street:string>>>, filename: string]

In [74]:
ordersDf.printSchema()

root
 |-- datasets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- customerId: string (nullable = true)
 |    |    |-- orderDate: string (nullable = true)
 |    |    |-- orderDetails: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- productId: string (nullable = true)
 |    |    |    |    |-- quantity: long (nullable = true)
 |    |    |    |    |-- sequence: long (nullable = true)
 |    |    |    |    |-- totalPrice: struct (nullable = true)
 |    |    |    |    |    |-- gross: long (nullable = true)
 |    |    |    |    |    |-- net: long (nullable = true)
 |    |    |    |    |    |-- tax: long (nullable = true)
 |    |    |-- orderId: string (nullable = true)
 |    |    |-- shipmentDetails: struct (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- postalCode: string (nullable = true)
 |    |    

In [75]:
from pyspark.sql.functions import explode

In [76]:
df1 = ordersDf.withColumn("Orders", explode("datasets"))

In [77]:
df1 = df1.drop('datasets')

In [78]:
df1.show(truncate=False)

+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|filename    |Orders                                                                                                                                                        |
+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|orderDetails|{cust5001, 2021-12-24 00.00.00.000, [{prd9001, 2, 1, {550, 500, 50}}, {prd9002, 3, 2, {300, 240, 60}}], ord1001, {Delhi, India, 110040, New Delhi, M.G.Road}} |
|orderDetails|{cust5002, 2021-12-25 00.00.00.000, [{prd9001, 1, 1, {275, 250, 25}}, {prd9004, 4, 2, {1000, 900, 100}}], ord1002, {Mumbai, India, 400064, Maharastra, Malad}}|
+------------+--------------------------------------------------------------------------------------------------------------------

In [79]:
df1.printSchema()

root
 |-- filename: string (nullable = true)
 |-- Orders: struct (nullable = true)
 |    |-- customerId: string (nullable = true)
 |    |-- orderDate: string (nullable = true)
 |    |-- orderDetails: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- quantity: long (nullable = true)
 |    |    |    |-- sequence: long (nullable = true)
 |    |    |    |-- totalPrice: struct (nullable = true)
 |    |    |    |    |-- gross: long (nullable = true)
 |    |    |    |    |-- net: long (nullable = true)
 |    |    |    |    |-- tax: long (nullable = true)
 |    |-- orderId: string (nullable = true)
 |    |-- shipmentDetails: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- postalCode: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- street: string (nullable = true)



In [80]:
df1 = (df1
              .withColumn("CustomerId", df1.Orders.getItem("customeriD"))
              .withColumn('OrderDate', df1.Orders.getItem("orderDate"))
              .withColumn('OrderDetails', df1.Orders.getItem('orderDetails'))
              .withColumn('OrderId', df1.Orders.getItem('OrderId'))
              .withColumn('ShipmentDetails', df1.Orders.getItem('shipmentDetails'))
              ).select('filename', 'CustomerId', 'OrderDate', 'OrderDetails', 'OrderId', 'ShipmentDetails')
df1.show()

+------------+----------+--------------------+--------------------+-------+--------------------+
|    filename|CustomerId|           OrderDate|        OrderDetails|OrderId|     ShipmentDetails|
+------------+----------+--------------------+--------------------+-------+--------------------+
|orderDetails|  cust5001|2021-12-24 00.00....|[{prd9001, 2, 1, ...|ord1001|{Delhi, India, 11...|
|orderDetails|  cust5002|2021-12-25 00.00....|[{prd9001, 1, 1, ...|ord1002|{Mumbai, India, 4...|
+------------+----------+--------------------+--------------------+-------+--------------------+



In [82]:
df1 = (
    df1.withColumn('City', df1.ShipmentDetails.getItem('city'))
    .withColumn('Country', df1.ShipmentDetails.getItem('country'))
    .withColumn('PostalCode', df1.ShipmentDetails.getItem('postalCode'))
    .withColumn('State', df1.ShipmentDetails.getItem('state'))
    .withColumn('Street', df1.ShipmentDetails.getItem('street'))
  )
df1 = df1.drop('ShipmentDetails')
df1.show()

+------------+----------+--------------------+--------------------+-------+------+-------+----------+----------+--------+
|    filename|CustomerId|           OrderDate|        OrderDetails|OrderId|  City|Country|PostalCode|     State|  Street|
+------------+----------+--------------------+--------------------+-------+------+-------+----------+----------+--------+
|orderDetails|  cust5001|2021-12-24 00.00....|[{prd9001, 2, 1, ...|ord1001| Delhi|  India|    110040| New Delhi|M.G.Road|
|orderDetails|  cust5002|2021-12-25 00.00....|[{prd9001, 1, 1, ...|ord1002|Mumbai|  India|    400064|Maharastra|   Malad|
+------------+----------+--------------------+--------------------+-------+------+-------+----------+----------+--------+



In [83]:
df1 = df1.withColumn('OrderDetails', explode('OrderDetails'))
df1.show()

+------------+----------+--------------------+--------------------+-------+------+-------+----------+----------+--------+
|    filename|CustomerId|           OrderDate|        OrderDetails|OrderId|  City|Country|PostalCode|     State|  Street|
+------------+----------+--------------------+--------------------+-------+------+-------+----------+----------+--------+
|orderDetails|  cust5001|2021-12-24 00.00....|{prd9001, 2, 1, {...|ord1001| Delhi|  India|    110040| New Delhi|M.G.Road|
|orderDetails|  cust5001|2021-12-24 00.00....|{prd9002, 3, 2, {...|ord1001| Delhi|  India|    110040| New Delhi|M.G.Road|
|orderDetails|  cust5002|2021-12-25 00.00....|{prd9001, 1, 1, {...|ord1002|Mumbai|  India|    400064|Maharastra|   Malad|
|orderDetails|  cust5002|2021-12-25 00.00....|{prd9004, 4, 2, {...|ord1002|Mumbai|  India|    400064|Maharastra|   Malad|
+------------+----------+--------------------+--------------------+-------+------+-------+----------+----------+--------+



In [84]:
df1.printSchema()

root
 |-- filename: string (nullable = true)
 |-- CustomerId: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- OrderDetails: struct (nullable = true)
 |    |-- productId: string (nullable = true)
 |    |-- quantity: long (nullable = true)
 |    |-- sequence: long (nullable = true)
 |    |-- totalPrice: struct (nullable = true)
 |    |    |-- gross: long (nullable = true)
 |    |    |-- net: long (nullable = true)
 |    |    |-- tax: long (nullable = true)
 |-- OrderId: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Street: string (nullable = true)



In [86]:
df1 = (df1
      .withColumn('ProductId', df1.OrderDetails.getItem("ProductId"))
      .withColumn('Quantity', df1.OrderDetails.getItem('quantity'))
      .withColumn('Sequence', df1.OrderDetails.getItem('sequence'))
      .withColumn('TotalPrice', df1.OrderDetails.getItem('totalPrice'))
      )
df1 = df1.drop('OrderDetails')
df1.show(truncate=False)

+------------+----------+-----------------------+-------+------+-------+----------+----------+--------+---------+--------+--------+----------------+
|filename    |CustomerId|OrderDate              |OrderId|City  |Country|PostalCode|State     |Street  |ProductId|Quantity|Sequence|TotalPrice      |
+------------+----------+-----------------------+-------+------+-------+----------+----------+--------+---------+--------+--------+----------------+
|orderDetails|cust5001  |2021-12-24 00.00.00.000|ord1001|Delhi |India  |110040    |New Delhi |M.G.Road|prd9001  |2       |1       |{550, 500, 50}  |
|orderDetails|cust5001  |2021-12-24 00.00.00.000|ord1001|Delhi |India  |110040    |New Delhi |M.G.Road|prd9002  |3       |2       |{300, 240, 60}  |
|orderDetails|cust5002  |2021-12-25 00.00.00.000|ord1002|Mumbai|India  |400064    |Maharastra|Malad   |prd9001  |1       |1       |{275, 250, 25}  |
|orderDetails|cust5002  |2021-12-25 00.00.00.000|ord1002|Mumbai|India  |400064    |Maharastra|Malad   |prd

In [87]:
df1.printSchema()

root
 |-- filename: string (nullable = true)
 |-- CustomerId: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- OrderId: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Sequence: long (nullable = true)
 |-- TotalPrice: struct (nullable = true)
 |    |-- gross: long (nullable = true)
 |    |-- net: long (nullable = true)
 |    |-- tax: long (nullable = true)



In [95]:
df = (df1
      .withColumn('GrossPrice', df1.TotalPrice.getItem('gross'))
      .withColumn('NetPrice', df1.TotalPrice.getItem('net'))
      .withColumn('TaxPrice', df1.TotalPrice.getItem('tax'))
      )
df = df.drop('TotalPrice')
df.show(truncate=False)

+------------+----------+-----------------------+-------+------+-------+----------+----------+--------+---------+--------+--------+----------+--------+--------+
|filename    |CustomerId|OrderDate              |OrderId|City  |Country|PostalCode|State     |Street  |ProductId|Quantity|Sequence|GrossPrice|NetPrice|TaxPrice|
+------------+----------+-----------------------+-------+------+-------+----------+----------+--------+---------+--------+--------+----------+--------+--------+
|orderDetails|cust5001  |2021-12-24 00.00.00.000|ord1001|Delhi |India  |110040    |New Delhi |M.G.Road|prd9001  |2       |1       |550       |500     |50      |
|orderDetails|cust5001  |2021-12-24 00.00.00.000|ord1001|Delhi |India  |110040    |New Delhi |M.G.Road|prd9002  |3       |2       |300       |240     |60      |
|orderDetails|cust5002  |2021-12-25 00.00.00.000|ord1002|Mumbai|India  |400064    |Maharastra|Malad   |prd9001  |1       |1       |275       |250     |25      |
|orderDetails|cust5002  |2021-12-2