# 0. Import Packages

In [1]:
import org.apache.spark.SparkFiles
import org.apache.spark.sql.functions._

# 1. Data Loading

In [2]:
val zone_lookup = spark
                        .read
                        .format("csv")
                        .option("header","true")
                        .csv("gs://82976-testbucket/yellow taxi trip data/taxi_zone_lookup.csv")

Waiting for a Spark session to start...

zone_lookup = [LocationID: string, Borough: string ... 2 more fields]


[LocationID: string, Borough: string ... 2 more fields]

In [3]:
zone_lookup.printSchema

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [3]:
//get yellow trip data 
val all_taxi = "gs://82976-testbucket/yellow taxi trip data/yellow_tripdata_2024-*.parquet"
val yellow_tripdata = spark.read.parquet(all_taxi)

all_taxi = gs://82976-testbucket/yellow taxi trip data/yellow_tripdata_2024-*.parquet
yellow_tripdata = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 17 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 17 more fields]

In [5]:
yellow_tripdata.printSchema

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [6]:
// show first five rows 
yellow_tripdata.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [1]:
// count number of rows
val totalRows = yellow_tripdata.count()
println(totalRows)


41169720


totalRows = 41169720


41169720

# 2. Data Cleaning
- 2a. Missing Data Handling 
- 2b. Miscellaneous Data Cleaning


## 2a. Missing Data

In [7]:
// count null values per column

val nullCounts =
  yellow_tripdata.select(
    yellow_tripdata.columns.map(c => sum(col(c).isNull.cast("int")).alias(c)): _*
  )

nullCounts.show(false)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|0       |0                   |0                    |4091232        |0            |4091232   |4091232           |0           |0           |0           |0          |0    |0      |0        

nullCounts = [VendorID: bigint, tpep_pickup_datetime: bigint ... 17 more fields]


[VendorID: bigint, tpep_pickup_datetime: bigint ... 17 more fields]

10% of the values are missing in the following columns
- passenger_count
- RatecodeID
- store_and_fwd_flag
- congestion_surcharge
- airport_fee

However, none of the null value need to be removed. Instead, we can replace them.

**passenger_count**
- Replace null values to 1. We don't have other signal on passenger counts.

**RatecodeID**
- From the data dictionary, we know RatecodeID 99 is Null or unknown. If the value is null, we replace with 99

**store_and_fwd_flag**
- This flag indicates Yes for store and forward trip and No for not a store and forward trip. We can keep the null as is, as null would indicate no information

**congestion_surcharge**
- The congestion fee is a per-trip fee when vehicles start, end, or pass through Manhattan south of 96th street.
- The fee for taxi is 2.5, FHVs (Uber or Lyft) 2.75, and pool rides (shared rides) 0.75
- I will first identify (PULocationID, DOLocationID) pairs where the congestion surcharge is observed as 2.5 (and summarize how frequent each pair is).
- Then, I will look at rows where congestion_surcharge is null. If a NULL row has an location pair that appears in the “2.5” set, I will fill the NULL with 2.5.
- While I can't refer if the ride was taxi, FHVs, or pool rides, most values are taxis so I will replace by 2.5
- Otherwise, replace with 0 

**airport_fee**
- The airport fee is 1.75 for pick up only at LaGuardia and John F. Kennedy Airports
- If the fee is missing but PULocationID is JFK (132) or LaGuardia (138), we can replace the missing values to 1.75
- Otherwise, we can replace null values with 0

In [5]:
val yellow_tripdata_missing =
  yellow_tripdata
    // replace passenger count
    .withColumn(
      "passenger_count",
      when(col("passenger_count").isNull, lit(1L)).otherwise(col("passenger_count"))
    )
    // replace ratecodeID
    .withColumn(
      "RatecodeID",
      when(col("RatecodeID").isNull, lit(99L)).otherwise(col("RatecodeID"))
    )

