In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import datetime

spark = SparkSession.builder.getOrCreate()

# ------------------- Customers -------------------
customers_data = [
    (1, "Alice", "alice@example.com", "USA"),
    (2, "Bob", "bob@example.com", "UK"),
    (3, "Charlie", None, "India"),
    (4, "David", "david@example.com", "Canada"),
    (5, "Eva", "eva@example.com", None),
    (6, "Frank", "frank@example.com", "Germany"),
    (7, "Grace", "grace@example.com", "France"),
    (8, "Henry", "henry@example.com", "Canada"),
    (9, "Ivy", "ivy@example.com", "Russia"),
    (10, "John", None, "Australia"),
]
customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("country", StringType(), True),
])
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)
customers_df.createOrReplaceTempView("customers")

# ------------------- Orders -------------------
orders_data = [
    (201, 1, datetime.date(2023, 1, 5), "Shipped"),
    (202, 2, datetime.date(2023, 1, 6), "Processing"),
    (203, 3, datetime.date(2023, 1, 7), "Delivered"),
    (204, 4, datetime.date(2023, 1, 8), "Cancelled"),
    (205, 5, datetime.date(2023, 1, 9), "Shipped"),
    (206, 6, datetime.date(2023, 1, 10), "Delivered"),
    (207, 7, datetime.date(2023, 1, 11), "Shipped"),
    (208, 8, datetime.date(2023, 1, 12), "Processing"),
    (209, 9, datetime.date(2023, 1, 13), "Shipped"),
    (210, 10, datetime.date(2023, 1, 14), "Delivered"),
]
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("status", StringType(), True),
])
orders_df = spark.createDataFrame(orders_data, schema=orders_schema)
orders_df.createOrReplaceTempView("orders")

# ------------------- Order Items -------------------
order_items_data = [
    (301, 201, 101, 2),
    (302, 201, 102, 1),
    (303, 202, 103, 1),
    (304, 203, 104, 3),
    (305, 204, 105, 1),
    (306, 205, 106, 2),
    (307, 206, 107, 1),
    (308, 207, 108, 4),
    (309, 208, 109, 2),
    (310, 209, 110, 1),
]
order_items_schema = StructType([
    StructField("item_id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
])
order_items_df = spark.createDataFrame(order_items_data, schema=order_items_schema)
order_items_df.createOrReplaceTempView("order_items")

# ------------------- Products -------------------
products_data = [
    (101, "Laptop", "Electronics", 999.99),
    (102, "Mouse", "Electronics", 19.99),
    (103, "Keyboard", "Electronics", 49.99),
    (104, "Monitor", "Electronics", 199.99),
    (105, "Phone", "Electronics", 699.99),
    (106, "Tablet", "Electronics", 299.99),
    (107, "Printer", "Electronics", 149.99),
    (108, "Headphones", "Electronics", 89.99),
    (109, "Webcam", "Electronics", 39.99),
    (110, "Charger", "Accessories", None),
]
products_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", FloatType(), True),
])
products_df = spark.createDataFrame(products_data, schema=products_schema)
products_df.createOrReplaceTempView("products")

