<a href="https://colab.research.google.com/github/abhishekkm8088/Pyspark-Abhi/blob/main/Usecase_Adv_Spark_Java.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Environment** Setup Instructions

In [None]:
# Mount Google Drive to store cached files
from google.colab import drive
drive.mount("/content/drive")


Mounted at /content/drive


In [None]:
# Set variables
strBasePath="/content/drive/MyDrive/IBM-DE-Spark-Scala"
scala_deb_path = strBasePath+"/scala-2.12.18.deb"
spark_tgz_path = strBasePath+"/spark-3.4.1-bin-hadoop3.tgz"

!mkdir -p /content/tmp
import os
# Download Scala .deb if not cached
if not os.path.exists(scala_deb_path):
    !wget -O "{scala_deb_path}" https://github.com/scala/scala/releases/download/v2.12.18/scala-2.12.18.deb

# Download Spark tgz if not cached
if not os.path.exists(spark_tgz_path):
    !wget -O "{spark_tgz_path}" https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Copy cached files to working dir
!cp "{scala_deb_path}" /content/tmp/scala-2.12.18.deb
!cp "{spark_tgz_path}" /content/tmp/spark-3.4.1-bin-hadoop3.tgz

# Install Java if not already present
!java -version || apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Install Scala
!dpkg -i /content/tmp/scala-2.12.18.deb

# Extract Spark
!tar xf /content/tmp/spark-3.4.1-bin-hadoop3.tgz -C /content

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
os.environ["PATH"] += f":{os.environ['SPARK_HOME']}/bin"

# Confirm installation
!java -version
!scala -version
!scalac -version
!echo "Spark path: $SPARK_HOME"
!ls $SPARK_HOME

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Selecting previously unselected package scala.
(Reading database ... 126284 files and directories currently installed.)
Preparing to unpack /content/tmp/scala-2.12.18.deb ...
Unpacking scala (2.12.18-400) ...
Setting up scala (2.12.18-400) ...
Creating system group: scala
Creating system user: scala in scala with scala daemon-user and shell /bin/false
Processing triggers for man-db (2.10.2-1) ...
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Scala code runner version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Scala compiler version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Spark path: /content/

#Task 1: Data Ingestion & Setup

In [None]:
%%writefile Schema.java
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;

public class Schema {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Data Ingestion Assignment")
                .master("local[*]")
                .getOrCreate();

        // Define schemas manually
        StructType productLinesSchema = new StructType()
                .add("productLine", DataTypes.StringType)
                .add("textDescription", DataTypes.StringType);

        StructType productsSchema = new StructType()
                .add("productCode", DataTypes.StringType)
                .add("productName", DataTypes.StringType)
                .add("productLine", DataTypes.StringType);

        StructType officesSchema = new StructType()
                .add("officeCode", DataTypes.StringType)
                .add("city", DataTypes.StringType)
                .add("country", DataTypes.StringType);

        StructType employeesSchema = new StructType()
                .add("employeeNumber", DataTypes.IntegerType)
                .add("lastName", DataTypes.StringType)
                .add("officeCode", DataTypes.StringType);

        StructType customersSchema = new StructType()
                .add("customerNumber", DataTypes.IntegerType, true)
                .add("customerName", DataTypes.StringType, true)
                .add("contactLastName", DataTypes.StringType, true)
                .add("contactFirstName", DataTypes.StringType, true)
                .add("phone", DataTypes.StringType, true)
                .add("addressLine1", DataTypes.StringType, true)
                .add("addressLine2", DataTypes.StringType, true)
                .add("city", DataTypes.StringType, true)
                .add("state", DataTypes.StringType, true)
                .add("postalCode", DataTypes.StringType, true)
                .add("country", DataTypes.StringType, true)
                .add("salesRepEmployeeNumber", DataTypes.IntegerType, true)
                .add("creditLimit", DataTypes.DoubleType, true);

        StructType paymentsSchema = new StructType()
              .add("customerNumber", DataTypes.IntegerType)
              .add("checkNumber", DataTypes.StringType)
              .add("paymentDate", DataTypes.DateType)
              .add("amount", DataTypes.DoubleType);

        StructType ordersSchema = new StructType()
                .add("orderNumber", DataTypes.IntegerType)
                .add("orderDate", DataTypes.StringType)
                .add("customerNumber", DataTypes.IntegerType);

