In [None]:
# Step 1: Clean installation sequence
!apt-get purge openjdk-* -qq > /dev/null  # Remove existing Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip uninstall pyspark -y -qq > /dev/null
!pip install pyspark==3.5.0 -qq



In [None]:
# Step 2: Configure environment properly
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.11/dist-packages/pyspark"

In [None]:
# Step 3: Initialize Spark with proper paths
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("ColabSparkFix") \
    .config("spark.executor.memory", "1g") \
    .config("spark.driver.memory", "1g") \
    .config("spark.sql.debug.maxToStringFields", "100") \
    .getOrCreate()

In [None]:
# Step 4: Create a database
from pyspark.sql.functions import col
from datetime import date, timedelta
import random

def random_date(start_date=date(2023, 1, 1), end_date=date(2023, 12, 31)):
    time_between_dates = end_date - start_date
    days_between_dates = time_between_dates.days
    random_number_of_days = random.randrange(days_between_dates)
    return start_date + timedelta(days=random_number_of_days)

# Create CATEGORIES DataFrame
categories_data = [
    (1, 'Electronics', 'Electronic devices'),
    (2, 'Clothing', 'Apparel and accessories'),
    (3, 'Home Goods', 'Household items'),
    (4, 'Sports', 'Sporting equipment'),
    (5, 'Books', 'Literature and educational materials'),
    (6, 'Kitchenware', 'Cooking utensils and appliances'),
    (7, 'Beauty', 'Cosmetics and personal care'),
    (8, 'Outdoor', 'Camping and outdoor gear')
]

categories_df = spark.createDataFrame(
    categories_data,
    ['id', 'name', 'description']
)

# Create PRODUCTS DataFrame
products_data = []
product_names = [
    ('Smartphone', 1), ('Laptop', 1), ('Headphones', 1),
    ('Tablet', 1), ('Smartwatch', 1), ('Power Bank', 1),
    ('T-Shirt', 2), ('Jeans', 2), ('Shoes', 2),
    ('Jacket', 2), ('Hat', 2), ('Scarf', 2),
    ('Sofa', 3), ('Bed', 3), ('Chair', 3),
    ('Table', 3), ('Desk', 3), ('Bookshelf', 3),
    ('Basketball', 4), ('Football', 4), ('Tennis Racket', 4),
    ('Running Shoes', 4), ('Golf Clubs', 4), ('Skates', 4),
    ('Helmet', 4)  # Added 25th product
]

prices = {
    1: [599.99, 999.99, 149.99, 299.99, 199.99, 29.99],
    2: [29.99, 79.99, 89.99, 99.99, 39.99, 19.99],
    3: [899.99, 599.99, 299.99, 499.99, 799.99, 399.99],
    4: [49.99, 79.99, 99.99, 129.99, 299.99, 199.99]
}

for i, (name, category_id) in enumerate(product_names, 1):
    price = random.choice(prices[category_id])
    status = 'active' if random.random() > 0.1 else 'inactive'
    products_data.append((i, name, category_id, price, status))

products_df = spark.createDataFrame(
    products_data,
    ['id', 'name', 'category_id', 'base_price', 'status']
)

# Create CUSTOMERS DataFrame
countries = ['USA', 'Canada', 'UK', 'Australia', 'Germany', 'France', 'Japan', 'India']
customer_names = [
    ('John Smith', 'john.smith@email.com'),
    ('Jane Doe', 'jane.doe@email.com'),
    ('Bob Wilson', 'bob.wilson@email.com'),
    ('Alice Brown', 'alice.brown@email.com'),
    ('Mike Davis', 'mike.davis@email.com')
]

customers_data = []
for i in range(1, 26):
    base_name = customer_names[i % len(customer_names)]
    email = f"{base_name[1]}_{i}"
    country = random.choice(countries)
    status = 'active' if random.random() > 0.1 else 'inactive'
    join_date = random_date(date(2023, 1, 1), date(2023, 12, 31))
    customers_data.append((i, f"{base_name[0]}_{i}", country, status, join_date))

customers_df = spark.createDataFrame(
    customers_data,
    ['id', 'name', 'country', 'status', 'join_date']
)

# Create ORDERS DataFrame
orders_data = []
order_statuses = ['completed', 'pending', 'cancelled']

for i in range(1, 51):
    customer_id = random.randint(1, 25)
    order_date = random_date(date(2023, 1, 1), date(2023, 12, 31))
    total_amount = round(random.uniform(50.0, 5000.0), 2)
    status = random.choice(order_statuses)
    orders_data.append((i, customer_id, order_date, total_amount, status))

orders_df = spark.createDataFrame(
    orders_data,
    ['id', 'customer_id', 'order_date', 'total_amount', 'status']
)

