## Imports and Data Fetching

In [1]:
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import col, year, month, mean, median, udf

In [2]:
spark = SparkSession.builder \
          .master("local[*]") \
          .appName("TaxiDataProcessing") \
          .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.1,com.microsoft.azure:azure-storage:8.6.6") \
          .getOrCreate()

23/10/30 21:13:05 WARN Utils: Your hostname, Alvees-MBP.local resolves to a loopback address: 127.0.0.1; using 192.168.1.155 instead (on interface en0)
23/10/30 21:13:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/alvee/.ivy2/cache
The jars for the packages stored in: /Users/alvee/.ivy2/jars
org.apache.hadoop#hadoop-azure added as a dependency
com.microsoft.azure#azure-storage added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7d5c9ebb-09b8-4b1b-ad58-6990075f84d5;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-azure;3.3.1 in central


:: loading settings :: url = jar:file:/Users/alvee/anaconda3/envs/nyc_taxi_ride_alvee/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.httpcomponents#httpclient;4.5.13 in local-m2-cache
	found org.apache.httpcomponents#httpcore;4.4.13 in local-m2-cache
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.11 in local-m2-cache
	found org.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 in central
	found org.eclipse.jetty#jetty-util-ajax;9.4.40.v20210413 in central
	found org.eclipse.jetty#jetty-util;9.4.40.v20210413 in central
	found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found com.microsoft.azure#azure-storage;8.6.6 in central
	found com.fasterxml.jackson.core#jackson-core;2.9.4 in central
	found org.slf4j#slf4j-api;1.7.12 in central
	found org.apache.commons#commons-lang3;3.4 in central
	found com.microsoft.azure#azure-keyvault-core;1.2.4 in central
	found com.google.guava#guava;24.1.1-jre in central
	found com.g

In [3]:
# Azure storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = "r"

# Allow Spark to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
  
print('Remote blob path: ' + wasbs_path)

df = spark.read.parquet(wasbs_path)

Remote blob path: wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow


23/10/30 21:13:07 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-azure-file-system.properties,hadoop-metrics2.properties
                                                                                

In [4]:
df.printSchema()

root
 |-- vendorID: string (nullable = true)
 |-- tpepPickupDateTime: timestamp (nullable = true)
 |-- tpepDropoffDateTime: timestamp (nullable = true)
 |-- passengerCount: integer (nullable = true)
 |-- tripDistance: double (nullable = true)
 |-- puLocationId: string (nullable = true)
 |-- doLocationId: string (nullable = true)
 |-- startLon: double (nullable = true)
 |-- startLat: double (nullable = true)
 |-- endLon: double (nullable = true)
 |-- endLat: double (nullable = true)
 |-- rateCodeId: integer (nullable = true)
 |-- storeAndFwdFlag: string (nullable = true)
 |-- paymentType: string (nullable = true)
 |-- fareAmount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mtaTax: double (nullable = true)
 |-- improvementSurcharge: string (nullable = true)
 |-- tipAmount: double (nullable = true)
 |-- tollsAmount: double (nullable = true)
 |-- totalAmount: double (nullable = true)
 |-- puYear: integer (nullable = true)
 |-- puMonth: integer (nullable = true)



## Clean Data

In [5]:
@udf(types.StringType())
def get_payment_type(payment_type: str):
    payment_type_map = {
        '1': 'Credit card',
        '2': 'Cash',
        '3': 'No charge',
        '4': 'Dispute',
        '5': 'Unknown',
        '6': 'Voided trip',
    }
    
    return payment_type_map.get(payment_type.strip(), None)

df = df.withColumn('paymentType', get_payment_type(df.paymentType))

In [6]:
df = df.filter((col("passengerCount") > 0) 
                & (col("paymentType").isNotNull())
                & (col("puYear") >= 2009) 
                & (col("puYear") <= 2018))

## Aggregate Data

In [7]:
df_agg = df.groupBy(
        col("paymentType"), 
        col("puYear").alias("year"), 
        col("puMonth").alias("month")
      ).agg(
        mean("fareAmount").alias("meanFareAmount"),
        median("fareAmount").alias("medianFareAmount"),
        mean("totalAmount").alias("meanTotalAmount"),
        median("totalAmount").alias("medianTotalAmount"),
        mean("passengerCount").alias("meanPassengerCount"),
        median("passengerCount").alias("medianPassengerCount")
      )

In [8]:
# Aggregate data using Spark SQL query

# df.createOrReplaceTempView("taxi_data")

# sql_query = """
# SELECT
#     paymentType,
#     puYear AS year,
#     puMonth AS month,
#     AVG(fareAmount) AS meanFareAmount,
#     PERCENTILE(fareAmount, 0.5) AS medianFareAmount,
#     AVG(totalAmount) AS meanTotalAmount,
#     PERCENTILE(totalAmount, 0.5) AS medianTotalAmount,
#     AVG(passengerCount) AS meanPassengerCount,
#     PERCENTILE(passengerCount, 0.5) AS medianPassengerCount
# FROM taxi_data
# WHERE passengerCount > 0 AND paymentType IS NOT NULL
# GROUP BY paymentType, year, month
# """

# df_agg = spark.sql(sql_query)

In [9]:
output_path = "./data/result_explore/"
df_agg.write.partitionBy("paymentType", "year", "month").parquet(output_path, mode="overwrite")

23/10/30 21:21:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/10/30 21:21:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/10/30 21:21:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/10/30 21:21:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/10/30 21:21:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

## Read Local Data

In [10]:
df_local = spark.read.parquet("./data/result_explore")
df_local.printSchema()

root
 |-- meanFareAmount: double (nullable = true)
 |-- medianFareAmount: double (nullable = true)
 |-- meanTotalAmount: double (nullable = true)
 |-- medianTotalAmount: double (nullable = true)
 |-- meanPassengerCount: double (nullable = true)
 |-- medianPassengerCount: double (nullable = true)
 |-- paymentType: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [11]:
df_local.limit(10).orderBy('paymentType', 'year', 'month').show()

+------------------+----------------+------------------+-----------------+------------------+--------------------+-----------+----+-----+
|    meanFareAmount|medianFareAmount|   meanTotalAmount|medianTotalAmount|meanPassengerCount|medianPassengerCount|paymentType|year|month|
+------------------+----------------+------------------+-----------------+------------------+--------------------+-----------+----+-----+
|10.949047720642925|             8.0|12.216734030414708|              9.3|1.7140974414180248|                 1.0|       Cash|2015|    1|
|11.261226190383946|             8.0|12.564796420455288|              9.3| 1.648360535101513|                 1.0|       Cash|2018|    1|
|13.282471413045489|            10.0| 17.42523316947812|            12.96| 1.656186660678024|                 1.0|Credit card|2015|    3|
|12.998312242744888|             9.5|17.112745963811772|             12.6| 1.656975895647892|                 1.0|Credit card|2016|    1|
|11.815727206707598|             8

In [13]:
spark.stop()