<a href="https://colab.research.google.com/github/Kiran45181/Pyspark/blob/main/Advanced%20Spark%20with%20java%20.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Advanced Spark with Java - Data Engineering Use Case Study

In [2]:
# Update and install Java
!apt-get update -q
!apt-get install -y openjdk-11-jdk-headless

# Download and install Spark 3.5.0
!wget -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!mv spark-3.5.0-bin-hadoop3 /usr/local/spark

# Set environment variables
!export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
!export PATH=$JAVA_HOME/bin:/usr/local/spark/bin:$PATH

# Install PySpark
!pip install pyspark==3.5.0


Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,918 kB]
Hit:5 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,773 kB]
Hit:7 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:9 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [9,161 kB]
Get:10 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:11 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [3,207 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/restricted amd64 Packages [5,1

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PATH"] += ":/usr/local/spark/bin:/usr/local/spark/sbin"

In [4]:
!pip install pyspark==3.5.0



In [6]:
from google.colab import files
import os, shutil

# Create target folder
os.makedirs("/content/data/csv", exist_ok=True)

# Upload multiple CSV files (select all 8 files)
uploaded = files.upload()

# Save uploaded files to /content/data/csv
for filename in uploaded.keys():
    shutil.move(filename, f"/content/data/csv/{filename}")

print("Files uploaded to /content/data/csv/")


Saving customers.csv to customers.csv
Saving employees.csv to employees.csv
Saving offices.csv to offices.csv
Saving orderdetails.csv to orderdetails.csv
Saving orders.csv to orders.csv
Saving payments.csv to payments.csv
Saving productlines.csv to productlines.csv
Saving products.csv to products.csv
Files uploaded to /content/data/csv/


In [7]:
import pandas as pd
import os

csv_folder = "/content/data/csv"
parquet_folder = "/content/data/parquet"
os.makedirs(parquet_folder, exist_ok=True)

# Convert all CSV files to Parquet
for file in os.listdir(csv_folder):
    if file.endswith(".csv"):
        df = pd.read_csv(os.path.join(csv_folder, file))
        parquet_file = file.replace(".csv", ".parquet")
        df.to_parquet(os.path.join(parquet_folder, parquet_file), engine='pyarrow')
        print(f"Converted {file} -> {parquet_file}")

print("✅ All CSV files converted to Parquet format.")


Converted orderdetails.csv -> orderdetails.parquet
Converted products.csv -> products.parquet
Converted employees.csv -> employees.parquet
Converted orders.csv -> orders.parquet
Converted productlines.csv -> productlines.parquet
Converted customers.csv -> customers.parquet
Converted payments.csv -> payments.parquet
Converted offices.csv -> offices.parquet
✅ All CSV files converted to Parquet format.


In [8]:
!ls /content/data/parquet


customers.parquet  offices.parquet	 orders.parquet    productlines.parquet
employees.parquet  orderdetails.parquet  payments.parquet  products.parquet


In [11]:
!rm -rf spark-3.5.0-bin-hadoop3*
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

!tar xf spark-3.5.0-bin-hadoop3.tgz
!mv spark-3.5.0-bin-hadoop3 /usr/local/spark


In [12]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PATH"] += ":/usr/local/spark/bin"


In [13]:
!pip install -q pyspark


In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ClassicModelsAnalysis") \
    .getOrCreate()

print("Spark Version:", spark.version)


Spark Version: 3.5.0


In [15]:
# Load all tables into Spark DataFrames
customers = spark.read.parquet("/content/data/parquet/customers.parquet")
employees = spark.read.parquet("/content/data/parquet/employees.parquet")
offices = spark.read.parquet("/content/data/parquet/offices.parquet")
orders = spark.read.parquet("/content/data/parquet/orders.parquet")
orderdetails = spark.read.parquet("/content/data/parquet/orderdetails.parquet")
payments = spark.read.parquet("/content/data/parquet/payments.parquet")
products = spark.read.parquet("/content/data/parquet/products.parquet")
productlines = spark.read.parquet("/content/data/parquet/productlines.parquet")

# Register as Temp Views for SQL
customers.createOrReplaceTempView("customers")
employees.createOrReplaceTempView("employees")
offices.createOrReplaceTempView("offices")
orders.createOrReplaceTempView("orders")
orderdetails.createOrReplaceTempView("orderdetails")
payments.createOrReplaceTempView("payments")
products.createOrReplaceTempView("products")
productlines.createOrReplaceTempView("productlines")


In [50]:
from pyspark.sql import functions as F

# Group products by productLine and calculate aggregations
products_summary = products.groupBy("productLine").agg(
    F.count("*").alias("totalProducts"),
    F.avg("MSRP").alias("avgMSRP"),
    F.max("MSRP").alias("maxMSRP")
)

# Save to Parquet
output_path = "/content/output/processed/products_summary.parquet"
products_summary.write.mode("overwrite").parquet(output_path)

print(f"✅ Products Summary saved to: {output_path}")
products_summary.show(truncate=False)


✅ Products Summary saved to: /content/output/processed/products_summary.parquet
+----------------+-------------+------------------+-------+
|productLine     |totalProducts|avgMSRP           |maxMSRP|
+----------------+-------------+------------------+-------+
|Motorcycles     |13           |97.17846153846153 |193.66 |
|Vintage Cars    |24           |87.09583333333332 |170.0  |
|Ships           |9            |86.56333333333333 |122.89 |
|Trucks and Buses|11           |103.18363636363637|136.67 |
|Classic Cars    |38           |118.02105263157895|214.3  |
|Trains          |3            |73.85333333333334 |100.84 |
|Planes          |12           |89.51583333333336 |157.69 |
+----------------+-------------+------------------+-------+



##Task 2

In [16]:
# Top 10 products by total quantity sold
top_products_qty = spark.sql("""
SELECT p.productName,
       SUM(od.quantityOrdered) AS total_quantity
FROM orderdetails od
JOIN products p ON od.productCode = p.productCode
GROUP BY p.productName
ORDER BY total_quantity DESC
LIMIT 10
""")

top_products_qty.show(10, truncate=False)

# Save result to Parquet
output_path = "/content/output/processed/top_10_products_quantity.parquet"
top_products_qty.write.mode("overwrite").parquet(output_path)
print(f"Saved Top 10 Products (Quantity) to {output_path}")


+---------------------------------------+--------------+
|productName                            |total_quantity|
+---------------------------------------+--------------+
|1992 Ferrari 360 Spider red            |1808          |
|1937 Lincoln Berline                   |1111          |
|American Airlines: MD-11S              |1085          |
|1941 Chevrolet Special Deluxe Cabriolet|1076          |
|1930 Buick Marquette Phaeton           |1074          |
|1940s Ford truck                       |1061          |
|1969 Harley Davidson Ultimate Chopper  |1057          |
|1957 Chevy Pickup                      |1056          |
|1964 Mercedes Tour Bus                 |1053          |
|1956 Porsche 356A Coupe                |1052          |
+---------------------------------------+--------------+

Saved Top 10 Products (Quantity) to /content/output/processed/top_10_products_quantity.parquet


In [17]:
# Product-wise revenue (priceEach * quantityOrdered)
product_revenue = spark.sql("""
SELECT p.productName,
       SUM(od.quantityOrdered * od.priceEach) AS total_revenue
FROM orderdetails od
JOIN products p ON od.productCode = p.productCode
GROUP BY p.productName
ORDER BY total_revenue DESC
""")

product_revenue.show(10, truncate=False)

# Save to Parquet
output_path = "/content/output/processed/product_revenue.parquet"
product_revenue.write.mode("overwrite").parquet(output_path)
print(f"Saved Product Revenue to {output_path}")


+------------------------------------+------------------+
|productName                         |total_revenue     |
+------------------------------------+------------------+
|1992 Ferrari 360 Spider red         |276839.98         |
|2001 Ferrari Enzo                   |190755.86         |
|1952 Alpine Renault 1300            |190017.95999999996|
|2003 Harley-Davidson Eagle Drag Bike|170685.99999999997|
|1968 Ford Mustang                   |161531.47999999992|
|1969 Ford Falcon                    |152543.02         |
|1980s Black Hawk Helicopter         |144959.90999999997|
|1998 Chrysler Plymouth Prowler      |142530.62999999998|
|1917 Grand Touring Sedan            |140535.60000000003|
|2002 Suzuki XREO                    |135767.03000000003|
+------------------------------------+------------------+
only showing top 10 rows

Saved Product Revenue to /content/output/processed/product_revenue.parquet


In [18]:
# Average order value per customer
avg_order_value = spark.sql("""
SELECT c.customerNumber,
       c.customerName,
       AVG(od.quantityOrdered * od.priceEach) AS avg_order_value
FROM orders o
JOIN customers c ON o.customerNumber = c.customerNumber
JOIN orderdetails od ON o.orderNumber = od.orderNumber
GROUP BY c.customerNumber, c.customerName
ORDER BY avg_order_value DESC
""")

avg_order_value.show(10, truncate=False)

# Save to Parquet
output_path = "/content/output/processed/avg_order_value_per_customer.parquet"
avg_order_value.write.mode("overwrite").parquet(output_path)
print(f"Saved Average Order Value per Customer to {output_path}")


+--------------+---------------------------+------------------+
|customerNumber|customerName               |avg_order_value   |
+--------------+---------------------------+------------------+
|455           |Super Scale Inc.           |4139.920588235294 |
|209           |Mini Caravy                |3992.595789473684 |
|328           |Tekni Collectables Inc.    |3895.5499999999993|
|175           |Gift Depot Inc.            |3816.9851999999996|
|172           |La Corne D'abondance, Co.  |3763.1965217391307|
|151           |Muscle Machine Inc         |3706.540625       |
|204           |Online Mini Collectables   |3705.1506666666664|
|333           |Australian Gift Network, Co|3679.3439999999996|
|201           |UK Collectables, Ltd.      |3676.2317241379315|
|381           |Royale Belge               |3652.1475         |
+--------------+---------------------------+------------------+
only showing top 10 rows

Saved Average Order Value per Customer to /content/output/processed/avg_order_

In [49]:
from pyspark.sql import functions as F

# Join orders -> customers -> payments
order_payment = orders \
    .join(customers, "customerNumber", "inner") \
    .join(payments, "customerNumber", "inner")

# Select and rename required columns
order_payment_report = order_payment.select(
    orders.orderNumber,
    orders.orderDate,
    customers.customerName,
    payments.amount.alias("paymentAmount"),
    orders.status
)

# Save to Parquet
output_path = "/content/output/processed/order_payment_report.parquet"
order_payment_report.write.mode("overwrite").parquet(output_path)

print(f"✅ Order Payment Report saved to: {output_path}")
order_payment_report.show(10, truncate=False)


✅ Order Payment Report saved to: /content/output/processed/order_payment_report.parquet
+-----------+----------+----------------------------+-------------+-------+
|orderNumber|orderDate |customerName                |paymentAmount|status |
+-----------+----------+----------------------------+-------------+-------+
|10100      |2003-01-06|Online Diecast Creations Co.|55425.77     |Shipped|
|10100      |2003-01-06|Online Diecast Creations Co.|10223.83     |Shipped|
|10100      |2003-01-06|Online Diecast Creations Co.|50799.69     |Shipped|
|10101      |2003-01-09|Blauer See Auto, Co.        |7466.32      |Shipped|
|10101      |2003-01-09|Blauer See Auto, Co.        |33820.62     |Shipped|
|10101      |2003-01-09|Blauer See Auto, Co.        |24101.81     |Shipped|
|10101      |2003-01-09|Blauer See Auto, Co.        |10549.01     |Shipped|
|10102      |2003-01-10|Vitachrome Inc.             |44400.5      |Shipped|
|10102      |2003-01-10|Vitachrome Inc.             |5494.78      |Shipped|


##Task 3 : Regional Sales Insights

In [19]:
# Sales per region: join offices -> employees -> customers -> orders -> orderdetails
sales_per_region = spark.sql("""
SELECT o.country,
       o.city,
       SUM(od.quantityOrdered * od.priceEach) AS total_sales
FROM offices o
JOIN employees e ON o.officeCode = e.officeCode
JOIN customers c ON e.employeeNumber = c.salesRepEmployeeNumber
JOIN orders ord ON c.customerNumber = ord.customerNumber
JOIN orderdetails od ON ord.orderNumber = od.orderNumber
GROUP BY o.country, o.city
ORDER BY total_sales DESC
""")

sales_per_region.show(10, truncate=False)

# Save to Parquet
output_path = "/content/output/processed/sales_per_region.parquet"
sales_per_region.write.mode("overwrite").parquet(output_path)
print(f"Saved Sales per Region to {output_path}")


+---------+-------------+------------------+
|country  |city         |total_sales       |
+---------+-------------+------------------+
|France   |Paris        |3083761.5800000066|
|UK       |London       |1436950.699999998 |
|USA      |San Francisco|1429063.569999999 |
|USA      |NYC          |1157589.7200000004|
|Australia|Sydney       |1147176.35        |
|USA      |Boston       |892538.6199999999 |
|Japan    |Tokyo        |457110.0700000002 |
+---------+-------------+------------------+

Saved Sales per Region to /content/output/processed/sales_per_region.parquet


In [20]:
# Total revenue by country (customers + payments)
revenue_by_country = spark.sql("""
SELECT c.country,
       SUM(p.amount) AS total_revenue
FROM customers c
JOIN payments p ON c.customerNumber = p.customerNumber
GROUP BY c.country
ORDER BY total_revenue DESC
""")

revenue_by_country.show(10, truncate=False)

# Save to Parquet
output_path = "/content/output/processed/revenue_by_country.parquet"
revenue_by_country.write.mode("overwrite").parquet(output_path)
print(f"Saved Revenue by Country to {output_path}")


+-----------+------------------+
|country    |total_revenue     |
+-----------+------------------+
|USA        |3040029.5199999996|
|Spain      |994438.5300000003 |
|France     |965750.5800000001 |
|Australia  |509385.81999999995|
|New Zealand|392486.59         |
|UK         |391503.89999999997|
|Italy      |325254.55000000005|
|Finland    |295149.35         |
|Singapore  |261671.59999999998|
|Canada     |205911.86         |
+-----------+------------------+
only showing top 10 rows

Saved Revenue by Country to /content/output/processed/revenue_by_country.parquet


In [21]:
# Top performing offices by total sales
top_offices = spark.sql("""
SELECT o.officeCode,
       o.city,
       o.country,
       SUM(od.quantityOrdered * od.priceEach) AS total_sales
FROM offices o
JOIN employees e ON o.officeCode = e.officeCode
JOIN customers c ON e.employeeNumber = c.salesRepEmployeeNumber
JOIN orders ord ON c.customerNumber = ord.customerNumber
JOIN orderdetails od ON ord.orderNumber = od.orderNumber
GROUP BY o.officeCode, o.city, o.country
ORDER BY total_sales DESC
""")

top_offices.show(10, truncate=False)

# Save to Parquet
output_path = "/content/output/processed/top_offices.parquet"
top_offices.write.mode("overwrite").parquet(output_path)
print(f"Saved Top Performing Offices to {output_path}")


+----------+-------------+---------+------------------+
|officeCode|city         |country  |total_sales       |
+----------+-------------+---------+------------------+
|4         |Paris        |France   |3083761.5800000066|
|7         |London       |UK       |1436950.699999998 |
|1         |San Francisco|USA      |1429063.569999999 |
|3         |NYC          |USA      |1157589.7200000004|
|6         |Sydney       |Australia|1147176.35        |
|2         |Boston       |USA      |892538.6199999999 |
|5         |Tokyo        |Japan    |457110.0700000002 |
+----------+-------------+---------+------------------+

Saved Top Performing Offices to /content/output/processed/top_offices.parquet


In [48]:
from pyspark.sql import functions as F

# Join offices -> employees -> customers -> payments
customer_sales_office = offices \
    .join(employees, offices.officeCode == employees.officeCode, "inner") \
    .join(customers, employees.employeeNumber == customers.salesRepEmployeeNumber, "inner") \
    .join(payments, customers.customerNumber == payments.customerNumber, "inner")

# Select only needed columns (avoid duplicate customerNumber)
customer_sales_office = customer_sales_office.select(
    offices.officeCode,
    offices.city,
    offices.country,
    customers.customerNumber.alias("custNumber"),
    payments.amount
)

# Group by office info
customer_sales_by_office = customer_sales_office.groupBy(
    "officeCode",
    "city",
    "country"
).agg(
    F.countDistinct("custNumber").alias("customerCount"),
    F.sum("amount").alias("totalSales")
)

# Save to Parquet
output_path = "/content/output/processed/customer_sales_by_office.parquet"
customer_sales_by_office.write.mode("overwrite").parquet(output_path)

print(f"✅ Customer Sales by Office saved to: {output_path}")
customer_sales_by_office.show(10, truncate=False)


✅ Customer Sales by Office saved to: /content/output/processed/customer_sales_by_office.parquet
+----------+-------------+---------+-------------+------------------+
|officeCode|city         |country  |customerCount|totalSales        |
+----------+-------------+---------+-------------+------------------+
|6         |Sydney       |Australia|10           |1007292.9800000001|
|2         |Boston       |USA      |12           |835882.33         |
|3         |NYC          |USA      |14           |1072619.47        |
|7         |London       |UK       |17           |1324325.9         |
|4         |Paris        |France   |28           |2819168.9000000004|
|1         |San Francisco|USA      |12           |1337439.5799999998|
|5         |Tokyo        |Japan    |5            |457110.07         |
+----------+-------------+---------+-------------+------------------+



##Task 4 : Performance Optimization

In [22]:
# Cache frequently used DataFrames to avoid recomputation
customers.cache()
employees.cache()
offices.cache()
orders.cache()
orderdetails.cache()
products.cache()
payments.cache()

print("✅ Cached frequently used tables.")


✅ Cached frequently used tables.


In [23]:
from pyspark.sql.functions import broadcast

# Example: Join broadcasted offices to employees for faster execution
broadcast_offices = broadcast(offices)

emp_office_join = employees.join(broadcast_offices, "officeCode") \
                           .select("employeeNumber", "lastName", "city", "country")

emp_office_join.show(5)


+--------------+---------+-------------+---------+
|employeeNumber| lastName|         city|  country|
+--------------+---------+-------------+---------+
|          1002|   Murphy|San Francisco|      USA|
|          1056|Patterson|San Francisco|      USA|
|          1076| Firrelli|San Francisco|      USA|
|          1088|Patterson|       Sydney|Australia|
|          1102|   Bondur|        Paris|   France|
+--------------+---------+-------------+---------+
only showing top 5 rows



In [24]:
# Lazy evaluation demo
lazy_df = orders.filter("status = 'Shipped'").select("orderNumber")

print("No computation yet (lazy)...")

# Trigger computation with an action
print("Count of Shipped Orders:", lazy_df.count())


No computation yet (lazy)...
Count of Shipped Orders: 303


In [25]:
# Convert orderdetails to RDD for custom aggregation using aggregateByKey
order_rdd = orderdetails.rdd.map(lambda row: (row.orderNumber, row.quantityOrdered * row.priceEach))

# Aggregate revenue per order
agg_rdd = order_rdd.aggregateByKey(0,
                                   lambda acc, x: acc + x,   # seqOp
                                   lambda acc1, acc2: acc1 + acc2)  # combOp

# Convert back to DataFrame
order_revenue_df = agg_rdd.toDF(["orderNumber", "totalRevenue"])
order_revenue_df.show(5)


+-----------+------------------+
|orderNumber|      totalRevenue|
+-----------+------------------+
|      10100|10223.829999999998|
|      10101|          10549.01|
|      10102|           5494.78|
|      10103|50218.950000000004|
|      10104|           40206.2|
+-----------+------------------+
only showing top 5 rows



In [26]:
output_path = "/content/output/processed/optimized_order_revenue.parquet"
order_revenue_df.write.mode("overwrite").parquet(output_path)
print(f"Optimized Order Revenue saved to {output_path}")


Optimized Order Revenue saved to /content/output/processed/optimized_order_revenue.parquet


In [44]:
from pyspark.sql import functions as F

# Join employees -> customers -> orders -> payments
employee_sales = employees \
    .join(customers, employees.employeeNumber == customers.salesRepEmployeeNumber, "inner") \
    .join(orders, customers.customerNumber == orders.customerNumber, "inner") \
    .join(payments, customers.customerNumber == payments.customerNumber, "inner")

# Aggregate: Total orders & total sales by employee
employee_sales_summary = employee_sales.groupBy(
    employees.employeeNumber,
    employees.firstName,
    employees.lastName
).agg(
    F.countDistinct("orderNumber").alias("totalOrders"),  # FIX: remove table prefix
    F.sum("amount").alias("totalSales")                   # FIX: remove table prefix
)

# Combine first and last name into employeeName
employee_sales_summary = employee_sales_summary.withColumn(
    "employeeName",
    F.concat_ws(" ", F.col("firstName"), F.col("lastName"))
).select(
    "employeeNumber", "employeeName", "totalOrders", "totalSales"
)

# Save result
output_path = "/content/output/processed/employee_sales_summary.parquet"
employee_sales_summary.write.mode("overwrite").parquet(output_path)

print(f"✅ Employee Performance Summary saved to: {output_path}")
employee_sales_summary.show(10, truncate=False)


✅ Employee Performance Summary saved to: /content/output/processed/employee_sales_summary.parquet
+--------------+---------------+-----------+------------------+
|employeeNumber|employeeName   |totalOrders|totalSales        |
+--------------+---------------+-----------+------------------+
|1611          |Andy Fixter    |19         |2118017.4299999997|
|1621          |Mami Nishi     |16         |1681538.9900000002|
|1286          |Foon Yue Tseng |17         |1542942.88        |
|1166          |Leslie Thompson|14         |869050.3400000001 |
|1188          |Julie Firrelli |14         |948732.5700000001 |
|1504          |Barry Jones    |25         |1866060.52        |
|1612          |Peter Marsh    |19         |2022883.4000000001|
|1323          |George Vanauf  |22         |1645493.9899999998|
|1702          |Martin Gerard  |12         |955915.4999999999 |
|1216          |Steve Patterson|18         |1355621.27        |
+--------------+---------------+-----------+------------------+
only s

##Task 5 : Code Structure & Submission

In [29]:
%%bash
mkdir -p /content/src/java
mkdir -p /content/build/java
mkdir -p /content/output/processed


In [30]:
%%writefile /content/src/java/OrderRevenueAnalysis.java
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class OrderRevenueAnalysis {
    public static void main(String[] args) {
        // Initialize Spark Session
        SparkSession spark = SparkSession.builder()
                .appName("OrderRevenueAnalysis")
                .config("spark.master", "local[*]")   // Required for Colab
                .getOrCreate();

        // Load Parquet data
        Dataset<Row> orders = spark.read().parquet("/content/data/parquet/orders.parquet");
        Dataset<Row> orderdetails = spark.read().parquet("/content/data/parquet/orderdetails.parquet");
        Dataset<Row> products = spark.read().parquet("/content/data/parquet/products.parquet");

        // Join & calculate revenue
        Dataset<Row> revenue = orderdetails.join(orders, "orderNumber")
                .join(products, "productCode")
                .groupBy("productName")
                .sum("quantityOrdered")
                .withColumnRenamed("sum(quantityOrdered)", "totalQuantity");

        // Save output
        revenue.write().mode("overwrite").parquet("/content/output/processed/java_order_revenue.parquet");

        System.out.println("Parquet written to /content/output/processed/java_order_revenue.parquet");

        spark.stop();
    }
}


Writing /content/src/java/OrderRevenueAnalysis.java


In [31]:
%%bash
# Set Spark classpath
CLASS_PATH=$(echo /usr/local/spark/jars/*.jar | tr ' ' ':')

# Compile Java program
javac -cp "$CLASS_PATH" -d /content/build/java /content/src/java/OrderRevenueAnalysis.java

# Run Java program
java -cp "/content/build/java:$CLASS_PATH" OrderRevenueAnalysis


Parquet written to /content/output/processed/java_order_revenue.parquet


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 05:18:49 INFO SparkContext: Running Spark version 3.5.0
25/08/06 05:18:49 INFO SparkContext: OS info Linux, 6.1.123+, amd64
25/08/06 05:18:49 INFO SparkContext: Java version 11.0.28
25/08/06 05:18:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 05:18:49 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 05:18:49 INFO SparkContext: Submitted application: OrderRevenueAnalysis
25/08/06 05:18:49 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 05:18:49 INFO ResourceProfile: Limiting resource is cpu
25/08/06 05:18:49 INFO ResourcePr

In [32]:
!ls /content/output/processed/java_order_revenue.parquet


part-00000-8651b3cf-94aa-4aa7-ac73-91117b43d825-c000.snappy.parquet  _SUCCESS


In [33]:
#View Parquet Output with PySpark
df = spark.read.parquet("/content/output/processed/java_order_revenue.parquet")
df.show(10, truncate=False)


+-----------------------------------+-------------+
|productName                        |totalQuantity|
+-----------------------------------+-------------+
|1996 Moto Guzzi 1100i              |999          |
|1936 Chrysler Airflow              |983          |
|18th Century Vintage Horse Carriage|907          |
|The Titanic                        |952          |
|1958 Setra Bus                     |972          |
|Diamond T620 Semi-Skirted Tanker   |979          |
|2001 Ferrari Enzo                  |1019         |
|The Queen Mary                     |896          |
|1930 Buick Marquette Phaeton       |1074         |
|The Mayflower                      |898          |
+-----------------------------------+-------------+
only showing top 10 rows



In [34]:
!mkdir -p /content/src/java
!mkdir -p /content/build/java
!mkdir -p /content/output/processed


In [35]:
%%writefile /content/src/java/CustomerSalesAnalysis.java
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;

public class CustomerSalesAnalysis {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("CustomerSalesAnalysis")
                .getOrCreate();

        // Load data
        Dataset<Row> orders = spark.read().parquet("/content/data/parquet/orders.parquet");
        Dataset<Row> payments = spark.read().parquet("/content/data/parquet/payments.parquet");
        Dataset<Row> customers = spark.read().parquet("/content/data/parquet/customers.parquet");

        // Join orders -> payments -> customers
        Dataset<Row> customerSales = orders.join(payments, "customerNumber")
                                           .join(customers, "customerNumber")
                                           .groupBy(col("customerName"))
                                           .agg(avg("amount").alias("avgOrderValue"),
                                                sum("amount").alias("totalSpent"))
                                           .orderBy(col("totalSpent").desc());

        // Save output
        customerSales.write().mode("overwrite").parquet("/content/output/processed/java_customer_sales.parquet");

        spark.stop();
    }
}


Writing /content/src/java/CustomerSalesAnalysis.java


In [36]:
%%writefile /content/src/java/EmployeePerformanceAnalysis.java
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;

public class EmployeePerformanceAnalysis {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("EmployeePerformanceAnalysis")
                .getOrCreate();

        // Load data
        Dataset<Row> employees = spark.read().parquet("/content/data/parquet/employees.parquet");
        Dataset<Row> customers = spark.read().parquet("/content/data/parquet/customers.parquet");
        Dataset<Row> payments = spark.read().parquet("/content/data/parquet/payments.parquet");

        // Join employees -> customers -> payments
        Dataset<Row> empSales = employees.join(customers, "employeeNumber")
                                         .join(payments, "customerNumber")
                                         .groupBy(col("lastName"), col("firstName"))
                                         .agg(sum("amount").alias("totalSales"),
                                              countDistinct("customerNumber").alias("uniqueCustomers"))
                                         .orderBy(col("totalSales").desc());

        // Save output
        empSales.write().mode("overwrite").parquet("/content/output/processed/java_employee_sales.parquet");

        spark.stop();
    }
}


Writing /content/src/java/EmployeePerformanceAnalysis.java


In [37]:
%%bash
CLASS_PATH=$(echo /usr/local/spark/jars/*.jar | tr ' ' ':')
javac -cp "$CLASS_PATH" -d /content/build/java /content/src/java/*.java


In [38]:
%%bash
CLASS_PATH=$(echo /usr/local/spark/jars/*.jar | tr ' ' ':')
java -cp "/content/build/java:$CLASS_PATH" OrderRevenueAnalysis
java -cp "/content/build/java:$CLASS_PATH" CustomerSalesAnalysis
java -cp "/content/build/java:$CLASS_PATH" EmployeePerformanceAnalysis


Parquet written to /content/output/processed/java_order_revenue.parquet


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 05:23:24 INFO SparkContext: Running Spark version 3.5.0
25/08/06 05:23:24 INFO SparkContext: OS info Linux, 6.1.123+, amd64
25/08/06 05:23:24 INFO SparkContext: Java version 11.0.28
25/08/06 05:23:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 05:23:24 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 05:23:24 INFO SparkContext: Submitted application: OrderRevenueAnalysis
25/08/06 05:23:24 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 05:23:24 INFO ResourceProfile: Limiting resource is cpu
25/08/06 05:23:24 INFO ResourcePr

CalledProcessError: Command 'b'CLASS_PATH=$(echo /usr/local/spark/jars/*.jar | tr \' \' \':\')\njava -cp "/content/build/java:$CLASS_PATH" OrderRevenueAnalysis\njava -cp "/content/build/java:$CLASS_PATH" CustomerSalesAnalysis\njava -cp "/content/build/java:$CLASS_PATH" EmployeePerformanceAnalysis\n'' returned non-zero exit status 1.

In [41]:
from pyspark.sql import functions as F

# Product Demand Summary Analysis
# Output: product_demand_summary.parquet

# 1. Join orderdetails and products
product_demand = orderdetails.join(products, "productCode")

# 2. Group by productCode and productName
product_demand_summary = product_demand.groupBy("productCode", "productName") \
    .agg(
        F.countDistinct("orderNumber").alias("orderCount"),
        F.sum("quantityOrdered").alias("totalOrderedQty"),
        F.avg("priceEach").alias("averagePriceEach")
    )

# 3. Save to Parquet
output_path = "/content/output/processed/product_demand_summary.parquet"
product_demand_summary.write.mode("overwrite").parquet(output_path)

print("✅ Product Demand Summary saved to:", output_path)

# 4. Show top 10 products by quantity
display(product_demand_summary.orderBy(F.desc("totalOrderedQty")).limit(10))


✅ Product Demand Summary saved to: /content/output/processed/product_demand_summary.parquet


DataFrame[productCode: string, productName: string, orderCount: bigint, totalOrderedQty: bigint, averagePriceEach: double]

In [42]:
pd.read_parquet("/content/output/processed/product_demand_summary.parquet").head(10)


Unnamed: 0,productCode,productName,orderCount,totalOrderedQty,averagePriceEach
0,S12_2823,2002 Suzuki XREO,28,1028,132.168929
1,S18_1889,1948 Porsche 356-A Roadster,27,972,68.53
2,S24_1578,1997 BMW R 1100 S,28,1033,101.953571
3,S24_3371,1971 Alpine Renault 1600s,27,969,54.132593
4,S24_2972,1982 Lamborghini Diablo,27,912,33.942593
5,S18_3140,1903 Ford Model A,27,883,125.714074
6,S700_2824,1982 Camaro Z28,28,997,89.408929
7,S12_4473,1957 Chevy Pickup,28,1056,103.902143
8,S18_2870,1999 Indy 500 Monte Carlo SS,25,855,118.3776
9,S32_1268,1980’s GM Manhattan Express,28,911,85.853214
