<a href="https://colab.research.google.com/github/akshathaprabhu22/UMD-SQL-Softball-database-creation-project/blob/main/SparkSQL%20vs%20Pandas%20vs%20PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Project Goal:**

The project aims to compare the syntax and performance differences among Spark SQL, PySpark, and Pandas when handling big data. It involves writing equivalent code in each of these tools to perform a specific data processing task and understanding differences in syntaxes and code efficiences.

Data source: Bike Store Relational Database | SQL
Sample database from sqlservertutorial.net for a retail bike store.
https://www.kaggle.com/datasets/dillonmyrick/bike-store-sample-database

# **Importing Libraries:**

In [117]:
# install PySpark
!pip install -q pyspark

In [118]:
# create a SparkSession from pyspark.sql, which is the entry point to Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Python_Explore').getOrCreate()

In [119]:
#installing pandas
import pandas as pd


Loading Datasets in Apache Spark,Pandas and PySpark

In [120]:
# Read the text file into a DataFrame
spark.read.option("header", "true").csv("brands.csv").createOrReplaceTempView("Brands")
spark.read.option("header", "true").csv("categories.csv").createOrReplaceTempView("Categories")
spark.read.option("header", "true").csv("customers.csv").createOrReplaceTempView("Customers")
spark.read.option("header", "true").csv("order_items.csv").createOrReplaceTempView("Order_Items")
spark.read.option("header", "true").csv("orders.csv").createOrReplaceTempView("Orders")
spark.read.option("header", "true").csv("products.csv").createOrReplaceTempView("Products")
spark.read.option("header", "true").csv("staffs.csv").createOrReplaceTempView("Staffs")
spark.read.option("header", "true").csv("stocks.csv").createOrReplaceTempView("Stocks")
spark.read.option("header", "true").csv("stores.csv").createOrReplaceTempView("Stores")


In [121]:
#704

df_brands = pd.read_csv('brands.csv', header=0)
df_categories = pd.read_csv('categories.csv', header=0)
df_customers = pd.read_csv('customers.csv', header=0)
df_order_items = pd.read_csv('order_items.csv', header=0)
df_orders = pd.read_csv('orders.csv', header=0)
df_products = pd.read_csv('products.csv', header=0)
df_staffs = pd.read_csv('staffs.csv', header=0)
df_stocks = pd.read_csv('stocks.csv', header=0)
df_stores = pd.read_csv('stores.csv', header=0)


In [122]:
# Read CSV files into Spark DataFrames
sp_brands = spark.read.option("header", "true").csv("brands.csv")
sp_categories = spark.read.option("header", "true").csv("categories.csv")
sp_customers = spark.read.option("header", "true").csv("customers.csv")
sp_order_items = spark.read.option("header", "true").csv("order_items.csv")
sp_orders = spark.read.option("header", "true").csv("orders.csv")
sp_products = spark.read.option("header", "true").csv("products.csv")
sp_staffs = spark.read.option("header", "true").csv("staffs.csv")
sp_stocks = spark.read.option("header", "true").csv("stocks.csv")
sp_stores = spark.read.option("header", "true").csv("stores.csv")

# **Dataset Exploration:**

1. How many units are ordered per month per year

In [123]:
#Spark
spark.sql('''SELECT
    CONCAT(date_format(ORDER_DATE, 'MMM'), "-", YEAR(ORDER_DATE)) as Month_Year,
    COUNT(ORDER_ID) AS Order_Count
    FROM ORDERS
    GROUP BY 1
    ORDER BY 1 ASC''').show()

+----------+-----------+
|Month_Year|Order_Count|
+----------+-----------+
|  Apr-2016|         43|
|  Apr-2017|         57|
|  Apr-2018|        125|
|  Aug-2016|         63|
|  Aug-2017|         65|
|  Aug-2018|          2|
|  Dec-2016|         55|
|  Dec-2017|         47|
|  Dec-2018|          1|
|  Feb-2016|         49|
|  Feb-2017|         57|
|  Feb-2018|         35|
|  Jan-2016|         50|
|  Jan-2017|         50|
|  Jan-2018|         52|
|  Jul-2016|         50|
|  Jul-2017|         52|
|  Jul-2018|          4|
|  Jun-2016|         45|
|  Jun-2017|         63|
+----------+-----------+
only showing top 20 rows



