# Packages & Spark Setup

In [1]:
import $ivy.`org.apache.spark::spark-sql:3.3.4`

[32mimport [39m[36m$ivy.$                                  [39m

In [2]:
org.apache.spark.SPARK_VERSION
scala.util.Properties.versionNumberString

[36mres1_0[39m: [32mString[39m = [32m"3.3.4"[39m
[36mres1_1[39m: [32mString[39m = [32m"2.13.8"[39m

In [3]:
import org.apache.spark.sql.NotebookSparkSession
import org.apache.spark.sql.SparkSession

[32mimport [39m[36morg.apache.spark.sql.NotebookSparkSession
[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession[39m

In [4]:
val spark = SparkSession.builder()
    .appName("helloJupyter")
    .master(s"spark://spark-localhost:7077")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
val sc = spark.sparkContext

sc.setLogLevel("WARN")
import spark.implicits._

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties


25/12/25 13:47:55 INFO SparkContext: Running Spark version 3.3.4
25/12/25 13:47:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/25 13:47:55 INFO ResourceUtils: No custom resources configured for spark.driver.
25/12/25 13:47:55 INFO SparkContext: Submitted application: helloJupyter
25/12/25 13:47:55 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 4096, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/12/25 13:47:55 INFO ResourceProfile: Limiting resource is cpu
25/12/25 13:47:55 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/12/25 13:47:55 INFO SecurityManager: Changing view acls to: ubuntu
25/12/25 13:47:55 INFO SecurityManager: Changing modify acls to: ubuntu
25/12/25 13:47:55 INFO Security

[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@3b1e3f1b
[36msc[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mSparkContext[39m = org.apache.spark.SparkContext@1b61a8ff
[32mimport [39m[36mspark.implicits._[39m

In [5]:
kernel.silent(false)

# Interactive Spark

In [5]:
// val data = Seq(("sue", 32), ("li", 3), ("bob", 75), ("heo", 13))
// val df = data.toDF("first_name", "age")

// df.show()

# Task 1: Trip Flow Efficiency

**Goal:** Rank the top 20 busiest **Pickup  Drop-off** pairs during weekday rush hours.

* **Filter:** Weekdays + Rush Hour windows.
* **Key:** `(PULocationID, DOLocationID)`.
* **Metrics:** Count, Avg Duration, Avg Speed, Avg Fare/Mile.
* **RDD Ops:** `filter`  `map` (to composite key)  `reduceByKey` (summing metrics)  `mapValues` (calculating averages)  `takeOrdered`.

In [6]:
import org.apache.spark.sql.{functions => F, types => T}
import org.apache.log4j.Logger
import org.apache.spark.sql.{SparkSession, DataFrame}

[32mimport [39m[36morg.apache.spark.sql.{functions => F, types => T}
[39m
[32mimport [39m[36morg.apache.log4j.Logger
[39m
[32mimport [39m[36morg.apache.spark.sql.{SparkSession, DataFrame}[39m

In [7]:
object SchemaEnforcer {
    // format: off
    val schema = T.StructType(
      Seq(
        T.StructField("VendorID", T.IntegerType, nullable = true),
        T.StructField("tpep_pickup_datetime", T.TimestampType, nullable = true),
        T.StructField("tpep_dropoff_datetime", T.TimestampType, nullable = true),
        T.StructField("passenger_count", T.DoubleType, nullable = true),
        T.StructField("trip_distance", T.DoubleType, nullable = true),
        T.StructField("RatecodeID", T.LongType, nullable = true),
        T.StructField("store_and_fwd_flag", T.StringType, nullable = true),
        T.StructField("PULocationID", T.IntegerType, nullable = true),
        T.StructField("DOLocationID", T.IntegerType, nullable = true),
        T.StructField("payment_type", T.LongType, nullable = true),
        T.StructField("fare_amount", T.DoubleType, nullable = true),
        T.StructField("extra", T.DoubleType, nullable = true),
        T.StructField("mta_tax", T.DoubleType, nullable = true),
        T.StructField("tip_amount", T.DoubleType, nullable = true),
        T.StructField("tolls_amount", T.DoubleType, nullable = true),
        T.StructField("improvement_surcharge", T.DoubleType, nullable = true),
        T.StructField("total_amount", T.DoubleType, nullable = true),
        T.StructField("congestion_surcharge", T.DoubleType, nullable = true),
        T.StructField("airport_fee", T.DoubleType, nullable = true)
      )
    )
    // format: on
    private def normName(name: String): String = {
      name.toLowerCase().replaceAll("_|-", "")
    }
    private val normNameTypePair =
      this.schema.map(f => (normName(f.name), f.name, f.dataType))

    def enforce(df: DataFrame)(implicit log: Logger): DataFrame = {
      val dfNormNameToRawPair =
        df.schema.map(f => this.normName(f.name) -> (f.name, f.dataType)).toMap

      val newCols = normNameTypePair.map { case (norm, name, dataType) =>
        val (ogName, ogDataType) = dfNormNameToRawPair(norm)
        var newCol = F.col(ogName)
        if (name != ogName) {
          log.info(s"Renaming '$ogName' to '$name'")
          newCol = newCol.alias(name)
        }
        if (dataType != ogDataType) {
          log.warn(s"Casting '$name' from $ogDataType to $dataType")
          newCol = newCol.cast(dataType)
        }
        newCol
      }
      df.select(newCols: _*)
    }
  }


defined [32mobject[39m [36mSchemaEnforcer[39m

In [8]:
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration

val conf = new Configuration()
val fs = FileSystem.get(conf)

val parquetPaths = fs
  .globStatus(
    new Path("/workspaces/*/data/taxi/yellow_tripdata_2023-*.parquet")
  )
  .map(_.getPath().toString())

[32mimport [39m[36morg.apache.hadoop.fs.{FileSystem, Path}
[39m
[32mimport [39m[36morg.apache.hadoop.conf.Configuration

[39m
[36mconf[39m: [32mConfiguration[39m = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-rbf-default.xml, hdfs-site.xml, hdfs-rbf-site.xml
[36mfs[39m: [32mFileSystem[39m = org.apache.hadoop.fs.LocalFileSystem@75504c3
[36mparquetPaths[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"file:/workspaces/learning-scala/data/taxi/yellow_tripdata_2023-01.parquet"[39m,
  [32m"file:/workspaces/learning-scala/data/taxi/yellow_tripdata_2023-02.parquet"[39m,
  [32m"file:/workspaces/learning-scala/data/taxi/yellow_tripdata_2023-03.parquet"[39m,
  [32m"file:/workspaces/learning-scala/data/taxi/yellow_tripdata_2023-04.parquet"[39m,
  [32m"file:/workspaces/learning-scala/data/taxi/yellow_tripdata_2023-05.parquet"[39m
)

In [9]:
implicit val log = Logger.getLogger(this.getClass())

[36mlog[39m: [32mLogger[39m = org.apache.log4j.Logger@24ac8818

In [10]:
import org.apache.spark.storage.StorageLevel

[32mimport [39m[36morg.apache.spark.storage.StorageLevel[39m

In [11]:
val parquets = parquetPaths.map(p => spark.read.parquet(p))
val normParquets = parquets.map(SchemaEnforcer.enforce)
val taxi = normParquets.reduce((a, b) => a.unionByName(b)).cache() // .persist(StorageLevel.MEMORY_ONLY)

25/12/25 13:48:05 WARN cmd8$Helper: Casting 'VendorID' from LongType to IntegerType
25/12/25 13:48:05 WARN cmd8$Helper: Casting 'RatecodeID' from DoubleType to LongType
25/12/25 13:48:05 WARN cmd8$Helper: Casting 'PULocationID' from LongType to IntegerType
25/12/25 13:48:05 WARN cmd8$Helper: Casting 'DOLocationID' from LongType to IntegerType
25/12/25 13:48:05 WARN cmd8$Helper: Casting 'passenger_count' from LongType to DoubleType
25/12/25 13:48:05 WARN cmd8$Helper: Casting 'passenger_count' from LongType to DoubleType
25/12/25 13:48:05 WARN cmd8$Helper: Casting 'passenger_count' from LongType to DoubleType
25/12/25 13:48:05 WARN cmd8$Helper: Casting 'passenger_count' from LongType to DoubleType


[36mparquets[39m: [32mArray[39m[[32mDataFrame[39m] = [33mArray[39m(
  [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields],
  [VendorID: int, tpep_pickup_datetime: timestamp ... 17 more fields],
  [VendorID: int, tpep_pickup_datetime: timestamp ... 17 more fields],
  [VendorID: int, tpep_pickup_datetime: timestamp ... 17 more fields],
  [VendorID: int, tpep_pickup_datetime: timestamp ... 17 more fields]
)
[36mnormParquets[39m: [32mArray[39m[[32mDataFrame[39m] = [33mArray[39m(
  [VendorID: int, tpep_pickup_datetime: timestamp ... 17 more fields],
  [VendorID: int, tpep_pickup_datetime: timestamp ... 17 more fields],
  [VendorID: int, tpep_pickup_datetime: timestamp ... 17 more fields],
  [VendorID: int, tpep_pickup_datetime: timestamp ... 17 more fields],
  [VendorID: int, tpep_pickup_datetime: timestamp ... 17 more fields]
)
[36mtaxi[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[3

In [16]:
val pickupsPerHour = taxi
  .select(
    $"tpep_pickup_datetime"
  )
  .filter(F.dayofweek($"tpep_pickup_datetime").between(2, 6))
  .withColumn("hour", F.hour($"tpep_pickup_datetime"))
  .groupBy($"hour")
  .agg(F.count($"tpep_pickup_datetime").alias("pickups_per_hour"))

[36mpickupsPerHour[39m: [32mDataFrame[39m = [hour: int, pickups_per_hour: bigint]

In [12]:
// pickupsPerHour.show(50)

In [17]:
val percentile = pickupsPerHour.select(F.percentile_approx(
  $"pickups_per_hour",
  F.lit(0.9),
  F.lit(10000)
)).as[Long].head

[36mpercentile[39m: [32mLong[39m = [32m774615L[39m

In [18]:
val rushHours = pickupsPerHour.filter(
  $"pickups_per_hour" > percentile
)

[36mrushHours[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [hour: int, pickups_per_hour: bigint]

In [19]:
rushHours.show()

+----+----------------+
|hour|pickups_per_hour|
+----+----------------+
|  17|          821064|
|  18|          876583|
+----+----------------+



In [31]:
val rushHoursArr = rushHours.select($"hour").as[Int].collect

[36mrushHoursArr[39m: [32mArray[39m[[32mInt[39m] = [33mArray[39m([32m17[39m, [32m18[39m)

In [34]:
taxi.printSchema

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [55]:
val duration_minutes = (F.unix_timestamp($"tpep_dropoff_datetime") - F.unix_timestamp($"tpep_pickup_datetime")) / 60
val rushHourStatistics = taxi
  .select($"*", F.hour($"tpep_pickup_datetime").alias("hour"))
  .filter($"hour".isInCollection(rushHoursArr))
  .groupBy($"PULocationID", $"DOLocationID")
  .agg(
    F.count($"*").alias("count"),
    F.avg(duration_minutes).alias("avg_duration_minutes"),
    F.avg($"trip_distance" / (duration_minutes / 60)).alias("avg_mph"),
    F.avg($"fare_amount" / $"trip_distance").alias("avg_fare_per_mile")
  ).cache()

[36mduration_minutes[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mColumn[39m = ((unix_timestamp(tpep_dropoff_datetime, yyyy-MM-dd HH:mm:ss) - unix_timestamp(tpep_pickup_datetime, yyyy-MM-dd HH:mm:ss)) / 60)
[36mrushHourStatistics[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [PULocationID: int, DOLocationID: int ... 4 more fields]

In [53]:
val zones = spark.read
  .option("header", "true")
  .csv("/workspaces/*/data/taxi/taxi_zone_lookup.csv")
  .select(
    $"LocationID".cast(T.IntegerType),
    $"Borough",
    $"Zone"
  )

[36mzones[39m: [32mDataFrame[39m = [LocationID: int, Borough: string ... 1 more field]

In [66]:
zones.printSchema

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)



In [83]:
val rushHourStatisticsWithZones = rushHourStatistics
  .join(
    zones.select(
      $"LocationID".alias("PULocationID"),
      $"Borough".alias("PU_Borough"),
      $"Zone".alias("PU_Zone")
    ),
    Seq("PULocationID"),
    "left"
  )
  .join(
    zones.select(
      $"LocationID".alias("DOLocationID"),
      $"Borough".alias("DO_Borough"),
      $"Zone".alias("DO_Zone")
    ),
    Seq("DOLocationID"),
    "left"
  ).cache()
rushHourStatisticsWithZones.printSchema

root
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- count: long (nullable = false)
 |-- avg_duration_minutes: double (nullable = true)
 |-- avg_mph: double (nullable = true)
 |-- avg_fare_per_mile: double (nullable = true)
 |-- PU_Borough: string (nullable = true)
 |-- PU_Zone: string (nullable = true)
 |-- DO_Borough: string (nullable = true)
 |-- DO_Zone: string (nullable = true)



[36mrushHourStatisticsWithZones[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [DOLocationID: int, PULocationID: int ... 8 more fields]

In [84]:
rushHourStatisticsWithZones.orderBy($"count".desc).limit(20).show(truncate=false)

+------------+------------+-----+--------------------+------------------+------------------+----------+---------------------+----------+-------------------------+
|DOLocationID|PULocationID|count|avg_duration_minutes|avg_mph           |avg_fare_per_mile |PU_Borough|PU_Zone              |DO_Borough|DO_Zone                  |
+------------+------------+-----+--------------------+------------------+------------------+----------+---------------------+----------+-------------------------+
|264         |264         |17558|17.322539583096017  |16.61390159724997 |18.59398844705124 |Unknown   |N/A                  |Unknown   |N/A                      |
|236         |237         |16264|7.6514008444007215  |10.769124136880796|8.085515046210366 |Manhattan |Upper East Side South|Manhattan |Upper East Side North    |
|237         |236         |15717|8.344477317554237   |14.122857957733672|9.045409279918841 |Manhattan |Upper East Side North|Manhattan |Upper East Side South    |
|237         |237     

---

# Task 2: Demand Volatility

**Goal:** Rank pickup zones by how much their trip volume fluctuates hourly.

* **Step 1:** Count trips per `(Zone, Hour)` using `reduceByKey`.
* **Step 2:** Regroup by `Zone` only.
* **Step 3:** Calculate **Variance** or **StdDev** of the hourly counts.
* **RDD Ops:** `map` (to hour)  `reduceByKey`  `aggregateByKey` (to compute stats)  `sortBy`.

---

# Comparison Goal

Implement both using **RDDs** (manual logic) vs. **DataFrames** (SQL/Optimization) to compare:

1. **Code Complexity:** Lines of code.
2. **Performance:** Execution time and shuffle size in Spark UI.

**Would you like the specific mathematical formulas for the RDD variance calculation?**

In [4]:
val a = List(1,2,3)

(console):2:7 expected (`this` | Id)
a.map.val
      ^

: 