## Initialize Spark Scala Env.

In [None]:
# Mount Google Drive to store cached files
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
# Set variables
strBasePath="/content/drive/MyDrive/IBM-DE-Spark-Scala"
scala_deb_path = strBasePath+"/scala-2.12.18.deb"
spark_tgz_path = strBasePath+"/spark-3.4.1-bin-hadoop3.tgz"

!mkdir -p /content/tmp
import os
# Download Scala .deb if not cached
if not os.path.exists(scala_deb_path):
    !wget -O "{scala_deb_path}" https://github.com/scala/scala/releases/download/v2.12.18/scala-2.12.18.deb

# Download Spark tgz if not cached
if not os.path.exists(spark_tgz_path):
    !wget -O "{spark_tgz_path}" https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Copy cached files to working dir
!cp "{scala_deb_path}" /content/tmp/scala-2.12.18.deb
!cp "{spark_tgz_path}" /content/tmp/spark-3.4.1-bin-hadoop3.tgz

# Install Java if not already present
!java -version || apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Install Scala
!dpkg -i /content/tmp/scala-2.12.18.deb

# Extract Spark
!tar xf /content/tmp/spark-3.4.1-bin-hadoop3.tgz -C /content

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
os.environ["PATH"] += f":{os.environ['SPARK_HOME']}/bin"

# Confirm installation
!java -version
!scala -version
!scalac -version
!echo "Spark path: $SPARK_HOME"
!ls $SPARK_HOME

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Selecting previously unselected package scala.
(Reading database ... 126284 files and directories currently installed.)
Preparing to unpack /content/tmp/scala-2.12.18.deb ...
Unpacking scala (2.12.18-400) ...
Setting up scala (2.12.18-400) ...
Creating system group: scala
Creating system user: scala in scala with scala daemon-user and shell /bin/false
Processing triggers for man-db (2.10.2-1) ...
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Scala code runner version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Scala compiler version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Spark path: /content/

In [None]:
# Install findspark (helps Python locate Spark)
!pip install -q findspark


In [None]:
# Set environment and create SparkSession
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ClassicModelsAnalytics") \
    .getOrCreate()

print("✅ SparkSession created successfully")
print("Spark version:", spark.version)


✅ SparkSession created successfully
Spark version: 3.4.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd

# Path to your data folder
data_path = "/content/drive/MyDrive/classicmodels/data/"

# Table name → column headers
tables_with_headers = {
    "products": [
        "productCode", "productName", "productLine", "productScale", "productVendor",
        "productDescription", "quantityInStock", "buyPrice", "MSRP"
    ],
    "productlines": [
        "productLine", "textDescription", "htmlDescription", "image"
    ],
    "offices": [
        "officeCode", "city", "phone", "addressLine1", "addressLine2", "state",
        "country", "postalCode", "territory"
    ],
    "employees": [
        "employeeNumber", "lastName", "firstName", "extension", "email", "officeCode",
        "reportsTo", "jobTitle"
    ],
    "customers": [
        "customerNumber", "customerName", "contactLastName", "contactFirstName",
        "phone", "addressLine1", "addressLine2", "city", "state", "postalCode",
        "country", "salesRepEmployeeNumber", "creditLimit"
    ],
    "payments": [
        "customerNumber", "checkNumber", "paymentDate", "amount"
    ],
    "orders": [
        "orderNumber", "orderDate", "requiredDate", "shippedDate", "status", "comments", "customerNumber"
    ],
    "orderdetails": [
        "orderNumber", "productCode", "quantityOrdered", "priceEach", "orderLineNumber"
    ]
}

# Apply headers and save again
for table, columns in tables_with_headers.items():
    csv_file = data_path + f"{table}.csv"
    try:
        df = pd.read_csv(csv_file, header=None)
        df.columns = columns
        df.to_csv(csv_file, index=False)
        print(f"✅ Headers added to {table}.csv")
    except Exception as e:
        print(f"❌ Error processing {table}: {e}")


✅ Headers added to products.csv
✅ Headers added to productlines.csv
✅ Headers added to offices.csv
✅ Headers added to employees.csv
✅ Headers added to customers.csv
✅ Headers added to payments.csv
✅ Headers added to orders.csv
✅ Headers added to orderdetails.csv


In [None]:
base_path = "/content/drive/MyDrive/classicmodels/data/"


In [None]:
productlines = spark.read.option("header", True).csv(base_path + "productlines.csv")
products = spark.read.option("header", True).csv(base_path + "products.csv")
offices = spark.read.option("header", True).csv(base_path + "offices.csv")
employees = spark.read.option("header", True).csv(base_path + "employees.csv")
customers = spark.read.option("header", True).csv(base_path + "customers.csv")
payments = spark.read.option("header", True).csv(base_path + "payments.csv")
orders = spark.read.option("header", True).csv(base_path + "orders.csv")
orderdetails = spark.read.option("header", True).csv(base_path + "orderdetails.csv")