In [124]:
#Pandas DataFrame APIs without Spark
df_orders.head()
df_orders['order_date'] = pd.to_datetime(df_orders['order_date'])

df_orders['Month_Year'] = df_orders['order_date'].dt.strftime('%b') + '-' + \
df_orders['order_date'].dt.strftime('%Y').astype(str)

order_quant = df_orders.groupby('Month_Year', as_index = False)\
    .agg({'order_id': 'nunique'})\
    .rename(columns = {'order_id' : 'Order Count'})\
    .sort_values('Month_Year')
# tot_quant['Total Ordered Quantity'] = tot_quant['Total Ordered Quantity'].astype(int)
order_quant.head(12)

Unnamed: 0,Month_Year,Order Count
0,Apr-2016,43
1,Apr-2017,57
2,Apr-2018,125
3,Aug-2016,63
4,Aug-2017,65
5,Aug-2018,2
6,Dec-2016,55
7,Dec-2017,47
8,Dec-2018,1
9,Feb-2016,49


In [125]:
#Spark DataFrame and SQL APIs
from pyspark.sql.functions import count, concat, date_format, year,expr

result_df = sp_orders \
    .groupBy(expr("concat(date_format(order_date, 'MMM'), '-', year(order_date))").alias("Month_Year")) \
    .agg(count('order_id').alias("Order_Count")) \
    .orderBy("Month_Year")

# Show the result
result_df.show()


+----------+-----------+
|Month_Year|Order_Count|
+----------+-----------+
|  Apr-2016|         43|
|  Apr-2017|         57|
|  Apr-2018|        125|
|  Aug-2016|         63|
|  Aug-2017|         65|
|  Aug-2018|          2|
|  Dec-2016|         55|
|  Dec-2017|         47|
|  Dec-2018|          1|
|  Feb-2016|         49|
|  Feb-2017|         57|
|  Feb-2018|         35|
|  Jan-2016|         50|
|  Jan-2017|         50|
|  Jan-2018|         52|
|  Jul-2016|         50|
|  Jul-2017|         52|
|  Jul-2018|          4|
|  Jun-2016|         45|
|  Jun-2017|         63|
+----------+-----------+
only showing top 20 rows



2. Which products are most frequently ordered and what categories do they belong to?

In [126]:
spark.sql('''
    SELECT
        P.PRODUCT_ID,
        P.PRODUCT_NAME,
        C.CATEGORY_NAME,
        COUNT(O.ORDER_ID) as ORDER_COUNT
    FROM
        Products p
    INNER JOIN
        Order_items o
    ON
        p.product_id = o.product_id
    INNER JOIN CATEGORIES C ON
        p.category_id = C.category_id
        GROUP BY 1,2,3
        ORDER BY 4 DESC
        LIMIT 5''').show()

+----------+--------------------+-----------------+-----------+
|PRODUCT_ID|        PRODUCT_NAME|    CATEGORY_NAME|ORDER_COUNT|
+----------+--------------------+-----------------+-----------+
|         6|Surly Ice Cream T...|   Mountain Bikes|        110|
|        12|Electra Townie Or...|Cruisers Bicycles|        104|
|        13|Electra Cruiser 1...|Cruisers Bicycles|        103|
|         7|Trek Slash 8 27.5...|   Mountain Bikes|        101|
|         9|Trek Conduit+ - 2016|   Electric Bikes|        101|
+----------+--------------------+-----------------+-----------+



In [127]:
result_df = pd.merge(df_products, df_order_items, on="product_id", how="inner") \
    .merge(df_categories, on="category_id", how="inner") \
    .groupby(["product_id", "product_name", "category_name"], as_index=False) \
    .size() \
    .rename(columns={"product_id": "PRODUCT_ID", "product_name": "PRODUCT_NAME", "category_name": "CATEGORY_NAME", "size": "ORDER_COUNT"}) \
    .sort_values(by="ORDER_COUNT", ascending=False) \
    .head(5)

