# Apache Spark for Complex Queries
 
## Data Management Project
 
 In this assignment, we use [Apache Spark](https://spark.apache.org/) to translate and execute selected queries from the TPCx-BB big data benchmark. # We use Python and Spark DataFrames to implement the queries.

### Import PySpark


 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# when run locally, spark has one (master) node with its own jvm and no cluster manager is created
spark = SparkSession.builder.master("local").appName("Homework 07").getOrCreate()

spark

## Helper Function

Load a table from TPCx-BB dataset as a Spark DataFrame.

In [32]:
# load table from TPCxx-BB dataset. Returning a dataframe read from parquet format
get_table = lambda table: spark.read.option("header", "true").parquet(
    f"TPCx-BB-dataset/{table}.ptxt"
)

### Query 0
Find the amount of items sold by their category.  
Only in certain categories sold in specific stores are considered,


In [37]:
# look into TCPx-BB-dataset/ directory to check all the available tables.
# gather tables needed

s = get_table("store_sales")
i = get_table("item")

q01_i_category_id_IN = [1, 2, 3]
q01_ss_store_sk_IN = [10, 20, 33, 40, 50]

query0_solution = (
    s.join(other=i, on=s.ss_item_sk == i.i_item_sk)
    .filter(condition=i.i_category_id.isin(q01_i_category_id_IN))
    .filter(condition=s.ss_store_sk.isin(q01_ss_store_sk_IN))
    .groupBy(i.i_category)
    .count()
    .select("i_category", "count")
)

query0_solution.show()

+--------------+-----+
|    i_category|count|
+--------------+-----+
|Home & Kitchen| 1975|
|         Books|14455|
|         Music|25060|
+--------------+-----+



# PROJECT QUERIES

Implementing assigned queries using Spark DataFrames.

## 1) Query 07
High-price items sold per state

In [297]:
# Load tables
a = get_table("customer_address")
c = get_table("customer")
s = get_table("store_sales")
i = get_table("item")
d = get_table("date_dim")

# Parameters
q07_YEAR = 2001
q07_MONTH = 1
q07_HIGHER_PRICE_RATIO = 1.20
q07_HAVING_COUNT_GE = 10
q07_LIMIT = 10

# Step 1: Compute average price per category and filter items > 20% above average
category_avg = (
    i.groupBy("i_category")
     .agg(F.avg("i_current_price").alias("avg_price"))
)

high_price_items = (
    i.join(category_avg, "i_category")
     .withColumn("threshold", F.col("avg_price") * q07_HIGHER_PRICE_RATIO)
     .filter(F.col("i_current_price") > F.col("threshold"))
     .select("i_item_sk")
     .distinct()
)

# Step 2: Filter dates for correct month/year
valid_dates = (
    d.filter((d.d_year == q07_YEAR) & (d.d_moy == q07_MONTH))
     .select("d_date_sk")
)

# Step 3: Apply joins and count sales per state

query7_solution = (
    s.join(c, s.ss_customer_sk == c.c_customer_sk)
     .join(a, c.c_current_addr_sk == a.ca_address_sk)
     .join(high_price_items, s.ss_item_sk == high_price_items.i_item_sk)
     .join(valid_dates, s.ss_sold_date_sk == valid_dates.d_date_sk)
     .filter(a.ca_state.isNotNull())
     .groupBy(a.ca_state)
     .agg(F.count("*").alias("cnt"))
     .filter(F.col("cnt") >= q07_HAVING_COUNT_GE)
     .orderBy(F.desc("cnt"), a.ca_state)
     .limit(q07_LIMIT)
)

query7_solution.show()

+--------+---+
|ca_state|cnt|
+--------+---+
|      TX|384|
|      VA|215|
|      KS|190|
|      MN|188|
|      AR|186|
|      GA|182|
|      KY|175|
|      NE|166|
|      IL|160|
|      WI|151|
+--------+---+



In [7]:
# check the result
!cat queries/q07/results/q07-result

TX,396
GA,247
VA,233
IL,205
KY,176
KS,170
NC,164
IA,163
MO,156
AL,139


## 3) Query 09
Filtered store sales aggregation

In [357]:
# Load tables
ss1 = get_table("store_sales")
d = get_table("date_dim")
ca1 = get_table("customer_address")
s = get_table("store")  
cd = get_table("customer_demographics")

# Parameters
q09_year = 2001

# Part 1 filters
part1_marital_status = "M"
part1_education_status = "High School"
part1_sales_price_min = 10
part1_sales_price_max = 100
part1_ca_country = "USA"
part1_ca_state_IN = ["TX", "CA", "NY"]
part1_net_profit_min = 100
part1_net_profit_max = 1000

# Part 2 filters
part2_marital_status = "S"
part2_education_status = "College"
part2_sales_price_min = 20
part2_sales_price_max = 200
part2_ca_country = "USA"
part2_ca_state_IN = ["FL", "GA"]
part2_net_profit_min = 50
part2_net_profit_max = 500

# Part 3 filters
part3_marital_status = "W"
part3_education_status = "Graduate"
part3_sales_price_min = 30
part3_sales_price_max = 300
part3_ca_country = "USA"
part3_ca_state_IN = ["IL", "OH"]
part3_net_profit_min = 20
part3_net_profit_max = 300

# Step 2: Apply joins
joined = (
    ss1.join(d, ss1.ss_sold_date_sk == d.d_date_sk)
        .join(ca1, ss1.ss_addr_sk == ca1.ca_address_sk)
        .join(s, ss1.ss_store_sk == s.s_store_sk)
        .join(cd, ss1.ss_cdemo_sk == cd.cd_demo_sk)
        .filter(d.d_year == q09_year)
)

# Step 3: Define filters
part1_condition = (
    (cd.cd_marital_status == "S") &
    (cd.cd_education_status == "Secondary") &
    (ss1.ss_sales_price.between(10, 110)) &
    (ca1.ca_country == "United States") &  
    (ca1.ca_state.isin(["TX", "CA", "NY"])) &
    (ss1.ss_net_profit.between(100, 1000))
)

part2_condition = (
    (cd.cd_marital_status == "S") &
    (cd.cd_education_status == "College") &
    (ss1.ss_sales_price.between(20, 200)) &
    (ca1.ca_country == "United States") &  
    (ca1.ca_state.isin(["FL", "GA"])) &
    (ss1.ss_net_profit.between(50, 500))
)

part3_condition = (
    (cd.cd_marital_status == "M") &
    (cd.cd_education_status == "2 yr Degree") &
    (ss1.ss_sales_price.between(30, 300)) &
    (ca1.ca_country == "United States") &  
    (ca1.ca_state.isin(["IL", "OH"])) &
    (ss1.ss_net_profit.between(20, 267))
)

# Step 4: Apply filters and compute result
query9_solution = (
    joined.filter(part1_condition | part2_condition | part3_condition)
          .agg(F.sum(F.col("ss_quantity").cast("long")).alias("totalSales"))
)
query9_solution.show()

+----------+
|totalSales|
+----------+
|      5908|
+----------+



## 4) Query 20
Customer order-return ratios

In [291]:
# implementation
# Load tables
s = get_table("store_sales")
r = get_table("store_returns")

# Step 1: Aggregate orders (per customer)
orders = (
    s.groupBy("ss_customer_sk")
     .agg(
         F.countDistinct("ss_ticket_number").alias("orders_count"),
         F.count("ss_item_sk").alias("orders_items"),
         F.sum("ss_net_paid").alias("orders_money")
     )
)

# Step 2: Aggregate returns (per customer)
returns = (
    r.groupBy("sr_customer_sk")
     .agg(
         F.countDistinct("sr_ticket_number").alias("returns_count"),
         F.count("sr_item_sk").alias("returns_items"),
         F.sum("sr_return_amt").alias("returns_money")
     )
)

# Step 3: Join and compute ratios
segmentation = (
    orders.join(returns, orders.ss_customer_sk == returns.sr_customer_sk, how="left")
     .select(
         orders.ss_customer_sk.alias("user_sk"),
         F.round(
             F.when(
                 (F.col("returns_count").isNull()) |
                 (F.col("orders_count").isNull()) |
                 (F.col("returns_count") / F.col("orders_count")).isNull(),
                 0.0
             ).otherwise(F.col("returns_count") / F.col("orders_count")),
             7
         ).alias("orderRatio"),
         F.round(
             F.when(
                 (F.col("returns_items").isNull()) |
                 (F.col("orders_items").isNull()) |
                 (F.col("returns_items") / F.col("orders_items")).isNull(),
                 0.0
             ).otherwise(F.col("returns_items") / F.col("orders_items")),
             7
         ).alias("itemsRatio"),
         F.round(
             F.when(
                 (F.col("returns_money").isNull()) |
                 (F.col("orders_money").isNull()) |
                 (F.col("returns_money") / F.col("orders_money")).isNull(),
                 0.0
             ).otherwise(F.col("returns_money") / F.col("orders_money")),
             7
         ).alias("monetaryRatio"),
         F.round(
             F.when(F.col("returns_count").isNull(), 0.0)
              .otherwise(F.col("returns_count")),
             0
         ).alias("frequency")
     )
     .orderBy("user_sk")
)

segmentation.show()

+-------+----------+----------+-------------+---------+
|user_sk|orderRatio|itemsRatio|monetaryRatio|frequency|
+-------+----------+----------+-------------+---------+
|      2|       0.1| 0.0595745|     0.051983|      3.0|
|      3|       0.0|       0.0|          0.0|      0.0|
|      4|       0.0|       0.0|          0.0|      0.0|
|      7| 0.3333333|      0.04|    0.0175497|      1.0|
|     12|       0.0|       0.0|          0.0|      0.0|
|     16|       0.0|       0.0|          0.0|      0.0|
|     17|       1.0| 0.5714286|    0.2275411|      1.0|
|     19|       0.0|       0.0|          0.0|      0.0|
|     22|       0.0|       0.0|          0.0|      0.0|
|     23|       0.5| 0.1666667|    0.1366843|      1.0|
|     30|       0.0|       0.0|          0.0|      0.0|
|     31|       1.0|      0.25|    0.4410076|      1.0|
|     32|       0.0|       0.0|          0.0|      0.0|
|     37|       0.0|       0.0|          0.0|      0.0|
|     38|       0.0|       0.0|          0.0|   