Student Name: Ashwathy Ashokan

Student ID: C0935859

Subject: Big Data Framework 01

Assignment: RDDs and Spark SQL **bold text**

Section A: RDD Operations

Q1: RDD Creation and Basic Operations

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDD Assignment").getOrCreate()
sc = spark.sparkContext

# Load sales.csv
sales_rdd = sc.textFile("sales.csv")
header = sales_rdd.first()
sales_data = sales_rdd.filter(lambda row: row != header)

# Display first 5 rows
print(sales_data.take(5))

# Count transactions
print(f"Total transactions: {sales_data.count()}")

# Extract (product_id, price)
prod_price = sales_data.map(lambda x: (x.split(",")[1], float(x.split(",")[4])))
print(prod_price.take(5))

['1,P1,U1,1,10.0', '2,P2,U2,2,20.0', '3,P3,U3,3,15.0', '4,P4,U1,2,30.0', '5,P2,U2,1,25.0']
Total transactions: 20
[('P1', 10.0), ('P2', 20.0), ('P3', 15.0), ('P4', 30.0), ('P2', 25.0)]


Q2: Transformations and Actions

In [3]:
# Assuming columns: transaction_id, product_id, user_id, quantity, price

# Total revenue
revenue_rdd = sales_data.map(lambda x: float(x.split(",")[3]) * float(x.split(",")[4]))
print(f"Total revenue: {revenue_rdd.sum()}")

# Unique products sold
product_ids = sales_data.map(lambda x: x.split(",")[1]).distinct()
print(f"Unique products: {product_ids.count()}")

# Filter where quantity > 1
filtered = sales_data.filter(lambda x: int(x.split(",")[3]) > 1)
print(filtered.take(5))

# (product_id, revenue)
prod_rev = sales_data.map(lambda x: (x.split(",")[1], float(x.split(",")[3]) * float(x.split(",")[4])))
total_rev_per_prod = prod_rev.reduceByKey(lambda x, y: x + y)
print(total_rev_per_prod.take(5))

Total revenue: 780.0
Unique products: 4
['2,P2,U2,2,20.0', '3,P3,U3,3,15.0', '4,P4,U1,2,30.0', '6,P1,U3,4,10.0', '7,P4,U2,2,35.0']
[('P1', 100.0), ('P2', 275.0), ('P3', 135.0), ('P4', 270.0)]


Q3: Working with Multiple RDDs

In [4]:
products_rdd = sc.textFile("products.csv").filter(lambda x: x != "product_id,product_name,category")
users_rdd = sc.textFile("users.csv").filter(lambda x: x != "user_id,user_name,location")

# Keyed by product_id
products_kv = products_rdd.map(lambda x: (x.split(",")[0], x.split(",")[1]))  # product_id, product_name
sales_kv = sales_data.map(lambda x: (x.split(",")[1], (x.split(",")[0], float(x.split(",")[3]) * float(x.split(",")[4]))))

# Join sales with products
joined_sales = sales_kv.join(products_kv)
result = joined_sales.map(lambda x: (x[1][0][0], x[1][1], x[1][0][1]))  # transaction_id, product_name, revenue

# Now join with users
users_kv = users_rdd.map(lambda x: (x.split(",")[0], (x.split(",")[1], x.split(",")[2])))
sales_users_kv = sales_data.map(lambda x: (x.split(",")[2], (x.split(",")[0], float(x.split(",")[3]) * float(x.split(",")[4]))))

final_join = sales_users_kv.join(users_kv).map(lambda x: (x[1][0][0], x[1][1][0], x[1][1][1], x[1][0][1]))  # transaction_id, user_name, location, revenue


Q4: Advanced RDD Operations

In [5]:
# Top 5 products by revenue
top5_products = total_rev_per_prod.takeOrdered(5, key=lambda x: -x[1])
print("Top 5 products by revenue:", top5_products)

# Total spending per user
user_spend = sales_data.map(lambda x: (x.split(",")[2], float(x.split(",")[3]) * float(x.split(",")[4])))
spend_by_user = user_spend.reduceByKey(lambda x, y: x + y)
print(spend_by_user.take(5))

Top 5 products by revenue: [('P2', 275.0), ('P4', 270.0), ('P3', 135.0), ('P1', 100.0)]
[('U3', 300.0), ('U1', 220.0), ('U2', 260.0)]


Section B: DataFrames and SQL

Q5: DataFrame Creation and Exploration

In [6]:
sales_df = spark.read.csv("sales.csv", header=True, inferSchema=True)
products_df = spark.read.csv("products.csv", header=True, inferSchema=True)
users_df = spark.read.csv("users.csv", header=True, inferSchema=True)

sales_df.printSchema()
products_df.printSchema()
users_df.printSchema()

sales_df.show(5)
products_df.show(5)
users_df.show(5)

root
 |-- transaction_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- location: string (nullable = true)

