# Highest-Grossing Items  
**Amazon SQL Interview Question**

---

### Question  
Assume you're given a table containing data on Amazon customers and their spending on products in different categories. Write a query to identify the top two highest-grossing products within each category in the year **2022**. The output should include the **category**, **product**, and **total spend**.

---

### Table: `product_spend`

| Column Name        | Type       |
|--------------------|------------|
| category           | string     |
| product            | string     |
| user_id            | integer    |
| spend              | decimal    |
| transaction_date   | timestamp  |

---

### Example Input:

| category     | product           | user_id | spend   | transaction_date     |
|--------------|-------------------|---------|---------|-----------------------|
| appliance    | refrigerator       | 165     | 246.00  | 12/26/2021 12:00:00  |
| appliance    | refrigerator       | 123     | 299.99  | 03/02/2022 12:00:00  |
| appliance    | washing machine    | 123     | 219.80  | 03/02/2022 12:00:00  |
| electronics  | vacuum             | 178     | 152.00  | 04/05/2022 12:00:00  |
| electronics  | wireless headset   | 156     | 249.90  | 07/08/2022 12:00:00  |
| electronics  | vacuum             | 145     | 189.00  | 07/15/2022 12:00:00  |

---

### Example Output:

| category     | product           | total_spend |
|--------------|-------------------|-------------|
| appliance    | refrigerator       | 299.99      |
| appliance    | washing machine    | 219.80      |
| electronics  | vacuum             | 341.00      |
| electronics  | wireless headset   | 249.90      |

---

### Explanation  
Within the "appliance" category, the top two highest-grossing products are "refrigerator" and "washing machine."  
In the "electronics" category, the top two highest-grossing products are "vacuum" and "wireless headset."

---


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime

# Create Spark session
spark = SparkSession.builder.master('local[1]').appName("HighestGrossingItems").getOrCreate()

# Define schema
schema = StructType([
    StructField("category", StringType(), True),
    StructField("product", StringType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("spend", FloatType(), True),
    StructField("transaction_date", TimestampType(), True)
])

# Sample data
data = [
    ("appliance", "refrigerator", 165, 246.00, datetime.strptime("12/26/2021 12:00:00", "%m/%d/%Y %H:%M:%S")),
    ("appliance", "refrigerator", 123, 299.99, datetime.strptime("03/02/2022 12:00:00", "%m/%d/%Y %H:%M:%S")),
    ("appliance", "washing machine", 123, 219.80, datetime.strptime("03/02/2022 12:00:00", "%m/%d/%Y %H:%M:%S")),
    ("electronics", "vacuum", 178, 152.00, datetime.strptime("04/05/2022 12:00:00", "%m/%d/%Y %H:%M:%S")),
    ("electronics", "wireless headset", 156, 249.90, datetime.strptime("07/08/2022 12:00:00", "%m/%d/%Y %H:%M:%S")),
    ("electronics", "vacuum", 145, 189.00, datetime.strptime("07/15/2022 12:00:00", "%m/%d/%Y %H:%M:%S")),
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show DataFrame
df.show(truncate=False)


+-----------+----------------+-------+------+-------------------+
|category   |product         |user_id|spend |transaction_date   |
+-----------+----------------+-------+------+-------------------+
|appliance  |refrigerator    |165    |246.0 |2021-12-26 12:00:00|
|appliance  |refrigerator    |123    |299.99|2022-03-02 12:00:00|
|appliance  |washing machine |123    |219.8 |2022-03-02 12:00:00|
|electronics|vacuum          |178    |152.0 |2022-04-05 12:00:00|
|electronics|wireless headset|156    |249.9 |2022-07-08 12:00:00|
|electronics|vacuum          |145    |189.0 |2022-07-15 12:00:00|
+-----------+----------------+-------+------+-------------------+



In [2]:
from pyspark.sql.window import Window

winspec= Window.partitionBy('category').orderBy('total_spend')

df.where(year('transaction_date')==2022)\
  .groupBy('category','product').agg(round(sum('spend'),2).alias('total_spend'))\
  .withColumn('rnk',row_number().over(winspec))\
  .where('rnk<3')\
  .drop('rnk').show()

+-----------+----------------+-----------+
|   category|         product|total_spend|
+-----------+----------------+-----------+
|  appliance| washing machine|      219.8|
|  appliance|    refrigerator|     299.99|
|electronics|wireless headset|      249.9|
|electronics|          vacuum|      341.0|
+-----------+----------------+-----------+



In [3]:
df.createOrReplaceTempView('product_spend')

spark.sql('''
with cte AS
(SELECT category, product,round(sum(spend),2) as total_spend
,row_number() over(PARTITION BY category ORDER BY sum(spend) DESC) as rnk
FROM product_spend
WHERE EXTRACT(YEAR FROM transaction_date)=2022
group by category, product)


SELECT category, product, total_spend
FROM cte
where rnk<3''').show()

+-----------+----------------+-----------+
|   category|         product|total_spend|
+-----------+----------------+-----------+
|  appliance|    refrigerator|     299.99|
|  appliance| washing machine|      219.8|
|electronics|          vacuum|      341.0|
|electronics|wireless headset|      249.9|
+-----------+----------------+-----------+