yellow_tripdata_missing = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 17 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 17 more fields]

In [6]:
// if congestion surcharge is null and pair is known to have 2.5, fill with 2.5
val pairsWith25 =yellow_tripdata_missing
    .filter(col("congestion_surcharge") === lit(2.5))
    .select("PULocationID", "DOLocationID")
    .distinct()

val yellow_tripdata_missing2 = yellow_tripdata_missing
    .join(
      broadcast(pairsWith25.withColumn("has_25", lit(true))),
      Seq("PULocationID", "DOLocationID"),
      "left"
    )
    .withColumn(
      "congestion_surcharge",
      when(col("congestion_surcharge").isNull && col("has_25") === true, lit(2.5))
        .when(col("congestion_surcharge").isNull, lit(0.0))
        .otherwise(col("congestion_surcharge"))
    )
    .drop("has_25")

pairsWith25 = [PULocationID: int, DOLocationID: int]
yellow_tripdata_missing2 = [PULocationID: int, DOLocationID: int ... 17 more fields]


[PULocationID: int, DOLocationID: int ... 17 more fields]

In [7]:
val yellow_tripdata_cleaned = yellow_tripdata_missing2
    //132 is JFK, 138 is Laguardia
    .withColumn("Airport_fee", when(col("Airport_fee").isNull && col("PULocationID").isin(132, 138), lit(1.75))
    .when(col("Airport_fee").isNull, lit(0.0))
    .otherwise(col("Airport_fee")))

yellow_tripdata_cleaned = [PULocationID: int, DOLocationID: int ... 17 more fields]


[PULocationID: int, DOLocationID: int ... 17 more fields]

In [38]:
// see the null counts: every column except for store_and_fwd_flag should be 0 
val nullCounts_missing =
  yellow_tripdata_cleaned.select(
    yellow_tripdata_cleaned.columns.map(c => sum(col(c).isNull.cast("int")).alias(c)): _*
  )

nullCounts_missing.show(false)

+------------+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|PULocationID|DOLocationID|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+------------+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|0           |0           |0       |0                   |0                    |0              |0            |0         |4091232           |0           |0          |0    |0      |0        

nullCounts_missing = [PULocationID: bigint, DOLocationID: bigint ... 17 more fields]


[PULocationID: bigint, DOLocationID: bigint ... 17 more fields]

## 2b. Miscellaneous Values


**Pickup Time & Dropoff Time**
- If the pickup timestamp is later than the dropoff timestamp, treat that as a data entry issue and swap the two values so pickup occurs before dropoff.

**Fare Amount**
- Flag “unusual” fares when the fare is negative or below the typical NYC taxi base fare threshold
- If fare_amount is between 0 and 2.9:
    - If trip_distance = 0, keep the fare (it may reflect a canceled/failed trip or a zero-distance record).
    - If trip_distance > 0, replace the fare with 3.0 (minimum fare assumption)
- If fare_amount < 0, treat it as a refund/adjustment and set it to 0.

**Extra, MTA Tax, Tip Amount, Tolls Amount, Improvement Surcharge, Congestion Surcharge, Airport Fee**
- Miscellaneous value is when the values are negative. Similar to fare amounts, we can assume that the value was refunded, so we can replace negative fares with 0.

**Airport Fee**
- Airport fee only exists for JFK and LGA.
- If Airport_fee > 0 but PULocationID is not 132 or 138, set Airport_fee to 0.
 
**Total Amount**
- If total_amount <= 0  we will replace with the following equation:
    - total_amount = fare_amount + extra + mta_tax + tip_amount + tolls_amount + congestion_surcharge + airport_fee
    


In [8]:
val yellow_tripdata_time = 
yellow_tripdata_cleaned
    .withColumn("_pickup_tmp",  col("tpep_pickup_datetime"))
    .withColumn("_dropoff_tmp", col("tpep_dropoff_datetime"))