# ------------------- Payments -------------------
payments_data = [
    (401, 201, "Credit Card", 1039.97),
    (402, 202, "PayPal", 49.99),
    (403, 203, "Credit Card", 599.97),
    (404, 204, "Debit Card", 699.99),
    (405, 205, "PayPal", 599.98),
    (406, 206, "UPI", 149.99),
    (407, 207, "Credit Card", 359.96),
    (408, 208, "Debit Card", 79.98),
    (409, 209, "Credit Card", 39.99),
    (410, 210, "PayPal", 89.99),
]
payments_schema = StructType([
    StructField("payment_id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("payment_method", StringType(), True),
    StructField("amount", FloatType(), True),
])
payments_df = spark.createDataFrame(payments_data, schema=payments_schema)
payments_df.createOrReplaceTempView("payments")

# ------------------- Reviews -------------------
reviews_data = [
    (501, 101, 1, 5, "Excellent product!"),
    (502, 102, 2, 4, "Good value."),
    (503, 103, 3, 3, "Average quality."),
    (504, 104, 4, None, "No comment."),
    (505, 105, 5, 2, "Not satisfied."),
    (506, 106, 6, 4, "Very useful."),
    (507, 107, 7, None, None),
    (508, 108, 8, 5, "Great screen."),
    (509, 109, 9, 4, "Works well."),
    (510, 110, 10, 3, "OK for the price."),
]
reviews_schema = StructType([
    StructField("review_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("comment", StringType(), True),
])
reviews_df = spark.createDataFrame(reviews_data, schema=reviews_schema)
reviews_df.createOrReplaceTempView("reviews")

# ------------------- Shipping -------------------
shipping_data = [
    (601, 201, "123 Main St, USA", datetime.date(2023, 1, 6), "FedEx"),
    (602, 202, "456 Baker St, UK", datetime.date(2023, 1, 7), "DHL"),
    (603, 203, "789 Hill Rd, India", None, "BlueDart"),
    (604, 204, "12 King Ave, Canada", datetime.date(2023, 1, 9), None),
    (605, 205, "99 Ocean Blvd, USA", datetime.date(2023, 1, 10), "USPS"),
    (606, 206, "17 West Dr, Germany", None, "DHL"),
    (607, 207, "22 East St, France", datetime.date(2023, 1, 12), "FedEx"),
    (608, 208, "5 North Rd, Canada", datetime.date(2023, 1, 13), "UPS"),
    (609, 209, "1 Central St, Russia", None, "RussianPost"),
    (610, 210, "8 Lake View, Australia", datetime.date(2023, 1, 15), "AustraliaPost"),
]
shipping_schema = StructType([
    StructField("shipping_id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("shipping_address", StringType(), True),
    StructField("shipping_date", DateType(), True),
    StructField("carrier", StringType(), True),
])
shipping_df = spark.createDataFrame(shipping_data, schema=shipping_schema)
shipping_df.createOrReplaceTempView("shipping")

# ------------------- Final Join Query -------------------
query = """
SELECT
    c.customer_id, c.name AS customer_name, c.email, c.country,
    o.order_id, o.order_date, o.status,
    oi.item_id, oi.product_id, oi.quantity,
    p.product_name, p.category, p.price,
    pay.payment_id, pay.payment_method, pay.amount,
    r.review_id, r.rating, r.comment,
    s.shipping_id, s.shipping_address, s.shipping_date, s.carrier
FROM customers c
FULL OUTER JOIN orders o ON c.customer_id = o.customer_id
FULL OUTER JOIN order_items oi ON o.order_id = oi.order_id
FULL OUTER JOIN products p ON oi.product_id = p.product_id
FULL OUTER JOIN payments pay ON o.order_id = pay.order_id
FULL OUTER JOIN reviews r ON r.customer_id = c.customer_id AND r.product_id = p.product_id
FULL OUTER JOIN shipping s ON o.order_id = s.order_id
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)



+-----------+-------------+-----------------+---------+--------+----------+----------+-------+----------+--------+------------+-----------+------+----------+--------------+-------+---------+------+------------------+-----------+----------------------+-------------+-------------+
|customer_id|customer_name|email            |country  |order_id|order_date|status    |item_id|product_id|quantity|product_name|category   |price |payment_id|payment_method|amount |review_id|rating|comment           |shipping_id|shipping_address      |shipping_date|carrier      |
+-----------+-------------+-----------------+---------+--------+----------+----------+-------+----------+--------+------------+-----------+------+----------+--------------+-------+---------+------+------------------+-----------+----------------------+-------------+-------------+
|NULL       |NULL         |NULL             |NULL     |NULL    |NULL      |NULL      |NULL   |NULL      |NULL    |NULL        |NULL       |NULL  |NULL      |NUL



```
Join Breakdown:
customers to orders: customer_id (present in both customers and orders)

orders to order_items: order_id (present in both orders and order_items)

order_items to products: product_id (present in both order_items and products)

orders to payments: order_id (present in both orders and payments)

customers to reviews: customer_id (present in both customers and reviews)

products to reviews: product_id (present in both products and reviews)

orders to shipping: order_id (present in both orders and shipping)

```



In [2]:
#Find all customers who are from Canada.
query = """
select *
from customers
where country = "Canada"
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)


+-----------+-----+-----------------+-------+
|customer_id|name |email            |country|
+-----------+-----+-----------------+-------+
|4          |David|david@example.com|Canada |
|8          |Henry|henry@example.com|Canada |
+-----------+-----+-----------------+-------+



In [3]:
# Get the first 5 products with the lowest price.
query = """
select *
from products
order by price asc
limit 5
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+----------+------------+-----------+-----+
|product_id|product_name|category   |price|
+----------+------------+-----------+-----+
|110       |Charger     |Accessories|NULL |
|102       |Mouse       |Electronics|19.99|
|109       |Webcam      |Electronics|39.99|
|103       |Keyboard    |Electronics|49.99|
|108       |Headphones  |Electronics|89.99|
+----------+------------+-----------+-----+



In [4]:
#Get the 3 latest orders.
query = """
select *
from orders
order by order_date desc
limit 3
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+--------+-----------+----------+----------+
|order_id|customer_id|order_date|status    |
+--------+-----------+----------+----------+
|210     |10         |2023-01-14|Delivered |
|209     |9          |2023-01-13|Shipped   |
|208     |8          |2023-01-12|Processing|
+--------+-----------+----------+----------+



In [5]:
#Find all orders that are 'Shipped'.

query = """
select *
from orders
where status = 'Shipped'
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+--------+-----------+----------+-------+
|order_id|customer_id|order_date|status |
+--------+-----------+----------+-------+
|201     |1          |2023-01-05|Shipped|
|205     |5          |2023-01-09|Shipped|
|207     |7          |2023-01-11|Shipped|
|209     |9          |2023-01-13|Shipped|
+--------+-----------+----------+-------+



In [6]:
#Show all products whose price is exactly 49.99
query = """
select *
from products
where price ='49.99'
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+----------+------------+-----------+-----+
|product_id|product_name|category   |price|
+----------+------------+-----------+-----+
|103       |Keyboard    |Electronics|49.99|
+----------+------------+-----------+-----+



In [7]:
#Find all shipping records with carrier as 'DHL'.
query = """
select *
from shipping
where carrier = 'DHL'
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+-----------+--------+-------------------+-------------+-------+
|shipping_id|order_id|shipping_address   |shipping_date|carrier|
+-----------+--------+-------------------+-------------+-------+
|602        |202     |456 Baker St, UK   |2023-01-07   |DHL    |
|606        |206     |17 West Dr, Germany|NULL         |DHL    |
+-----------+--------+-------------------+-------------+-------+



In [8]:
#Find products with names starting with 'L'.
query = """
select *
from products
where product_name like 'L%'
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+----------+------------+-----------+------+
|product_id|product_name|category   |price |
+----------+------------+-----------+------+
|101       |Laptop      |Electronics|999.99|
+----------+------------+-----------+------+



In [9]:
#SELECT * FROM customers WHERE country IN ('Canada', 'USA', 'UK');
query = """
select *
from customers
where country in('Canada', 'USA', 'UK');
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+-----------+-----+-----------------+-------+
|customer_id|name |email            |country|
+-----------+-----+-----------------+-------+
|1          |Alice|alice@example.com|USA    |
|2          |Bob  |bob@example.com  |UK     |
|4          |David|david@example.com|Canada |
|8          |Henry|henry@example.com|Canada |
+-----------+-----+-----------------+-------+



In [10]:
#Show all orders with order_id between 202 and 206.
query = """
select *
from orders
where order_id between 202 and 206
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+--------+-----------+----------+----------+
|order_id|customer_id|order_date|status    |
+--------+-----------+----------+----------+
|202     |2          |2023-01-06|Processing|
|203     |3          |2023-01-07|Delivered |
|204     |4          |2023-01-08|Cancelled |
|205     |5          |2023-01-09|Shipped   |
|206     |6          |2023-01-10|Delivered |
+--------+-----------+----------+----------+



In [11]:
#List products with category as 'Electronics' and price over 300.
query = """
select product_name , category,price
from products
where category = 'Electronics' and price > 300
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+------------+-----------+------+
|product_name|category   |price |
+------------+-----------+------+
|Laptop      |Electronics|999.99|
|Phone       |Electronics|699.99|
+------------+-----------+------+



In [12]:
#Find customers whose country starts with 'C' or ends with 'a'.
query = """
select *
from customers
where country like "C%" or country like "%a"
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+-----------+-------+-----------------+---------+
|customer_id|name   |email            |country  |
+-----------+-------+-----------------+---------+
|3          |Charlie|NULL             |India    |
|4          |David  |david@example.com|Canada   |
|8          |Henry  |henry@example.com|Canada   |
|9          |Ivy    |ivy@example.com  |Russia   |
|10         |John   |NULL             |Australia|
+-----------+-------+-----------------+---------+



In [13]:
#Get all customer names with their order IDs
query = """
select c.name, o.order_id
from orders o
join customers c on o.customer_id = c.customer_id
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+-------+--------+
|name   |order_id|
+-------+--------+
|Alice  |201     |
|Bob    |202     |
|Charlie|203     |
|David  |204     |
|Eva    |205     |
|Frank  |206     |
|Grace  |207     |
|Henry  |208     |
|Ivy    |209     |
|John   |210     |
+-------+--------+



In [14]:
#Find customers who have never placed an order.
query = """
select *
from orders o
join customers c on c.customer_id = o.customer_id
where order_id is null
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+--------+-----------+----------+------+-----------+----+-----+-------+
|order_id|customer_id|order_date|status|customer_id|name|email|country|
+--------+-----------+----------+------+-----------+----+-----+-------+
+--------+-----------+----------+------+-----------+----+-----+-------+



In [15]:
#Get all orders with their payment method and amount.

query = """
select order_id , payment_method, amount
from payments
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+--------+--------------+-------+
|order_id|payment_method|amount |
+--------+--------------+-------+
|201     |Credit Card   |1039.97|
|202     |PayPal        |49.99  |
|203     |Credit Card   |599.97 |
|204     |Debit Card    |699.99 |
|205     |PayPal        |599.98 |
|206     |UPI           |149.99 |
|207     |Credit Card   |359.96 |
|208     |Debit Card    |79.98  |
|209     |Credit Card   |39.99  |
|210     |PayPal        |89.99  |
+--------+--------------+-------+



In [16]:
#List customer name, order date, and shipping carrier.
query = """
select c.name,o.order_date,s.carrier
from customers c
join orders o on c.customer_id = o.customer_id
join shipping s on s.order_id = o.order_id

"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+-------+----------+-------------+
|name   |order_date|carrier      |
+-------+----------+-------------+
|Eva    |2023-01-09|USPS         |
|Bob    |2023-01-06|DHL          |
|Charlie|2023-01-07|BlueDart     |
|David  |2023-01-08|NULL         |
|Alice  |2023-01-05|FedEx        |
|John   |2023-01-14|AustraliaPost|
|Ivy    |2023-01-13|RussianPost  |
|Frank  |2023-01-10|DHL          |
|Grace  |2023-01-11|FedEx        |
|Henry  |2023-01-12|UPS          |
+-------+----------+-------------+



In [17]:
#Get a list of customers and their product reviews.


query = """
select c.name , r.product_id,r.rating
from customers c
join reviews r on c.customer_id = r.customer_id

"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+-------+----------+------+
|name   |product_id|rating|
+-------+----------+------+
|Alice  |101       |5     |
|Bob    |102       |4     |
|Charlie|103       |3     |
|David  |104       |NULL  |
|Eva    |105       |2     |
|Frank  |106       |4     |
|Grace  |107       |NULL  |
|Henry  |108       |5     |
|Ivy    |109       |4     |
|John   |110       |3     |
+-------+----------+------+



In [18]:
#Get all customers and their shipping addresses.
query = """
select c.name , s.shipping_address
from customers c
join orders o on c.customer_id = o.customer_id
join shipping s on o.order_id = s.order_id

"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+-------+----------------------+
|name   |shipping_address      |
+-------+----------------------+
|Eva    |99 Ocean Blvd, USA    |
|Bob    |456 Baker St, UK      |
|Charlie|789 Hill Rd, India    |
|David  |12 King Ave, Canada   |
|Alice  |123 Main St, USA      |
|John   |8 Lake View, Australia|
|Ivy    |1 Central St, Russia  |
|Frank  |17 West Dr, Germany   |
|Grace  |22 East St, France    |
|Henry  |5 North Rd, Canada    |
+-------+----------------------+



In [19]:
#Get all payment methods used by customers from 'India'.
query = """
select p.payment_method , c.country
from payments p
join orders o on o.order_id = p.order_id
join customers c on c.customer_id = o.customer_id
where country = 'India'

"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+--------------+-------+
|payment_method|country|
+--------------+-------+
|Credit Card   |India  |
+--------------+-------+



In [20]:
#List customer names and their total spending (sum of payments).

query = """
select c.name, p.amount
from payments p
join orders o on o.order_id = p.order_id
join customers c on c.customer_id = o.customer_id

"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+-------+-------+
|name   |amount |
+-------+-------+
|Alice  |1039.97|
|Charlie|599.97 |
|Eva    |599.98 |
|David  |699.99 |
|Bob    |49.99  |
|Frank  |149.99 |
|Ivy    |39.99  |
|Henry  |79.98  |
|Grace  |359.96 |
|John   |89.99  |
+-------+-------+



In [21]:
query = """
select *
from payments
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+----------+--------+--------------+-------+
|payment_id|order_id|payment_method|amount |
+----------+--------+--------------+-------+
|401       |201     |Credit Card   |1039.97|
|402       |202     |PayPal        |49.99  |
|403       |203     |Credit Card   |599.97 |
|404       |204     |Debit Card    |699.99 |
|405       |205     |PayPal        |599.98 |
|406       |206     |UPI           |149.99 |
|407       |207     |Credit Card   |359.96 |
|408       |208     |Debit Card    |79.98  |
|409       |209     |Credit Card   |39.99  |
|410       |210     |PayPal        |89.99  |
+----------+--------+--------------+-------+



In [22]:
#Get customer name, product, and payment amount for each order.
query = """
select c.name , p.product_name, pa.amount
from customers c
join orders o on o.customer_id = c.customer_id
join order_items oi on oi.order_id  = o.order_id
join products p on p.product_id = oi.product_id
join payments pa on pa.order_id = o.order_id;
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+-------+------------+-------+
|name   |product_name|amount |
+-------+------------+-------+
|Eva    |Tablet      |599.98 |
|Bob    |Keyboard    |49.99  |
|Charlie|Monitor     |599.97 |
|David  |Phone       |699.99 |
|Alice  |Mouse       |1039.97|
|Alice  |Laptop      |1039.97|
|Ivy    |Charger     |39.99  |
|Frank  |Printer     |149.99 |
|Grace  |Headphones  |359.96 |
|Henry  |Webcam      |79.98  |
+-------+------------+-------+



In [23]:
#Find products ordered by customers from 'Canada'.
query = """
select *
from orders o
join customers c on c.customer_id = o.customer_id
join order_items oi on oi.order_id = o.order_id
join products p on p.product_id = oi.product_id
where country = 'Canada'
"""
joined_df = spark.sql(query)
joined_df.show(truncate=False)

+--------+-----------+----------+----------+-----------+-----+-----------------+-------+-------+--------+----------+--------+----------+------------+-----------+------+
|order_id|customer_id|order_date|status    |customer_id|name |email            |country|item_id|order_id|product_id|quantity|product_id|product_name|category   |price |
+--------+-----------+----------+----------+-----------+-----+-----------------+-------+-------+--------+----------+--------+----------+------------+-----------+------+
|204     |4          |2023-01-08|Cancelled |4          |David|david@example.com|Canada |305    |204     |105       |1       |105       |Phone       |Electronics|699.99|
|208     |8          |2023-01-12|Processing|8          |Henry|henry@example.com|Canada |309    |208     |109       |2       |109       |Webcam      |Electronics|39.99 |
+--------+-----------+----------+----------+-----------+-----+-----------------+-------+-------+--------+----------+--------+----------+------------+------

In [24]:
query = """
SELECT c.name, p.product_name, SUM(pay.amount) AS total_spent
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN payments pay ON o.order_id = pay.order_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
GROUP BY c.name, p.product_name;

"""
joined_df = spark.sql(query)
joined_df.show()

+-------+------------+------------------+
|   name|product_name|       total_spent|
+-------+------------+------------------+
|  Grace|  Headphones| 359.9599914550781|
|  David|       Phone|  699.989990234375|
|Charlie|     Monitor|  599.969970703125|
|    Ivy|     Charger|  39.9900016784668|
|  Alice|       Mouse| 1039.969970703125|
|  Frank|     Printer|149.99000549316406|
|  Alice|      Laptop| 1039.969970703125|
|    Eva|      Tablet|   599.97998046875|
|  Henry|      Webcam|  79.9800033569336|
|    Bob|    Keyboard|  49.9900016784668|
+-------+------------+------------------+



In [25]:
#Find product names ordered by customers who also reviewed them with rating > 4
query = """
select
p.product_name,
c.name,
r.rating
from products p
join order_items oi on oi.product_id = p.product_id
join orders o on o.order_id = oi.order_id
join customers c on c.customer_id = o.customer_id
join reviews r on r.product_id = p.product_id
WHERE r.rating > 4
"""
joined_df = spark.sql(query)
joined_df.show()

+------------+-----+------+
|product_name| name|rating|
+------------+-----+------+
|      Laptop|Alice|     5|
|  Headphones|Grace|     5|
+------------+-----+------+



In [26]:
#Find orders where all products in the order were rated 5 by the customer.
query = """
select
p.product_name,
c.name,
r.rating
from products p
join order_items oi on oi.product_id = p.product_id
join orders o on o.order_id = oi.order_id
join customers c on c.customer_id = o.customer_id
join reviews r on r.product_id = p.product_id
WHERE r.rating = 5
"""
joined_df = spark.sql(query)
joined_df.show()

+------------+-----+------+
|product_name| name|rating|
+------------+-----+------+
|      Laptop|Alice|     5|
|  Headphones|Grace|     5|
+------------+-----+------+



In [27]:
#Find top 3 customers who spent the most.
query = """
select c.name , p.amount
from customers c
join orders o on c.customer_id  = o.customer_id
join payments p on p.order_id = o.order_id
order by p.amount desc
limit 3
"""
joined_df = spark.sql(query)
joined_df.show()

+-----+-------+
| name| amount|
+-----+-------+
|Alice|1039.97|
|David| 699.99|
|  Eva| 599.98|
+-----+-------+



In [28]:
#Find the month-wise revenue generated.
query = """
SELECT
    date_format(o.order_date, 'yyyy-MM') AS month,
    SUM(oi.quantity * p.price) AS total_revenue
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
GROUP BY date_format(o.order_date, 'yyyy-MM')
ORDER BY month;


"""
joined_df = spark.sql(query)
joined_df.show()

+-------+-----------------+
|  month|    total_revenue|
+-------+-----------------+
|2023-01|4559.829984664917|
+-------+-----------------+



In [29]:
#Show the product with the highest average review rating.

query = """
select p.product_name, avg(r.rating) as avg_rating_score
from products p
join reviews r on  r.product_id = p.product_id
group by p.product_name
order by avg_rating_score desc
limit 1
"""
joined_df = spark.sql(query)
joined_df.show()

+------------+----------------+
|product_name|avg_rating_score|
+------------+----------------+
|      Laptop|             5.0|
+------------+----------------+



In [30]:
#List customers who placed orders using only 'Credit Card
query = """
select c.name , p.payment_method
from customers c
join orders o on o.customer_id = c.customer_id
join payments p on o.order_id = p.order_id
where payment_method= 'Credit Card'
"""
joined_df = spark.sql(query)
joined_df.show()

+-------+--------------+
|   name|payment_method|
+-------+--------------+
|Charlie|   Credit Card|
|  Alice|   Credit Card|
|    Ivy|   Credit Card|
|  Grace|   Credit Card|
+-------+--------------+



In [31]:
#Find the total revenue per country.
query = """
select c.country, sum(p.amount) as total_rev
from customers c
join orders o on c.customer_id = o.customer_id
join payments p on p.order_id = o.order_id
group by c.country
order by total_rev desc

"""
joined_df = spark.sql(query)
joined_df.show()

+---------+------------------+
|  country|         total_rev|
+---------+------------------+
|      USA| 1039.969970703125|
|   Canada| 779.9699935913086|
|     NULL|   599.97998046875|
|    India|  599.969970703125|
|   France| 359.9599914550781|
|  Germany|149.99000549316406|
|Australia| 89.98999786376953|
|       UK|  49.9900016784668|
|   Russia|  39.9900016784668|
+---------+------------------+



In [32]:
#Which product has the highest average rating from customers in Canada?
query = """
select p.product_name , avg(r.rating) AS highest_rating_avg
from reviews r
join customers c on c.customer_id = r.customer_id
join products p on p.product_id  = r.product_id
where c.country = 'Canada' and r.rating is not null
group by p.product_name
"""
joined_df = spark.sql(query)
joined_df.show()

+------------+------------------+
|product_name|highest_rating_avg|
+------------+------------------+
|  Headphones|               5.0|
+------------+------------------+



In [33]:
#Find the total revenue per country.
query = """
select *
from products

"""
joined_df = spark.sql(query)
joined_df.show()

+----------+------------+-----------+------+
|product_id|product_name|   category| price|
+----------+------------+-----------+------+
|       101|      Laptop|Electronics|999.99|
|       102|       Mouse|Electronics| 19.99|
|       103|    Keyboard|Electronics| 49.99|
|       104|     Monitor|Electronics|199.99|
|       105|       Phone|Electronics|699.99|
|       106|      Tablet|Electronics|299.99|
|       107|     Printer|Electronics|149.99|
|       108|  Headphones|Electronics| 89.99|
|       109|      Webcam|Electronics| 39.99|
|       110|     Charger|Accessories|  NULL|
+----------+------------+-----------+------+



Show all orders with their corresponding customer names and email addresses.

List all products that have been ordered, including products that haven't been ordered.

Display all customers along with their payment methods (if any).

Find all orders that have shipping information, showing order date and shipping date.

List all reviews with the corresponding product names and customer names.

In [34]:
#Show all orders with their corresponding customer names and email addresses.
query = """
SELECT o.order_id, c.name, c.email
from orders o
join customers c on c.customer_id = o.customer_id
where email is not null
"""
joined_df = spark.sql(query)
joined_df.show()

+--------+-----+-----------------+
|order_id| name|            email|
+--------+-----+-----------------+
|     201|Alice|alice@example.com|
|     202|  Bob|  bob@example.com|
|     204|David|david@example.com|
|     205|  Eva|  eva@example.com|
|     206|Frank|frank@example.com|
|     207|Grace|grace@example.com|
|     208|Henry|henry@example.com|
|     209|  Ivy|  ivy@example.com|
+--------+-----+-----------------+



In [35]:
#List all products that have been ordered, including products that haven't been ordered.
query = """
select o.order_id,p.product_name , o.status
from products p
join order_items oi on oi.product_id = p.product_id
join orders o on o.order_id = oi.order_id
where o.status = 'Shipped'

"""
joined_df = spark.sql(query)
joined_df.show()

+--------+------------+-------+
|order_id|product_name| status|
+--------+------------+-------+
|     205|      Tablet|Shipped|
|     201|       Mouse|Shipped|
|     201|      Laptop|Shipped|
|     209|     Charger|Shipped|
|     207|  Headphones|Shipped|
+--------+------------+-------+



In [36]:
#Display all customers along with their payment methods (if any).
query = """
select c.name , p.payment_method
from payments p
join orders o on o.order_id = p.order_id
join customers c on c.customer_id = o.customer_id

"""
joined_df = spark.sql(query)
joined_df.show()

+-------+--------------+
|   name|payment_method|
+-------+--------------+
|  Alice|   Credit Card|
|Charlie|   Credit Card|
|    Eva|        PayPal|
|  David|    Debit Card|
|    Bob|        PayPal|
|  Frank|           UPI|
|    Ivy|   Credit Card|
|  Henry|    Debit Card|
|  Grace|   Credit Card|
|   John|        PayPal|
+-------+--------------+



In [37]:
#Find all orders that have shipping information, showing order date and shipping date.
query = """
SELECT o.order_date, s.shipping_date
FROM shipping s
join orders o on o.order_id = s.order_id
"""
joined_df = spark.sql(query)
joined_df.show()

+----------+-------------+
|order_date|shipping_date|
+----------+-------------+
|2023-01-05|   2023-01-06|
|2023-01-06|   2023-01-07|
|2023-01-07|         NULL|
|2023-01-08|   2023-01-09|
|2023-01-09|   2023-01-10|
|2023-01-10|         NULL|
|2023-01-11|   2023-01-12|
|2023-01-12|   2023-01-13|
|2023-01-13|         NULL|
|2023-01-14|   2023-01-15|
+----------+-------------+



In [38]:
query = """
SELECT *
FROM shipping
"""
joined_df = spark.sql(query)
joined_df.show()

+-----------+--------+--------------------+-------------+-------------+
|shipping_id|order_id|    shipping_address|shipping_date|      carrier|
+-----------+--------+--------------------+-------------+-------------+
|        601|     201|    123 Main St, USA|   2023-01-06|        FedEx|
|        602|     202|    456 Baker St, UK|   2023-01-07|          DHL|
|        603|     203|  789 Hill Rd, India|         NULL|     BlueDart|
|        604|     204| 12 King Ave, Canada|   2023-01-09|         NULL|
|        605|     205|  99 Ocean Blvd, USA|   2023-01-10|         USPS|
|        606|     206| 17 West Dr, Germany|         NULL|          DHL|
|        607|     207|  22 East St, France|   2023-01-12|        FedEx|
|        608|     208|  5 North Rd, Canada|   2023-01-13|          UPS|
|        609|     209|1 Central St, Russia|         NULL|  RussianPost|
|        610|     210|8 Lake View, Aust...|   2023-01-15|AustraliaPost|
+-----------+--------+--------------------+-------------+-------

In [39]:
# List all reviews with the corresponding product names and customer names.
query = """
select p.product_name, c.name, r.rating
from reviews r
join products p on p.product_id = r.product_id
join customers c on c.customer_id = r.customer_id
where rating is not null
"""
joined_df = spark.sql(query)
joined_df.show()

+------------+-------+------+
|product_name|   name|rating|
+------------+-------+------+
|      Laptop|  Alice|     5|
|    Keyboard|Charlie|     3|
|       Phone|    Eva|     2|
|       Mouse|    Bob|     4|
|      Tablet|  Frank|     4|
|      Webcam|    Ivy|     4|
|  Headphones|  Henry|     5|
|     Charger|   John|     3|
+------------+-------+------+



Show the total quantity ordered for each product (include products with zero orders).

Find customers who haven't placed any orders.

Display orders with their total payment amounts (some orders might not have payments).

List products that have never been ordered.

Show all customers and their most recent order date (if any).

In [40]:
#Find customers who haven't placed any orders.
query = """
select o.order_id,c.name,o.status
from orders o
join customers c on c.customer_id = o.customer_id
where order_id is null
"""
joined_df = spark.sql(query)
joined_df.show()

+--------+----+------+
|order_id|name|status|
+--------+----+------+
+--------+----+------+



In [41]:
#Display orders with their total payment amounts (some orders might not have payments).
query = """
select o.order_id, p.amount
from payments p
join orders o on o.order_id = p.order_id

"""
joined_df = spark.sql(query)
joined_df.show()

+--------+-------+
|order_id| amount|
+--------+-------+
|     201|1039.97|
|     202|  49.99|
|     203| 599.97|
|     204| 699.99|
|     205| 599.98|
|     206| 149.99|
|     207| 359.96|
|     208|  79.98|
|     209|  39.99|
|     210|  89.99|
+--------+-------+



In [42]:
query = """
select *
from orders
"""
joined_df = spark.sql(query)
joined_df.show()

+--------+-----------+----------+----------+
|order_id|customer_id|order_date|    status|
+--------+-----------+----------+----------+
|     201|          1|2023-01-05|   Shipped|
|     202|          2|2023-01-06|Processing|
|     203|          3|2023-01-07| Delivered|
|     204|          4|2023-01-08| Cancelled|
|     205|          5|2023-01-09|   Shipped|
|     206|          6|2023-01-10| Delivered|
|     207|          7|2023-01-11|   Shipped|
|     208|          8|2023-01-12|Processing|
|     209|          9|2023-01-13|   Shipped|
|     210|         10|2023-01-14| Delivered|
+--------+-----------+----------+----------+



In [43]:
query = """
SELECT
    c.customer_id,
    c.name,
    c.email,
    COUNT(o.order_id) AS order_count
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.email
ORDER BY order_count DESC
"""
joined_df = spark.sql(query)
joined_df.show()

+-----------+-------+-----------------+-----------+
|customer_id|   name|            email|order_count|
+-----------+-------+-----------------+-----------+
|          1|  Alice|alice@example.com|          1|
|          2|    Bob|  bob@example.com|          1|
|          4|  David|david@example.com|          1|
|          5|    Eva|  eva@example.com|          1|
|          6|  Frank|frank@example.com|          1|
|          7|  Grace|grace@example.com|          1|
|          8|  Henry|henry@example.com|          1|
|          9|    Ivy|  ivy@example.com|          1|
|          3|Charlie|             NULL|          1|
|         10|   John|             NULL|          1|
+-----------+-------+-----------------+-----------+



In [44]:
#Missing Information Analysis Identify orders that have payments but are missing shipping information.
#List products that have never been ordered.
query = """
select p.amount , s.shipping_date
from shipping s
join orders o on o.order_id = s.order_id
join payments p on p.order_id = o.order_id
where shipping_date is null
"""
joined_df = spark.sql(query)
joined_df.show()

+------+-------------+
|amount|shipping_date|
+------+-------------+
|599.97|         NULL|
| 39.99|         NULL|
|149.99|         NULL|
+------+-------------+



In [45]:
query = """
select *
from products
"""
joined_df = spark.sql(query)
joined_df.show()

+----------+------------+-----------+------+
|product_id|product_name|   category| price|
+----------+------------+-----------+------+
|       101|      Laptop|Electronics|999.99|
|       102|       Mouse|Electronics| 19.99|
|       103|    Keyboard|Electronics| 49.99|
|       104|     Monitor|Electronics|199.99|
|       105|       Phone|Electronics|699.99|
|       106|      Tablet|Electronics|299.99|
|       107|     Printer|Electronics|149.99|
|       108|  Headphones|Electronics| 89.99|
|       109|      Webcam|Electronics| 39.99|
|       110|     Charger|Accessories|  NULL|
+----------+------------+-----------+------+



In [50]:
#Product Popularity Show products with their total quantities sold, including products never ordered.
query = """
select  count(product_id)
from products
"""
joined_df = spark.sql(query)
joined_df.show()

+-----------------+
|count(product_id)|
+-----------------+
|               10|
+-----------------+



Customer-Order Matching
Find all customers with their order counts, including those who never ordered.

Missing Information Analysis
Identify orders that have payments but are missing shipping information.

Product Popularity
Show products with their total quantities sold, including products never ordered.

International Customers
List customers from Canada with their order details and payment methods.

Review Engagement
Find customers who placed orders but never left product reviews.

Shipping Performance
Calculate average shipping time (shipping_date - order_date) per carrier.

High-Value Orders
Identify orders where the total amount paid was more than $500.

Category Preferences
Show each customer's most frequently purchased product category.

Payment Method Trends
Compare usage counts of each payment method by country.

Incomplete Orders
Find orders marked as "Processing" with no shipping date after 3+ days.

Premium Customers
Identify customers who spent more than $1000 in total.

Product-Rating Analysis
Show products with their average ratings, including products without reviews.

Order Value Discrepancies
Find orders where payment amount doesn't match sum(product_price × quantity).

Customer Retention
Find customers who placed more than one order in January 2023.

Abandoned Checkouts
Identify customers who added items to orders but have no payment records.