In [None]:
products.printSchema()
customers.printSchema()
orders.printSchema()
orderdetails.printSchema()

products.show(5)
customers.show(5)
orders.show(5)
orderdetails.show(5)


root
 |-- productCode: string (nullable = true)
 |-- productName: string (nullable = true)
 |-- productLine: string (nullable = true)
 |-- productScale: string (nullable = true)
 |-- productVendor: string (nullable = true)
 |-- productDescription: string (nullable = true)
 |-- quantityInStock: string (nullable = true)
 |-- buyPrice: string (nullable = true)
 |-- MSRP: string (nullable = true)

root
 |-- customerNumber: string (nullable = true)
 |-- customerName: string (nullable = true)
 |-- contactLastName: string (nullable = true)
 |-- contactFirstName: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- addressLine1: string (nullable = true)
 |-- addressLine2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postalCode: string (nullable = true)
 |-- country: string (nullable = true)
 |-- salesRepEmployeeNumber: string (nullable = true)
 |-- creditLimit: string (nullable = true)

root
 |-- orderNumber: string (nulla

In [None]:
joined = customers.join(orders, "customerNumber").join(orderdetails, "orderNumber")
joined.select("customerName", "orderNumber", "productCode", "quantityOrdered").show(5)


+-----------------+-----------+-----------+---------------+
|     customerName|orderNumber|productCode|quantityOrdered|
+-----------------+-----------+-----------+---------------+
|     customerName|orderNumber|productCode|quantityOrdered|
|Atelier graphique|      10123|   S24_1628|             50|
|Atelier graphique|      10123|   S18_3685|             34|
|Atelier graphique|      10123|   S18_2870|             46|
|Atelier graphique|      10123|   S18_1589|             26|
+-----------------+-----------+-----------+---------------+
only showing top 5 rows



In [None]:
productlines.createOrReplaceTempView("productlines")
products.createOrReplaceTempView("products")
offices.createOrReplaceTempView("offices")
employees.createOrReplaceTempView("employees")
customers.createOrReplaceTempView("customers")
payments.createOrReplaceTempView("payments")
orders.createOrReplaceTempView("orders")
orderdetails.createOrReplaceTempView("orderdetails")

print("✅ All tables registered as temp views")


✅ All tables registered as temp views


In [None]:
products_table = products.select(
    "productCode", "productName", "productLine", "quantityInStock", "buyPrice", "MSRP"
)

customers_table = customers.select(
    "customerNumber", "customerName", "contactLastName", "phone", "city", "creditLimit"
)

orders_table = orders.select(
    "orderNumber", "customerNumber", "orderDate", "status", "comments", "shippedDate"
)


In [None]:
products_table.createOrReplaceTempView("products_table")
customers_table.createOrReplaceTempView("customers_table")
orders_table.createOrReplaceTempView("orders_table")


In [None]:
spark.sql("SELECT * FROM products_table LIMIT 5").show()


+-----------+--------------------+-----------+---------------+--------+-----+
|productCode|         productName|productLine|quantityInStock|buyPrice| MSRP|
+-----------+--------------------+-----------+---------------+--------+-----+
|productCode|         productName|productLine|quantityInStock|buyPrice| MSRP|
|   S10_1678|1969 Harley David...|Motorcycles|           7933|   48.81|95.70|
+-----------+--------------------+-----------+---------------+--------+-----+



In [None]:
from pyspark.sql import Row

sample_product = spark.createDataFrame([
    Row(productCode="S999", productName="Test Car", productLine="Classic Cars",
        quantityInStock="50", buyPrice="29.99", MSRP="49.99")
])

# Add to products_table
products_table = products_table.union(sample_product)
products_table.createOrReplaceTempView("products_table")


In [None]:
products_table.filter("MSRP > 100").select("productName", "MSRP").show(5)


+-----------+----+
|productName|MSRP|
+-----------+----+
+-----------+----+



In [None]:
products_table.groupBy("productLine").count().show()


+------------+-----+
| productLine|count|
+------------+-----+
| Motorcycles|    1|
| productLine|    1|
|Classic Cars|    1|
+------------+-----+



In [None]:
customers_table.join(orders_table, "customerNumber") \
    .select("customerName", "orderNumber", "orderDate", "status") \
    .show(5)


+--------------------+-----------+----------+-------+
|        customerName|orderNumber| orderDate| status|
+--------------------+-----------+----------+-------+
|        customerName|orderNumber| orderDate| status|
|Online Diecast Cr...|      10100|2003-01-06|Shipped|
|Blauer See Auto, Co.|      10101|2003-01-09|Shipped|
|     Vitachrome Inc.|      10102|2003-01-10|Shipped|
|  Baane Mini Imports|      10103|2003-01-29|Shipped|
+--------------------+-----------+----------+-------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col, sum as spark_sum

order_value = orderdetails \
    .join(orders, "orderNumber") \
    .join(customers, "customerNumber") \
    .withColumn("order_value", col("quantityOrdered") * col("priceEach")) \
    .groupBy("customerNumber", "customerName") \
    .agg(spark_sum("order_value").alias("total_order_value")) \
    .orderBy(col("total_order_value").desc())

order_value.show(5)


+--------------+--------------------+------------------+
|customerNumber|        customerName| total_order_value|
+--------------+--------------------+------------------+
|           124|Mini Gifts Distri...|122698.70000000001|
|           321|Corporate Gift Id...| 85559.11999999998|
|           148|Dragon Souveniers...|           82730.3|
|           151|  Muscle Machine Inc|          58841.35|
|           145|Danish Wholesale ...|          53959.21|
+--------------+--------------------+------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Cast quantityInStock to int
products_casted = products.withColumn("quantityInStock", col("quantityInStock").cast("int"))

window_spec = Window.partitionBy("productLine").orderBy("quantityInStock")

lowest_stock = products_casted \
    .withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") == 1) \
    .select("productLine", "productName", "quantityInStock")

lowest_stock.show()


+-----------+--------------------+---------------+
|productLine|         productName|quantityInStock|
+-----------+--------------------+---------------+
|Motorcycles|1969 Harley David...|           7933|
|productLine|         productName|           null|
+-----------+--------------------+---------------+



In [None]:
payments_casted = payments.withColumn("amount", col("amount").cast("double"))

top_customers = payments_casted \
    .join(customers, "customerNumber") \
    .groupBy("customerName") \
    .agg(spark_sum("amount").alias("total_paid")) \
    .orderBy(col("total_paid").desc()) \
    .limit(5)

top_customers.show()


+--------------------+------------------+
|        customerName|        total_paid|
+--------------------+------------------+
|Mini Gifts Distri...| 584188.2400000001|
|Australian Collec...|180585.06999999998|
|  Muscle Machine Inc|         177913.95|
|Dragon Souveniers...|         156251.03|
|Down Under Souven...|154622.08000000002|
+--------------------+------------------+



In [None]:
from pyspark.sql.functions import month, year, count

monthly_orders = orders \
    .withColumn("orderMonth", month("orderDate")) \
    .withColumn("orderYear", year("orderDate")) \
    .groupBy("orderYear", "orderMonth") \
    .agg(count("orderNumber").alias("total_orders")) \
    .orderBy("orderYear", "orderMonth")

monthly_orders.show()


+---------+----------+------------+
|orderYear|orderMonth|total_orders|
+---------+----------+------------+
|     null|      null|           1|
|     2003|         1|           5|
|     2003|         2|           3|
|     2003|         3|           5|
|     2003|         4|           7|
|     2003|         5|           6|
|     2003|         6|           7|
|     2003|         7|           7|
|     2003|         8|           5|
|     2003|         9|           8|
|     2003|        10|          10|
+---------+----------+------------+



In [None]:
order_value.createOrReplaceTempView("customer_order_value")
top_customers.createOrReplaceTempView("top_customers_by_payment")
monthly_orders.createOrReplaceTempView("monthly_order_trends")


In [None]:
# View top customers by payment
spark.sql("SELECT * FROM top_customers_by_payment").show()

# Get order value > 50,000
spark.sql("SELECT * FROM customer_order_value WHERE total_order_value > 50000").show()


+--------------------+------------------+
|        customerName|        total_paid|
+--------------------+------------------+
|Mini Gifts Distri...| 584188.2400000001|
|Australian Collec...|180585.06999999998|
|  Muscle Machine Inc|         177913.95|
|Dragon Souveniers...|         156251.03|
|Down Under Souven...|154622.08000000002|
+--------------------+------------------+

+--------------+--------------------+------------------+
|customerNumber|        customerName| total_order_value|
+--------------+--------------------+------------------+
|           124|Mini Gifts Distri...|122698.70000000001|
|           321|Corporate Gift Id...| 85559.11999999998|
|           148|Dragon Souveniers...|           82730.3|
|           151|  Muscle Machine Inc|          58841.35|
|           145|Danish Wholesale ...|          53959.21|
|           114|Australian Collec...|          53429.11|
|           278|       Rovelli Gifts|52151.810000000005|
|           121|  Baane Mini Imports|          5171

In [None]:
output_path = "/content/drive/MyDrive/classicmodels/output/"
import os
os.makedirs(output_path, exist_ok=True)


In [None]:
order_value.coalesce(1).write.option("header", True).csv(output_path + "customer_order_value", mode='overwrite')
top_customers.coalesce(1).write.option("header", True).csv(output_path + "top_customers_by_payment", mode='overwrite')
monthly_orders.coalesce(1).write.option("header", True).csv(output_path + "monthly_order_trends", mode='overwrite')
lowest_stock.coalesce(1).write.option("header", True).csv(output_path + "lowest_stock_products", mode='overwrite')
