In [1]:
import findspark
findspark.init(spark_home="/home/prabhakar/mybin/spark-3.0.2-bin-hadoop2.7-hive1.2")


from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("handling complex data types").getOrCreate()


configs = spark.sparkContext.getConf().getAll()
for key, value in configs:
    print(f"{key}: {value}")

# Get the Spark UI URL
spark_ui_url = spark.sparkContext.uiWebUrl

print(f"Spark Job URL: {spark_ui_url}")

25/09/06 08:38:12 WARN Utils: Your hostname, prabhu resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/09/06 08:38:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/09/06 08:38:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


spark.app.id: local-1757147897498
spark.app.startTime: 1757147896025
spark.rdd.compress: True
spark.driver.port: 42127
spark.serializer.objectStreamReset: 100
spark.master: local[*]
spark.submit.pyFiles: 
spark.executor.id: driver
spark.submit.deployMode: client
spark.driver.host: 10.255.255.254
spark.app.name: handling complex data types
spark.ui.showConsoleProgress: true
Spark Job URL: http://10.255.255.254:4040


### Explode Function

In [2]:
from pyspark.sql.functions import explode
from pyspark.sql.types import *

# Sample data
data = [
    {
        "order_id": "ORD001",
        "customer_id": "CUST1001",
        "order_date": "2023-10-01",
        "items": [
            {"product_id": "P100", "product_name": "Laptop", "quantity": 1, "price": 1000},
            {"product_id": "P200", "product_name": "Mouse", "quantity": 2, "price": 25}
        ]
    },
    {
        "order_id": "ORD002",
        "customer_id": "CUST1002",
        "order_date": "2023-10-02",
        "items": [
            {"product_id": "P300", "product_name": "Monitor", "quantity": 1, "price": 300}
        ]
    },
    {
        "order_id": "ORD003",
        "customer_id": "CUST1003",
        "order_date": "2023-10-03",
        "items": []
    }
]

# Define schema
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("items", ArrayType(
        StructType([
            StructField("product_id", StringType(), True),
            StructField("product_name", StringType(), True),
            StructField("quantity", IntegerType(), True),
            StructField("price", IntegerType(), True)
        ])
    ))
])

# Create DataFrame
df = spark.createDataFrame(data, schema)

df.show(truncate=False)



                                                                                

+--------+-----------+----------+-----------------------------------------------+
|order_id|customer_id|order_date|items                                          |
+--------+-----------+----------+-----------------------------------------------+
|ORD001  |CUST1001   |2023-10-01|[[P100, Laptop, 1, 1000], [P200, Mouse, 2, 25]]|
|ORD002  |CUST1002   |2023-10-02|[[P300, Monitor, 1, 300]]                      |
|ORD003  |CUST1003   |2023-10-03|[]                                             |
+--------+-----------+----------+-----------------------------------------------+



In [10]:
df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: string (nullable = true)
 |    |    |-- product_name: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- price: integer (nullable = true)



### Q: What is the total quantity per day

In [4]:
from pyspark.sql.functions import col
# df2 = df.withColumn("exp_items", explode(col("items")))
# df2 = df.withColumn("exp_items", explode(df["items"]))
df2 = df.withColumn("exp_items", explode(df.items))

In [5]:
df2.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: string (nullable = true)
 |    |    |-- product_name: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- price: integer (nullable = true)
 |-- exp_items: struct (nullable = true)
 |    |-- product_id: string (nullable = true)
 |    |-- product_name: string (nullable = true)
 |    |-- quantity: integer (nullable = true)
 |    |-- price: integer (nullable = true)



In [6]:
df.show(truncate=False)
df2.show(truncate=False)

+--------+-----------+----------+-----------------------------------------------+
|order_id|customer_id|order_date|items                                          |
+--------+-----------+----------+-----------------------------------------------+
|ORD001  |CUST1001   |2023-10-01|[[P100, Laptop, 1, 1000], [P200, Mouse, 2, 25]]|
|ORD002  |CUST1002   |2023-10-02|[[P300, Monitor, 1, 300]]                      |
|ORD003  |CUST1003   |2023-10-03|[]                                             |
+--------+-----------+----------+-----------------------------------------------+

+--------+-----------+----------+-----------------------------------------------+-----------------------+
|order_id|customer_id|order_date|items                                          |exp_items              |
+--------+-----------+----------+-----------------------------------------------+-----------------------+
|ORD001  |CUST1001   |2023-10-01|[[P100, Laptop, 1, 1000], [P200, Mouse, 2, 25]]|[P100, Laptop, 1, 1000]|
|

In [7]:
df2.select(col("order_date"), col("exp_items.quantity")).show()

import pyspark.sql.functions as F
df2.groupBy(col("order_date")).agg(F.sum("exp_items.quantity")).show()

+----------+--------+
|order_date|quantity|
+----------+--------+
|2023-10-01|       1|
|2023-10-01|       2|
|2023-10-02|       1|
+----------+--------+

+----------+-----------------------+
|order_date|sum(exp_items.quantity)|
+----------+-----------------------+
|2023-10-02|                      1|
|2023-10-01|                      3|
+----------+-----------------------+



### explode_outer

