<a href="https://colab.research.google.com/github/farhaan-hussain/PySpark/blob/main/SparkAssignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Initialize Spark Scala Env.

In [1]:
# Mount Google Drive to store cached files
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [2]:
# Set variables
strBasePath="/content/drive/MyDrive/IBM-DE-Spark-Scala"
scala_deb_path = strBasePath+"/scala-2.12.18.deb"
spark_tgz_path = strBasePath+"/spark-3.4.1-bin-hadoop3.tgz"

!mkdir -p /content/tmp
import os
# Download Scala .deb if not cached
if not os.path.exists(scala_deb_path):
    !wget -O "{scala_deb_path}" https://github.com/scala/scala/releases/download/v2.12.18/scala-2.12.18.deb

# Download Spark tgz if not cached
if not os.path.exists(spark_tgz_path):
    !wget -O "{spark_tgz_path}" https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Copy cached files to working dir
!cp "{scala_deb_path}" /content/tmp/scala-2.12.18.deb
!cp "{spark_tgz_path}" /content/tmp/spark-3.4.1-bin-hadoop3.tgz

# Install Java if not already present
!java -version || apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Install Scala
!dpkg -i /content/tmp/scala-2.12.18.deb

# Extract Spark
!tar xf /content/tmp/spark-3.4.1-bin-hadoop3.tgz -C /content

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
os.environ["PATH"] += f":{os.environ['SPARK_HOME']}/bin"

# Confirm installation
!java -version
!scala -version
!scalac -version
!echo "Spark path: $SPARK_HOME"
!ls $SPARK_HOME

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Selecting previously unselected package scala.
(Reading database ... 126284 files and directories currently installed.)
Preparing to unpack /content/tmp/scala-2.12.18.deb ...
Unpacking scala (2.12.18-400) ...
Setting up scala (2.12.18-400) ...
Creating system group: scala
Creating system user: scala in scala with scala daemon-user and shell /bin/false
Processing triggers for man-db (2.10.2-1) ...
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
Scala code runner version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Scala compiler version 2.12.18 -- Copyright 2002-2023, LAMP/EPFL and Lightbend, Inc.
Spark path: /content/

## Test Hello World in Scala

In [3]:
!ls

drive  sample_data  spark-3.4.1-bin-hadoop3  tmp