        StructType orderDetailsSchema = new StructType()
                .add("orderNumber", DataTypes.IntegerType)
                .add("productCode", DataTypes.StringType)
                .add("quantityOrdered", DataTypes.IntegerType)
                .add("priceEach", DataTypes.DoubleType)
                .add("orderLineNumber", DataTypes.IntegerType);

        // Base paths
        String inputPath = "";
        String outputPath = "data/parquet/";

        // Read and write all tables
        readAndSave(spark, inputPath + "productlines.csv", outputPath + "productlines", productLinesSchema);
        readAndSave(spark, inputPath + "products.csv", outputPath + "products", productsSchema);
        readAndSave(spark, inputPath + "offices.csv", outputPath + "offices", officesSchema);
        readAndSave(spark, inputPath + "employees.csv", outputPath + "employees", employeesSchema);
        readAndSave(spark, inputPath + "customers.csv", outputPath + "customers", customersSchema);
        readAndSave(spark, inputPath + "payments.csv", outputPath + "payments", paymentsSchema);
        readAndSave(spark, inputPath + "orders.csv", outputPath + "orders", ordersSchema);
        readAndSave(spark, inputPath + "orderdetails.csv", outputPath + "orderdetails", orderDetailsSchema);

        spark.stop();
    }

    private static void readAndSave(SparkSession spark, String inputCsvPath, String outputParquetPath, StructType schema) {
        Dataset<Row> df = spark.read()
                .option("header", "true")
                .schema(schema)
                .csv(inputCsvPath);

        df.write()
                .mode(SaveMode.Overwrite)
                .parquet(outputParquetPath);
    }
}

Writing Schema.java


In [None]:
!javac -cp "$SPARK_HOME/jars/*" Schema.java


In [None]:
!java -cp ".:$SPARK_HOME/jars/*" Schema

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 08:38:43 INFO SparkContext: Running Spark version 3.4.1
25/08/06 08:38:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 08:38:43 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 08:38:43 INFO SparkContext: Submitted application: Data Ingestion Assignment
25/08/06 08:38:43 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 08:38:43 INFO ResourceProfile: Limiting resource is cpu
25/08/06 08:38:43 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 08:38:43 INFO SecurityManager: Changing view acls to: root
25/08/06 08

#Task 2: Product & Order Analysis

In [None]:
%%writefile SparkAnalysis.java
import org.apache.spark.sql.*;

public class SparkAnalysis {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
            .appName("Product and Order Analysis")
            .master("local[*]")
            .getOrCreate();

        Dataset<Row> orderDetails = spark.read().parquet("data/parquet/orderdetails");
        Dataset<Row> orders = spark.read().parquet("data/parquet/orders");
        Dataset<Row> products = spark.read().parquet("data/parquet/products");

        // Top 10 products by quantity sold
        Dataset<Row> topProducts = orderDetails.groupBy("productCode")
            .sum("quantityOrdered")
            .orderBy(functions.desc("sum(quantityOrdered)"))
            .limit(10);

        topProducts.show();

        // Join for product-wise revenue
        Dataset<Row> revenueData = orderDetails
            .join(products, "productCode")
            .join(orders, "orderNumber")
            .withColumn("revenue", functions.expr("quantityOrdered * priceEach"));

        Dataset<Row> productRevenue = revenueData.groupBy("productCode", "productName")
            .agg(functions.sum("revenue").alias("totalRevenue"))
            .orderBy(functions.desc("totalRevenue"));

        productRevenue.show();

        // Average order value per customer
        Dataset<Row> customerAOV = revenueData.groupBy("customerNumber")
            .agg(functions.sum("revenue").alias("totalSpent"),
                 functions.countDistinct("orderNumber").alias("orderCount"))
            .withColumn("averageOrderValue", functions.expr("totalSpent / orderCount"));

        customerAOV.show();

        // Save results
        topProducts.write().mode("overwrite").parquet("data/results/top_products");
        productRevenue.write().mode("overwrite").parquet("data/results/product_revenue");
        customerAOV.write().mode("overwrite").parquet("data/results/customer_aov");

        spark.stop();
    }
}


Writing SparkAnalysis.java


In [None]:
!javac -cp "$SPARK_HOME/jars/*" SparkAnalysis.java


In [None]:
!java -cp ".:$SPARK_HOME/jars/*" SparkAnalysis

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 08:40:01 INFO SparkContext: Running Spark version 3.4.1
25/08/06 08:40:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 08:40:01 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 08:40:01 INFO SparkContext: Submitted application: Product and Order Analysis
25/08/06 08:40:01 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 08:40:01 INFO ResourceProfile: Limiting resource is cpu
25/08/06 08:40:01 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 08:40:01 INFO SecurityManager: Changing view acls to: root
25/08/06 0