In [8]:
from pyspark.sql.functions import col, explode_outer
# df3 = df.withColumn("exp_items", explode(col("items")))
# df3 = df.withColumn("exp_items", explode(df["items"]))
df3 = df.withColumn("exp_items", explode_outer(df.items)) # ---> explode_outer

In [13]:
df.show(truncate=False)
df2.show(truncate=False)
df3.show(10, False)

+--------+-----------+----------+-----------------------------------------------+
|order_id|customer_id|order_date|items                                          |
+--------+-----------+----------+-----------------------------------------------+
|ORD001  |CUST1001   |2023-10-01|[[P100, Laptop, 1, 1000], [P200, Mouse, 2, 25]]|
|ORD002  |CUST1002   |2023-10-02|[[P300, Monitor, 1, 300]]                      |
|ORD003  |CUST1003   |2023-10-03|[]                                             |
+--------+-----------+----------+-----------------------------------------------+

+--------+-----------+----------+-----------------------------------------------+-----------------------+
|order_id|customer_id|order_date|items                                          |exp_items              |
+--------+-----------+----------+-----------------------------------------------+-----------------------+
|ORD001  |CUST1001   |2023-10-01|[[P100, Laptop, 1, 1000], [P200, Mouse, 2, 25]]|[P100, Laptop, 1, 1000]|
|

### Flattening

In [None]:
df3.select(col("exp_items")).show(truncate=False)
df3.select(col("exp_items.*")).show(truncate=False) ## ----> Flattening the sturcture

+-----------------------+
|exp_items              |
+-----------------------+
|[P100, Laptop, 1, 1000]|
|[P200, Mouse, 2, 25]   |
|[P300, Monitor, 1, 300]|
|null                   |
+-----------------------+

+----------+------------+--------+-----+
|product_id|product_name|quantity|price|
+----------+------------+--------+-----+
|P100      |Laptop      |1       |1000 |
|P200      |Mouse       |2       |25   |
|P300      |Monitor     |1       |300  |
|null      |null        |null    |null |
+----------+------------+--------+-----+



In [None]:
df.show()
df.show(10)
df.show(truncate=False)
df.show(10, False)

+--------+-----------+----------+--------------------+
|order_id|customer_id|order_date|               items|
+--------+-----------+----------+--------------------+
|  ORD001|   CUST1001|2023-10-01|[[P100, Laptop, 1...|
|  ORD002|   CUST1002|2023-10-02|[[P300, Monitor, ...|
|  ORD003|   CUST1003|2023-10-03|                  []|
+--------+-----------+----------+--------------------+

+--------+-----------+----------+--------------------+
|order_id|customer_id|order_date|               items|
+--------+-----------+----------+--------------------+
|  ORD001|   CUST1001|2023-10-01|[[P100, Laptop, 1...|
|  ORD002|   CUST1002|2023-10-02|[[P300, Monitor, ...|
|  ORD003|   CUST1003|2023-10-03|                  []|
+--------+-----------+----------+--------------------+

+--------+-----------+----------+-----------------------------------------------+
|order_id|customer_id|order_date|items                                          |
+--------+-----------+----------+-------------------------------

### Pivot

In [17]:
# Sample data
data_pivot_example = [
    ("North", "Jan", 1000),
    ("North", "Feb", 1100),
    ("South", "Jan", 800),
    ("South", "Feb", 950),
    ("East", "Jan", 1200),
    ("East", "Feb", 1300)
]

columns = ["region", "month", "sales"]

df_pivot_source = spark.createDataFrame(data_pivot_example, columns)
df_pivot_source.show()



+------+-----+-----+
|region|month|sales|
+------+-----+-----+
| North|  Jan| 1000|
| North|  Feb| 1100|
| South|  Jan|  800|
| South|  Feb|  950|
|  East|  Jan| 1200|
|  East|  Feb| 1300|
+------+-----+-----+



In [20]:
# Pivot: Convert months to columns
pivot_df = df_pivot_source.groupBy("region").pivot("month").sum("sales")

pivot_df.show()

+------+----+----+
|region| Feb| Jan|
+------+----+----+
| South| 950| 800|
|  East|1300|1200|
| North|1100|1000|
+------+----+----+



In [None]:
# Pivot: Convert months to columns
pivot_region_df = df_pivot_source.groupBy("month").pivot("region").sum("sales")

pivot_region_df.show()


                                                                                

+-----+----+-----+-----+
|month|East|North|South|
+-----+----+-----+-----+
|  Feb|1300| 1100|  950|
|  Jan|1200| 1000|  800|
+-----+----+-----+-----+



### Unpivot

In [24]:
# Unpivot: Convert columns Jan, Feb back to rows
unpivot_df = pivot_df.selectExpr(
    "region",
    "stack(2, 'Jan', Jan, 'Feb', Feb) as (month, sales)"
)

unpivot_df.show()


+------+-----+-----+
|region|month|sales|
+------+-----+-----+
| South|  Jan|  800|
| South|  Feb|  950|
|  East|  Jan| 1200|
|  East|  Feb| 1300|
| North|  Jan| 1000|
| North|  Feb| 1100|
+------+-----+-----+