# Create ORDER_ITEMS DataFrame with valid product IDs
valid_product_ids = [row.id for row in products_df.select('id').collect()]

order_items_data = []
for i in range(1, 101):
    order_id = random.randint(1, 50)
    product_id = random.choice(valid_product_ids)
    quantity = random.randint(1, 5)

    product_row = products_df.filter(col('id') == product_id).first()
    if product_row:
        price = product_row['base_price']
    else:
        price = 0.0  # Fallback value or handle differently
        continue      # Skip invalid items

    order_items_data.append((i, order_id, product_id, quantity, price))

order_items_df = spark.createDataFrame(
    order_items_data,
    ['id', 'order_id', 'product_id', 'quantity', 'price']
)

# Show record counts for each table
print("\nDataset Record Counts:")
print(f"CATEGORIES: {categories_df.count()} records")
print(f"PRODUCTS: {products_df.count()} records")
print(f"CUSTOMERS: {customers_df.count()} records")
print(f"ORDERS: {orders_df.count()} records")
print(f"ORDER_ITEMS: {order_items_df.count()} records")

# Save datasets to Parquet format locally
!mkdir -p data
# Save datasets to Parquet format with overwrite mode
categories_df.write.mode('overwrite').parquet("data/categories_expanded")
products_df.write.mode('overwrite').parquet("data/products_expanded")
customers_df.write.mode('overwrite').parquet("data/customers_expanded")
orders_df.write.mode('overwrite').parquet("data/orders_expanded")
order_items_df.write.mode('overwrite').parquet("data/order_items_expanded")

# Show sample records from each table
print("\nSample Categories:")
categories_df.show()

print("\nSample Products:")
products_df.show()

print("\nSample Customers:")
customers_df.show()

print("\nSample Orders:")
orders_df.show()

print("\nSample Order Items:")
order_items_df.show()

# List saved files
print("\nSaved files:")
!ls -lh data


Dataset Record Counts:
CATEGORIES: 8 records
PRODUCTS: 25 records
CUSTOMERS: 25 records
ORDERS: 50 records
ORDER_ITEMS: 100 records

Sample Categories:
+---+-----------+--------------------+
| id|       name|         description|
+---+-----------+--------------------+
|  1|Electronics|  Electronic devices|
|  2|   Clothing|Apparel and acces...|
|  3| Home Goods|     Household items|
|  4|     Sports|  Sporting equipment|
|  5|      Books|Literature and ed...|
|  6|Kitchenware|Cooking utensils ...|
|  7|     Beauty|Cosmetics and per...|
|  8|    Outdoor|Camping and outdo...|
+---+-----------+--------------------+


Sample Products:
+---+----------+-----------+----------+--------+
| id|      name|category_id|base_price|  status|
+---+----------+-----------+----------+--------+
|  1|Smartphone|          1|    199.99|  active|
|  2|    Laptop|          1|    599.99|  active|
|  3|Headphones|          1|    199.99|  active|
|  4|    Tablet|          1|    149.99|  active|
|  5|Smartwatch| 

### Questionaire:

In [None]:
# 1. Create DataFrame with selected columns
cust_basic = customers_df.select('id', 'name', 'country')
cust_basic.show(5)

+---+-------------+---------+
| id|         name|  country|
+---+-------------+---------+
|  1|   Jane Doe_1|  Germany|
|  2| Bob Wilson_2|Australia|
|  3|Alice Brown_3|    India|
|  4| Mike Davis_4|      USA|
|  5| John Smith_5|       UK|
+---+-------------+---------+
only showing top 5 rows



In [None]:
# 2. Read Parquet files
df_cust = spark.read.parquet("data/customers_expanded")
df_cust.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- status: string (nullable = true)
 |-- join_date: date (nullable = true)



In [None]:
# 3. Show schema
products_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- category_id: long (nullable = true)
 |-- base_price: double (nullable = true)
 |-- status: string (nullable = true)



In [None]:
# 4. Select and sort columns
from pyspark.sql.functions import asc
orders_df.select('order_date', 'customer_id', 'total_amount') \
         .orderBy(asc('order_date')).show()

+----------+-----------+------------+
|order_date|customer_id|total_amount|
+----------+-----------+------------+
|2023-02-08|         20|     2182.79|
|2023-02-22|          9|      3053.4|
|2023-02-23|         17|      776.12|
|2023-03-05|         19|     3047.48|
|2023-03-06|         19|     3419.64|
|2023-03-13|         15|     3761.67|
|2023-03-16|         17|     4959.08|
|2023-03-17|         15|     2853.24|
|2023-03-21|         13|     3723.45|
|2023-03-28|          8|     2499.95|
|2023-03-29|         25|     2596.07|
|2023-04-11|         21|     2655.56|
|2023-04-12|          7|      599.36|
|2023-04-12|         19|     1831.24|
|2023-04-18|          9|      497.07|
|2023-05-03|         22|     3903.87|
|2023-05-11|         10|     4033.01|
|2023-05-18|         24|     4265.84|
|2023-05-27|         22|      418.62|
|2023-06-01|          1|     2722.47|
+----------+-----------+------------+
only showing top 20 rows