#Task 3: Regional Sales Insights

In [None]:
%%writefile Task3.java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class Task3 {
    public static void main(String[] args) {
        // Create Spark session
        SparkSession spark = SparkSession.builder()
                .appName("Task 3 - Regional Sales Insights")
                .master("local[*]")
                .getOrCreate();

        // Read Parquet files
        Dataset<Row> offices = spark.read().parquet("data/parquet/offices");
        Dataset<Row> employees = spark.read().parquet("data/parquet/employees");
        Dataset<Row> customers = spark.read().parquet("data/parquet/customers");
        Dataset<Row> payments = spark.read().parquet("data/parquet/payments");

        // Register Temp Views
        offices.createOrReplaceTempView("offices");
        employees.createOrReplaceTempView("employees");
        customers.createOrReplaceTempView("customers");
        payments.createOrReplaceTempView("payments");

        // -------------------------
        // Task 3.1: Sales per region
        // -------------------------
        Dataset<Row> salesPerRegion = spark.sql(
            "SELECT o.country, o.city, " +
            "       COUNT(DISTINCT e.employeeNumber) AS totalEmployees, " +
            "       COUNT(DISTINCT c.customerNumber) AS totalCustomers " +
            "FROM offices o " +
            "JOIN employees e ON o.officeCode = e.officeCode " +
            "JOIN customers c ON e.employeeNumber = c.salesRepEmployeeNumber " +
            "GROUP BY o.country, o.city " +
            "ORDER BY o.country, o.city"
        );

        salesPerRegion.show();
        salesPerRegion.write().mode("overwrite").parquet("data/output/task3/sales_per_region");

        // -------------------------------
        // Task 3.2: Total revenue by country
        // -------------------------------
        Dataset<Row> revenueByCountry = spark.sql(
            "SELECT c.country, " +
            "       SUM(p.amount) AS totalRevenue " +
            "FROM customers c " +
            "JOIN payments p ON c.customerNumber = p.customerNumber " +
            "GROUP BY c.country " +
            "ORDER BY totalRevenue DESC"
        );

        revenueByCountry.show();
        revenueByCountry.write().mode("overwrite").parquet("data/output/task3/revenue_by_country");

        // -------------------------------------
        // Task 3.3: Top-performing offices by revenue
        // -------------------------------------
        Dataset<Row> topOffices = spark.sql(
            "SELECT o.officeCode, o.city, o.country, " +
            "       SUM(p.amount) AS officeRevenue " +
            "FROM offices o " +
            "JOIN employees e ON o.officeCode = e.officeCode " +
            "JOIN customers c ON e.employeeNumber = c.salesRepEmployeeNumber " +
            "JOIN payments p ON c.customerNumber = p.customerNumber " +
            "GROUP BY o.officeCode, o.city, o.country " +
            "ORDER BY officeRevenue DESC"
        );

        topOffices.show();
        topOffices.write().mode("overwrite").parquet("data/output/task3/top_offices");

        // Stop Spark session
        spark.stop();
    }
}


Writing Task3.java


In [None]:

!javac -cp "$SPARK_HOME/jars/*" Task3.java


In [None]:
!java -cp ".:$SPARK_HOME/jars/*" Task3

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 08:41:07 INFO SparkContext: Running Spark version 3.4.1
25/08/06 08:41:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 08:41:08 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 08:41:08 INFO SparkContext: Submitted application: Task 3 - Regional Sales Insights
25/08/06 08:41:08 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 08:41:08 INFO ResourceProfile: Limiting resource is cpu
25/08/06 08:41:08 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 08:41:08 INFO SecurityManager: Changing view acls to: root
25/0

#Task 4: Performance Optimization

In [None]:
%%writefile Task4.java
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

import java.util.Iterator;

public class Task4 {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Task 4: Performance Optimization")
                .master("local[*]")
                .getOrCreate();

        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

        Dataset<Row> customers = spark.read().parquet("data/parquet/customers");
        Dataset<Row> payments = spark.read().parquet("data/parquet/payments");
        Dataset<Row> offices = spark.read().parquet("data/parquet/offices");

        // Broadcast smaller dataset
        Broadcast<Dataset<Row>> broadcastOffices = jsc.broadcast(offices);

        // Cache payments since we’ll use it multiple times
        payments.persist(StorageLevel.MEMORY_AND_DISK());

