In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
# Start Spark session
spark = SparkSession.builder.getOrCreate()

In [6]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("dob", StringType(), True),  # Could also be DateType
    StructField("city", StringType(), True)
])

df = spark.read.option("header", True).schema(schema).csv(r"D:\MyPythonProject\spark_datasets\employees.csv")

df.show()

+---+-------+----------+------+----------+---------+
| id|   name|department|salary|       dob|     city|
+---+-------+----------+------+----------+---------+
|  1|  Alice|        HR|  5000|1990-01-01|Bangalore|
|  2|    Bob|   Finance|  6000|1985-07-23|   Mumbai|
|  3|Charlie|        IT|  7000|1992-03-15|    Delhi|
|  4|  David|        IT|  6500|1993-11-30|  Chennai|
|  5|    Eva|        HR|  4800|1991-09-17|     Pune|
+---+-------+----------+------+----------+---------+



In [13]:
df_1 = df.withColumn("annual_salary", col('salary')*12).drop("city").withColumnRenamed('dob','date_of_birth')

df_1.show()

+---+-------+----------+------+-------------+-------------+
| id|   name|department|salary|date_of_birth|annual_salary|
+---+-------+----------+------+-------------+-------------+
|  1|  Alice|        HR|  5000|   1990-01-01|        60000|
|  2|    Bob|   Finance|  6000|   1985-07-23|        72000|
|  3|Charlie|        IT|  7000|   1992-03-15|        84000|
|  4|  David|        IT|  6500|   1993-11-30|        78000|
|  5|    Eva|        HR|  4800|   1991-09-17|        57600|
+---+-------+----------+------+-------------+-------------+



In [9]:
df_2 = df.selectExpr("id", 'name', "department", 'salary', "dob as date_of_birth", "salary * 12 as annual_salary")

df_2.show()

+---+-------+----------+------+-------------+-------------+
| id|   name|department|salary|date_of_birth|annual_salary|
+---+-------+----------+------+-------------+-------------+
|  1|  Alice|        HR|  5000|   1990-01-01|        60000|
|  2|    Bob|   Finance|  6000|   1985-07-23|        72000|
|  3|Charlie|        IT|  7000|   1992-03-15|        84000|
|  4|  David|        IT|  6500|   1993-11-30|        78000|
|  5|    Eva|        HR|  4800|   1991-09-17|        57600|
+---+-------+----------+------+-------------+-------------+



In [13]:
df_3 = df_2.filter(col("annual_salary").between(50000, 75000))

df_3.show()

+---+-----+----------+------+-------------+-------------+
| id| name|department|salary|date_of_birth|annual_salary|
+---+-----+----------+------+-------------+-------------+
|  1|Alice|        HR|  5000|   1990-01-01|        60000|
|  2|  Bob|   Finance|  6000|   1985-07-23|        72000|
|  5|  Eva|        HR|  4800|   1991-09-17|        57600|
+---+-----+----------+------+-------------+-------------+



In [17]:
df_4 = df.filter(col("department").isin("IT", "HR"))

df_4.show()

+---+-------+----------+------+----------+---------+
| id|   name|department|salary|       dob|     city|
+---+-------+----------+------+----------+---------+
|  1|  Alice|        HR|  5000|1990-01-01|Bangalore|
|  3|Charlie|        IT|  7000|1992-03-15|    Delhi|
|  4|  David|        IT|  6500|1993-11-30|  Chennai|
|  5|    Eva|        HR|  4800|1991-09-17|     Pune|
+---+-------+----------+------+----------+---------+



In [21]:
df_like = df.filter(col("name").like("A%"))

df_like.show()

df_rgex = df.filter(col("name").rlike("e$"))

df_rgex.show()

+---+-----+----------+------+----------+---------+
| id| name|department|salary|       dob|     city|
+---+-----+----------+------+----------+---------+
|  1|Alice|        HR|  5000|1990-01-01|Bangalore|
+---+-----+----------+------+----------+---------+

+---+-------+----------+------+----------+---------+
| id|   name|department|salary|       dob|     city|
+---+-------+----------+------+----------+---------+
|  1|  Alice|        HR|  5000|1990-01-01|Bangalore|
|  3|Charlie|        IT|  7000|1992-03-15|    Delhi|
+---+-------+----------+------+----------+---------+



