MAJOR_ASSIGNMENT 1 [BDAWAS]

 Initialize Spark and Load Data


(a) Initialize a Spark Session with Hive Support


In [2]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("E-commerce Sales Analysis") \
    .config("spark.sql.catalogImplementation", "hive") \
    .getOrCreate()


(b) Read the CSV File into a DataFrame



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Initialize Spark Session
spark = SparkSession.builder.appName("E-commerce Sales Analysis").getOrCreate()

# Define Schema
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("date", StringType(), True)
])

# Sample Data
data = [
    (101, 1001, 2001, "Electronics", 3, 1500.0, "2025-03-10"),
    (102, 1002, 2002, "Clothing", 1, 500.0, "2025-03-11"),
    (103, 1003, 2003, "Electronics", 2, 1200.0, "2025-03-12"),
    (104, 1004, 2004, "Home Decor", 4, 300.0, "2025-03-13"),
    (105, 1005, 2005, "Clothing", 2, 800.0, "2025-03-14"),
    (106, 1006, 2006, "Electronics", 1, 900.0, "2025-03-15"),
    (107, 1007, 2007, "Furniture", 2, 2500.0, "2025-03-16"),
    (108, 1008, 2008, "Clothing", 3, 600.0, "2025-03-17"),
    (109, 1009, 2009, "Home Decor", 1, 350.0, "2025-03-18"),
    (110, 1010, 2010, "Electronics", 2, 1100.0, "2025-03-19")
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show DataFrame
df.show()


+--------+-----------+----------+-----------+--------+------+----------+
|order_id|customer_id|product_id|   category|quantity| price|      date|
+--------+-----------+----------+-----------+--------+------+----------+
|     101|       1001|      2001|Electronics|       3|1500.0|2025-03-10|
|     102|       1002|      2002|   Clothing|       1| 500.0|2025-03-11|
|     103|       1003|      2003|Electronics|       2|1200.0|2025-03-12|
|     104|       1004|      2004| Home Decor|       4| 300.0|2025-03-13|
|     105|       1005|      2005|   Clothing|       2| 800.0|2025-03-14|
|     106|       1006|      2006|Electronics|       1| 900.0|2025-03-15|
|     107|       1007|      2007|  Furniture|       2|2500.0|2025-03-16|
|     108|       1008|      2008|   Clothing|       3| 600.0|2025-03-17|
|     109|       1009|      2009| Home Decor|       1| 350.0|2025-03-18|
|     110|       1010|      2010|Electronics|       2|1100.0|2025-03-19|
+--------+-----------+----------+-----------+------

In [4]:
spark = SparkSession.builder.appName("E-commerce Sales Analysis").getOrCreate()
df = spark.createDataFrame(data, schema=schema)


Q2: Create a Temporary View and Filter High-Value Orders


In [5]:
df.createOrReplaceTempView("sales_data")


(b) Write a Spark SQL Query to Filter High-Value Orders


In [6]:
high_value_orders = spark.sql("""
    SELECT * FROM sales_data
    WHERE price > 1000 AND quantity >= 2
""")

# Display Filtered Rows
high_value_orders.show()


+--------+-----------+----------+-----------+--------+------+----------+
|order_id|customer_id|product_id|   category|quantity| price|      date|
+--------+-----------+----------+-----------+--------+------+----------+
|     101|       1001|      2001|Electronics|       3|1500.0|2025-03-10|
|     103|       1003|      2003|Electronics|       2|1200.0|2025-03-12|
|     107|       1007|      2007|  Furniture|       2|2500.0|2025-03-16|
|     110|       1010|      2010|Electronics|       2|1100.0|2025-03-19|
+--------+-----------+----------+-----------+--------+------+----------+



Q3: Aggregate Sales by Category


(a) Using Spark SQL to Calculate Aggregates

In [7]:
category_sales_summary = spark.sql("""
    SELECT category,
           SUM(quantity * price) AS total_revenue,
           AVG(price) AS average_price,
           COUNT(order_id) AS total_orders
    FROM sales_data
    GROUP BY category
""")

# Display Aggregated Results
category_sales_summary.show()


+-----------+-------------+-----------------+------------+
|   category|total_revenue|    average_price|total_orders|
+-----------+-------------+-----------------+------------+
|Electronics|      10000.0|           1175.0|           4|
|   Clothing|       3900.0|633.3333333333334|           3|
| Home Decor|       1550.0|            325.0|           2|
|  Furniture|       5000.0|           2500.0|           1|
+-----------+-------------+-----------------+------------+



Q4: Save Filtered and Aggregated Results to CSV


In [8]:
# Save High-Value Orders
high_value_orders.write.csv("high_value_orders.csv", header=True, mode="overwrite")

# Save Aggregated Sales Data
category_sales_summary.write.csv("category_sales_summary.csv", header=True, mode="overwrite")