        // 1. Aggregate revenue per country using mapPartitions
        JavaRDD<Row> revenueByCountryRDD = payments
                .join(customers, "customerNumber")
                .select("country", "amount")
                .javaRDD()
                .mapPartitions(iterator -> {
                    java.util.Map<String, Double> map = new java.util.HashMap<>();
                    while (iterator.hasNext()) {
                        Row row = iterator.next();
                        String country = row.getString(0);
                        double amount = row.getDouble(1);
                        map.put(country, map.getOrDefault(country, 0.0) + amount);
                    }
                    java.util.List<Row> rows = new java.util.ArrayList<>();
                    for (java.util.Map.Entry<String, Double> entry : map.entrySet()) {
                        rows.add(RowFactory.create(entry.getKey(), entry.getValue()));
                    }
                    return rows.iterator();
                });

        // Define schema
        StructType schema = new StructType()
                .add("country", DataTypes.StringType)
                .add("totalRevenue", DataTypes.DoubleType);

        Dataset<Row> revenueByCountry = spark.createDataFrame(revenueByCountryRDD, schema);
        revenueByCountry.show();

        // 2. Aggregate using aggregateByKey
        JavaPairRDD<String, Double> countryRevenuePair = payments
                .join(customers, "customerNumber")
                .select("country", "amount")
                .javaRDD()
                .mapToPair(row -> new Tuple2<>(row.getString(0), row.getDouble(1)));

        JavaPairRDD<String, Double> aggregatedRevenue = countryRevenuePair.aggregateByKey(
                0.0,
                Double::sum,
                Double::sum
        );

        Dataset<Row> aggregatedDF = spark.createDataFrame(
                aggregatedRevenue.map(tuple -> RowFactory.create(tuple._1, tuple._2)),
                schema
        );
        aggregatedDF.show();

        // 3. Lazy evaluation example
        Dataset<Row> lazyEval = payments.filter("amount > 1000");
        System.out.println("Lazy evaluation example defined. Not triggered yet.");
        lazyEval.show(); // Action triggers execution

        // Save output
        revenueByCountry.write().mode(SaveMode.Overwrite).parquet("data/output/task4/revenueByCountry");
        aggregatedDF.write().mode(SaveMode.Overwrite).parquet("data/output/task4/aggregatedRevenue");

        // Unpersist after use
        payments.unpersist();

        spark.stop();
    }
}

Writing Task4.java


In [None]:
!javac -cp "$SPARK_HOME/jars/*" Task4.java

In [None]:
!java -cp ".:$SPARK_HOME/jars/*" Task4

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 08:42:00 INFO SparkContext: Running Spark version 3.4.1
25/08/06 08:42:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 08:42:01 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 08:42:01 INFO SparkContext: Submitted application: Task 4: Performance Optimization
25/08/06 08:42:01 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 08:42:01 INFO ResourceProfile: Limiting resource is cpu
25/08/06 08:42:01 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 08:42:01 INFO SecurityManager: Changing view acls to: root
25/0

#Task 5: Code Structure & Submission

In [None]:
!zip -r data_folder.zip /content/data/

# Step 2: Download the zipped folder
from google.colab import files
files.download('data_folder.zip')

  adding: content/data/ (stored 0%)
  adding: content/data/.ipynb_checkpoints/ (stored 0%)
  adding: content/data/output/ (stored 0%)
  adding: content/data/output/task1/ (stored 0%)
  adding: content/data/output/task1/orderdetails/ (stored 0%)
  adding: content/data/output/task1/orderdetails/._SUCCESS.crc (stored 0%)
  adding: content/data/output/task1/orderdetails/_SUCCESS (stored 0%)
  adding: content/data/output/task1/orderdetails/part-00000-e416c70d-cd9e-43af-986e-3ba02e998a1e-c000.snappy.parquet (deflated 14%)
  adding: content/data/output/task1/orderdetails/.part-00000-e416c70d-cd9e-43af-986e-3ba02e998a1e-c000.snappy.parquet.crc (stored 0%)
  adding: content/data/output/task1/customers/ (stored 0%)
  adding: content/data/output/task1/customers/._SUCCESS.crc (stored 0%)
  adding: content/data/output/task1/customers/part-00000-23671a68-7d7e-43a5-987e-fb6eb20921c3-c000.snappy.parquet (deflated 29%)
  adding: content/data/output/task1/customers/.part-00000-23671a68-7d7e-43a5-987e-fb

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>