In [None]:
!pip install pyspark

In [None]:
# @title Initialise SparkSession
from pyspark.sql import SparkSession
import json, re

spark = SparkSession.builder \
            .appName("WordCount") \
            .master("local[*]") \
            .getOrCreate()
sc = spark.sparkContext

In [None]:
# @title Creating input files
# Word Count input
with open("input.txt", "w") as f:
    f.write("hello world\nhello spark\nhello pyspark world")

# CSV files (users & purchases)
with open("users.csv", "w") as f:
    f.write("user_id,name\n1,Alice\n2,Bob\n3,Charlie")

with open("purchases.csv", "w") as f:
    f.write("user_id,product,amount\n1,Book,100.0\n2,Laptop,800.5\n1,Pen,50.0\n3,Phone,200.0")

# JSON file
with open("people.json", "w") as f:
    f.write('{"name": "Alice", "age": 30}\n')
    f.write('{"name": "Bob", "age": 40}\n')
    f.write('{"name": "Eve", "age": 29}\n')
    f.write('{"name": "Charlie", "age": 40}\n')

# Event file
with open("events.jsonl", "w") as f:
    f.write('{"event": "login", "user": "Alice"}\n')
    f.write('{"event": "purchase", "user": "Bob", "amount": 100.0}\n')
    f.write('{"event": "login", "user": "Charlie"}\n')
    f.write('{"event": "purchase", "user": "Alice", "amount": 50.0}\n')
    f.write('{"event": "login", "user": "Bob"}\n')
    f.write('{"event": "purchase", "user": "Charlie", "amount": 200.0}\n')
    f.write('{"event": "purchase", "user": "Alice", "amount": 300.0}\n')
    f.write('{"event": "login", "user": "Charlie"}\n')
    f.write('{"event": "purchase", "user": "Bob", "amount": 400.0}\n')
    f.write('{"event": "login", "user": "Alice"}\n')

# Apache log file
with open("apache.access.log", "w") as f:
    f.write('127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET /index.html HTTP/1.0" 200 2326\n')
    f.write('127.0.0.1 - - [10/Oct/2000:13:55:56 -0700] "GET /products HTTP/1.0" 200 1234\n')
    f.write('127.0.0.1 - - [10/Oct/2000:13:56:01 -0700] "GET /index.html HTTP/1.0" 200 2326\n')
    f.write('127.0.0.1 - - [10/Oct/2000:13:56:16 -0700] "GET /login HTTP/1.0" 200 543\n')
    f.write('127.0.0.1 - - [10/Oct/2000:13:56:36 -0700] "GET /index.html HTTP/1.0" 200 2326\n')
    f.write('127.0.0.1 - - [10/Oct/2000:13:56:56 -0700] "GET /products HTTP/1.0" 200 1234\n')

In [None]:
# @title 6.1 - Word Count (DataFrame / SQL)
print("\n=== Word Count (DataFrame / SQL) ===")
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, col
import sys

spark = SparkSession.builder.appName("Exp7_WordCount_SQL").getOrCreate()
input_path = "./input.txt"

df = spark.read.text(input_path)
words = df.select(explode(split(col("value"), "\\s+")).alias("word"))
words = words.withColumn("word", lower(col("word"))).filter(col("word") != "")
counts = words.groupBy("word").count().orderBy(col("count").desc())
counts.show(50, truncate=False)

spark.stop()

In [None]:
# @title 6.2 — CSV Join & Aggregation (DataFrame / SQL)
print("\n=== CSV Join & Aggregation (DataFrame / SQL) ===")
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as _sum
import sys

spark = SparkSession.builder.appName("Exp7_Join_SQL").getOrCreate()
people_path = "./users.csv"
purchases_path = "./purchases.csv"

people = spark.read.csv(people_path, header=True, inferSchema=True)
purchases = spark.read.csv(purchases_path, header=True, inferSchema=True)

joined = people.join(purchases, on="user_id", how="inner")
result = joined.groupBy("name").agg(_sum("amount").alias("total_amount")).orderBy("name")
result.show(truncate=False)

spark.stop()

In [None]:
# @title 6.3 - JSONL processing (DataFrame)
print("\n=== JSON Processing (DataFrame) ===")
from pyspark.sql import SparkSession
import sys

spark = SparkSession.builder.appName("Exp7_Events_SQL").getOrCreate()
path = "./events.jsonl"

df = spark.read.json(path)
df.groupBy("event").count().show()

spark.stop()

In [None]:
# @title 6.4 - Apache Log parsing (DataFrame + SQL)
print("\n=== Apache Log Parsing (DataFrame + SQL) ===")
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col
import sys

spark = SparkSession.builder.appName("Exp7_Apache_SQL").getOrCreate()
path = "./apache.access.log"

df = spark.read.text(path)
# regex to extract request part and then path
# group 1: ip, group 2: datetime, group 3: request, group 4: status, group 5: size
log_regex = r'(\S+) - - \[(.*?)\] "(.*?)" (\d{3}) (\S+)'
df2 = df.withColumn("request", regexp_extract(col("value"), log_regex, 3))
# extract path from request (split by space)
from pyspark.sql.functions import split
df3 = df2.withColumn("path", split(col("request"), " ").getItem(1))
df3.groupBy("path").count().orderBy(col("count").desc()).show(truncate=False)

spark.stop()

In [None]:
# @title Modify the join to use broadcast join in SQL for small people.csv and compare shuffle metrics (spark.sql.autoBroadcastJoinThreshold).
print("\n=== Modified CSV Join & Aggregation (DataFrame / SQL) ===")
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as _sum
import sys

spark = SparkSession.builder.appName("Exp7_Join_SQL").getOrCreate()

# Set the autoBroadcastJoinThreshold for broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10*1024*1024) # Set to 10MB

people_path = "./users.csv"
purchases_path = "./purchases.csv"

people = spark.read.csv(people_path, header=True, inferSchema=True)
purchases = spark.read.csv(purchases_path, header=True, inferSchema=True)

joined = people.join(purchases, on="user_id", how="inner")
result = joined.groupBy("name").agg(_sum("amount").alias("total_amount")).orderBy("name")
result.show(truncate=False)

# To compare shuffle metrics, you would typically need to inspect the Spark UI.
# However, programmatically we can try to trigger the execution and see the execution plan.
print("\n=== Execution Plan ===")
result.explain()

spark.stop()