# Show the result
result_df

Unnamed: 0,PRODUCT_ID,PRODUCT_NAME,CATEGORY_NAME,ORDER_COUNT
4,6,Surly Ice Cream Truck Frameset - 2016,Mountain Bikes,110
10,12,Electra Townie Original 21D - 2016,Cruisers Bicycles,104
11,13,Electra Cruiser 1 (24-Inch) - 2016,Cruisers Bicycles,103
5,7,Trek Slash 8 27.5 - 2016,Mountain Bikes,101
7,9,Trek Conduit+ - 2016,Electric Bikes,101


In [128]:
#Pyspark
from pyspark.sql.functions import count, desc
sp_products.join(sp_order_items, "product_id", "inner")\
    .join(sp_categories, "category_id", "inner") \
    .groupBy("product_id", "product_name", "category_name") \
    .agg(count("*").alias("order_count")) \
    .orderBy(desc("order_count")) \
    .limit(5).show()

+----------+--------------------+-----------------+-----------+
|product_id|        product_name|    category_name|order_count|
+----------+--------------------+-----------------+-----------+
|         6|Surly Ice Cream T...|   Mountain Bikes|        110|
|        12|Electra Townie Or...|Cruisers Bicycles|        104|
|        13|Electra Cruiser 1...|Cruisers Bicycles|        103|
|         7|Trek Slash 8 27.5...|   Mountain Bikes|        101|
|         9|Trek Conduit+ - 2016|   Electric Bikes|        101|
+----------+--------------------+-----------------+-----------+



3. What are the top 5 cities by order percentage?



In [129]:
spark.sql('''
    SELECT
        C.CITY,
        C.STATE,
        COUNT(O.ORDER_ID) as ORDER_COUNT,
        ROUND(COUNT(O.ORDER_ID) * 100.0 / SUM(COUNT(O.ORDER_ID)) OVER (),2) AS ORDER_PERCENTAGE
    FROM
        Customers C
    INNER JOIN
        Orders O
    ON
        C.customer_id = O.customer_id
    GROUP BY
        C.CITY,C.STATE
    ORDER BY
        ORDER_COUNT DESC
    LIMIT 5''').show()

+------------+-----+-----------+----------------+
|        CITY|STATE|ORDER_COUNT|ORDER_PERCENTAGE|
+------------+-----+-----------+----------------+
|Mount Vernon|   NY|         20|            1.24|
|   Scarsdale|   NY|         20|            1.24|
|Ballston Spa|   NY|         17|            1.05|
| Canandaigua|   NY|         16|            0.99|
|  San Angelo|   TX|         16|            0.99|
+------------+-----+-----------+----------------+



In [130]:
#Pandas DataFrame APIs without Spark
result_df = (
    pd.merge(df_customers, df_orders, on='customer_id')\
    .groupby(['city', 'state'], as_index=False)\
    .agg(ORDER_COUNT=('order_id', 'count'))\
    .assign(ORDER_PERCENTAGE=lambda x: (x['ORDER_COUNT'] * 100.0 / x['ORDER_COUNT'].sum()).round(2))\
    .sort_values(by='ORDER_COUNT', ascending=False)\
    .head(5)
)

result_df

Unnamed: 0,city,state,ORDER_COUNT,ORDER_PERCENTAGE
159,Scarsdale,NY,20,1.24
102,Mount Vernon,NY,20,1.24
12,Ballston Spa,NY,17,1.05
23,Canandaigua,NY,16,0.99
148,San Angelo,TX,16,0.99


In [131]:
from pyspark.sql import Window

window_spec = Window.partitionBy()
sp_customers.join(sp_orders, on='customer_id') \
            .groupBy('CITY', 'STATE') \
            .agg(count('ORDER_ID').alias('ORDER_COUNT')) \
            .withColumn('ORDER_PERCENTAGE', round(col('ORDER_COUNT') * 100.0 / sum('ORDER_COUNT').over(window_spec), 2)) \
            .orderBy(desc('ORDER_COUNT')) \
            .limit(5).show(truncate = False)