In [4]:
!spark-shell

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/06 04:56:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://1efc78f4a3be:4040
Spark context available as 'sc' (master = local[*], app id = local-1754456202483).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.1
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.28)
Type in expressions to have them evaluated.
Type :help for more information.
[35m
scala> [0m

In [7]:
!ls

Assignment.java  offices.csv	   productlines.csv	    tmp
customers.csv	 orderdetails.csv  products.csv
drive		 orders.csv	   sample_data
employees.csv	 payments.csv	   spark-3.4.1-bin-hadoop3


In [9]:
%%writefile Assignment.java
import org.apache.spark.sql.*;
import java.util.*;

public class Assignment
{
  public static void main(String[] args)
  {
    SparkSession spark = SparkSession.builder().appName("Assignment").getOrCreate();
    DataSet<Row> customersDs = spark.read().option("header", "true").csv("customers.csv");
    DataSet<Row> ordersDs = spark.read().option("header", "true").csv("orders.csv");
    DataSet<Row> orderDetailsDs = spark.read().option("header", "true").csv("orderdetails.csv");
    DataSet<Row> productsDs = spark.read().option("header", "true").csv("products.csv");
    DataSet<Row> employeesDs = spark.read().option("header", "true").csv("employees.csv");
    DataSet<Row> officesDs = spark.read().option("header", "true").csv("offices.csv");
    DataSet<Row> productlineDs = spark.read().option("header", "true").csv("productlines.csv");
    DataSet<Row> paymentsDs = spark.read().option("header", "true").csv("payments.csv");

}
}

Overwriting Assignment.java


In [21]:
!ls

Assignment.java  drive	    sample_data		     tmp
csv_files	 paraquets  spark-3.4.1-bin-hadoop3


In [11]:
mkdir paraquets

In [36]:
%%writefile Assignment.java
import org.apache.spark.sql.*;
import java.util.*;

public class Assignment
{
  public static void main(String[] args)
  {
    SparkSession spark = SparkSession.builder().appName("Assignment").master("local[*]").getOrCreate();
    Dataset<Row> customersDs = spark.read().option("header", "true").csv("/content/csv_files/customers.csv");
    Dataset<Row> ordersDs = spark.read().option("header", "true").csv("/content/csv_files/orders.csv");
    Dataset<Row> orderDetailsDs = spark.read().option("header", "true").csv("/content/csv_files/orderdetails.csv");
    Dataset<Row> productsDs = spark.read().option("header", "true").csv("/content/csv_files/products.csv");
    Dataset<Row> employeesDs = spark.read().option("header", "true").csv("/content/csv_files/employees.csv");
    Dataset<Row> officesDs = spark.read().option("header", "true").csv("/content/csv_files/offices.csv");
    Dataset<Row> productlineDs = spark.read().option("header", "true").csv("/content/csv_files/productlines.csv");
    Dataset<Row> paymentsDs = spark.read().option("header", "true").csv("/content/csv_files/payments.csv");

    customersDs.write().parquet("/content/paraquets/customers.parquet");
    ordersDs.write().parquet("/content/paraquets/orders.parquet");
    orderDetailsDs.write().parquet("/content/paraquets/orderdetails.parquet");
    productsDs.write().parquet("/content/paraquets/products.parquet");
    employeesDs.write().parquet("/content/paraquets/employees.parquet");
    officesDs.write().parquet("/content/paraquets/offices.parquet");
    productlineDs.write().parquet("/content/paraquets/productlines.parquet");
    paymentsDs.write().parquet("/content/paraquets/payments.parquet");

    spark.stop();
}
}

Overwriting Assignment.java


In [37]:
!javac -cp "$(find /content/spark-3.4.1-bin-hadoop3/jars/ -name '*.jar' | tr '\n' ':')" Assignment.java

In [38]:
!java -cp ".:$(find /content/spark-3.4.1-bin-hadoop3/jars/ -name '*.jar' | tr '\n' ':')" \
  --add-exports java.base/sun.nio.ch=ALL-UNNAMED Assignment

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 05:21:40 INFO SparkContext: Running Spark version 3.4.1
25/08/06 05:21:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 05:21:41 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 05:21:41 INFO SparkContext: Submitted application: Assignment
25/08/06 05:21:41 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 05:21:41 INFO ResourceProfile: Limiting resource is cpu
25/08/06 05:21:41 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 05:21:41 INFO SecurityManager: Changing view acls to: root
25/08/06 05:21:41 INFO Sec

In [29]:
!ls

Assignment.class  csv_files  paraquets	  spark-3.4.1-bin-hadoop3
Assignment.java   drive      sample_data  tmp


In [30]:
!pwd

/content


In [47]:
ls

Assignment.class  [0m[01;34mcsv_files[0m/  [01;34mparaquets[0m/    [01;34mspark-3.4.1-bin-hadoop3[0m/
Assignment.java   [01;34mdrive[0m/      [01;34msample_data[0m/  [01;34mtmp[0m/


In [76]:
%%writefile task2.java
import org.apache.spark.sql.*;
import static org.apache.spark.sql.functions.*;
import java.util.*;

public class task2
{
  public static void main(String[] args)
  {
    SparkSession spark = SparkSession.builder().appName("Assignment").master("local[*]").getOrCreate();
    Dataset<Row> orderDs = spark.read().parquet("/content/paraquets/orders.parquet");
    Dataset<Row>productsDs = spark.read().parquet("/content/paraquets/products.parquet");
    Dataset<Row>orderDetailsDs = spark.read().parquet("/content/paraquets/orderdetails.parquet");
    Dataset<Row>topProductsDs = orderDetailsDs.groupBy("productCode").agg(sum("quantityOrdered").alias("quantityOrdered")).join(productsDs, "productCode").orderBy(desc("quantityOrdered")).limit(10);
    Dataset<Row> productRevenueDs = orderDetailsDs
                .withColumn("revenue", col("quantityOrdered").multiply(col("priceEach")))
                .join(productsDs, "productCode")
                .groupBy("productCode", "productName")
                .agg(sum("revenue").alias("productRevenue"))
                .orderBy(desc("productRevenue"));

    Dataset<Row>avgOrderValueDs = orderDs.join(orderDetailsDs,"orderNumber").groupBy(col("customerNumber")).agg(avg(col("quantityOrdered").multiply(col("priceEach"))).alias("avgOrderValue"));


    productRevenueDs.write().parquet("/content/paraquets/productRevenueDs.parquet");
    avgOrderValueDs.write().parquet("/content/paraquets/avgOrderValueDs.parquet");
    spark.stop();
}
}

Overwriting task2.java


In [77]:
!javac -cp "$(find /content/spark-3.4.1-bin-hadoop3/jars/ -name '*.jar' | tr '\n' ':')" task2.java

In [78]:
!java -cp ".:$(find /content/spark-3.4.1-bin-hadoop3/jars/ -name '*.jar' | tr '\n' ':')" \
  --add-exports java.base/sun.nio.ch=ALL-UNNAMED task2

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 06:31:37 INFO SparkContext: Running Spark version 3.4.1
25/08/06 06:31:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 06:31:37 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 06:31:37 INFO SparkContext: Submitted application: Assignment
25/08/06 06:31:37 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 06:31:37 INFO ResourceProfile: Limiting resource is cpu
25/08/06 06:31:37 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 06:31:37 INFO SecurityManager: Changing view acls to: root
25/08/06 06:31:37 INFO Sec

In [91]:
%%writefile task3.java
import org.apache.spark.sql.*;
import static org.apache.spark.sql.functions.*;
import java.util.*;

public class task3
{
  public static void main(String[] args)
  {
    SparkSession spark = SparkSession.builder().appName("Assignment").master("local[*]").getOrCreate();

    Dataset<Row> ordersDs = spark.read().parquet("/content/paraquets/orders.parquet");
    Dataset<Row> orderDetailsDs = spark.read().parquet("/content/paraquets/orderdetails.parquet");
    Dataset<Row> customersDs = spark.read().parquet("/content/paraquets/customers.parquet");
    Dataset<Row> employeesDs = spark.read().parquet("/content/paraquets/employees.parquet");
    Dataset<Row> officesDs = spark.read().parquet("/content/paraquets/offices.parquet");
    Dataset<Row> paymentsDs = spark.read().parquet("/content/paraquets/payments.parquet");


    Dataset<Row> totalRevenueCountryDs = customersDs.join(paymentsDs,"customerNumber").groupBy("country").agg(sum("amount"));
    totalRevenueCountryDs.show();
    totalRevenueCountryDs.write().mode("overwrite").parquet("/content/paraquets/totalRevenueCountryDs.parquet");
    spark.stop();
}
}

Overwriting task3.java


In [92]:
!javac -cp "$(find /content/spark-3.4.1-bin-hadoop3/jars/ -name '*.jar' | tr '\n' ':')" task3.java

In [93]:
!java -cp ".:$(find /content/spark-3.4.1-bin-hadoop3/jars/ -name '*.jar' | tr '\n' ':')" \
  --add-exports java.base/sun.nio.ch=ALL-UNNAMED task3

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 07:04:22 INFO SparkContext: Running Spark version 3.4.1
25/08/06 07:04:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 07:04:22 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 07:04:22 INFO SparkContext: Submitted application: Assignment
25/08/06 07:04:22 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 07:04:22 INFO ResourceProfile: Limiting resource is cpu
25/08/06 07:04:22 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 07:04:22 INFO SecurityManager: Changing view acls to: root
25/08/06 07:04:22 INFO Sec