+--------------+----------+-------+--------+-----+
|transaction_id|product_id|user_id|quantity|price|
+--------------+----------+-------+--------+-----+
|             1|        P1|     U1|       1| 10.0|
|             2|        P2|     U2|       2| 20.0|
|             3|        P3|     U3|       3| 15.0|
|             4|        P4|     U1|       2| 30.0|
|             5|        P2|     U2|       1| 25.0|
+--------------+----------+-------+--------+-----+
only showing top 5 rows

+----------+------------+----------+
|product_

Q6: SQL Queries

In [7]:
sales_df.createOrReplaceTempView("sales")
products_df.createOrReplaceTempView("products")
users_df.createOrReplaceTempView("users")

# Total revenue
spark.sql("SELECT SUM(quantity * price) AS total_revenue FROM sales").show()

# Top 5 users
spark.sql("""
    SELECT u.user_name, SUM(s.quantity * s.price) AS total_spent
    FROM sales s JOIN users u ON s.user_id = u.user_id
    GROUP BY u.user_name
    ORDER BY total_spent DESC
    LIMIT 5
""").show()

# Count of products sold by category
spark.sql("""
    SELECT p.category, COUNT(s.product_id) AS product_sold
    FROM sales s JOIN products p ON s.product_id = p.product_id
    GROUP BY p.category
""").show()


+-------------+
|total_revenue|
+-------------+
|        780.0|
+-------------+

+---------+-----------+
|user_name|total_spent|
+---------+-----------+
|  Charlie|      300.0|
|      Bob|      260.0|
|    Alice|      220.0|
+---------+-----------+

+----------+------------+
|  category|product_sold|
+----------+------------+
|Stationery|          20|
+----------+------------+



Q7: Joins and Aggregations

In [8]:
enriched_df = sales_df.join(users_df, "user_id").join(products_df, "product_id")
enriched_df = enriched_df.withColumn("revenue", enriched_df["quantity"] * enriched_df["price"])
enriched_df.select("transaction_id", "user_name", "location", "product_name", "category", "quantity", "price", "revenue").show(5)

# Revenue per location
enriched_df.groupBy("location").sum("revenue").show()

# Avg quantity per category
enriched_df.groupBy("category").avg("quantity").show()


+--------------+---------+----------+------------+----------+--------+-----+-------+
|transaction_id|user_name|  location|product_name|  category|quantity|price|revenue|
+--------------+---------+----------+------------+----------+--------+-----+-------+
|             1|    Alice|  New York|         Pen|Stationery|       1| 10.0|   10.0|
|             2|      Bob|California|    Notebook|Stationery|       2| 20.0|   40.0|
|             3|  Charlie|     Texas|      Pencil|Stationery|       3| 15.0|   45.0|
|             4|    Alice|  New York|      Marker|Stationery|       2| 30.0|   60.0|
|             5|      Bob|California|    Notebook|Stationery|       1| 25.0|   25.0|
+--------------+---------+----------+------------+----------+--------+-----+-------+
only showing top 5 rows

+----------+------------+
|  location|sum(revenue)|
+----------+------------+
|     Texas|       300.0|
|California|       260.0|
|  New York|       220.0|
+----------+------------+

+----------+-------------+


Q8: Window Functions and Ranking

In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, sum as _sum

# Rank users by spending within location
user_spend_df = enriched_df.groupBy("user_id", "user_name", "location").agg(_sum("revenue").alias("total_spent"))
windowSpec = Window.partitionBy("location").orderBy(user_spend_df["total_spent"].desc())
ranked_users = user_spend_df.withColumn("rank", rank().over(windowSpec))
ranked_users.show()

# Top product per category by revenue
prod_rev_df = enriched_df.groupBy("category", "product_name").agg(_sum("revenue").alias("total_revenue"))
windowSpec2 = Window.partitionBy("category").orderBy(prod_rev_df["total_revenue"].desc())
top_products = prod_rev_df.withColumn("rank", rank().over(windowSpec2)).filter("rank = 1")
top_products.show()


+-------+---------+----------+-----------+----+
|user_id|user_name|  location|total_spent|rank|
+-------+---------+----------+-----------+----+
|     U2|      Bob|California|      260.0|   1|
|     U1|    Alice|  New York|      220.0|   1|
|     U3|  Charlie|     Texas|      300.0|   1|
+-------+---------+----------+-----------+----+

+----------+------------+-------------+----+
|  category|product_name|total_revenue|rank|
+----------+------------+-------------+----+
|Stationery|    Notebook|        275.0|   1|
+----------+------------+-------------+----+



Bonus: Caching and Parquet

In [10]:
# Cache enriched df
enriched_df.cache()

# Write to Parquet
enriched_df.write.mode("overwrite").parquet("output/enriched_data.parquet")

# Read back
parquet_df = spark.read.parquet("output/enriched_data.parquet")
parquet_df.groupBy("location").sum("revenue").show()


+----------+------------+
|  location|sum(revenue)|
+----------+------------+
|     Texas|       300.0|
|California|       260.0|
|  New York|       220.0|
+----------+------------+

