## Initialize Spark Scala Env.

In [1]:
# Mount Google Drive to store cached files
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [2]:
# Set variables
strBasePath="/content/drive/MyDrive/IBM-DE-Spark-Scala"
scala_deb_path = strBasePath+"/scala-2.12.18.deb"
spark_tgz_path = strBasePath+"/spark-3.4.1-bin-hadoop3.tgz"

!mkdir -p /content/tmp
import os
# Download Scala .deb if not cached
if not os.path.exists(scala_deb_path):
    !wget -O "{scala_deb_path}" https://github.com/scala/scala/releases/download/v2.12.18/scala-2.12.18.deb

# Download Spark tgz if not cached
if not os.path.exists(spark_tgz_path):
    !wget -O "{spark_tgz_path}" https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Copy cached files to working dir
!cp "{scala_deb_path}" /content/tmp/scala-2.12.18.deb
!cp "{spark_tgz_path}" /content/tmp/spark-3.4.1-bin-hadoop3.tgz

# Install Java if not already present
!java -version || apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Install Scala
!dpkg -i /content/tmp/scala-2.12.18.deb

# Extract Spark
!tar xf /content/tmp/spark-3.4.1-bin-hadoop3.tgz -C /content

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
os.environ["PATH"] += f":{os.environ['SPARK_HOME']}/bin"

# Confirm installation
!java -version
!scala -version
!scalac -version
!echo "Spark path: $SPARK_HOME"
!ls $SPARK_HOME

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Selecting previously unselected package scala.
(Reading database ... 126284 files and directories currently installed.)
Preparing to unpack /content/tmp/scala-2.12.18.deb ...
Unpacking scala (2.12.18-400) ...
Setting up scala (2.12.18-400) ...
Creating system group: scala
Creating system user: scala in scala with scala daemon-user and shell /bin/false
Processing triggers for man-db (2.10.2-1) ...
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Scala code runner version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Scala compiler version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Spark path: /content/

## Test Hello World in JAVA

In [4]:
%%writefile CreateDataFrame.java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Encoders;
import java.util.Arrays;
import java.util.List;

public class CreateDataFrame {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("CreateDataFrameExample")
                .master("local[*]")
                .getOrCreate();

        // Sample data
        List<String> data = Arrays.asList("Java", "Python", "Scala");

        // Create DataFrame from a list
        Dataset<String> df = spark.createDataset(data, Encoders.STRING());

        // Show the DataFrame content
        df.show();

        spark.stop();
    }
}

Overwriting CreateDataFrame.java


In [5]:
import os
spark_home = os.environ.get("SPARK_HOME")
!javac -cp "$spark_home/jars/*" CreateDataFrame.java

In [6]:
import os
spark_home = os.environ.get("SPARK_HOME")
!java -cp "$spark_home/jars/*:." CreateDataFrame

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 05:01:12 INFO SparkContext: Running Spark version 3.4.1
25/08/06 05:01:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 05:01:13 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 05:01:13 INFO SparkContext: Submitted application: CreateDataFrameExample
25/08/06 05:01:13 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 05:01:13 INFO ResourceProfile: Limiting resource is cpu
25/08/06 05:01:13 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 05:01:13 INFO SecurityManager: Changing view acls to: root
25/08/06 05:01

In [7]:
# set varible

