<a href="https://colab.research.google.com/github/Musaveer39/PySpark/blob/main/SparkJavaAssignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Environment Setup

In [1]:
# Mount Google Drive to store cached files
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [2]:
# Set variables
strBasePath="/content/drive/MyDrive/IBM-DE-Spark-Scala"
scala_deb_path = strBasePath+"/scala-2.12.18.deb"
spark_tgz_path = strBasePath+"/spark-3.4.1-bin-hadoop3.tgz"

!mkdir -p /content/tmp
import os
# Download Scala .deb if not cached
if not os.path.exists(scala_deb_path):
    !wget -O "{scala_deb_path}" https://github.com/scala/scala/releases/download/v2.12.18/scala-2.12.18.deb

# Download Spark tgz if not cached
if not os.path.exists(spark_tgz_path):
    !wget -O "{spark_tgz_path}" https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Copy cached files to working dir
!cp "{scala_deb_path}" /content/tmp/scala-2.12.18.deb
!cp "{spark_tgz_path}" /content/tmp/spark-3.4.1-bin-hadoop3.tgz

# Install Java if not already present
!java -version || apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Install Scala
!dpkg -i /content/tmp/scala-2.12.18.deb

# Extract Spark
!tar xf /content/tmp/spark-3.4.1-bin-hadoop3.tgz -C /content

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
os.environ["PATH"] += f":{os.environ['SPARK_HOME']}/bin"

# Confirm installation
!java -version
!scala -version
!scalac -version
!echo "Spark path: $SPARK_HOME"
!ls $SPARK_HOME

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Selecting previously unselected package scala.
(Reading database ... 126284 files and directories currently installed.)
Preparing to unpack /content/tmp/scala-2.12.18.deb ...
Unpacking scala (2.12.18-400) ...
Setting up scala (2.12.18-400) ...
Creating system group: scala
Creating system user: scala in scala with scala daemon-user and shell /bin/false
Processing triggers for man-db (2.10.2-1) ...
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Scala code runner version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Scala compiler version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Spark path: /content/

In [3]:
!java -version

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)


In [5]:
!javac -version

javac 11.0.28


## Task 1: Data Ingestion & Setup

In [14]:
%%writefile SparkApp.java
import org.apache.spark.sql.*;

public class SparkApp {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Java Spark App")
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> df = spark.read().option("header", true).csv("input.csv");
        df.show();

        df.write().mode("overwrite").parquet("output_parquet");

        spark.stop();
    }
}


Writing SparkApp.java


In [17]:
!javac -cp "$SPARK_HOME/jars/*" SparkApp.java


In [19]:
!java -cp ".:$SPARK_HOME/jars/*" SparkApp

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 05:00:30 INFO SparkContext: Running Spark version 3.4.1
25/08/06 05:00:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 05:00:31 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 05:00:31 INFO SparkContext: Submitted application: Java Spark App
25/08/06 05:00:31 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 05:00:31 INFO ResourceProfile: Limiting resource is cpu
25/08/06 05:00:31 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 05:00:31 INFO SecurityManager: Changing view acls to: root
25/08/06 05:00:31 INFO

In [26]:
%%writefile Schema.java
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;

public class Schema {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Data Ingestion Assignment")
                .master("local[*]")
                .getOrCreate();

        // Define schemas manually
        StructType productLinesSchema = new StructType()
                .add("productLine", DataTypes.StringType)
                .add("textDescription", DataTypes.StringType);

        StructType productsSchema = new StructType()
                .add("productCode", DataTypes.StringType)
                .add("productName", DataTypes.StringType)
                .add("productLine", DataTypes.StringType);

        StructType officesSchema = new StructType()
                .add("officeCode", DataTypes.StringType)
                .add("city", DataTypes.StringType)
                .add("country", DataTypes.StringType);

        StructType employeesSchema = new StructType()
                .add("employeeNumber", DataTypes.IntegerType)
                .add("lastName", DataTypes.StringType)
                .add("officeCode", DataTypes.StringType);

        StructType customersSchema = new StructType()
                .add("customerNumber", DataTypes.IntegerType)
                .add("customerName", DataTypes.StringType);

        StructType paymentsSchema = new StructType()
                .add("customerNumber", DataTypes.IntegerType)
                .add("checkNumber", DataTypes.StringType)
                .add("paymentDate", DataTypes.StringType); // consider DateType if date formatted

        StructType ordersSchema = new StructType()
                .add("orderNumber", DataTypes.IntegerType)
                .add("orderDate", DataTypes.StringType)
                .add("customerNumber", DataTypes.IntegerType);

        StructType orderDetailsSchema = new StructType()
                .add("orderNumber", DataTypes.IntegerType)
                .add("productCode", DataTypes.StringType)
                .add("quantityOrdered", DataTypes.IntegerType);

        // Base paths
        String inputPath = "";
        String outputPath = "data/parquet/";

        // Read and write all tables
        readAndSave(spark, inputPath + "productlines.csv", outputPath + "productlines", productLinesSchema);
        readAndSave(spark, inputPath + "products.csv", outputPath + "products", productsSchema);
        readAndSave(spark, inputPath + "offices.csv", outputPath + "offices", officesSchema);
        readAndSave(spark, inputPath + "employees.csv", outputPath + "employees", employeesSchema);
        readAndSave(spark, inputPath + "customers.csv", outputPath + "customers", customersSchema);
        readAndSave(spark, inputPath + "payments.csv", outputPath + "payments", paymentsSchema);
        readAndSave(spark, inputPath + "orders.csv", outputPath + "orders", ordersSchema);
        readAndSave(spark, inputPath + "orderdetails.csv", outputPath + "orderdetails", orderDetailsSchema);

        spark.stop();
    }

    private static void readAndSave(SparkSession spark, String inputCsvPath, String outputParquetPath, StructType schema) {
        Dataset<Row> df = spark.read()
                .option("header", "true")
                .schema(schema)
                .csv(inputCsvPath);

        df.write()
                .mode(SaveMode.Overwrite)
                .parquet(outputParquetPath);
    }
}



Overwriting Schema.java


In [27]:
!javac -cp "$SPARK_HOME/jars/*" Schema.java

In [28]:
!java -cp ".:$SPARK_HOME/jars/*" Schema

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 05:09:48 INFO SparkContext: Running Spark version 3.4.1
25/08/06 05:09:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 05:09:49 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 05:09:49 INFO SparkContext: Submitted application: Data Ingestion Assignment
25/08/06 05:09:49 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 05:09:49 INFO ResourceProfile: Limiting resource is cpu
25/08/06 05:09:49 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 05:09:49 INFO SecurityManager: Changing view acls to: root
25/08/06 05