In [None]:
# 5. Filter active US customers
from pyspark.sql.functions import year
customers_df.filter((col('status') == 'active') &
                   (col('country') == 'USA') &
                   (year(col('join_date')) == 2023)) \
            .select('id', 'name', 'join_date').show()

+---+-------------+----------+
| id|         name| join_date|
+---+-------------+----------+
|  4| Mike Davis_4|2023-08-08|
| 17|Bob Wilson_17|2023-10-15|
+---+-------------+----------+



In [None]:
# 6. Group by product
order_items_df.groupBy('product_id') \
              .agg({'quantity': 'sum'}) \
              .withColumnRenamed('sum(quantity)', 'total_sold').show()

+----------+----------+
|product_id|total_sold|
+----------+----------+
|        19|        16|
|        22|         5|
|         7|         5|
|        25|        10|
|         6|        18|
|         9|         8|
|         1|         9|
|        10|        10|
|         3|        11|
|        12|        16|
|         8|        13|
|        11|        10|
|         2|        21|
|         4|         9|
|        13|         6|
|        18|        17|
|        14|        28|
|        21|        18|
|        15|        11|
|        23|        17|
+----------+----------+
only showing top 20 rows



In [None]:
# 7. Join orders and customers
orders_df.join(customers_df, orders_df.customer_id == customers_df.id) \
         .select(customers_df.name, orders_df.total_amount).show()

+-------------+------------+
|         name|total_amount|
+-------------+------------+
|   Jane Doe_1|     2722.47|
| Bob Wilson_2|      446.72|
| Bob Wilson_2|     4170.33|
| Bob Wilson_2|      573.36|
| Mike Davis_4|      962.03|
| John Smith_5|     3858.49|
| John Smith_5|     1754.04|
|   Jane Doe_6|      899.91|
| Bob Wilson_7|      599.36|
|Alice Brown_8|      3641.5|
|Alice Brown_8|     2499.95|
| Mike Davis_9|      3053.4|
| Mike Davis_9|      497.07|
|John Smith_10|     4444.31|
|John Smith_10|     4033.01|
|  Jane Doe_11|      390.74|
|  Jane Doe_11|     1017.94|
|  Jane Doe_11|     3947.05|
|  Jane Doe_11|     1850.26|
|Bob Wilson_12|     4052.35|
+-------------+------------+
only showing top 20 rows



In [None]:
# 8. Rename column
orders_renamed = orders_df.withColumnRenamed('total_amount', 'order_total')
orders_renamed.show()

+---+-----------+----------+-----------+---------+
| id|customer_id|order_date|order_total|   status|
+---+-----------+----------+-----------+---------+
|  1|         19|2023-04-12|    1831.24|completed|
|  2|         21|2023-04-11|    2655.56|  pending|
|  3|          5|2023-08-03|    3858.49|cancelled|
|  4|         22|2023-05-03|    3903.87|  pending|
|  5|         11|2023-09-26|     390.74|  pending|
|  6|         19|2023-03-05|    3047.48|completed|
|  7|          6|2023-12-07|     899.91|completed|
|  8|          4|2023-06-25|     962.03|cancelled|
|  9|          2|2023-06-02|     446.72|  pending|
| 10|          9|2023-02-22|     3053.4|  pending|
| 11|          2|2023-11-29|    4170.33|cancelled|
| 12|         22|2023-11-13|    2007.06|completed|
| 13|         15|2023-03-17|    2853.24|  pending|
| 14|          1|2023-06-01|    2722.47|completed|
| 15|         24|2023-05-18|    4265.84|completed|
| 16|         17|2023-12-27|     2634.2|  pending|
| 17|         19|2023-03-06|   

In [None]:
# 9. Handle nulls
products_clean = products_df.fillna({
    'base_price': 0,
    'status': 'unknown',
    'name': 'unknown_product'
})
products_clean.show()

+---+----------+-----------+----------+--------+
| id|      name|category_id|base_price|  status|
+---+----------+-----------+----------+--------+
|  1|Smartphone|          1|    199.99|  active|
|  2|    Laptop|          1|    599.99|  active|
|  3|Headphones|          1|    199.99|  active|
|  4|    Tablet|          1|    149.99|  active|
|  5|Smartwatch|          1|    199.99|  active|
|  6|Power Bank|          1|    299.99|inactive|
|  7|   T-Shirt|          2|     19.99|inactive|
|  8|     Jeans|          2|     79.99|  active|
|  9|     Shoes|          2|     79.99|  active|
| 10|    Jacket|          2|     29.99|  active|
| 11|       Hat|          2|     79.99|  active|
| 12|     Scarf|          2|     19.99|  active|
| 13|      Sofa|          3|    799.99|  active|
| 14|       Bed|          3|    599.99|inactive|
| 15|     Chair|          3|    499.99|  active|
| 16|     Table|          3|    599.99|  active|
| 17|      Desk|          3|    799.99|  active|
| 18| Bookshelf|    