In [5]:
data = [
    (1, "Alice", ["Reading", "Cycling", "Painting"]),
    (2, "Bob", ["Gaming", "Running"]),
    (3, "Charlie", [])
]

schema = ["id", "name", "hobbies"]

df = spark.createDataFrame(data, schema)

df_flat = df.withColumn("hobby", explode("hobbies"))

df_flat.show()

+---+-----+--------------------+--------+
| id| name|             hobbies|   hobby|
+---+-----+--------------------+--------+
|  1|Alice|[Reading, Cycling...| Reading|
|  1|Alice|[Reading, Cycling...| Cycling|
|  1|Alice|[Reading, Cycling...|Painting|
|  2|  Bob|   [Gaming, Running]|  Gaming|
|  2|  Bob|   [Gaming, Running]| Running|
+---+-----+--------------------+--------+



In [6]:
data = [
    Row(id=1, name="Alice", address=Row(city="Bangalore", zip=560001)),
    Row(id=2, name="Bob", address=Row(city="Delhi", zip=110001))
]

df = spark.createDataFrame(data)

df.select("id", "name", "address.city", "address.zip").show()

+---+-----+---------+------+
| id| name|     city|   zip|
+---+-----+---------+------+
|  1|Alice|Bangalore|560001|
|  2|  Bob|    Delhi|110001|
+---+-----+---------+------+



In [7]:
data = [
    (1, {"math": 90, "science": 85}),
    (2, {"math": 75, "science": 88}),
    (3, {"science": 80})  # missing math
]

schema = ["id", "scores"]

df = spark.createDataFrame(data, schema)

df.select("id", col("scores")["math"]).show()

+---+------------+
| id|scores[math]|
+---+------------+
|  1|          90|
|  2|          75|
|  3|        NULL|
+---+------------+



In [9]:
df.rdd.glom().collect()


[[],
 [],
 [Row(id=1, scores={'science': 85, 'math': 90})],
 [],
 [],
 [Row(id=2, scores={'science': 88, 'math': 75})],
 [],
 [Row(id=3, scores={'science': 80})]]

In [12]:
# Read the JSON
df = spark.read.option("multiline", "true").json(r"C:\Users\user\Downloads\txn.json")

# Sort globally by total_amount descending
df_sorted = df.orderBy(col("total_amount").desc())
df_sorted.show()

+--------------------+------------+------+-------------------+------------+--------------+-------+
|               items|payment_mode|region|          timestamp|total_amount|transaction_id|user_id|
+--------------------+------------+------+-------------------+------------+--------------+-------+
|     [laptop, mouse]|        card| north|2025-08-25T10:30:00|      1500.0|         tx001|    u01|
|             [phone]|         UPI| south|2025-08-24T18:20:00|       800.0|         tx002|    u02|
|[headphones, char...|         COD| north|2025-08-26T09:00:00|       250.0|         tx003|    u03|
+--------------------+------------+------+-------------------+------------+--------------+-------+



In [13]:
# Sort by timestamp ascending
df.sort("timestamp").show()

+--------------------+------------+------+-------------------+------------+--------------+-------+
|               items|payment_mode|region|          timestamp|total_amount|transaction_id|user_id|
+--------------------+------------+------+-------------------+------------+--------------+-------+
|             [phone]|         UPI| south|2025-08-24T18:20:00|       800.0|         tx002|    u02|
|     [laptop, mouse]|        card| north|2025-08-25T10:30:00|      1500.0|         tx001|    u01|
|[headphones, char...|         COD| north|2025-08-26T09:00:00|       250.0|         tx003|    u03|
+--------------------+------------+------+-------------------+------------+--------------+-------+



In [14]:
# Partition by region and sort within partitions by total_amount ascending
df_partitioned = df.repartition(8, "region")
df_partitioned.sortWithinPartitions("total_amount").show()

+--------------------+------------+------+-------------------+------------+--------------+-------+
|               items|payment_mode|region|          timestamp|total_amount|transaction_id|user_id|
+--------------------+------------+------+-------------------+------------+--------------+-------+
|[headphones, char...|         COD| north|2025-08-26T09:00:00|       250.0|         tx003|    u03|
|     [laptop, mouse]|        card| north|2025-08-25T10:30:00|      1500.0|         tx001|    u01|
|             [phone]|         UPI| south|2025-08-24T18:20:00|       800.0|         tx002|    u02|
+--------------------+------------+------+-------------------+------------+--------------+-------+