!SPARK_HOME=/content/spark-3.3.2-bin-hadoop3
!JARS=$(echo $SPARK_HOME/jars/*.jar | tr ' ' ':')


In [8]:
!echo $SPARK_HOME
!echo $JARS

/content/spark-3.4.1-bin-hadoop3



In [9]:
!java -cp "$(echo $SPARK_HOME/jars/*.jar | tr ' ' ':')" CreateDataFrame.java

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 05:01:44 INFO SparkContext: Running Spark version 3.4.1
25/08/06 05:01:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 05:01:45 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 05:01:45 INFO SparkContext: Submitted application: CreateDataFrameExample
25/08/06 05:01:45 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 05:01:45 INFO ResourceProfile: Limiting resource is cpu
25/08/06 05:01:45 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 05:01:46 INFO SecurityManager: Changing view acls to: root
25/08/06 05:01

In [None]:
!java -cp ".:$(echo $SPARK_HOME/jars/*.jar | tr ' ' ':')" CreateDataFrame

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 04:31:56 INFO SparkContext: Running Spark version 3.4.1
25/08/06 04:31:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 04:31:57 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 04:31:57 INFO SparkContext: Submitted application: CreateDataFrameExample
25/08/06 04:31:57 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 04:31:57 INFO ResourceProfile: Limiting resource is cpu
25/08/06 04:31:57 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 04:31:57 INFO SecurityManager: Changing view acls to: root
25/08/06 04:31

# Task
Perform data ingestion, setup, and product/order analysis using Apache Spark with Java based on the provided dataset tables and instructions. The analysis should include finding the top 10 products by quantity sold, calculating product-wise revenue, and calculating the average order value. The data should be read from CSV files, converted to Parquet, and stored in "/data/parquet/".

## Environment setup

### Subtask:
Install Java and set up Apache Spark as described. Link the project with a GitHub repository.


## Data ingestion & setup

### Subtask:
Read the CSV files for all the dataset tables into Spark DataFrames, define schemas, convert them to Parquet format, and save them to the specified location.


**Reasoning**:
Create a SparkSession object to start the Spark application.



In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CSV to Parquet Conversion") \
    .master("local[*]") \
    .getOrCreate()

**Reasoning**:
Define schemas for each table and read the CSV files into DataFrames using the defined schemas. Then convert and save the DataFrames to Parquet format.



In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
import os

# Initialize Spark session
spark = SparkSession.builder \
    .appName("ClassicModels Ingestion") \
    .getOrCreate()

# Define manual schemas for each table
schemas = {
    "customers": StructType([
        StructField("customerNumber", IntegerType()),
        StructField("customerName", StringType()),
        StructField("contactLastName", StringType()),
        StructField("contactFirstName", StringType()),
        StructField("phone", StringType()),
        StructField("addressLine1", StringType()),
        StructField("addressLine2", StringType()),
        StructField("city", StringType()),
        StructField("state", StringType()),
        StructField("postalCode", StringType()),
        StructField("country", StringType()),
        StructField("salesRepEmployeeNumber", IntegerType()),
        StructField("creditLimit", DoubleType())
    ]),
    "employees": StructType([
        StructField("employeeNumber", IntegerType()),
        StructField("lastName", StringType()),
        StructField("firstName", StringType()),
        StructField("extension", StringType()),
        StructField("email", StringType()),
        StructField("officeCode", StringType()),
        StructField("reportsTo", IntegerType()),
        StructField("jobTitle", StringType())
    ]),
    "offices": StructType([
        StructField("officeCode", StringType()),
        StructField("city", StringType()),
        StructField("phone", StringType()),
        StructField("addressLine1", StringType()),
        StructField("addressLine2", StringType()),
        StructField("state", StringType()),
        StructField("country", StringType()),
        StructField("postalCode", StringType()),
        StructField("territory", StringType())
    ]),
    "orderdetails": StructType([
        StructField("orderNumber", IntegerType()),
        StructField("productCode", StringType()),
        StructField("quantityOrdered", IntegerType()),
        StructField("priceEach", DoubleType()),
        StructField("orderLineNumber", IntegerType())
    ]),
    "orders": StructType([
        StructField("orderNumber", IntegerType()),
        StructField("orderDate", DateType()),
        StructField("requiredDate", DateType()),
        StructField("shippedDate", DateType()),
        StructField("status", StringType()),
        StructField("comments", StringType()),
        StructField("customerNumber", IntegerType())
    ]),
    "payments": StructType([
        StructField("customerNumber", IntegerType()),
        StructField("checkNumber", StringType()),
        StructField("paymentDate", DateType()),
        StructField("amount", DoubleType())
    ]),
    "productlines": StructType([
        StructField("productLine", StringType()),
        StructField("textDescription", StringType()),
        StructField("htmlDescription", StringType()),
        StructField("image", StringType())
    ]),
    "products": StructType([
        StructField("productCode", StringType()),
        StructField("productName", StringType()),
        StructField("productLine", StringType()),
        StructField("productScale", StringType()),
        StructField("productVendor", StringType()),
        StructField("productDescription", StringType()),
        StructField("quantityInStock", IntegerType()),
        StructField("buyPrice", DoubleType()),
        StructField("MSRP", DoubleType())
    ])
}

# Input: CSV folder in Drive
base_input_path = "/content/drive/MyDrive/classicmodels"

# Output: Parquet folder in Drive (persistent)
base_output_path = "/content/drive/MyDrive/data/parquet/"

# Debug: Check input directory
print(f"Checking input path: {base_input_path}")
if os.path.exists(base_input_path):
    print(f"✅ Input path exists.\nContents: {os.listdir(base_input_path)}\n")
else:
    print("❌ ERROR: Input path does NOT exist. Check your Drive folder name.")
    raise FileNotFoundError(base_input_path)

# Read each CSV, convert to Parquet
for table_name, schema in schemas.items():
    csv_path = os.path.join(base_input_path, f"{table_name}.csv")
    parquet_path = os.path.join(base_output_path, table_name)

    print(f"🔄 Processing: {table_name}")

    # Check if CSV file exists
    if not os.path.exists(csv_path):
        print(f"⚠️  Skipped: File not found → {csv_path}")
        continue

    # Read with schema
    df = spark.read.csv(csv_path, header=True, schema=schema)

    # Save to Parquet
    df.write.mode("overwrite").parquet(parquet_path)
    print(f"✅ Saved {table_name} to {parquet_path}\n")

print("🎉 All available CSVs processed and saved as Parquet.")


Checking input path: /content/drive/MyDrive/classicmodels
✅ Input path exists.
Contents: ['products.csv', 'offices.csv', 'productlines.csv', 'employees.csv', 'payments.csv', 'customers.csv', 'orders.csv', 'orderdetails.csv', 'customers', 'employees', 'offices', 'orderdetails', 'orders', 'payments', 'productlines', 'products']

🔄 Processing: customers
✅ Saved customers to /content/drive/MyDrive/data/parquet/customers

🔄 Processing: employees
✅ Saved employees to /content/drive/MyDrive/data/parquet/employees

🔄 Processing: offices
✅ Saved offices to /content/drive/MyDrive/data/parquet/offices

🔄 Processing: orderdetails
✅ Saved orderdetails to /content/drive/MyDrive/data/parquet/orderdetails

🔄 Processing: orders
✅ Saved orders to /content/drive/MyDrive/data/parquet/orders

🔄 Processing: payments
✅ Saved payments to /content/drive/MyDrive/data/parquet/payments

🔄 Processing: productlines
✅ Saved productlines to /content/drive/MyDrive/data/parquet/productlines

🔄 Processing: products
✅ Sa

In [29]:
# Read CSVs, convert to Parquet, and save
import os

base_input_path = "/content/drive/MyDrive/classicmodels"
base_output_path = "/content/drive/MyDrive/data/parquet/"

# Verify the input directory exists and list its contents for debugging
print(f"Checking input path: {base_input_path}")
if os.path.exists(base_input_path):
    print(f"✅ Input path exists. Contents: {os.listdir(base_input_path)}\n")
else:
    print(f"❌ Input path does NOT exist.")
    raise FileNotFoundError(base_input_path)

# Iterate through all tables and convert to Parquet
for table_name, schema in schemas.items():
    csv_path = os.path.join(base_input_path, f"{table_name}.csv")
    parquet_path = os.path.join(base_output_path, table_name)

    print(f"🔄 Processing {table_name}...")

    if not os.path.exists(csv_path):
        print(f"⚠️  Skipped: Input file not found: {csv_path}")
        continue

    df = spark.read.csv(csv_path, header=True, schema=schema)

    # Create output folder (Drive) if needed
    os.makedirs(parquet_path, exist_ok=True)

    df.write.parquet(parquet_path, mode="overwrite")
    print(f"✅ Saved {table_name} to {parquet_path}\n")

print("🎉 All CSVs processed and saved as Parquet in Google Drive.")


Checking input path: /content/drive/MyDrive/classicmodels
✅ Input path exists. Contents: ['products.csv', 'offices.csv', 'productlines.csv', 'employees.csv', 'payments.csv', 'customers.csv', 'orders.csv', 'orderdetails.csv', 'customers', 'employees', 'offices', 'orderdetails', 'orders', 'payments', 'productlines', 'products']

🔄 Processing customers...
✅ Saved customers to /content/drive/MyDrive/data/parquet/customers

🔄 Processing employees...
✅ Saved employees to /content/drive/MyDrive/data/parquet/employees

🔄 Processing offices...
✅ Saved offices to /content/drive/MyDrive/data/parquet/offices

🔄 Processing orderdetails...
✅ Saved orderdetails to /content/drive/MyDrive/data/parquet/orderdetails

🔄 Processing orders...
✅ Saved orders to /content/drive/MyDrive/data/parquet/orders

🔄 Processing payments...
✅ Saved payments to /content/drive/MyDrive/data/parquet/payments

🔄 Processing productlines...
✅ Saved productlines to /content/drive/MyDrive/data/parquet/productlines

🔄 Processing 

In [30]:
# Read CSVs, convert to Parquet, and save to Google Drive

import os

# ✅ FIXED: Base paths
base_input_path = "/content/drive/MyDrive/classicmodels"
base_output_path = "/content/drive/MyDrive/data/parquet/"

# Verify the input directory exists and list its contents
print(f"Checking input path: {base_input_path}")
if os.path.exists(base_input_path):
    print(f"✅ Input path exists. Contents: {os.listdir(base_input_path)}\n")
else:
    print(f"❌ Input path does NOT exist.")
    raise FileNotFoundError(base_input_path)

# Iterate through schemas and convert CSV → Parquet
for table_name, schema in schemas.items():
    csv_path = os.path.join(base_input_path, f"{table_name}.csv")
    parquet_path = os.path.join(base_output_path, table_name)

    print(f"🔄 Processing {table_name}...")

    if not os.path.exists(csv_path):
        print(f"⚠️  Skipped: Input file not found: {csv_path}")
        continue

    df = spark.read.csv(csv_path, header=True, schema=schema)

    os.makedirs(parquet_path, exist_ok=True)

    df.write.mode("overwrite").parquet(parquet_path)
    print(f"✅ Saved {table_name} to {parquet_path}\n")

print("🎉 All CSV files converted to Parquet and saved in Google Drive.")


Checking input path: /content/drive/MyDrive/classicmodels
✅ Input path exists. Contents: ['products.csv', 'offices.csv', 'productlines.csv', 'employees.csv', 'payments.csv', 'customers.csv', 'orders.csv', 'orderdetails.csv', 'customers', 'employees', 'offices', 'orderdetails', 'orders', 'payments', 'productlines', 'products']

🔄 Processing customers...
✅ Saved customers to /content/drive/MyDrive/data/parquet/customers

🔄 Processing employees...
✅ Saved employees to /content/drive/MyDrive/data/parquet/employees

🔄 Processing offices...
✅ Saved offices to /content/drive/MyDrive/data/parquet/offices

🔄 Processing orderdetails...
✅ Saved orderdetails to /content/drive/MyDrive/data/parquet/orderdetails

🔄 Processing orders...
✅ Saved orders to /content/drive/MyDrive/data/parquet/orders

🔄 Processing payments...
✅ Saved payments to /content/drive/MyDrive/data/parquet/payments

🔄 Processing productlines...
✅ Saved productlines to /content/drive/MyDrive/data/parquet/productlines

🔄 Processing 

**Reasoning**:
Load the 'orderdetails' and 'products' Parquet files into Spark DataFrames.



In [31]:
orderdetails_df = spark.read.parquet("/content/drive/MyDrive/data/parquet/orderdetails")
products_df = spark.read.parquet("/content/drive/MyDrive/data/parquet/products")

orderdetails_df.show(5)
products_df.show(5)


+-----------+-----------+---------------+---------+---------------+
|orderNumber|productCode|quantityOrdered|priceEach|orderLineNumber|
+-----------+-----------+---------------+---------+---------------+
|       null|productCode|           null|     null|           null|
|      10100|   S18_1749|             30|    136.0|              3|
|      10100|   S18_2248|             50|    55.09|              2|
|      10100|   S18_4409|             22|    75.46|              4|
|      10100|   S24_3969|             49|    35.29|              1|
+-----------+-----------+---------------+---------+---------------+
only showing top 5 rows

+-----------+--------------------+-----------+------------+---------------+--------------------+---------------+--------+----+
|productCode|         productName|productLine|productScale|  productVendor|  productDescription|quantityInStock|buyPrice|MSRP|
+-----------+--------------------+-----------+------------+---------------+--------------------+-------------

In [32]:
orderdetails_df = spark.read.parquet("/content/drive/MyDrive/data/parquet/orderdetails")
products_df = spark.read.parquet("/content/drive/MyDrive/data/parquet/products")
orderdetails_df.show(5)
products_df.show(5)


+-----------+-----------+---------------+---------+---------------+
|orderNumber|productCode|quantityOrdered|priceEach|orderLineNumber|
+-----------+-----------+---------------+---------+---------------+
|       null|productCode|           null|     null|           null|
|      10100|   S18_1749|             30|    136.0|              3|
|      10100|   S18_2248|             50|    55.09|              2|
|      10100|   S18_4409|             22|    75.46|              4|
|      10100|   S24_3969|             49|    35.29|              1|
+-----------+-----------+---------------+---------+---------------+
only showing top 5 rows

+-----------+--------------------+-----------+------------+---------------+--------------------+---------------+--------+----+
|productCode|         productName|productLine|productScale|  productVendor|  productDescription|quantityInStock|buyPrice|MSRP|
+-----------+--------------------+-----------+------------+---------------+--------------------+-------------

In [33]:
!ls /content/drive/MyDrive/data/parquet/


customers  offices	 orders    productlines
employees  orderdetails  payments  products


In [34]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

# ✅ CORRECT paths (Google Drive)
base_input_path = "/content/drive/MyDrive/classicmodels"
base_output_path = "/content/drive/MyDrive/data/parquet"

# ✅ Start Spark session
spark = SparkSession.builder \
    .appName("CSV to Parquet Conversion") \
    .master("local[*]") \
    .getOrCreate()

# ✅ Define schema for each table
schemas = {
    "customers": StructType([
        StructField("customerNumber", IntegerType(), True),
        StructField("customerName", StringType(), True),
        StructField("contactLastName", StringType(), True),
        StructField("contactFirstName", StringType(), True),
        StructField("phone", StringType(), True),
        StructField("addressLine1", StringType(), True),
        StructField("addressLine2", StringType(), True),
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("postalCode", StringType(), True),
        StructField("country", StringType(), True),
        StructField("salesRepEmployeeNumber", IntegerType(), True),
        StructField("creditLimit", DoubleType(), True)
    ]),
    "employees": StructType([
        StructField("employeeNumber", IntegerType(), True),
        StructField("lastName", StringType(), True),
        StructField("firstName", StringType(), True),
        StructField("extension", StringType(), True),
        StructField("email", StringType(), True),
        StructField("officeCode", StringType(), True),
        StructField("reportsTo", IntegerType(), True),
        StructField("jobTitle", StringType(), True)
    ]),
    "offices": StructType([
        StructField("officeCode", StringType(), True),
        StructField("city", StringType(), True),
        StructField("phone", StringType(), True),
        StructField("addressLine1", StringType(), True),
        StructField("addressLine2", StringType(), True),
        StructField("state", StringType(), True),
        StructField("country", StringType(), True),
        StructField("postalCode", StringType(), True),
        StructField("territory", StringType(), True)
    ]),
    "orderdetails": StructType([
        StructField("orderNumber", IntegerType(), True),
        StructField("productCode", StringType(), True),
        StructField("quantityOrdered", IntegerType(), True),
        StructField("priceEach", DoubleType(), True),
        StructField("orderLineNumber", IntegerType(), True)
    ]),
    "orders": StructType([
        StructField("orderNumber", IntegerType(), True),
        StructField("orderDate", DateType(), True),
        StructField("requiredDate", DateType(), True),
        StructField("shippedDate", DateType(), True),
        StructField("status", StringType(), True),
        StructField("comments", StringType(), True),
        StructField("customerNumber", IntegerType(), True)
    ]),
    "payments": StructType([
        StructField("customerNumber", IntegerType(), True),
        StructField("checkNumber", StringType(), True),
        StructField("paymentDate", DateType(), True),
        StructField("amount", DoubleType(), True)
    ]),
    "productlines": StructType([
        StructField("productLine", StringType(), True),
        StructField("textDescription", StringType(), True),
        StructField("htmlDescription", StringType(), True),
        StructField("image", StringType(), True)
    ]),
    "products": StructType([
        StructField("productCode", StringType(), True),
        StructField("productName", StringType(), True),
        StructField("productLine", StringType(), True),
        StructField("productScale", StringType(), True),
        StructField("productVendor", StringType(), True),
        StructField("productDescription", StringType(), True),
        StructField("quantityInStock", IntegerType(), True),
        StructField("buyPrice", DoubleType(), True),
        StructField("MSRP", DoubleType(), True)
    ])
}

# ✅ Check input directory
print(f"Checking input path: {base_input_path}")
if os.path.exists(base_input_path):
    print(f"Input path exists. Contents: {os.listdir(base_input_path)}")
else:
    print("Input path does NOT exist.")

# ✅ Loop over each table
for table_name, schema in schemas.items():
    csv_path = os.path.join(base_input_path, f"{table_name}.csv")
    parquet_path = os.path.join(base_output_path, table_name)

    print(f"\nProcessing {table_name} from: {csv_path}")

    if not os.path.exists(csv_path):
        print(f"❌ File not found: {csv_path}")
        continue

    try:
        df = spark.read.csv(csv_path, header=True, schema=schema)

        # Make sure parent directory exists
        os.makedirs(parquet_path, exist_ok=True)

        df.write.parquet(parquet_path, mode="overwrite")
        print(f"✅ Saved to: {parquet_path}")
    except Exception as e:
        print(f"❌ Error processing {table_name}: {e}")


Checking input path: /content/drive/MyDrive/classicmodels
Input path exists. Contents: ['products.csv', 'offices.csv', 'productlines.csv', 'employees.csv', 'payments.csv', 'customers.csv', 'orders.csv', 'orderdetails.csv', 'customers', 'employees', 'offices', 'orderdetails', 'orders', 'payments', 'productlines', 'products']

Processing customers from: /content/drive/MyDrive/classicmodels/customers.csv
✅ Saved to: /content/drive/MyDrive/data/parquet/customers

Processing employees from: /content/drive/MyDrive/classicmodels/employees.csv
✅ Saved to: /content/drive/MyDrive/data/parquet/employees

Processing offices from: /content/drive/MyDrive/classicmodels/offices.csv
✅ Saved to: /content/drive/MyDrive/data/parquet/offices

Processing orderdetails from: /content/drive/MyDrive/classicmodels/orderdetails.csv
✅ Saved to: /content/drive/MyDrive/data/parquet/orderdetails

Processing orders from: /content/drive/MyDrive/classicmodels/orders.csv
✅ Saved to: /content/drive/MyDrive/data/parquet/or

In [40]:
top_10_products_df = joined_df.groupBy("productName") \
                             .agg(sum("quantityOrdered").alias("totalQuantitySold")) \
                             .orderBy("totalQuantitySold", ascending=False) \
                             .limit(10)


In [42]:
from pyspark.sql.functions import col, round, sum as _sum
import os

# Define input and output paths
input_path = "/data/parquet"
output_path = "/output/processed"

# Ensure output directory exists
os.makedirs(output_path, exist_ok=True)

# Read the required Parquet files
orders_df = spark.read.parquet(f"{input_path}/orders")
orderdetails_df = spark.read.parquet(f"{input_path}/orderdetails")
products_df = spark.read.parquet(f"{input_path}/products")

# Join orderdetails with orders on orderNumber
order_with_details = orderdetails_df.join(orders_df, "orderNumber")

# Join the above with products on productCode
full_joined_df = order_with_details.join(products_df, "productCode")

# Calculate revenue = quantityOrdered * priceEach
revenue_df = full_joined_df.withColumn("revenue", col("quantityOrdered") * col("priceEach"))

# Group by productName and sum the revenue
product_revenue_df = revenue_df.groupBy("productName") \
    .agg(round(_sum("revenue"), 2).alias("totalRevenue")) \
    .orderBy(col("totalRevenue").desc())

# Show top 10 for visual check
product_revenue_df.show(10, truncate=False)

# Save output as Parquet
product_revenue_df.write.mode("overwrite").parquet(f"{output_path}/product_revenue.parquet")
print("✅ Saved product-wise revenue to /output/processed/product_revenue.parquet")


+-------------------------------------+------------+
|productName                          |totalRevenue|
+-------------------------------------+------------+
|1969 Harley Davidson Ultimate Chopper|16527.99    |
+-------------------------------------+------------+

✅ Saved product-wise revenue to /output/processed/product_revenue.parquet


In [47]:
from pyspark.sql.functions import col, round, sum as _sum, count

# Define input/output paths
input_path = "/content/drive/MyDrive/classicmodels"
output_path = "/output/processed"

# Read required datasets
customers_df = spark.read.parquet(f"{input_path}/customers")
orders_df = spark.read.parquet(f"{input_path}/orders")
orderdetails_df = spark.read.parquet(f"{input_path}/orderdetails")

# Join orderdetails with orders on orderNumber
order_data = orderdetails_df.join(orders_df, "orderNumber")

# Calculate revenue for each line item
order_data = order_data.withColumn("lineRevenue", col("quantityOrdered") * col("priceEach"))

# Calculate total revenue per order
order_total_df = order_data.groupBy("orderNumber", "customerNumber") \
    .agg(_sum("lineRevenue").alias("orderTotal"))

# Join with customers to get names
order_customer_df = order_total_df.join(customers_df, "customerNumber")

# Group by customer and calculate average order value
customer_avg_order_df = order_customer_df.groupBy("customerNumber", "customerName") \
    .agg(round(_sum("orderTotal") / count("orderNumber"), 2).alias("avgOrderValue")) \
    .orderBy(col("avgOrderValue").desc())

# Show top 10 only
customer_avg_order_df.show(10, truncate=False)

# Save full result to Parquet
customer_avg_order_df.write.mode("overwrite").parquet(f"{output_path}/customer_avg_order.parquet")
print("✅ Saved full customer average order values to /output/processed/customer_avg_order.parquet")


+--------------+----------------------------+-------------+
|customerNumber|customerName                |avgOrderValue|
+--------------+----------------------------+-------------+
|151           |Muscle Machine Inc          |58841.35     |
|145           |Danish Wholesale Imports    |53959.21     |
|278           |Rovelli Gifts               |52151.81     |
|385           |Cruz & Sons Co.             |51001.22     |
|350           |Marseille Mini Autos        |50824.66     |
|205           |Toys4GrownUps.com           |50342.74     |
|321           |Corporate Gift Ideas Co.    |42779.56     |
|148           |Dragon Souveniers, Ltd.     |41365.15     |
|320           |Mini Creations Ltd.         |41016.75     |
|124           |Mini Gifts Distributors Ltd.|40899.57     |
+--------------+----------------------------+-------------+
only showing top 10 rows

✅ Saved full customer average order values to /output/processed/customer_avg_order.parquet


In [48]:
customer_avg_order_df = order_customer_df.groupBy("customerNumber", "customerName") \
    .agg(round(_sum("orderTotal") / count("orderNumber"), 2).alias("avgOrderValue")) \
    .orderBy(col("avgOrderValue").desc())


In [50]:
from pyspark.sql.functions import col, sum as _sum

# Paths
base_path = "/content/drive/MyDrive/classicmodels"
output_path = "/output/processed"

# Step 1: Read required tables
offices_df = spark.read.parquet(f"{base_path}/offices")
employees_df = spark.read.parquet(f"{base_path}/employees")
customers_df = spark.read.parquet(f"{base_path}/customers")
orders_df = spark.read.parquet(f"{base_path}/orders")
orderdetails_df = spark.read.parquet(f"{base_path}/orderdetails")

# Step 2: Join offices -> employees
emp_office_df = employees_df.join(offices_df, "officeCode")

# Step 3: Join employees -> customers (via salesRepEmployeeNumber)
emp_cust_df = customers_df.join(emp_office_df, customers_df["salesRepEmployeeNumber"] == emp_office_df["employeeNumber"])

# Step 4: Join customers -> orders
cust_orders_df = emp_cust_df.join(orders_df, "customerNumber")

# Step 5: Join orders -> orderdetails
full_sales_df = cust_orders_df.join(orderdetails_df, "orderNumber")

# Step 6: Compute revenue
full_sales_df = full_sales_df.withColumn("revenue", col("quantityOrdered") * col("priceEach"))

# Step 7: Group by region (country + city)
regional_sales_df = full_sales_df.groupBy(
    offices_df["country"], offices_df["city"]
).agg(
    _sum("revenue").alias("totalSales")
).orderBy(
    col("totalSales").desc()
)

# Step 8: Show top regions
regional_sales_df.show(10, truncate=False)

# Step 9: Save full result to Parquet
regional_sales_df.write.mode("overwrite").parquet(f"{output_path}/regional_sales_by_location.parquet")
print("✅ Regional sales by location saved to /output/processed/regional_sales_by_location.parquet")


+---------+-------------+------------------+
|country  |city         |totalSales        |
+---------+-------------+------------------+
|USA      |San Francisco|380918.45999999996|
|France   |Paris        |329517.0699999999 |
|USA      |NYC          |179833.89         |
|UK       |London       |162301.72         |
|Japan    |Tokyo        |133731.51999999996|
|Australia|Sydney       |122221.39         |
|USA      |Boston       |89957.85          |
+---------+-------------+------------------+

✅ Regional sales by location saved to /output/processed/regional_sales_by_location.parquet


In [51]:
from pyspark.sql.functions import sum as _sum

# Step 1: Read customers and payments
customers_df = spark.read.parquet(f"{input_path}/customers")
payments_df = spark.read.parquet(f"{input_path}/payments")

# Step 2: Join customers and payments on customerNumber
customer_payments_df = payments_df.join(customers_df, "customerNumber")

# Step 3: Group by country and sum the payment amounts
revenue_by_country_df = customer_payments_df.groupBy("country") \
    .agg(_sum("amount").alias("totalRevenue")) \
    .orderBy(col("totalRevenue").desc())

# Step 4: Show top countries
revenue_by_country_df.show(10, truncate=False)

# Step 5: Save result to Parquet
revenue_by_country_df.write.mode("overwrite").parquet(f"{output_path}/revenue_by_country.parquet")
print("✅ Revenue by country saved to /output/processed/revenue_by_country.parquet")


+-----------+------------------+
|country    |totalRevenue      |
+-----------+------------------+
|USA        |2998523.3299999996|
|France     |792760.0900000003 |
|New Zealand|392486.59         |
|Australia  |372351.6          |
|Italy      |325254.55000000005|
|Finland    |295149.35         |
|Singapore  |261671.59999999998|
|Denmark    |197356.3          |
|Germany    |196470.99         |
|Japan      |167909.95         |
+-----------+------------------+
only showing top 10 rows

✅ Revenue by country saved to /output/processed/revenue_by_country.parquet


In [53]:
from pyspark.sql.functions import sum as _sum

# Step 1: Read all required tables
offices_df = spark.read.parquet(f"{input_path}/offices")
employees_df = spark.read.parquet(f"{input_path}/employees")
customers_df = spark.read.parquet(f"{input_path}/customers")
payments_df = spark.read.parquet(f"{input_path}/payments")

# Step 2: Join employees with offices
emp_office_df = employees_df.join(offices_df, "officeCode")

# Step 3: Join customers with employees (salesRepEmployeeNumber)
cust_emp_office_df = customers_df.join(
    emp_office_df,
    customers_df["salesRepEmployeeNumber"] == emp_office_df["employeeNumber"]
)

# Step 4: Join payments with full hierarchy
full_office_sales_df = payments_df.join(cust_emp_office_df, "customerNumber")

# Step 5: Select only the office columns (rename to avoid ambiguity)
office_sales_clean_df = full_office_sales_df.select(
    "officeCode",
    emp_office_df["city"].alias("officeCity"),
    emp_office_df["country"].alias("officeCountry"),
    "amount"
)

# Step 6: Group by office and calculate total sales
office_sales_df = office_sales_clean_df.groupBy("officeCode", "officeCity", "officeCountry") \
    .agg(_sum("amount").alias("totalSales")) \
    .orderBy(col("totalSales").desc())

# Step 7: Show results
office_sales_df.show(truncate=False)

# Step 8: Save to Parquet
office_sales_df.write.mode("overwrite").parquet(f"{output_path}/office_sales_summary.parquet")
print("✅ Office sales summary saved to /output/processed/office_sales_summary.parquet")


+----------+-------------+-------------+------------------+
|officeCode|officeCity   |officeCountry|totalSales        |
+----------+-------------+-------------+------------------+
|4         |Paris        |France       |1607019.9899999995|
|1         |San Francisco|USA          |1295933.3900000004|
|7         |London       |UK           |932821.9999999999 |
|6         |Sydney       |Australia    |870258.7599999999 |
|3         |NYC          |USA          |866707.61         |
|2         |Boston       |USA          |835882.3300000001 |
|5         |Tokyo        |Japan        |457110.07000000007|
+----------+-------------+-------------+------------------+

✅ Office sales summary saved to /output/processed/office_sales_summary.parquet


In [54]:
from pyspark.sql.functions import col, sum as _sum

# Re-read data for demonstration
orders_df = spark.read.parquet("/content/drive/MyDrive/classicmodels/orders")
orderdetails_df = spark.read.parquet("/content/drive/MyDrive/classicmodels/orderdetails")

# Join and calculate revenue
order_data = orderdetails_df.join(orders_df, "orderNumber") \
    .withColumn("lineRevenue", col("quantityOrdered") * col("priceEach"))

# 🔁 Cache the DataFrame since it will be reused
order_data.cache()

# First use: total revenue per order
order_total_df = order_data.groupBy("orderNumber") \
    .agg(_sum("lineRevenue").alias("orderRevenue"))

order_total_df.show()

# Second use: total revenue per customer
order_customer_df = order_data.groupBy("customerNumber") \
    .agg(_sum("lineRevenue").alias("customerRevenue"))

order_customer_df.show()


+-----------+------------------+
|orderNumber|      orderRevenue|
+-----------+------------------+
|      10121|          16700.47|
|      10128|          13884.99|
|      10143| 41016.74999999999|
|      10162|30876.439999999995|
|      10116|           1627.56|
|      10135|55601.840000000004|
|      10124|32641.980000000003|
|      10140|38675.130000000005|
|      10131|          17032.29|
|      10161|          36164.46|
|      10136|           14232.7|
|      10155|          37602.48|
|      10138|32077.440000000002|
|      10104|           40206.2|
|      10117|          44380.15|
|      10101|          10549.01|
|      10114| 33383.14000000001|
|      10147|          32680.31|
|      10115|21665.980000000003|
|      10108|51001.219999999994|
+-----------+------------------+
only showing top 20 rows

+--------------+------------------+
|customerNumber|   customerRevenue|
+--------------+------------------+
|           148|           82730.3|
|           496|32077.440000000002|
| 

In [58]:
from pyspark.sql.functions import broadcast, col, sum as _sum

# Load the datasets
orderdetails_df = spark.read.parquet("/content/drive/MyDrive/classicmodels/orderdetails")
products_df = spark.read.parquet("/content/drive/MyDrive/classicmodels/products")

# ✅ Use broadcast join
joined_df = orderdetails_df.join(broadcast(products_df), on="productCode")

# Aggregate total quantity sold per product
product_sales_df = joined_df.groupBy("productName") \
    .agg(_sum("quantityOrdered").alias("totalQuantitySold")) \
    .orderBy(col("totalQuantitySold").desc())

# Show top 10
product_sales_df.show(10, truncate=False)


+-------------------------------------+-----------------+
|productName                          |totalQuantitySold|
+-------------------------------------+-----------------+
|1969 Harley Davidson Ultimate Chopper|1057             |
|productName                          |null             |
+-------------------------------------+-----------------+



In [62]:
# Step 1: Import required modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, coalesce

# Step 2: Initialize SparkSession
spark = SparkSession.builder \
    .appName("ClassicModels - Task 4.2 Caching Optimization") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

# ✅ Step 3: Corrected paths to match your working directory
input_base_path = "/content/drive/MyDrive/classicmodels"
output_base_path = "/content/drive/MyDrive/output/optimized"

# Step 4: Read the Parquet files
orders_df = spark.read.parquet(f"{input_base_path}/orders")
orderdetails_df = spark.read.parquet(f"{input_base_path}/orderdetails")
products_df = spark.read.parquet(f"{input_base_path}/products")

# Step 5: Join all 3 datasets
joined_df = orders_df.join(orderdetails_df, on="orderNumber", how="inner") \
                     .join(products_df, on="productCode", how="inner")

# Step 6: Cache the intermediate joined dataset
joined_df.cache()

# Step 7: Calculate revenue using coalesce to handle nulls
revenue_df = joined_df.withColumn(
    "revenue",
    coalesce(col("priceEach"), lit(0)) * coalesce(col("quantityOrdered"), lit(0))
)

# Group by productName to get total revenue
grouped_df = revenue_df.groupBy("productName").sum("revenue") \
                       .withColumnRenamed("sum(revenue)", "totalRevenue")

# Step 8: Display top 10 products by revenue
grouped_df.orderBy(col("totalRevenue").desc()).show(10, truncate=False)

# Step 9: Save result to Parquet
grouped_df.write.mode("overwrite").parquet(f"{output_base_path}/top_product_revenue.parquet")
print(f"✅ Saved to: {output_base_path}/top_product_revenue.parquet")


+-------------------------------------+------------------+
|productName                          |totalRevenue      |
+-------------------------------------+------------------+
|1969 Harley Davidson Ultimate Chopper|16527.989999999998|
+-------------------------------------+------------------+

✅ Saved to: /content/drive/MyDrive/output/optimized/top_product_revenue.parquet


In [68]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Step 1: Create Spark session
spark = SparkSession.builder \
    .appName("Partitioning Optimization - ClassicModels") \
    .getOrCreate()

# Step 2: Define the correct base path (your mounted Google Drive location)
base_path = "/content/drive/MyDrive/classicmodels/"

# Step 3: Read Parquet datasets
orders_df = spark.read.parquet(base_path + "orders/")
orderdetails_df = spark.read.parquet(base_path + "orderdetails/")
products_df = spark.read.parquet(base_path + "products/")

# Step 4: Join orders with orderdetails and products
order_with_details = orders_df.join(orderdetails_df, "orderNumber") \
                              .join(products_df, "productCode")

# Step 5: Check current number of partitions
print(f"Original number of partitions: {order_with_details.rdd.getNumPartitions()}")

# Step 6: Repartition the data by product line (optimizing for grouped processing)
repartitioned_df = order_with_details.repartition("productLine")

# Step 7: Save the repartitioned data to Parquet (to a new path)
repartitioned_df.write.mode("overwrite") \
    .parquet(base_path + "output/repartitioned_by_productLine/")

print("✅ Repartitioned data saved to: /content/drive/MyDrive/classicmodels/output/repartitioned_by_productLine/")


Original number of partitions: 1
✅ Repartitioned data saved to: /content/drive/MyDrive/classicmodels/output/repartitioned_by_productLine/


In [67]:
!ls /content/drive/MyDrive/classicmodels/


customers      offices		 orders        productlines
customers.csv  offices.csv	 orders.csv    productlines.csv
employees      orderdetails	 payments      products
employees.csv  orderdetails.csv  payments.csv  products.csv


In [71]:
from pyspark.sql.functions import col, sum as _sum

# Input paths
input_path = "/content/drive/MyDrive/classicmodels"
output_path = "/content/drive/MyDrive/classicmodels/output/processed"

# Load required datasets
orders_df = spark.read.csv(f"{input_path}/orders.csv", header=True, inferSchema=True)
orderdetails_df = spark.read.csv(f"{input_path}/orderdetails.csv", header=True, inferSchema=True)
products_df = spark.read.csv(f"{input_path}/products.csv", header=True, inferSchema=True)

# Join orderdetails with products
product_orders = orderdetails_df.join(products_df, "productCode") \
                                .join(orders_df, "orderNumber")

# Calculate revenue per product
product_revenue_df = product_orders.withColumn("lineRevenue", col("quantityOrdered") * col("priceEach")) \
    .groupBy("productCode", "productName") \
    .agg(_sum("lineRevenue").alias("totalRevenue")) \
    .orderBy(col("totalRevenue").desc())

# Save to Parquet (correct path)
product_revenue_df.write.mode("overwrite").parquet(f"{output_path}/product_revenue.parquet")


In [70]:
for file in output_files:
    path = os.path.join(output_path, file)
    if os.path.exists(path):
        df = spark.read.parquet(path)
        df.coalesce(1).write.mode("overwrite").parquet(path)
        print(f"✔ Coalesced: {file}")


In [72]:
customers_df = spark.read.csv(f"{input_path}/customers.csv", header=True, inferSchema=True)

# Join orders + orderdetails
order_data = orderdetails_df.join(orders_df, "orderNumber")

# Calculate line item revenue
order_data = order_data.withColumn("lineRevenue", col("quantityOrdered") * col("priceEach"))

# Total revenue per order
order_total_df = order_data.groupBy("orderNumber", "customerNumber") \
    .agg(_sum("lineRevenue").alias("orderTotal"))

# Join with customers
order_customer_df = order_total_df.join(customers_df, "customerNumber")

# Group by customer and calculate avg order value
from pyspark.sql.functions import round, count
customer_avg_order_df = order_customer_df.groupBy("customerNumber", "customerName") \
    .agg(round(_sum("orderTotal") / count("orderNumber"), 2).alias("avgOrderValue")) \
    .orderBy(col("avgOrderValue").desc())

# Save output
customer_avg_order_df.write.mode("overwrite").parquet(f"{output_path}/customer_avg_order.parquet")



In [73]:
top_10_products_df = product_revenue_df.limit(10)

# Save output
top_10_products_df.write.mode("overwrite").parquet(f"{output_path}/top_10_products.parquet")


In [76]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [77]:
# Base path for ClassicModels CSV files
base_path = "/content/drive/MyDrive/classicmodels/"

# Output path to store processed Parquet files
output_path = base_path + "output/processed/"

# Create output folder if it does not exist
import os
os.makedirs(output_path, exist_ok=True)


In [78]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ClassicModels Analysis") \
    .getOrCreate()


In [79]:
customers_df = spark.read.csv(base_path + "customers.csv", header=True, inferSchema=True)
employees_df = spark.read.csv(base_path + "employees.csv", header=True, inferSchema=True)
offices_df = spark.read.csv(base_path + "offices.csv", header=True, inferSchema=True)
orders_df = spark.read.csv(base_path + "orders.csv", header=True, inferSchema=True)
orderdetails_df = spark.read.csv(base_path + "orderdetails.csv", header=True, inferSchema=True)
products_df = spark.read.csv(base_path + "products.csv", header=True, inferSchema=True)
productlines_df = spark.read.csv(base_path + "productlines.csv", header=True, inferSchema=True)
payments_df = spark.read.csv(base_path + "payments.csv", header=True, inferSchema=True)


In [80]:
from pyspark.sql.functions import col, sum as _sum

# Join orderdetails + products + orders
product_orders = orderdetails_df.join(products_df, "productCode") \
                                .join(orders_df, "orderNumber")

# Calculate revenue per product
product_revenue_df = product_orders.withColumn("lineRevenue", col("quantityOrdered") * col("priceEach")) \
    .groupBy("productCode", "productName") \
    .agg(_sum("lineRevenue").alias("totalRevenue")) \
    .orderBy(col("totalRevenue").desc())

# Save as Parquet
product_revenue_df.write.mode("overwrite").parquet(output_path + "product_revenue.parquet")


In [81]:
from pyspark.sql.functions import round, count

# Calculate line revenue for each order detail
order_data = orderdetails_df.join(orders_df, "orderNumber")
order_data = order_data.withColumn("lineRevenue", col("quantityOrdered") * col("priceEach"))

# Total revenue per order
order_total_df = order_data.groupBy("orderNumber", "customerNumber") \
    .agg(_sum("lineRevenue").alias("orderTotal"))

# Join with customers and calculate average order value
customer_avg_order_df = order_total_df.join(customers_df, "customerNumber") \
    .groupBy("customerNumber", "customerName") \
    .agg(round(_sum("orderTotal") / count("orderNumber"), 2).alias("avgOrderValue")) \
    .orderBy(col("avgOrderValue").desc())

# Save as Parquet
customer_avg_order_df.write.mode("overwrite").parquet(output_path + "customer_avg_order.parquet")


In [82]:
top_10_products_df = product_revenue_df.limit(10)
top_10_products_df.write.mode("overwrite").parquet(output_path + "top_10_products.parquet")


In [84]:
print("Orders columns:", orders_df.columns)
print("Employees columns:", employees_df.columns)
print("Offices columns:", offices_df.columns)
print("OrderDetails columns:", orderdetails_df.columns)


Orders columns: ['orderNumber', 'orderDate', 'requiredDate', 'shippedDate', 'status', 'comments', 'customerNumber']
Employees columns: ['employeeNumber', 'lastName', 'firstName', 'extension', 'email', 'officeCode', 'reportsTo', 'jobTitle']
Offices columns: ['officeCode', 'city', 'phone', 'addressLine1', 'addressLine2', 'state', 'country', 'postalCode', 'territory']
OrderDetails columns: ['orderNumber', 'productCode', 'quantityOrdered', 'priceEach', 'orderLineNumber']


In [87]:
# Step 1: Mount Google Drive (if not already mounted)
from google.colab import drive
drive.mount('/content/drive')

# Step 2: Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg, desc

# Step 3: Create Spark session
spark = SparkSession.builder \
    .appName("ClassicModels Analysis") \
    .getOrCreate()

# Step 4: Define base path
base_path = "/content/drive/MyDrive/classicmodels/"

# Step 5: Read CSV files into DataFrames
orders_df = spark.read.option("header", True).csv(base_path + "orders.csv")
orderdetails_df = spark.read.option("header", True).csv(base_path + "orderdetails.csv")
products_df = spark.read.option("header", True).csv(base_path + "products.csv")
customers_df = spark.read.option("header", True).csv(base_path + "customers.csv")
# employees_df and offices_df are read but we won't use them (no salesRepEmployeeNumber)
employees_df = spark.read.option("header", True).csv(base_path + "employees.csv")
offices_df = spark.read.option("header", True).csv(base_path + "offices.csv")

# Step 6: Cast numeric columns
orderdetails_df = orderdetails_df.withColumn("quantityOrdered", col("quantityOrdered").cast("int")) \
                                 .withColumn("priceEach", col("priceEach").cast("double"))

# Step 7: Product-wise revenue
product_revenue_df = orderdetails_df.join(products_df, "productCode") \
    .withColumn("totalRevenue", col("quantityOrdered") * col("priceEach")) \
    .groupBy("productCode", "productName") \
    .agg(_sum("totalRevenue").alias("totalRevenue")) \
    .orderBy(desc("totalRevenue"))

# Save to Parquet
product_revenue_df.write.mode("overwrite").parquet(base_path + "product_revenue.parquet")

# Step 8: Average order value per customer
customer_order_df = orderdetails_df.join(orders_df, "orderNumber") \
    .withColumn("orderValue", col("quantityOrdered") * col("priceEach")) \
    .groupBy("customerNumber") \
    .agg(avg("orderValue").alias("avgOrderValue"))

# Join with customer name
customer_avg_order_df = customer_order_df.join(customers_df, "customerNumber") \
    .select("customerNumber", "customerName", "avgOrderValue") \
    .orderBy(desc("avgOrderValue"))

# Save to Parquet
customer_avg_order_df.write.mode("overwrite").parquet(base_path + "customer_avg_order.parquet")

# Step 9: Top 10 products by revenue
top_10_products_df = product_revenue_df.limit(10)
top_10_products_df.write.mode("overwrite").parquet(base_path + "top_10_products.parquet")

# Step 10: Regional sales (by customer country)
regional_sales_df = orderdetails_df.join(orders_df, "orderNumber") \
    .join(customers_df, "customerNumber") \
    .withColumn("totalSale", col("quantityOrdered") * col("priceEach")) \
    .groupBy("country") \
    .agg(_sum("totalSale").alias("totalRevenue")) \
    .orderBy(desc("totalRevenue"))

regional_sales_df.write.mode("overwrite").parquet(base_path + "regional_sales.parquet")

print("✅ All outputs saved to Parquet!")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ All outputs saved to Parquet!


In [88]:
# Step 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Step 2: Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg, desc

# Step 3: Create Spark session
spark = SparkSession.builder \
    .appName("ClassicModels Analysis") \
    .getOrCreate()

# Step 4: Define base path
base_path = "/content/drive/MyDrive/classicmodels/"

# Step 5: Read CSV files
orders_df = spark.read.option("header", True).csv(base_path + "orders.csv")
orderdetails_df = spark.read.option("header", True).csv(base_path + "orderdetails.csv")
products_df = spark.read.option("header", True).csv(base_path + "products.csv")
customers_df = spark.read.option("header", True).csv(base_path + "customers.csv")

# Step 6: Cast numeric columns
orderdetails_df = orderdetails_df.withColumn("quantityOrdered", col("quantityOrdered").cast("int")) \
                                 .withColumn("priceEach", col("priceEach").cast("double"))

# Step 7: Product-wise revenue
product_revenue_df = orderdetails_df.join(products_df, "productCode") \
    .withColumn("totalRevenue", col("quantityOrdered") * col("priceEach")) \
    .groupBy("productCode", "productName") \
    .agg(_sum("totalRevenue").alias("totalRevenue")) \
    .orderBy(desc("totalRevenue"))

product_revenue_df.write.mode("overwrite").parquet(base_path + "product_revenue.parquet")
print("Top 5 Product Revenue:")
product_revenue_df.show(5, truncate=False)

# Step 8: Average order value per customer
customer_order_df = orderdetails_df.join(orders_df, "orderNumber") \
    .withColumn("orderValue", col("quantityOrdered") * col("priceEach")) \
    .groupBy("customerNumber") \
    .agg(avg("orderValue").alias("avgOrderValue"))

customer_avg_order_df = customer_order_df.join(customers_df, "customerNumber") \
    .select("customerNumber", "customerName", "avgOrderValue") \
    .orderBy(desc("avgOrderValue"))

customer_avg_order_df.write.mode("overwrite").parquet(base_path + "customer_avg_order.parquet")
print("Top 5 Customers by Avg Order Value:")
customer_avg_order_df.show(5, truncate=False)

# Step 9: Top 10 products by revenue
top_10_products_df = product_revenue_df.limit(10)
top_10_products_df.write.mode("overwrite").parquet(base_path + "top_10_products.parquet")
print("Top 10 Products by Revenue:")
top_10_products_df.show(10, truncate=False)

# Step 10: Regional sales by country
regional_sales_df = orderdetails_df.join(orders_df, "orderNumber") \
    .join(customers_df, "customerNumber") \
    .withColumn("totalSale", col("quantityOrdered") * col("priceEach")) \
    .groupBy("country") \
    .agg(_sum("totalSale").alias("totalRevenue")) \
    .orderBy(desc("totalRevenue"))

regional_sales_df.write.mode("overwrite").parquet(base_path + "regional_sales.parquet")
print("Regional Sales by Country (Top 5):")
regional_sales_df.show(5, truncate=False)

print("✅ All outputs saved and verified!")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Top 5 Product Revenue:
+-----------+-------------------------------------+-----------------+
|productCode|productName                          |totalRevenue     |
+-----------+-------------------------------------+-----------------+
|S10_1678   |1969 Harley Davidson Ultimate Chopper|90157.77000000002|
|productCode|productName                          |null             |
+-----------+-------------------------------------+-----------------+

Top 5 Customers by Avg Order Value:
+--------------+----------------------------+------------------+
|customerNumber|customerName                |avgOrderValue     |
+--------------+----------------------------+------------------+
|242           |Alpha Cognac                |4744.233333333334 |
|486           |Motor Mint Distributors Inc.|4305.5233333333335|
|424           |Classic Legends Inc.        |3973.4863636363643|
|

In [94]:
# -------------------------------
# Step 1: Import PySpark
# -------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg as _avg, round as _round, count as _count, countDistinct as _countDistinct

# -------------------------------
# Step 2: Initialize Spark Session
# -------------------------------
spark = SparkSession.builder \
    .appName("ClassicModelsSalesAnalysis") \
    .getOrCreate()

# -------------------------------
# Step 3: Define paths
# -------------------------------
base_path = "/content/drive/MyDrive/classicmodels/"
output_path = "/content/drive/MyDrive/classicmodels/output/processed/"

# -------------------------------
# Step 4: Read CSVs into DataFrames
# -------------------------------
customers_df = spark.read.option("header", True).csv(base_path + "customers.csv")
employees_df = spark.read.option("header", True).csv(base_path + "employees.csv")
offices_df = spark.read.option("header", True).csv(base_path + "offices.csv")
orders_df = spark.read.option("header", True).csv(base_path + "orders.csv")
orderdetails_df = spark.read.option("header", True).csv(base_path + "orderdetails.csv")
products_df = spark.read.option("header", True).csv(base_path + "products.csv")
payments_df = spark.read.option("header", True).csv(base_path + "payments.csv")

# -------------------------------
# Step 5: Cast numeric columns
# -------------------------------
orderdetails_df = orderdetails_df.withColumn("quantityOrdered", col("quantityOrdered").cast("int")) \
                                 .withColumn("priceEach", col("priceEach").cast("double"))

products_df = products_df.withColumn("MSRP", col("MSRP").cast("double"))

payments_df = payments_df.withColumn("amount", col("amount").cast("double"))

# -------------------------------
# Step 6: Task 1 - Products Aggregation
# -------------------------------
products_summary_df = products_df.groupBy("productLine") \
    .agg(
        _count("productCode").alias("totalProducts"),
        _round(_avg("MSRP"),2).alias("avgMSRP"),
        _round(_max("MSRP"),2).alias("maxMSRP")
    )
products_summary_df.write.mode("overwrite").parquet(output_path + "products_summary.parquet")

# -------------------------------
# Step 7: Task 2 - Product & Order Analysis
# -------------------------------
# 7a: Top 10 Products by Quantity Sold
top_10_products_df = orderdetails_df.join(products_df, "productCode") \
    .groupBy("productCode", "productName") \
    .agg(_sum("quantityOrdered").alias("totalQuantity")) \
    .orderBy(col("totalQuantity").desc()) \
    .limit(10)
top_10_products_df.write.mode("overwrite").parquet(output_path + "top_10_products.parquet")

# 7b: Product-wise Revenue
product_revenue_df = orderdetails_df.join(products_df, "productCode") \
    .withColumn("revenue", col("quantityOrdered") * col("priceEach")) \
    .groupBy("productCode", "productName") \
    .agg(_round(_sum("revenue"),2).alias("totalRevenue"))
product_revenue_df.write.mode("overwrite").parquet(output_path + "product_revenue.parquet")

# 7c: Average Order Value per Customer
customer_order_df = orders_df.join(orderdetails_df, "orderNumber") \
    .join(customers_df, "customerNumber") \
    .withColumn("orderRevenue", col("quantityOrdered") * col("priceEach"))

customer_avg_order_df = customer_order_df.groupBy("customerNumber", "customerName") \
    .agg(_round(_sum("orderRevenue")/_countDistinct("orderNumber"),2).alias("avgOrderValue"))
customer_avg_order_df.write.mode("overwrite").parquet(output_path + "customer_avg_order.parquet")

# -------------------------------
# Step 8: Task 3 - Regional & Office Sales
# -------------------------------
# Join orders -> orderdetails -> customers -> employees -> offices
# Join orders -> orderdetails -> customers -> employees -> offices
sales_df = orders_df.join(orderdetails_df, "orderNumber") \
    .join(customers_df, "customerNumber") \
    .join(employees_df, customers_df["salesRepEmployeeNumber"] == employees_df["employeeNumber"]) \
    .join(offices_df, employees_df["officeCode"] == offices_df["officeCode"]) \
    .withColumn("revenue", col("quantityOrdered") * col("priceEach")) \
    .select(
        orders_df["orderNumber"],
        customers_df["customerNumber"],
        customers_df["customerName"],
        employees_df["employeeNumber"],
        employees_df["lastName"].alias("employeeLastName"),
        offices_df["officeCode"],
        offices_df["city"].alias("officeCity"),
        offices_df["country"].alias("officeCountry"),
        col("revenue")
    )

# 8a: Revenue by Region/Country
regional_sales_df = sales_df.groupBy("officeCountry") \
    .agg(_round(_sum("revenue"),2).alias("totalRevenue"))
regional_sales_df.write.mode("overwrite").parquet(output_path + "regional_sales.parquet")

# 8b: Revenue by Office
office_sales_df = sales_df.groupBy("officeCode", "officeCity") \
    .agg(_round(_sum("revenue"),2).alias("totalRevenue"))
office_sales_df.write.mode("overwrite").parquet(output_path + "office_sales.parquet")

# -------------------------------
print("All tasks completed successfully! Outputs are in:", output_path)


All tasks completed successfully! Outputs are in: /content/drive/MyDrive/classicmodels/output/processed/


In [95]:
# Display Products Summary
products_summary_df.show(truncate=False)

# Display Customer Average Order
customer_avg_order_df.show(truncate=False)

# Display Top 10 Products by Revenue
top_10_products_df.show(truncate=False)

# Display Regional Sales
regional_sales_df.show(truncate=False)

# Display Office Sales
office_sales_df.show(truncate=False)


+-----------+-------------+-------+-------+
|productLine|totalProducts|avgMSRP|maxMSRP|
+-----------+-------------+-------+-------+
|productLine|1            |null   |null   |
|Motorcycles|1            |95.7   |95.7   |
+-----------+-------------+-------+-------+

+--------------+----------------------------+-------------+
|customerNumber|customerName                |avgOrderValue|
+--------------+----------------------------+-------------+
|customerNumber|customerName                |null         |
|114           |Australian Collectors, Co.  |26714.56     |
|321           |Corporate Gift Ideas Co.    |42779.56     |
|205           |Toys4GrownUps.com           |50342.74     |
|103           |Atelier graphique           |14571.44     |
|278           |Rovelli Gifts               |52151.81     |
|385           |Cruz & Sons Co.             |51001.22     |
|382           |Salzburg Collectables       |35826.33     |
|198           |Auto-Moto Classics Inc.     |6036.96      |
|311           

In [97]:
# Step 1: Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg as _avg, count as _count, round as _round

# Step 2: Initialize Spark
spark = SparkSession.builder \
    .appName("ClassicModels Analysis") \
    .getOrCreate()

# Step 3: File paths
base_path = "/content/drive/MyDrive/classicmodels/"
output_path = "/content/drive/MyDrive/classicmodels/output/processed/"

# Step 4: Read CSVs
customers_df = spark.read.option("header",True).csv(base_path + "customers.csv")
employees_df = spark.read.option("header",True).csv(base_path + "employees.csv")
offices_df = spark.read.option("header",True).csv(base_path + "offices.csv")
orders_df = spark.read.option("header",True).csv(base_path + "orders.csv")
orderdetails_df = spark.read.option("header",True).csv(base_path + "orderdetails.csv")
products_df = spark.read.option("header",True).csv(base_path + "products.csv")
productlines_df = spark.read.option("header",True).csv(base_path + "productlines.csv")
payments_df = spark.read.option("header",True).csv(base_path + "payments.csv")

# Step 5: Cast columns to proper types
orderdetails_df = orderdetails_df.withColumn("quantityOrdered", col("quantityOrdered").cast("int")) \
                                 .withColumn("priceEach", col("priceEach").cast("double"))
products_df = products_df.withColumn("MSRP", col("MSRP").cast("double"))
payments_df = payments_df.withColumn("amount", col("amount").cast("double"))

# Step 6: Convert CSVs to Parquet
customers_df.write.mode("overwrite").parquet(base_path + "customers.parquet")
employees_df.write.mode("overwrite").parquet(base_path + "employees.parquet")
offices_df.write.mode("overwrite").parquet(base_path + "offices.parquet")
orders_df.write.mode("overwrite").parquet(base_path + "orders.parquet")
orderdetails_df.write.mode("overwrite").parquet(base_path + "orderdetails.parquet")
products_df.write.mode("overwrite").parquet(base_path + "products.parquet")
productlines_df.write.mode("overwrite").parquet(base_path + "productlines.parquet")
payments_df.write.mode("overwrite").parquet(base_path + "payments.parquet")

# -------------------------------
# Task 2: Product & Order Analysis
# -------------------------------

# 2a: Top 10 products by quantity sold
top_10_products_df = orderdetails_df.groupBy("productCode") \
    .agg(_sum("quantityOrdered").alias("totalQuantity")) \
    .join(products_df.select("productCode","productName"), "productCode") \
    .orderBy(col("totalQuantity").desc()) \
    .limit(10)

top_10_products_df.write.mode("overwrite").parquet(output_path + "top_10_products.parquet")

# 2b: Product-wise revenue
product_revenue_df = orderdetails_df.join(products_df, "productCode") \
    .withColumn("revenue", col("quantityOrdered") * col("priceEach")) \
    .groupBy("productCode","productName") \
    .agg(_round(_sum("revenue"),2).alias("totalRevenue"))

product_revenue_df.write.mode("overwrite").parquet(output_path + "product_revenue.parquet")

# 2c: Average order value per customer
customer_order_df = orders_df.join(orderdetails_df, "orderNumber") \
                             .join(customers_df.select("customerNumber","customerName"), "customerNumber") \
                             .withColumn("orderValue", col("quantityOrdered") * col("priceEach"))

customer_avg_order_df = customer_order_df.groupBy("customerNumber","customerName") \
    .agg(_round(_sum("orderValue") / _count("orderNumber"),2).alias("avgOrderValue"))

customer_avg_order_df.write.mode("overwrite").parquet(output_path + "customer_avg_order.parquet")

# -------------------------------
# Task 3: Regional Sales Insights
# -------------------------------

# Join orders -> orderdetails -> customers -> offices via employees if salesRep exists
# Since your orders CSV has no salesRepEmployeeNumber, we join only orders -> customers -> offices
sales_df = orders_df.join(orderdetails_df, "orderNumber") \
                    .join(customers_df.select("customerNumber","customerName","country","salesRepEmployeeNumber"), "customerNumber") \
                    .join(offices_df.select("officeCode","city","country").withColumnRenamed("country","officeCountry"), orders_df.customerNumber.isNotNull(), "left") \
                    .withColumn("revenue", col("quantityOrdered") * col("priceEach"))

# Revenue by country
regional_sales_df = sales_df.groupBy("country").agg(_round(_sum("revenue"),2).alias("totalRevenue"))
regional_sales_df.write.mode("overwrite").parquet(output_path + "regional_sales.parquet")

# Revenue by office
office_sales_df = sales_df.groupBy("officeCode","city").agg(_round(_sum("revenue"),2).alias("totalRevenue"))
office_sales_df.write.mode("overwrite").parquet(output_path + "office_sales.parquet")

# -------------------------------
# Task 4: Product Demand & Employee Metrics
# -------------------------------

# Products demand summary
product_demand_df = orderdetails_df.join(products_df, "productCode") \
    .groupBy("productCode","productName") \
    .agg(_count("orderNumber").alias("orderCount"),
         _sum("quantityOrdered").alias("totalOrderedQty"),
         _round(_avg("priceEach"),2).alias("averagePriceEach"))

product_demand_df.write.mode("overwrite").parquet(output_path + "product_demand_summary.parquet")

# Employee performance summary (only employees linked in customers)
employee_sales_df = orders_df.join(customers_df, orders_df.customerNumber == customers_df.customerNumber, "inner") \
                             .join(orderdetails_df, "orderNumber") \
                             .groupBy("salesRepEmployeeNumber") \
                             .agg(_count("orderNumber").alias("totalOrders"),
                                  _round(_sum(col("quantityOrdered")*col("priceEach")),2).alias("totalSales"))

employee_sales_df.write.mode("overwrite").parquet(output_path + "employee_sales_summary.parquet")

# -------------------------------
# Task 5: Products Aggregation
# -------------------------------

products_summary_df = products_df.groupBy("productLine") \
    .agg(_count("productCode").alias("totalProducts"),
         _round(_avg("MSRP"),2).alias("avgMSRP"),
         _round(_max("MSRP"),2).alias("maxMSRP"))

products_summary_df.write.mode("overwrite").parquet(output_path + "products_summary.parquet")

print("✅ All tasks completed. Parquet outputs are in:", output_path)


✅ All tasks completed. Parquet outputs are in: /content/drive/MyDrive/classicmodels/output/processed/


In [98]:
# Step 1: Output folder
output_path = "/content/drive/MyDrive/classicmodels/output/processed/"

# Step 2: Read and display all outputs

print("---- Top 10 Products ----")
top_10_products = spark.read.parquet(output_path + "top_10_products.parquet")
top_10_products.show(truncate=False)

print("---- Product Revenue ----")
product_revenue = spark.read.parquet(output_path + "product_revenue.parquet")
product_revenue.show(truncate=False)

print("---- Customer Average Order ----")
customer_avg_order = spark.read.parquet(output_path + "customer_avg_order.parquet")
customer_avg_order.show(truncate=False)

print("---- Regional Sales ----")
regional_sales = spark.read.parquet(output_path + "regional_sales.parquet")
regional_sales.show(truncate=False)

print("---- Office Sales ----")
office_sales = spark.read.parquet(output_path + "office_sales.parquet")
office_sales.show(truncate=False)

print("---- Product Demand Summary ----")
product_demand_summary = spark.read.parquet(output_path + "product_demand_summary.parquet")
product_demand_summary.show(truncate=False)

print("---- Employee Sales Summary ----")
employee_sales_summary = spark.read.parquet(output_path + "employee_sales_summary.parquet")
employee_sales_summary.show(truncate=False)

print("---- Products Summary ----")
products_summary = spark.read.parquet(output_path + "products_summary.parquet")
products_summary.show(truncate=False)


---- Top 10 Products ----
+-----------+-------------+-------------------------------------+
|productCode|totalQuantity|productName                          |
+-----------+-------------+-------------------------------------+
|S10_1678   |1057         |1969 Harley Davidson Ultimate Chopper|
|productCode|null         |productName                          |
+-----------+-------------+-------------------------------------+

---- Product Revenue ----
+-----------+-------------------------------------+------------+
|productCode|productName                          |totalRevenue|
+-----------+-------------------------------------+------------+
|productCode|productName                          |null        |
|S10_1678   |1969 Harley Davidson Ultimate Chopper|90157.77    |
+-----------+-------------------------------------+------------+

---- Customer Average Order ----
+--------------+----------------------------+-------------+
|customerNumber|customerName                |avgOrderValue|
+------