In [None]:
# 10. Add order year
from pyspark.sql.functions import year
orders_with_year = orders_df.withColumn('order_year', year('order_date'))

In [None]:
# 11. Window functions
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as _sum
windowSpec = Window.partitionBy('customer_id').orderBy('order_date').rowsBetween(Window.unboundedPreceding, 0)
orders_df.withColumn('running_total', _sum('total_amount').over(windowSpec)).show()

+---+-----------+----------+------------+---------+------------------+
| id|customer_id|order_date|total_amount|   status|     running_total|
+---+-----------+----------+------------+---------+------------------+
| 14|          1|2023-06-01|     2722.47|completed|           2722.47|
|  9|          2|2023-06-02|      446.72|  pending|            446.72|
| 49|          2|2023-11-27|      573.36|cancelled|           1020.08|
| 11|          2|2023-11-29|     4170.33|cancelled|           5190.41|
|  8|          4|2023-06-25|      962.03|cancelled|            962.03|
|  3|          5|2023-08-03|     3858.49|cancelled|           3858.49|
| 39|          5|2023-08-04|     1754.04|completed|           5612.53|
|  7|          6|2023-12-07|      899.91|completed|            899.91|
| 47|          7|2023-04-12|      599.36|completed|            599.36|
| 50|          8|2023-03-28|     2499.95|  pending|           2499.95|
| 36|          8|2023-10-11|      3641.5|completed|           6141.45|
| 10| 

In [None]:
# 12. Monthly stats
from pyspark.sql.functions import month,count
orders_df.groupBy(month('order_date').alias('month')) \
         .agg(_sum('total_amount').alias('total_sales'),
              count('*').alias('order_count')).show()

+-----+------------------+-----------+
|month|       total_sales|order_count|
+-----+------------------+-----------+
|   12|3534.1099999999997|          2|
|    6| 7705.679999999999|          5|
|    3|26860.579999999998|          8|
|    5|          12621.34|          4|
|    9|5635.4800000000005|          4|
|    4|5583.2300000000005|          4|
|    8|11410.579999999998|          5|
|    7|          12671.45|          4|
|   10|14868.960000000001|          5|
|   11|12212.329999999998|          6|
|    2|6012.3099999999995|          3|
+-----+------------------+-----------+



In [None]:
# 13. Category-product counts
from pyspark.sql.functions import collect_list
(products_df.groupBy('category_id')
            .agg(count('*').alias('product_count'),collect_list('name').alias('product_names'))
            .withColumnRenamed('category_id', 'id')
            .join(categories_df, 'id', "inner")
            .select('name', 'product_count','product_names')).show()