// when pickup time is greater than dropoff time, replace with dropoff time
    .withColumn(
      "tpep_pickup_datetime",
      when(col("_pickup_tmp") > col("_dropoff_tmp"), col("_dropoff_tmp"))
        .otherwise(col("_pickup_tmp")))
// replace dropoff time with pickup time for above values
    .withColumn(
      "tpep_dropoff_datetime",
      when(col("_pickup_tmp") > col("_dropoff_tmp"), col("_pickup_tmp"))
        .otherwise(col("_dropoff_tmp")))
    .drop("_pickup_tmp", "_dropoff_tmp")

yellow_tripdata_time = [PULocationID: int, DOLocationID: int ... 17 more fields]


[PULocationID: int, DOLocationID: int ... 17 more fields]

In [10]:
val yellow_tripdata_fare =
yellow_tripdata_time
    .withColumn(
      "fare_amount",
        // replace negative values to 0 
      when(col("fare_amount") < 0, lit(0.0))
        // for 0 to 2.9 and when trip distance is greater than 0, replace with 3
        .when(col("fare_amount").between(0.0, 2.9) && col("trip_distance") > 0, lit(3.0))
        .otherwise(col("fare_amount")))

yellow_tripdata_fare = [PULocationID: int, DOLocationID: int ... 17 more fields]


[PULocationID: int, DOLocationID: int ... 17 more fields]

