In [0]:
import numpy
import pandas



In [0]:
# Azure access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = "r"

# Spark reads from Blob
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print(wasbs_path)

yellowtaxis = spark.read.parquet(wasbs_path)
yellowtaxis.createOrReplaceTempView('taxi')
display(spark.sql('SELECT * FROM taxi LIMIT 1'))

wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow


vendorID,tpepPickupDateTime,tpepDropoffDateTime,passengerCount,tripDistance,puLocationId,doLocationId,startLon,startLat,endLon,endLat,rateCodeId,storeAndFwdFlag,paymentType,fareAmount,extra,mtaTax,improvementSurcharge,tipAmount,tollsAmount,totalAmount,puYear,puMonth
CMT,2012-02-29T23:53:14.000+0000,2012-03-01T00:00:43.000+0000,1,2.1,,,-73.980494,40.730601,-73.983532,40.752311,1,N,CSH,7.3,0.5,0.5,,0.0,0.0,8.3,2012,3


In [0]:
# delta to a DataFrame
df = spark.read.format('delta').table('default.taxi_zone_lookup_3_csv')
df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [0]:
df_taxi = spark.table("taxi")
df_taxi.printSchema()

root
 |-- vendorID: string (nullable = true)
 |-- tpepPickupDateTime: timestamp (nullable = true)
 |-- tpepDropoffDateTime: timestamp (nullable = true)
 |-- passengerCount: integer (nullable = true)
 |-- tripDistance: double (nullable = true)
 |-- puLocationId: string (nullable = true)
 |-- doLocationId: string (nullable = true)
 |-- startLon: double (nullable = true)
 |-- startLat: double (nullable = true)
 |-- endLon: double (nullable = true)
 |-- endLat: double (nullable = true)
 |-- rateCodeId: integer (nullable = true)
 |-- storeAndFwdFlag: string (nullable = true)
 |-- paymentType: string (nullable = true)
 |-- fareAmount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mtaTax: double (nullable = true)
 |-- improvementSurcharge: string (nullable = true)
 |-- tipAmount: double (nullable = true)
 |-- tollsAmount: double (nullable = true)
 |-- totalAmount: double (nullable = true)
 |-- puYear: integer (nullable = true)
 |-- puMonth: integer (nullable = true)



In [0]:
from pyspark.sql import functions as F

# 2 miles or more
filtered_taxi_df = df_taxi.filter(df_taxi.tripDistance >= 2)

# top 20 drop-off locations 
manhattan_drops = (
    filtered_taxi_df.join(
        spark.read.format('delta').table('default.taxi_zone_lookup_3_csv'),
        filtered_taxi_df.doLocationId == df.LocationID
    )
    .filter(df.Borough == 'Manhattan')  
    .groupBy("doLocationId", "Zone")
    .agg(F.sum("passengerCount").alias("total_passenger_count"))
    .orderBy(F.desc("total_passenger_count"))
    .limit(20)  
)

popular_dropoffs = manhattan_drops.select("doLocationId").rdd.flatMap(lambda x: x).collect()


#   avg total amount per trip, count of trips per location, count of trips that start at that location and end at one of the popular drop-off locs

pickup_analysis = (
    filtered_taxi_df.join(
        spark.read.format('delta').table('default.taxi_zone_lookup_3_csv'),
        filtered_taxi_df.puLocationId == df.LocationID
    )
    .withColumn(
        "is_popular_dropoff", 
        F.when(filtered_taxi_df.doLocationId.isin(popular_dropoffs), 1).otherwise(0)
    )
    .groupBy("puLocationId", "Borough", "Zone")
    .agg(
        F.avg("totalAmount").alias("avg_total_amount"),  # Average total amount per trip
        F.sum("passengerCount").alias("total_passenger_count"),
        F.sum("is_popular_dropoff").alias("popular_dropoff_count")
    )
)

# profit for each pickup location
pickup_analysis = pickup_analysis.withColumn(
    "dropoff_proportion", 
    F.col("popular_dropoff_count") / F.col("total_passenger_count")
)

# weighted profit
pickup_analysis = pickup_analysis.withColumn(
    "weighted_profit", 
    F.col("dropoff_proportion") * F.col("avg_total_amount")
)

pickup_analysis = pickup_analysis.orderBy(F.desc("weighted_profit"))

# final top 20 locations list
pickup_analysis.select("Borough", "Zone", "avg_total_amount", "total_passenger_count", "popular_dropoff_count", "dropoff_proportion", "weighted_profit").show(20)