+-----------+-------------+--------------------+
|       name|product_count|       product_names|
+-----------+-------------+--------------------+
|Electronics|            6|[Smartphone, Lapt...|
| Home Goods|            6|[Sofa, Bed, Chair...|
|   Clothing|            6|[T-Shirt, Jeans, ...|
|     Sports|            7|[Basketball, Foot...|
+-----------+-------------+--------------------+



In [None]:
orders_df.show()
order_items_df.show()
products_df.show()

+---+-----------+----------+------------+---------+
| id|customer_id|order_date|total_amount|   status|
+---+-----------+----------+------------+---------+
|  1|         19|2023-04-12|     1831.24|completed|
|  2|         21|2023-04-11|     2655.56|  pending|
|  3|          5|2023-08-03|     3858.49|cancelled|
|  4|         22|2023-05-03|     3903.87|  pending|
|  5|         11|2023-09-26|      390.74|  pending|
|  6|         19|2023-03-05|     3047.48|completed|
|  7|          6|2023-12-07|      899.91|completed|
|  8|          4|2023-06-25|      962.03|cancelled|
|  9|          2|2023-06-02|      446.72|  pending|
| 10|          9|2023-02-22|      3053.4|  pending|
| 11|          2|2023-11-29|     4170.33|cancelled|
| 12|         22|2023-11-13|     2007.06|completed|
| 13|         15|2023-03-17|     2853.24|  pending|
| 14|          1|2023-06-01|     2722.47|completed|
| 15|         24|2023-05-18|     4265.84|completed|
| 16|         17|2023-12-27|      2634.2|  pending|
| 17|       

In [None]:
# 14. Pivot table
from pyspark.sql import functions as F

# Join orders, order_items, and products to link orders to categories
orders_with_categories = orders_df.alias("o") \
    .join(order_items_df.alias("oi"), col("o.id") == col("oi.order_id")) \
    .join(products_df.alias("p"), col("oi.product_id") == col("p.id")) \
    .select("o.*", "p.category_id")

orders_with_categories.show()


+---+-----------+----------+------------+---------+-----------+
| id|customer_id|order_date|total_amount|   status|category_id|
+---+-----------+----------+------------+---------+-----------+
| 18|         12|2023-10-10|     4052.35|completed|          2|
| 45|         19|2023-08-28|      997.64|cancelled|          2|
| 30|         11|2023-08-15|     1017.94|  pending|          1|
| 34|         10|2023-07-19|     4444.31|cancelled|          1|
|  3|          5|2023-08-03|     3858.49|cancelled|          1|
| 48|         17|2023-03-16|     4959.08|cancelled|          1|
| 33|         20|2023-11-07|     2833.69|  pending|          1|
|  9|          2|2023-06-02|      446.72|  pending|          1|
| 48|         17|2023-03-16|     4959.08|cancelled|          2|
|  5|         11|2023-09-26|      390.74|  pending|          2|
| 22|         25|2023-03-29|     2596.07|completed|          2|
| 23|         17|2023-08-02|     3782.47|cancelled|          1|
| 44|         13|2023-03-21|     3723.45

In [None]:
# 15. Cross-tab
orders_with_customers = orders_df.alias("o") \
    .join(customers_df.alias("c"), col("o.customer_id") == col("c.id"))
orders_with_customers.stat.crosstab('c.country', 'o.status').show()

+------------------+---------+---------+-------+
|c.country_o.status|cancelled|completed|pending|
+------------------+---------+---------+-------+
|           Germany|        0|        1|      0|
|            France|        1|        4|      4|
|             India|        2|        2|      1|
|               USA|        3|        1|      3|
|                UK|        1|        2|      0|
|            Canada|        0|        1|      5|
|             Japan|        5|        5|      4|
|         Australia|        3|        0|      2|
+------------------+---------+---------+-------+



In [None]:
#16. Optimize memory usage when joining orders with order_items tables for large datasets.
orders_df.repartition(4, "customer_id").createOrReplaceTempView("orders")
order_items_df.repartition(4, "order_id").createOrReplaceTempView("order_items")

spark.sql("""
    SELECT /*+ COALESCE(4) */ o.*, oi.*
    FROM orders o
    JOIN order_items oi ON o.id = oi.order_id
""").show()

+---+-----------+----------+------------+---------+---+--------+----------+--------+------+
| id|customer_id|order_date|total_amount|   status| id|order_id|product_id|quantity| price|
+---+-----------+----------+------------+---------+---+--------+----------+--------+------+
| 29|         20|2023-02-08|     2182.79|cancelled| 52|      29|        23|       3|129.99|
| 29|         20|2023-02-08|     2182.79|cancelled| 89|      29|         4|       2|149.99|
| 19|         20|2023-09-22|     3619.82|  pending| 72|      19|         2|       2|599.99|
| 34|         10|2023-07-19|     4444.31|cancelled| 13|      34|        14|       3|599.99|
| 34|         10|2023-07-19|     4444.31|cancelled| 77|      34|        23|       1|129.99|
| 34|         10|2023-07-19|     4444.31|cancelled| 88|      34|         6|       3|299.99|
| 31|         22|2023-07-14|     4218.89|cancelled|  5|      31|        14|       5|599.99|
| 31|         22|2023-07-14|     4218.89|cancelled| 86|      31|        16|     

In [None]:
#17. Handle skewed data distribution in the products table by implementing salting on the category_id column.
from pyspark.sql.functions import rand, concat, lit
salted_products_df = products_df.withColumn("salt", (rand() * 10).cast("int"))
salted_products_df.createOrReplaceTempView("salted_products")
order_items_df.createOrReplaceTempView("order_items")

spark.sql("""
    SELECT sp.*, oi.*
    FROM salted_products sp
    JOIN order_items oi ON sp.id = oi.product_id AND sp.salt = (oi.order_id % 10)
""").show()

+---+----------+-----------+----------+--------+----+---+--------+----------+--------+------+
| id|      name|category_id|base_price|  status|salt| id|order_id|product_id|quantity| price|
+---+----------+-----------+----------+--------+----+---+--------+----------+--------+------+
|  9|     Shoes|          2|     79.99|  active|   8| 34|      48|         9|       4| 79.99|
|  8|     Jeans|          2|     79.99|  active|   8| 49|      38|         8|       3| 79.99|
| 12|     Scarf|          2|     19.99|  active|   9| 22|      39|        12|       5| 19.99|
|  2|    Laptop|          1|    599.99|  active|   0| 99|      40|         2|       1|599.99|
|  2|    Laptop|          1|    599.99|  active|   0| 82|      30|         2|       3|599.99|
|  2|    Laptop|          1|    599.99|  active|   0| 44|      20|         2|       3|599.99|
| 25|    Helmet|          4|    299.99|  active|   7| 70|      47|        25|       1|299.99|
| 25|    Helmet|          4|    299.99|  active|   7|  9|   

In [None]:
#18. Use MLlib to predict future sales based on historical order data.
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import to_date, dayofweek, weekofyear, month, year

# Convert date to numerical features
orders_df = orders_df.withColumn("order_date", to_date(col("order_date"))) \
    .withColumn("dayofweek", dayofweek(col("order_date"))) \
    .withColumn("weekofyear", weekofyear(col("order_date"))) \
    .withColumn("month", month(col("order_date"))) \
    .withColumn("year", year(col("order_date")))
assembler = VectorAssembler(inputCols=["customer_id", "dayofweek", "weekofyear", "month", "year"], outputCol="features")
assembled_orders_df = assembler.transform(orders_df)
(training_data, test_data) = assembled_orders_df.randomSplit([0.8, 0.2])

# Train a linear regression model
lr = LinearRegression(featuresCol="features", labelCol="total_amount")
model = lr.fit(training_data)
predictions = model.transform(test_data)

# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) = " + str(rmse))

Root Mean Squared Error (RMSE) = 1603.5202079422497


In [None]:
#19. Optimize joins between orders, order_items, and products tables using broadcast hints.
from pyspark.sql.functions import broadcast

orders_df.createOrReplaceTempView("orders")
order_items_df.createOrReplaceTempView("order_items")
products_df.createOrReplaceTempView("products")

spark.sql("""
    SELECT /*+ BROADCAST(p) */ o.*, oi.*, p.*
    FROM orders o
    JOIN order_items oi ON o.id = oi.order_id
    JOIN products p ON oi.product_id = p.id
""").show()

+---+-----------+----------+------------+---------+---------+----------+-----+----+---+--------+----------+--------+------+---+-------------+-----------+----------+--------+
| id|customer_id|order_date|total_amount|   status|dayofweek|weekofyear|month|year| id|order_id|product_id|quantity| price| id|         name|category_id|base_price|  status|
+---+-----------+----------+------------+---------+---------+----------+-----+----+---+--------+----------+--------+------+---+-------------+-----------+----------+--------+
| 22|         25|2023-03-29|     2596.07|completed|        4|        13|    3|2023| 20|      22|         9|       2| 79.99|  9|        Shoes|          2|     79.99|  active|
| 34|         10|2023-07-19|     4444.31|cancelled|        4|        29|    7|2023| 13|      34|        14|       3|599.99| 14|          Bed|          3|    599.99|inactive|
| 32|         17|2023-02-23|      776.12|completed|        5|         8|    2|2023| 27|      32|         8|       5| 79.99|  8|   

In [None]:
#20. Convert the customers table between Parquet and CSV formats while maintaining schema integrity.
customers_df.write.csv("data/customers.csv", mode="overwrite", header=True)

csv_customers_df = spark.read.csv("data/customers.csv", header=True, inferSchema=True)
csv_customers_df.printSchema()
csv_customers_df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- status: string (nullable = true)
 |-- join_date: date (nullable = true)

+---+--------------+---------+--------+----------+
| id|          name|  country|  status| join_date|
+---+--------------+---------+--------+----------+
| 13|Alice Brown_13|   France|  active|2023-11-20|
| 14| Mike Davis_14|       UK|  active|2023-08-30|
| 15| John Smith_15|   France|inactive|2023-09-16|
| 16|   Jane Doe_16|   Canada|  active|2023-07-07|
| 17| Bob Wilson_17|      USA|  active|2023-10-15|
| 18|Alice Brown_18|    India|  active|2023-06-27|
| 19| Mike Davis_19|    Japan|  active|2023-06-10|
| 20| John Smith_20|    Japan|  active|2023-04-12|
| 21|   Jane Doe_21|    Japan|  active|2023-01-15|
| 22| Bob Wilson_22|    India|  active|2023-12-24|
| 23|Alice Brown_23|    Japan|  active|2023-10-16|
| 24| Mike Davis_24|       UK|  active|2023-03-12|
| 25| John Smith_25|   France|  active|202

In [None]:
#21. Build a classification model to predict high-value customers (> $5000 total spending) using features from the orders table.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import when, col

# Define a high-value customer as one with total spending > $5000
orders_with_high_value = orders_df.withColumn("high_value", when(col("total_amount") > 5000, 1).otherwise(0))

# Assemble features into a vector
assembler = VectorAssembler(inputCols=["customer_id", "total_amount"], outputCol="features")
assembled_orders_df = assembler.transform(orders_with_high_value)

# Split data into training and testing sets
(training_data, test_data) = assembled_orders_df.randomSplit([0.8, 0.2], seed=42)

# Train a logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="high_value")
model = lr.fit(training_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="high_value", rawPredictionCol="prediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print("Area Under ROC (AUC) = " + str(auc))


Area Under ROC (AUC) = 0.0


In [None]:
#22. Implement partition pruning on the orders table using order_date for faster date-range queries.
orders_df.write.partitionBy("order_date").parquet("data/partitioned_orders")

partitioned_orders_df = spark.read.parquet("data/partitioned_orders")
partitioned_orders_df.createOrReplaceTempView("partitioned_orders")

spark.sql("""
    SELECT *
    FROM partitioned_orders
    WHERE order_date BETWEEN '2023-06-01' AND '2023-06-30'
""").show()


+---+-----------+------------+---------+---------+----------+-----+----+----------+
| id|customer_id|total_amount|   status|dayofweek|weekofyear|month|year|order_date|
+---+-----------+------------+---------+---------+----------+-----+----+----------+
| 40|         20|     1076.74|cancelled|        5|        26|    6|2023|2023-06-29|
| 14|          1|     2722.47|completed|        5|        22|    6|2023|2023-06-01|
|  8|          4|      962.03|cancelled|        1|        25|    6|2023|2023-06-25|
|  9|          2|      446.72|  pending|        6|        22|    6|2023|2023-06-02|
| 26|         12|     2497.72|  pending|        3|        24|    6|2023|2023-06-13|
+---+-----------+------------+---------+---------+----------+-----+----+----------+



In [None]:
#23. Handle memory optimization when processing large order_item datasets with multiple aggregations.
#Reduce the number of partitions before aggregation
repartitioned_order_items_df = order_items_df.repartition(10)

# Perform aggregations
aggregated_order_items = repartitioned_order_items_df.groupBy("order_id") \
    .agg(_sum("quantity").alias("total_quantity"), _sum("price").alias("total_price"))

aggregated_order_items.show()


+--------+--------------+------------------+
|order_id|total_quantity|       total_price|
+--------+--------------+------------------+
|      39|            18|           1409.94|
|      33|             6|            379.98|
|      28|             8|            949.98|
|       1|             1|             79.99|
|      48|            19|           1779.95|
|       2|             7|            499.98|
|      30|            10|           1399.96|
|      40|             8|           2499.96|
|      45|            10|469.96000000000004|
|      29|             5|            279.98|
|      25|            14|           1879.96|
|       9|             4|            449.98|
|      10|             6|            479.97|
|      37|             6|            629.98|
|       4|             2|           1099.98|
|      15|            12|           1519.96|
|      23|            18|           1579.95|
|      16|            12|            749.97|
|       5|             6|            579.98|
|      36|

In [None]:
#24. Implement robust error handling for missing product IDs in the order_items table.
from pyspark.sql.functions import when, lit

# Add a check for missing product IDs
valid_product_ids = [row.id for row in products_df.select('id').collect()]

order_items_checked_df = order_items_df.withColumn("valid_product", when(col("product_id").isin(valid_product_ids), 1).otherwise(0))

order_items_checked_df.show()

+---+--------+----------+--------+------+-------------+
| id|order_id|product_id|quantity| price|valid_product|
+---+--------+----------+--------+------+-------------+
|  1|      30|        21|       3|299.99|            1|
|  2|      37|        10|       3| 29.99|            1|
|  3|      39|        23|       5|129.99|            1|
|  4|      35|         2|       5|599.99|            1|
|  5|      31|        14|       5|599.99|            1|
|  6|      25|        16|       3|599.99|            1|
|  7|       2|         3|       3|199.99|            1|
|  8|      13|        23|       3|129.99|            1|
|  9|      47|        25|       5|299.99|            1|
| 10|      33|         6|       4|299.99|            1|
| 11|      25|        14|       3|599.99|            1|
| 12|      36|        23|       2|129.99|            1|
| 13|      34|        14|       3|599.99|            1|
| 14|      30|         1|       2|199.99|            1|
| 15|      40|        13|       1|799.99|       

In [None]:
# 25. Validate data integrity across related tables (orders → order_items → products).
# Check for orphaned order items
order_items_with_orders = order_items_df.join(orders_df, order_items_df.order_id == orders_df.id, "left_outer")

orphaned_order_items = order_items_with_orders.filter(orders_df.id.isNull())

print("Number of orphaned order items: ", orphaned_order_items.count())


Number of orphaned order items:  0


In [None]:
#26. Optimize shuffle operations during aggregation of large-scale order data.
spark.conf.set("spark.sql.shuffle.partitions", "10")

In [None]:
#27. Calculate rolling averages of sales amounts for each category over a 3-month window.
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
from pyspark.sql.functions import date_trunc

# Create a window specification
windowSpec = Window.partitionBy("category_id").orderBy("month").rowsBetween(-2, 0)

# Calculate the rolling average
rolling_avg = orders_with_categories.withColumn("month", date_trunc("MM", col("order_date"))) \
    .groupBy("category_id", "month") \
    .agg(_sum("total_amount").alias("total_sales")) \
    .withColumn("rolling_avg", avg("total_sales").over(windowSpec))

rolling_avg.show()

+-----------+-------------------+------------------+------------------+
|category_id|              month|       total_sales|       rolling_avg|
+-----------+-------------------+------------------+------------------+
|          1|2023-02-01 00:00:00|           2182.79|           2182.79|
|          1|2023-03-01 00:00:00|          12405.98|          7294.385|
|          1|2023-04-01 00:00:00|           2655.56| 5748.110000000001|
|          1|2023-05-01 00:00:00| 8717.470000000001| 7926.336666666667|
|          1|2023-06-01 00:00:00|           1970.18| 4447.736666666667|
|          1|2023-07-01 00:00:00|           8072.74| 6253.463333333333|
|          1|2023-08-01 00:00:00|          14477.25| 8173.389999999999|
|          1|2023-09-01 00:00:00|           3619.82| 8723.269999999999|
|          1|2023-10-01 00:00:00|           4068.65| 7388.573333333334|
|          1|2023-11-01 00:00:00|           2833.69|3507.3866666666668|
|          1|2023-12-01 00:00:00|            2634.2| 3178.846666

In [None]:
#28. Process nested JSON structures representing product hierarchies within categories.
data = [{"category": "Electronics",
         "products": [{"name": "Laptop", "price": 1200},
                      {"name": "Keyboard", "price": 75}]},
        {"category": "Clothing",
         "products": [{"name": "Shirt", "price": 50},
                      {"name": "Jeans", "price": 100}]}]
df = spark.createDataFrame(data)

df.printSchema()
df.show(truncate=False)

root
 |-- category: string (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

+-----------+------------------------------------------------------------------+
|category   |products                                                          |
+-----------+------------------------------------------------------------------+
|Electronics|[{name -> Laptop, price -> 1200}, {name -> Keyboard, price -> 75}]|
|Clothing   |[{name -> Shirt, price -> 50}, {name -> Jeans, price -> 100}]     |
+-----------+------------------------------------------------------------------+



In [None]:
#29. Simulate streaming data ingestion for new orders and calculate real-time sales metrics.
from pyspark.sql.functions import expr
from time import sleep

# Create a streaming DataFrame
streaming_orders_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load() \
    .withColumn("customer_id", expr("cast(value % 25 as int) + 1")) \
    .withColumn("order_date", expr("current_date()")) \
    .withColumn("total_amount", expr("abs(cast(value as int) % 5000) + 50")) \
    .withColumn("status", expr("case when value % 3 = 0 then 'completed' when value % 3 = 1 then 'pending' else 'cancelled' end"))

# Calculate real-time sales metrics
real_time_sales = streaming_orders_df.groupBy("status").agg(_sum("total_amount").alias("total_sales"))

# Write the output to the console
query = real_time_sales.writeStream.outputMode("complete").format("console").start()

# Keep the query running for a while
try:
    query.awaitTermination(10)  # Run for 10 seconds
except KeyboardInterrupt:
    query.stop()

In [None]:
#30. Optimize partition strategy for the orders table using order_date and country.
orders_with_customers = orders_df.join(customers_df, orders_df.customer_id == customers_df.id)
partitioned_orders = orders_with_customers.repartition(4, "order_date", "country")

partitioned_orders.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(order_date#224, country#214, 4), REPARTITION_BY_NUM, [plan_id=14223]
   +- SortMergeJoin [customer_id#223L], [id#212L], Inner
      :- Sort [customer_id#223L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(customer_id#223L, 10), ENSURE_REQUIREMENTS, [plan_id=14218]
      :     +- Project [id#222L, customer_id#223L, order_date#224, total_amount#225, status#226, dayofweek(order_date#224) AS dayofweek#2005, weekofyear(order_date#224) AS weekofyear#2012, month(order_date#224) AS month#2020, year(order_date#224) AS year#2029]
      :        +- Filter isnotnull(customer_id#223L)
      :           +- Scan ExistingRDD[id#222L,customer_id#223L,order_date#224,total_amount#225,status#226]
      +- Sort [id#212L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#212L, 10), ENSURE_REQUIREMENTS, [plan_id=14219]
            +- Filter isnotnull(id#212L)
               +- Scan Existing