In [11]:
val feeCols = Seq("extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "congestion_surcharge", "airport_fee")
val yellow_tripdata_fees =
// overwrite existing columns
feeCols.foldLeft(yellow_tripdata_fare) { (d, c) =>
    d.withColumn(c, when(col(c) < 0, lit(0.0)).otherwise(col(c)))}


feeCols = List(extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, congestion_surcharge, airport_fee)
yellow_tripdata_fees = [PULocationID: int, DOLocationID: int ... 17 more fields]


[PULocationID: int, DOLocationID: int ... 17 more fields]

In [12]:
val yellow_tripdata_airport =
yellow_tripdata_fees.withColumn(
    "airport_fee",
      //airport fee only exist for 132 and 138 , otherwise replace with 0 
    when(col("airport_fee") > 0 && !col("PULocationID").isin(132, 138), lit(0.0))
      .otherwise(col("airport_fee"))
  )

yellow_tripdata_airport = [PULocationID: int, DOLocationID: int ... 17 more fields]


[PULocationID: int, DOLocationID: int ... 17 more fields]

In [14]:
// recompute total amount for when total amount is less than 0 
val recomputedTotal =
    coalesce(col("fare_amount"), lit(0.0)) +
    coalesce(col("extra"), lit(0.0)) +
    coalesce(col("mta_tax"), lit(0.0)) +
    coalesce(col("tip_amount"), lit(0.0)) +
    coalesce(col("tolls_amount"), lit(0.0)) +
    coalesce(col("congestion_surcharge"), lit(0.0)) +
    coalesce(col("improvement_surcharge"), lit(0.0)) +
    coalesce(col("airport_fee"), lit(0.0))

// update data
val yellow_tripdata_cleaned =
  yellow_tripdata_airport.withColumn(
    "total_amount",
    when(col("total_amount") <= 0 || col("total_amount").isNull, recomputedTotal)
      .otherwise(col("total_amount")))

recomputedTotal = (((((((coalesce(fare_amount, 0.0) + coalesce(extra, 0.0)) + coalesce(mta_tax, 0.0)) + coalesce(tip_amount, 0.0)) + coalesce(tolls_amount, 0.0)) + coalesce(congestion_surcharge, 0.0)) + coalesce(improvement_surcharge, 0.0)) + coalesce(airport_fee, 0.0))
yellow_tripdata_cleaned = [PULocationID: int, DOLocationID: int ... 17 more fields]


[PULocationID: int, DOLocationID: int ... 17 more fields]

# 3. Create New Features

**Operational Efficiency and Unit Economics**
- Duration is the baseline operational efficiency metric. It captures time-to-serve a trip, supports congestion/inefficiency monitoring, and is required to compute speed and revenue productivity per unit time.
    - duration_sec = dropoff_ts − pickup_ts (in seconds)
    - duration_min = duration_sec / 60
    - duration_hour = duration_sec / 3600
- Speed is a direct proxy for roadway congestion and operational efficiency. Low speed indicates time wasted per mile, which can depress throughput and profitability. By analyzing speed by zone and hour, we can identify where and when service slows down
    - speed_mph = trip_distance / duration_hour (only when duration_hour > 0)
- Revenue per mile is a unit-economics metric that normalizes for trip length. It helps identify high-value trip types and lanes and supports comparisons across zones and times. This is a practical proxy for route-level profitability when cost data is unavailable
    - revenue_per_mile = net_revenue / trip_distance (only when trip_distance > 0)
        - where net_revenue = total_amount - tolls_amount
- Revenue per minute measures productivity per unit time and is especially important when traffic conditions vary. It is a strong lever for evaluating peak-period performance and for comparing short, slow trips vs longer, faster trips 
    - revenue_per_minute = net_revenue / duration_min (only when duration_min > 0
- Tip rate is a proxy for rider satisfaction, rider mix, and payment behavior, and can be analyzed by time/zone to detect patterns (ex. higher tipping in certain boroughs or time windows). It can also serve as a quality-of-service signal in the absence of explicit rating data.
    - tip_rate = tip_amount / total_amount (only when total_amount > 0)
- Revenue without tolls amount improves comparability across routes/zone because it does not represent value that can be retained
    - net_revenue_ex_tolls = total_amount - tolls_amount
    
**Time & seasonality for Demand and Operations Planning**

- Mobility demand and traffic conditions are highly time-dependent. Hourly segmentation enables peak/off-peak analysis.
    - hour_of_day   
- Weekday vs weekend patterns drive demand, trip mix, and efficiency (traffic). 
    - day_of_week
- A simple weekend indicator improves interpretability and supports clean slicing for reporting and dashboards,
    - is_weekend
- Peak commute windows capture predictable demand surges and congestion periods. This feature enables direct comparison of unit economics and efficiency during peak vs non-peak, a common operational lever in mobility marketplaces.
    - is_peak_commute = (NOT weekend) AND (hour_of_day between 7–10 OR 16–19)



**Policy**

- Tolls are a major driver of customer price and vary by route.
    - has_tolls = (tolls_amount > 0)
- Congestion fees change trip totals and reflect travel through regulated zones
    - has_congestion_fee = (congestion_surcharge > 0)

In [17]:
val yellow_tripdata_transformed = 
yellow_tripdata_cleaned
// first rename columns
    .withColumnRenamed("tpep_pickup_datetime", "pickup_time")
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_time")

// Below is operational efficiency
// create duration in sec
    .withColumn("duration_sec", expr("timestampdiff(SECOND, pickup_time, dropoff_time)"))
// create duration in minute
    .withColumn("duration_min", col("duration_sec") / 60.0)
// create duration in hour
    .withColumn("duration_hr", col("duration_sec") / 3600.0)
// create speed
    .withColumn("speed_mph",
          when(col("duration_hr") > 0, col("trip_distance") / col("duration_hr")).otherwise(lit(null)))
// create rev - tolls
    .withColumn("net_revenue_ex_tolls",
          col("total_amount") - coalesce(col("tolls_amount"), lit(0.0)))
// create revenue per mile
    .withColumn("revenue_per_mile",
          when(col("trip_distance") > 0, col("net_revenue_ex_tolls") / col("trip_distance")).otherwise(lit(null)))
// create revenue per minute 
    .withColumn("revenue_per_min",
          when(col("duration_min") > 0, col("net_revenue_ex_tolls") / col("duration_min")).otherwise(lit(null)))
// create tip rate 
    .withColumn("tip_rate",
          when(col("total_amount") > 0, col("tip_amount") / col("total_amount")).otherwise(lit(null)))

// Below is time and seasonality 
// create hour of day
    .withColumn("hour_of_day", hour(col("pickup_time")))
// create day of week
    .withColumn("day_of_week", dayofweek(col("pickup_time")))
// create weekend 
    .withColumn("is_weekend", col("day_of_week").isin(1, 7))
// create peak commute
    .withColumn( "is_peak_commute", (!col("is_weekend")) && (col("hour_of_day").between(7, 10) || col("hour_of_day").between(16, 19)))

// Below is policy

    .withColumn("has_tolls",      coalesce(col("tolls_amount"), lit(0.0)) > 0)
    .withColumn("has_congestion_fee", coalesce(col("congestion_surcharge"), lit(0.0)) > 0)
    



lastException = null
yellow_tripdata_transformed = [PULocationID: int, DOLocationID: int ... 31 more fields]


[PULocationID: int, DOLocationID: int ... 31 more fields]

In [18]:
// see schema 
yellow_tripdata_transformed.printSchema

root
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- pickup_time: timestamp_ntz (nullable = true)
 |-- dropoff_time: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- duration_sec: long (nullable = true)
 |-- duration_min: double (nullable = true)
 |-- duration_hr: double (nullable = true)
 |-- speed_mph:

# 4. Join Zone data and Taxi data
Join with NYC Zone Lookup Table to get the Borough, Zone Name, and Service Zone for the pickup and dropoff

In [19]:
// create data for pick up locations 
val pu_zone = zone_lookup
  .withColumnRenamed("LocationID","PULocationID")
  .withColumnRenamed("borough","pu_borough")
  .withColumnRenamed("zone","pu_zone")
  .withColumnRenamed("service_zone","pu_service_zone")

pu_zone = [PULocationID: string, pu_borough: string ... 2 more fields]


[PULocationID: string, pu_borough: string ... 2 more fields]

In [20]:
// create data for drop off locations
val do_zone = zone_lookup
  .withColumnRenamed("LocationID","DOLocationID")
  .withColumnRenamed("borough","do_borough")
  .withColumnRenamed("zone","do_zone")
  .withColumnRenamed("service_zone","do_service_zone")

do_zone = [DOLocationID: string, do_borough: string ... 2 more fields]


[DOLocationID: string, do_borough: string ... 2 more fields]

In [21]:
// join pick up location, drop off location with yellow taxi trip data
val merged = yellow_tripdata_transformed
  .join(pu_zone, Seq("PULocationID"), "left")
  .join(do_zone, Seq("DOLocationID"), "left")

merged = [DOLocationID: int, PULocationID: int ... 37 more fields]


[DOLocationID: int, PULocationID: int ... 37 more fields]

In [22]:
// see schema
merged.printSchema

root
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- pickup_time: timestamp_ntz (nullable = true)
 |-- dropoff_time: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- duration_sec: long (nullable = true)
 |-- duration_min: double (nullable = true)
 |-- duration_hr: double (nullable = true)
 |-- speed_mph:

# 5. Write Parquet file and Time

In [23]:
val outPath = "gs://82976-testbucket/yellow taxi trip data/transformed data/"


outPath = gs://82976-testbucket/yellow taxi trip data/transformed data/


gs://82976-testbucket/yellow taxi trip data/transformed data/

In [24]:
def timer[R](block: => R): R = { //R is the placeholder return type for the block
  val t0 = System.nanoTime() //Returns the system time
  val result = block //executes the block and stores the result
  val t1 = System.nanoTime()
  println(s"Elapsed time: ${(t1 - t0)/1e9} seconds") //the time difference 
  result //return the result
}

timer: [R](block: => R)R


Regular

In [25]:
// time 
timer{merged.write.mode("overwrite").parquet(outPath)}

Elapsed time: 145.617094928 seconds


In [26]:
// write 
merged.write.mode("overwrite").parquet(outPath)

**Partition by Zone**

In [27]:
val outPath2 = "gs://82976-testbucket/yellow taxi trip data/partition by zone data/"


outPath2 = gs://82976-testbucket/yellow taxi trip data/partition by zone data/


gs://82976-testbucket/yellow taxi trip data/partition by zone data/

In [30]:
timer{merged.repartition(col("pu_zone")).write.mode("overwrite").parquet(outPath2)}


Elapsed time: 208.826530618 seconds


In [31]:
// write
merged.repartition(col("pu_zone")).write.mode("overwrite").parquet(outPath2)

**Partition by Hour**

In [32]:
val outPath3 = "gs://82976-testbucket/yellow taxi trip data/partition by hour data/"


outPath3 = gs://82976-testbucket/yellow taxi trip data/partition by hour data/


gs://82976-testbucket/yellow taxi trip data/partition by hour data/

In [34]:
timer{merged.repartition(col("hour_of_day")).write.mode("overwrite").parquet(outPath3)}


Elapsed time: 202.299658655 seconds


In [35]:
//write
merged.repartition(col("hour_of_day")).write.mode("overwrite").parquet(outPath3)

# 6. Load Data into BigQuery

In [36]:
val tableId = "quiet-grail-480918-g1:tlc_trip_record_data.nyc_yellow_taxi_table"
// use hour of day as partition field
merged.write
  .format("bigquery")
  .option("writeMethod", "indirect")
  .option("temporaryGcsBucket", "82976-testbucket")
  .option("partitionField", "hour_of_day")
  .option("partitionRangeStart", "0")
  .option("partitionRangeEnd", "24")
  .option("partitionRangeInterval", "1")
  .mode("overwrite")
  .save(tableId)

tableId = quiet-grail-480918-g1:tlc_trip_record_data.nyc_yellow_taxi_table


quiet-grail-480918-g1:tlc_trip_record_data.nyc_yellow_taxi_table

# 7. Run Queries
- Average fare per day of week
- Average fare per time of day
- Average trip duration per day of week
- Average trip duration by hour of day
- Fare distribution by hour of day


In [1]:
// register view
merged.createOrReplaceTempView("trips")


In [9]:
// average fare per day of week
timer{
spark.sql("""
SELECT day_of_week, AVG(fare_amount) AS average_fare
FROM trips
GROUP BY day_of_week
ORDER BY day_of_week""").show(7, false)}

+-----------+------------------+
|day_of_week|average_fare      |
+-----------+------------------+
|1          |20.46679388525139 |
|2          |20.363182856568443|
|3          |19.35915444189709 |
|4          |19.46376574878205 |
|5          |19.79322009701156 |
|6          |19.40711505689188 |
|7          |18.667698373736773|
+-----------+------------------+

Elapsed time: 26.321215291 seconds


In [10]:
// average fare per time of day
timer{spark.sql("""
SELECT hour_of_day, AVG(fare_amount) AS average_fare
FROM trips
GROUP BY hour_of_day
ORDER BY hour_of_day
""").show(24, false)}


+-----------+------------------+
|hour_of_day|average_fare      |
+-----------+------------------+
|0          |19.983829939483073|
|1          |18.05690350791211 |
|2          |16.793209636088232|
|3          |17.796442620677862|
|4          |23.113505251889258|
|5          |26.848694027451774|
|6          |22.366583913425195|
|7          |19.497490071709674|
|8          |18.741535015220897|
|9          |18.585516151944155|
|10         |18.85466144785401 |
|11         |19.10830985328393 |
|12         |19.533067789553996|
|13         |20.24018327114264 |
|14         |21.00677676825041 |
|15         |21.029246467561304|
|16         |21.235369345276826|
|17         |19.709929810941723|
|18         |18.489187071912536|
|19         |18.499628625795808|
|20         |18.748447229146812|
|21         |18.807585051191634|
|22         |19.59988357356946 |
|23         |20.527836791065084|
+-----------+------------------+

Elapsed time: 21.672758165 seconds


In [11]:
// average trip duration per day of week
timer{spark.sql("""
SELECT
day_of_week,
AVG(duration_min) AS average_duration_min
FROM trips
GROUP BY day_of_week
ORDER BY day_of_week
""").show(7, false)}


+-----------+--------------------+
|day_of_week|average_duration_min|
+-----------+--------------------+
|1          |16.48958807077207   |
|2          |17.429222971277206  |
|3          |17.665408201621492  |
|4          |17.944848572917817  |
|5          |18.4223574047951    |
|6          |17.62257823765832   |
|7          |16.552475583990397  |
+-----------+--------------------+

Elapsed time: 14.81751881 seconds


In [12]:
// average trip duration by hour of day
timer{spark.sql("""
SELECT
hour_of_day,
AVG(duration_min) AS average_duration_min
FROM trips
GROUP BY hour_of_day
ORDER BY hour_of_day
""").show(24, false)}


+-----------+--------------------+
|hour_of_day|average_duration_min|
+-----------+--------------------+
|0          |15.286006777075263  |
|1          |14.103325363934493  |
|2          |13.066722730410032  |
|3          |13.014680331854512  |
|4          |14.763036416656753  |
|5          |16.862843948213325  |
|6          |17.10601012615924   |
|7          |16.427855818497154  |
|8          |16.654590047626634  |
|9          |16.77803491796762   |
|10         |17.325524433653378  |
|11         |17.87339169656797   |
|12         |18.36906216624194   |
|13         |19.014423598205298  |
|14         |20.093711671699708  |
|15         |20.60402938473823   |
|16         |20.565112926912178  |
|17         |19.09303566907057   |
|18         |17.135786800914126  |
|19         |16.195541565891105  |
|20         |15.739032293720175  |
|21         |15.565121629768248  |
|22         |15.818517841222613  |
|23         |16.023710510149247  |
+-----------+--------------------+

Elapsed time: 14.96

In [14]:
// fare distribution by hour of day
timer{spark.sql("""
SELECT
hour_of_day,
percentile_approx(fare_amount, 0.10) AS p10_fare,
percentile_approx(fare_amount, 0.50) AS p50_fare,
percentile_approx(fare_amount, 0.90) AS p90_fare,
percentile_approx(fare_amount, 0.99) AS p99_fare
FROM trips
GROUP BY hour_of_day
ORDER BY hour_of_day
""").show(24, false)}


+-----------+--------+--------+--------+--------+
|hour_of_day|p10_fare|p50_fare|p90_fare|p99_fare|
+-----------+--------+--------+--------+--------+
|0          |6.5     |14.2    |40.1    |82.0    |
|1          |6.5     |13.1    |33.8    |75.8    |
|2          |6.5     |12.8    |31.0    |70.0    |
|3          |5.8     |12.8    |34.58   |80.0    |
|4          |6.5     |15.6    |51.3    |92.3    |
|5          |6.5     |16.05   |70.0    |94.85   |
|6          |6.5     |13.5    |61.8    |88.1    |
|7          |6.5     |12.8    |43.6    |83.0    |
|8          |6.5     |13.5    |38.7    |74.8    |
|9          |6.5     |13.5    |38.7    |72.7    |
|10         |6.5     |13.5    |40.1    |73.7    |
|11         |6.5     |13.5    |39.5    |74.4    |
|12         |6.5     |14.2    |40.8    |77.9    |
|13         |6.5     |14.2    |43.6    |83.0    |
|14         |6.5     |14.2    |47.1    |86.0    |
|15         |6.5     |14.2    |47.1    |87.0    |
|16         |6.5     |14.2    |47.8    |86.69   |
