In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Week3_Day2_PySpark_Joins_Aggregations") \
    .getOrCreate()

print("✅ Spark session created")

✅ Spark session created


In [2]:
# Customers data
customers = spark.createDataFrame([
    (1, "Alice", "Pune"),
    (2, "Bob", "Mumbai"),
    (3, "Charlie", "Delhi"),
    (4, "David", "Pune")
], ["customer_id", "name", "city"])

# Orders data
orders = spark.createDataFrame([
    (101, 1, 5000),
    (102, 2, 1500),
    (103, 1, 2500),
    (104, 3, 2000),
    (105, 5, 3000)
], ["order_id", "customer_id", "amount"])

print("✅ Sample Data Created")
customers.show()
orders.show()

✅ Sample Data Created
+-----------+-------+------+
|customer_id|   name|  city|
+-----------+-------+------+
|          1|  Alice|  Pune|
|          2|    Bob|Mumbai|
|          3|Charlie| Delhi|
|          4|  David|  Pune|
+-----------+-------+------+

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|     101|          1|  5000|
|     102|          2|  1500|
|     103|          1|  2500|
|     104|          3|  2000|
|     105|          5|  3000|
+--------+-----------+------+



In [3]:
inner_join_df = customers.join(orders, "customer_id", "inner")
inner_join_df.show()

+-----------+-------+------+--------+------+
|customer_id|   name|  city|order_id|amount|
+-----------+-------+------+--------+------+
|          1|  Alice|  Pune|     101|  5000|
|          1|  Alice|  Pune|     103|  2500|
|          2|    Bob|Mumbai|     102|  1500|
|          3|Charlie| Delhi|     104|  2000|
+-----------+-------+------+--------+------+



In [4]:
left_join_df = customers.join(orders, "customer_id", "left")
left_join_df.show()

+-----------+-------+------+--------+------+
|customer_id|   name|  city|order_id|amount|
+-----------+-------+------+--------+------+
|          1|  Alice|  Pune|     103|  2500|
|          1|  Alice|  Pune|     101|  5000|
|          2|    Bob|Mumbai|     102|  1500|
|          3|Charlie| Delhi|     104|  2000|
|          4|  David|  Pune|    NULL|  NULL|
+-----------+-------+------+--------+------+



In [5]:
right_join_df = customers.join(orders, "customer_id", "right")
right_join_df.show()

+-----------+-------+------+--------+------+
|customer_id|   name|  city|order_id|amount|
+-----------+-------+------+--------+------+
|          1|  Alice|  Pune|     101|  5000|
|          2|    Bob|Mumbai|     102|  1500|
|          1|  Alice|  Pune|     103|  2500|
|          3|Charlie| Delhi|     104|  2000|
|          5|   NULL|  NULL|     105|  3000|
+-----------+-------+------+--------+------+



In [6]:
full_join_df = customers.join(orders, "customer_id", "outer")
full_join_df.show()

+-----------+-------+------+--------+------+
|customer_id|   name|  city|order_id|amount|
+-----------+-------+------+--------+------+
|          1|  Alice|  Pune|     101|  5000|
|          1|  Alice|  Pune|     103|  2500|
|          2|    Bob|Mumbai|     102|  1500|
|          3|Charlie| Delhi|     104|  2000|
|          4|  David|  Pune|    NULL|  NULL|
|          5|   NULL|  NULL|     105|  3000|
+-----------+-------+------+--------+------+



In [7]:
from pyspark.sql.functions import broadcast

optimized_join = orders.join(broadcast(customers), "customer_id", "inner")
optimized_join.explain(True)
optimized_join.show()

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [customer_id])
:- LogicalRDD [order_id#6L, customer_id#7L, amount#8L], false
+- ResolvedHint (strategy=broadcast)
   +- LogicalRDD [customer_id#0L, name#1, city#2], false

== Analyzed Logical Plan ==
customer_id: bigint, order_id: bigint, amount: bigint, name: string, city: string
Project [customer_id#7L, order_id#6L, amount#8L, name#1, city#2]
+- Join Inner, (customer_id#7L = customer_id#0L)
   :- LogicalRDD [order_id#6L, customer_id#7L, amount#8L], false
   +- ResolvedHint (strategy=broadcast)
      +- LogicalRDD [customer_id#0L, name#1, city#2], false

== Optimized Logical Plan ==
Project [customer_id#7L, order_id#6L, amount#8L, name#1, city#2]
+- Join Inner, (customer_id#7L = customer_id#0L), rightHint=(strategy=broadcast)
   :- Filter isnotnull(customer_id#7L)
   :  +- LogicalRDD [order_id#6L, customer_id#7L, amount#8L], false
   +- Filter isnotnull(customer_id#0L)
      +- LogicalRDD [customer_id#0L, name#1, city#2], false

== Phys

In [8]:
from pyspark.sql.functions import sum, avg, count

agg_df = optimized_join.groupBy("city").agg(
    count("order_id").alias("total_orders"),
    sum("amount").alias("total_amount"),
    avg("amount").alias("avg_amount")
)
agg_df.show()

+------+------------+------------+----------+
|  city|total_orders|total_amount|avg_amount|
+------+------------+------------+----------+
|  Pune|           2|        7500|    3750.0|
|Mumbai|           1|        1500|    1500.0|
| Delhi|           1|        2000|    2000.0|
+------+------------+------------+----------+



In [9]:
agg_df.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, count('order_id) AS total_orders#175, sum('amount) AS total_amount#177, avg('amount) AS avg_amount#179]
+- Project [customer_id#7L, order_id#6L, amount#8L, name#1, city#2]
   +- Join Inner, (customer_id#7L = customer_id#0L)
      :- LogicalRDD [order_id#6L, customer_id#7L, amount#8L], false
      +- ResolvedHint (strategy=broadcast)
         +- LogicalRDD [customer_id#0L, name#1, city#2], false

== Analyzed Logical Plan ==
city: string, total_orders: bigint, total_amount: bigint, avg_amount: double
Aggregate [city#2], [city#2, count(order_id#6L) AS total_orders#175L, sum(amount#8L) AS total_amount#177L, avg(amount#8L) AS avg_amount#179]
+- Project [customer_id#7L, order_id#6L, amount#8L, name#1, city#2]
   +- Join Inner, (customer_id#7L = customer_id#0L)
      :- LogicalRDD [order_id#6L, customer_id#7L, amount#8L], false
      +- ResolvedHint (strategy=broadcast)
         +- LogicalRDD [customer_id#0L, name#1, city#2], false

== Opt

In [10]:
agg_df.write.mode("overwrite").parquet("output/week3_day2_city_sales")
print("✅ Output written to Parquet format")

✅ Output written to Parquet format