+------------+-----+-----------+----------------+
|CITY        |STATE|ORDER_COUNT|ORDER_PERCENTAGE|
+------------+-----+-----------+----------------+
|Mount Vernon|NY   |20         |1.24            |
|Scarsdale   |NY   |20         |1.24            |
|Ballston Spa|NY   |17         |1.05            |
|Canandaigua |NY   |16         |0.99            |
|San Angelo  |TX   |16         |0.99            |
+------------+-----+-----------+----------------+



4. What are the top 5 brands by revenue?


In [132]:
#SparkSQL
spark.sql("""
    SELECT product_name, brand_name, ROUND(SUM(item_revenue), 2) AS item_revenue
    FROM (
        SELECT p.product_name, b.brand_name, (oi.quantity * (oi.list_price * (1 - oi.discount))) AS item_revenue
        FROM order_items oi
        LEFT JOIN products p ON oi.product_id = p.product_id
        LEFT JOIN brands b ON p.brand_id = b.brand_id
    ) AS subquery
    GROUP BY product_name, brand_name
    ORDER BY item_revenue DESC
    LIMIT 5
""").show()

+--------------------+----------+------------+
|        product_name|brand_name|item_revenue|
+--------------------+----------+------------+
|Trek Slash 8 27.5...|      Trek|   555558.61|
|Trek Conduit+ - 2016|      Trek|    389248.7|
|Trek Fuel EX 8 29...|      Trek|   368472.73|
|Surly Straggler 6...|     Surly|   226765.55|
|Trek Domane SLR 6...|      Trek|   211584.62|
+--------------------+----------+------------+



In [133]:
#Pandas
result = df_order_items.merge(df_products, on='product_id', how='left') \
                       .merge(df_brands, on='brand_id', how='left') \
                       .assign(effective_price=lambda x: x['list_price_x'] * (1 - x['discount'])) \
                       .assign(item_revenue=lambda x: x['quantity'] * x['effective_price']) \
                       .groupby(['product_name', 'brand_name']) \
                       .agg(item_revenue=('item_revenue', 'sum')) \
                       .reset_index() \
                       .sort_values(by='item_revenue', ascending=False) \
                       .head(5)
result['item_revenue'] = result['item_revenue'].round(2)
result

Unnamed: 0,product_name,brand_name,item_revenue
261,Trek Slash 8 27.5 - 2016,Trek,555558.61
160,Trek Conduit+ - 2016,Trek,389248.7
212,Trek Fuel EX 8 29 - 2016,Trek,368472.73
146,Surly Straggler 650b - 2016,Surly,226765.55
193,Trek Domane SLR 6 Disc - 2017,Trek,211584.62


In [134]:
#Pyspark
from pyspark.sql.functions import col, sum, round

sp_order_items.join(sp_products.withColumnRenamed('list_price', 'product_list_price'), on='product_id', how='left') \
              .join(sp_brands, on='brand_id', how='left') \
              .withColumn('effective_price', (col('product_list_price') * (1 - col('discount')))) \
              .withColumn('item_revenue', (col('quantity') * col('effective_price'))) \
              .groupBy('product_name', 'brand_name') \
              .agg(sum('item_revenue').alias('item_revenue')) \
              .withColumn('item_revenue', round('item_revenue', 2)) \
              .orderBy('item_revenue', ascending=False) \
              .limit(5) \
              .show()

+--------------------+----------+------------+
|        product_name|brand_name|item_revenue|
+--------------------+----------+------------+
|Trek Slash 8 27.5...|      Trek|   555558.61|
|Trek Conduit+ - 2016|      Trek|    389248.7|
|Trek Fuel EX 8 29...|      Trek|   368472.73|
|Surly Straggler 6...|     Surly|   226765.55|
|Trek Domane SLR 6...|      Trek|   211584.62|
+--------------------+----------+------------+

