<a href="https://colab.research.google.com/github/Alby-Benny-IBM/PySpark/blob/main/06_ScalaUseCase.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz

In [2]:
!wget -q https://github.com/scala/scala/releases/download/v2.12.18/scala-2.12.18.deb

In [3]:
!dpkg -i scala-2.12.18.deb

Selecting previously unselected package scala.
(Reading database ... 126284 files and directories currently installed.)
Preparing to unpack scala-2.12.18.deb ...
Unpacking scala (2.12.18-400) ...
Setting up scala (2.12.18-400) ...
Creating system group: scala
Creating system user: scala in scala with scala daemon-user and shell /bin/false
Processing triggers for man-db (2.10.2-1) ...


In [4]:
import os

In [5]:
# set environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [6]:
!pip install -q findspark

In [7]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .getOrCreate()
spark

In [8]:
!ls -R csv/

csv/:
customers.csv  offices.csv	 orders.csv    productlines.csv
employees.csv  orderdetails.csv  payments.csv  products.csv


In [39]:
%%writefile ClassicModelsApp.scala
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

object ClassicModelsApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ClassicModels Analytics")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val basePath = "/content/csv"

    // Load CSVs into DataFrames
    val customersDF     = loadCSV(spark, s"$basePath/customers.csv")
    val employeesDF     = loadCSV(spark, s"$basePath/employees.csv")
    val officesDF       = loadCSV(spark, s"$basePath/offices.csv")
    val orderDetailsDF  = loadCSV(spark, s"$basePath/orderdetails.csv")
    val ordersDF        = loadCSV(spark, s"$basePath/orders.csv")
    val paymentsDF      = loadCSV(spark, s"$basePath/payments.csv")
    val productLinesDF  = loadCSV(spark, s"$basePath/productlines.csv")
    val productsDF      = loadCSV(spark, s"$basePath/products.csv")

    // Create Temporary Views
    customersDF.createOrReplaceTempView("customers")
    ordersDF.createOrReplaceTempView("orders")
    orderDetailsDF.createOrReplaceTempView("orderdetails")
    productsDF.createOrReplaceTempView("products")
    paymentsDF.createOrReplaceTempView("payments")

    println("✅ Temporary views created.")

    // Logical Data Model Tables (selected columns)
    val productsTable = productsDF.select("productCode", "productName", "productLine", "quantityInStock", "buyPrice", "MSRP")
    val customersTable = customersDF.select("customerNumber", "customerName", "contactLastName", "phone", "city", "creditLimit")
    val ordersTable = ordersDF.select("orderNumber", "customerNumber", "orderDate", "status", "comments", "shippedDate")

    productsTable.createOrReplaceTempView("products_table")
    customersTable.createOrReplaceTempView("customers_table")
    ordersTable.createOrReplaceTempView("orders_table")

    println("✅ Logical data model created.")

    // Sample data insertion example (can be extended)
    val sampleProducts = Seq(
      ("S10_1949", "1952 Alpine Renault", "Classic Cars", 7300, 53.9, 95.7)
    ).toDF("productCode", "productName", "productLine", "quantityInStock", "buyPrice", "MSRP")

    sampleProducts.createOrReplaceTempView("sample_products")
    println("\nSample products inserted:")
    sampleProducts.show()

    // Transformations & Aggregations

    // 1. Total order value by customer
    val totalOrderValueByCustomer = ordersDF
      .join(orderDetailsDF, "orderNumber")
      .groupBy("customerNumber")
      .agg(round(sum(col("priceEach") * col("quantityOrdered")), 2).alias("total_order_value"))
      .orderBy(desc("total_order_value"))

    println("\n📊 Total Order Value by Customer:")
    totalOrderValueByCustomer.show(5)

    // 2. Products with lowest stock per product line
    val lowestStock = productsDF
      .groupBy("productLine")
      .agg(min("quantityInStock").alias("minStock"))
      .join(productsDF, Seq("productLine"))
      .where(col("quantityInStock") === col("minStock"))
      .select("productCode", "productLine", "productName", "quantityInStock")

    println("\n📉 Products with Lowest Stock:")
    lowestStock.show()

    // 3. Top 5 customers by total payments
    val topPayments = paymentsDF
      .groupBy("customerNumber")
      .agg(round(sum("amount"), 2).alias("totalPayment"))
      .join(customersDF, "customerNumber")
      .select("customerNumber", "customerName", "totalPayment")
      .orderBy(desc("totalPayment"))
      .limit(5)

    println("\n💰 Top 5 Customers by Total Payment Amount:")
    topPayments.show()

    // 4. Monthly Order Trends using Window functions
    val monthlyTrends = ordersDF
      .withColumn("month", date_format(col("orderDate"), "yyyy-MM"))
      .groupBy("month")
      .agg(count("*").alias("totalOrders"))
      .withColumn("rank", dense_rank().over(Window.orderBy(desc("totalOrders"))))

    println("\n📆 Monthly Order Trends:")
    monthlyTrends.show()

    // 5. SQL query on recent shipped orders
    val recentShippedOrders = spark.sql(
      """
        |SELECT o.orderNumber, c.customerName, o.status, o.orderDate
        |FROM orders o
        |JOIN customers c ON o.customerNumber = c.customerNumber
        |WHERE o.status = 'Shipped'
        |ORDER BY o.orderDate DESC
        |LIMIT 5
      """.stripMargin)

    println("\n📄 Recent Shipped Orders:")
    recentShippedOrders.show()

    // Validate relationships with joins (optional, can call separately)
    validateJoins(customersDF, employeesDF, officesDF, orderDetailsDF, ordersDF, paymentsDF, productLinesDF, productsDF)

    // Save outputs as CSV files
    val outputPath = "/content/output_csv"

    totalOrderValueByCustomer
      .coalesce(1)
      .write.option("header", "true")
      .mode("overwrite")
      .csv(s"$outputPath/total_order_value")

    lowestStock
      .coalesce(1)
      .write.option("header", "true")
      .mode("overwrite")
      .csv(s"$outputPath/lowest_stock")

    topPayments
      .coalesce(1)
      .write.option("header", "true")
      .mode("overwrite")
      .csv(s"$outputPath/top_customers_by_payment")

    monthlyTrends
      .coalesce(1)
      .write.option("header", "true")
      .mode("overwrite")
      .csv(s"$outputPath/monthly_order_trends")

    recentShippedOrders
      .coalesce(1)
      .write.option("header", "true")
      .mode("overwrite")
      .csv(s"$outputPath/recent_shipped_orders")

    println(s"\n✅ All important DataFrames saved to: $outputPath")

    spark.stop()
  }

  def loadCSV(spark: SparkSession, path: String): DataFrame = {
    spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(path)
  }

  def validateJoins(
      customers: DataFrame,
      employees: DataFrame,
      offices: DataFrame,
      orderDetails: DataFrame,
      orders: DataFrame,
      payments: DataFrame,
      productLines: DataFrame,
      products: DataFrame
  ): Unit = {
    println("\n🔗 Validating Relationships Using Joins:")

    println("\n➡️ orders JOIN customers")
    val ordersCustomers = orders.join(customers, "customerNumber")
    ordersCustomers.select("orderNumber", "customerName", "status").show(5)

    println("\n➡️ orderdetails JOIN orders")
    val orderDetailsOrders = orderDetails.join(orders, "orderNumber")
    orderDetailsOrders.select("orderNumber", "productCode", "quantityOrdered").show(5)

    println("\n➡️ employees JOIN offices")
    val employeesOffices = employees.join(offices, "officeCode")
    employeesOffices.select("employeeNumber", "firstName", "officeCode", "city").show(5)

    println("\n➡️ payments JOIN customers")
    val paymentsCustomers = payments.join(customers, "customerNumber")
    paymentsCustomers.select("customerName", "checkNumber", "amount").show(5)

    println("\n➡️ products JOIN productlines")
    val productsProductLines = products.join(productLines, "productLine")
    productsProductLines.select("productCode", "productLine", "productName").show(5)
  }
}


Overwriting ClassicModelsApp.scala


In [42]:
!scalac -classpath "$SPARK_HOME/jars/*" ClassicModelsApp.scala

In [45]:
!!scala -J-Xmx1g -classpath ".:$SPARK_HOME/jars/*" ClassicModelsApp

["Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties",
 '25/08/04 10:47:43 INFO SparkContext: Running Spark version 3.4.1',
 '25/08/04 10:47:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',
 '25/08/04 10:47:43 INFO ResourceUtils: No custom resources configured for spark.driver.',
 '25/08/04 10:47:43 INFO SparkContext: Submitted application: ClassicModels Analytics',
 '25/08/04 10:47:43 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)',
 '25/08/04 10:47:43 INFO ResourceProfile: Limiting resource is cpu',
 '25/08/04 10:47:43 INFO ResourceProfileManager: Added ResourceProfile id: 0',
 '25/08/04 10:47:43 INFO SecurityManager: Changin