In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

In [None]:
spark = SparkSession.builder.appName("Retail-Data-PySpark").getOrCreate()
INPUT_ROOT = "/content"

In [None]:
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", TimestampType(), True),
    StructField("order_customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)
])

order_items_schema = StructType([
    StructField("order_item_id", IntegerType(), True),
    StructField("order_item_order_id", IntegerType(), True),
    StructField("order_item_product_id", IntegerType(), True),
    StructField("order_item_quantity", IntegerType(), True),
    StructField("order_item_subtotal", DoubleType(), True),
    StructField("order_item_product_price", DoubleType(), True)
])

customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_fname", StringType(), True),
    StructField("customer_lname", StringType(), True),
    StructField("customer_email", StringType(), True),
    StructField("customer_password", StringType(), True),
    StructField("customer_street", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_zipcode", StringType(), True)
])

categories_schema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("category_department_id", IntegerType(), True),
    StructField("category_name", StringType(), True)
])

products_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_category_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("product_description", StringType(), True),
    StructField("product_price", DoubleType(), True),
    StructField("product_image", StringType(), True)
])

departments_schema = StructType([
    StructField("department_id", IntegerType(), True),
    StructField("department_name", StringType(), True)
])

In [None]:
orders = spark.read.option("header", False).schema(orders_schema).csv(f"{INPUT_ROOT}/orders.csv")
order_items = spark.read.option("header", False).schema(order_items_schema).csv(f"{INPUT_ROOT}/order_items.csv")
customers = spark.read.option("header", False).schema(customers_schema).csv(f"{INPUT_ROOT}/customers.csv")
categories = spark.read.option("header", False).schema(categories_schema).csv(f"{INPUT_ROOT}/categories.csv")
products = spark.read.option("header", False).schema(products_schema).csv(f"{INPUT_ROOT}/products.csv")
departments = spark.read.option("header", False).schema(departments_schema).csv(f"{INPUT_ROOT}/departments.csv")


In [None]:
orders.createOrReplaceTempView("orders")
order_items.createOrReplaceTempView("order_items")
customers.createOrReplaceTempView("customers")
categories.createOrReplaceTempView("categories")
products.createOrReplaceTempView("products")
departments.createOrReplaceTempView("departments")

# Retail SQL → Spark SQL Queries

Each query is explained with a Markdown cell and executed in Spark SQL using `spark.sql()`.

### Distinct order status
Taking distinct order statuses from the orders table and sorting them.

In [None]:
orders.select('order_status').distinct().orderBy('order_status').show()

### Orders with COMPLETE status

In [None]:
orders.filter(F.col('order_status') == 'COMPLETE').show()

### Orders with CLOSED status

In [None]:
orders.filter(F.col('order_status') == 'CLOSED').show()

### Orders with CLOSED or COMPLETE status

In [None]:
orders.filter(F.col('order_status').isin('CLOSED','COMPLETE')).show()

### Count of orders

In [None]:
orders.agg(F.count('*').alias('order_count')).show()

### Revenue per order

In [None]:
(
    order_items.groupBy('order_item_order_id')
    .agg(F.round(F.sum('order_item_subtotal'), 2).alias('order_revenue'))
    .orderBy('order_item_order_id')
).show()

### Orders with revenue >=2000

In [None]:
(
    order_items.groupBy('order_item_order_id')
    .agg(F.round(F.sum('order_item_subtotal'), 2).alias('order_revenue'))
    .orderBy('order_item_order_id')
).show()

### Inner Join Orders + Order Items

In [None]:
(
    orders.alias('o')
    .join(order_items.alias('oi'), F.col('o.order_id') == F.col('oi.order_item_order_id'), 'inner')
    .select('o.order_date', 'oi.order_item_product_id', 'oi.order_item_subtotal')
).show()

### Left Outer Join Orders + Order Items

In [None]:
(
    orders.alias('o')
    .join(order_items.alias('oi'), F.col('o.order_id') == F.col('oi.order_item_order_id'), 'inner')
    .select('o.order_id', 'o.order_date', 'oi.order_item_id', 'oi.order_item_product_id', 'oi.order_item_subtotal')
    .orderBy('o.order_id')
).show()

In [None]:
daily_revenue = (
    orders.alias('o')
    .join(order_items.alias('oi'), F.col('o.order_id') == F.col('oi.order_item_order_id'), 'inner')
    .filter(F.col('o.order_status').isin('COMPLETE','CLOSED'))
    .groupBy(F.to_date('o.order_date').alias('order_date'))
    .agg(F.round(F.sum('oi.order_item_subtotal'), 2).alias('order_revenue'))
)
daily_revenue.createOrReplaceTempView('daily_revenue')
daily_revenue.orderBy('order_date').show()

### Daily Product Revenue temp view

In [None]:
from pyspark.sql.window import Window
daily_product_revenue = (
    orders.alias('o')
    .join(order_items.alias('oi'), F.col('o.order_id') == F.col('oi.order_item_order_id'), 'inner')
    .filter(F.col('o.order_status').isin('COMPLETE','CLOSED'))
    .groupBy(F.to_date('o.order_date').alias('order_date'), F.col('oi.order_item_product_id'))
    .agg(F.round(F.sum('oi.order_item_subtotal'), 2).alias('order_revenue'))
)
daily_product_revenue.createOrReplaceTempView('daily_product_revenue')

w = Window.partitionBy('order_date').orderBy(F.col('order_revenue').desc())
(
    daily_product_revenue
    .filter(F.date_format('order_date','yyyy-MM')=='2014-01')
    .withColumn('rnk', F.rank().over(w))
    .withColumn('drnk', F.dense_rank().over(w))
    .filter(F.col('drnk') <= 5)
    .orderBy('order_date', F.col('order_revenue').desc())
).show()
