In [1]:
%%capture
!pip install geopandas

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, DecimalType, IntegerType, StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, month, year, isnan, desc, unix_timestamp, to_timestamp, dayofmonth, hour, minute, second, mean
from functools import reduce
import seaborn as sns
import pandas as pd
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt

Matplotlib created a temporary cache directory at /scratch/hradhakrishnan/job_30614111/matplotlib-lcpc70p5 because the default path (/home/jovyan/.cache/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


In [3]:
# Setting up a theme with Seaborn
sns.set_theme(style="whitegrid")

# Adjusting font scale for better readability
sns.set_context("notebook", font_scale=1.2)

In [4]:
spark = SparkSession.builder \
    .config("spark.driver.memory", "32g") \
	.config("spark.executor.memory", "8g") \
    .config('spark.executor.instances', 8) \
	.appName("Yellow Taxi EDA") \
	.getOrCreate()

In [5]:
base_path = 'yellow_taxi_data/yellow_tripdata_2023-{:02d}.parquet'

paths=[]
for mo in range(1, 8):  # This loops from 1 to 12
    path = base_path.format(mo)  # Formats the month with leading zero if necessary
    paths.append(path)

In [6]:
def load_and_cast(filepath):
    df = spark.read.parquet(filepath)
    df = df.withColumn("VendorID", col("VendorID").cast(IntegerType()))
    return df

# Load, cast, and accumulate all DataFrames
dataframes = [load_and_cast(path) for path in paths]
df = reduce(lambda df1, df2: df1.unionByName(df2), dataframes)

# Show the DataFrame
df.show(1)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

In [7]:
from shapely.geometry import Point

In [8]:
zone = gpd.read_file("yellow_taxi_data/taxi_zones/taxi_zones.shp")

zone.set_crs("EPSG:2263", inplace=True)
zone['center'] = zone.representative_point()
center_gdf = gpd.GeoDataFrame(zone, geometry=zone['center'])
center_gdf = center_gdf.to_crs("EPSG:4326")

zone['long'] = center_gdf.geometry.x
zone['lat'] = center_gdf.geometry.y

zone = zone.drop(columns = ['OBJECTID','geometry','center'])

In [9]:
schemazone = StructType([
    StructField("Shape_Leng", DoubleType(), True),
    StructField("Shape_Area", DoubleType(), True),
    StructField("Zone", StringType(), True),
    StructField("LocationID", StringType(), True),
    StructField("Borough", StringType(), True),
    StructField("Long", DoubleType(), True),
    StructField("Lat", DoubleType(), True)

])

zonedf = spark.createDataFrame(zone, schemazone)

In [10]:
merged_zone = df.join(zonedf, df.PULocationID == zonedf.LocationID, how = 'left')

In [11]:
merged_zone = merged_zone.drop(*('VendorID','passenger_count','RatecodeID','store_and_fwd_flag','payment_type', 'fare_amount',
                                'extra','mta_tax','tip_amount','tolls_amount','improvement_surcharge','total_amount','congestion_surcharge',
                                'airport_fee', 'LocationID'))

In [12]:
merged_zone = merged_zone.withColumnRenamed("Shape_Leng", "PU_Shape_Leng") \
                         .withColumnRenamed("Shape_Area", "PU_Shape_Area") \
                         .withColumnRenamed("Zone", "PU_Zone") \
                         .withColumnRenamed("Borough", "PU_Borough") \
                         .withColumnRenamed("long", "PU_Long") \
                         .withColumnRenamed("lat", "PU_Lat")

In [13]:
merged_zone = merged_zone.join(zonedf, merged_zone.DOLocationID == zonedf.LocationID, how = 'left')

In [14]:
merged_zone = merged_zone.withColumnRenamed("Shape_Leng", "DO_Shape_Leng") \
                         .withColumnRenamed("Shape_Area", "DO_Shape_Area") \
                         .withColumnRenamed("Zone", "DO_Zone") \
                         .withColumnRenamed("Borough", "DO_Borough") \
                         .withColumnRenamed("long", "DO_Long") \
                         .withColumnRenamed("lat", "DO_Lat")

In [15]:
from pyspark.sql.functions import col, unix_timestamp, expr

merged_zone = merged_zone.withColumn('tpep_pickup_datetime', col('tpep_pickup_datetime').cast('timestamp'))
merged_zone = merged_zone.withColumn('tpep_dropoff_datetime', col('tpep_dropoff_datetime').cast('timestamp'))
merged_zone = merged_zone.withColumn('time_diff_seconds', unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime'))

merged_zone = merged_zone.withColumn('duration_mins', (col('time_diff_seconds') / 60).cast('int'))
merged_zone = merged_zone.drop('time_diff_seconds')

In [16]:
merged_zone = merged_zone.drop('LocationID')

In [17]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, dayofweek

merged_zone = merged_zone.withColumn('month', month('tpep_pickup_datetime')) \
                         .withColumn('day', dayofmonth('tpep_pickup_datetime')) \
                         .withColumn('hour', hour('tpep_pickup_datetime')) \
                         .withColumn('minute', minute('tpep_pickup_datetime'))\
                         .withColumn('dayOfWeek', dayofweek('tpep_pickup_datetime'))

In [18]:
merged_zone = merged_zone.withColumn('is_weekend', (merged_zone.dayOfWeek.isin([1, 7])).cast('boolean'))


In [20]:
from pyspark.sql.functions import col, sum

na_counts = merged_zone.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_zone.columns])

In [21]:
na_counts.show()

+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+-------------+-------------+-------+----------+-------+------+-------------+-----+---+----+------+---------+----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|PULocationID|DOLocationID|PU_Shape_Leng|PU_Shape_Area|PU_Zone|PU_Borough|PU_Long|PU_Lat|DO_Shape_Leng|DO_Shape_Area|DO_Zone|DO_Borough|DO_Long|DO_Lat|duration_mins|month|day|hour|minute|dayOfWeek|is_weekend|
+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+-------------+-------------+-------+----------+-------+------+-------------+-----+---+----+------+---------+----------+
|                   0|                    0|            0|           0|           0|       255034|       255034| 255034|    255034| 255034|255034|       327630|       327630| 327630|    327630| 327630|3

In [22]:
merged_zone.count()

22408473

In [23]:
merged_zone.filter(col("PU_Shape_Leng").isNull()).show(5)

+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+---------------+----------------+--------------------+----------+------------------+------------------+-------------+-----+---+----+------+---------+----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|PULocationID|DOLocationID|PU_Shape_Leng|PU_Shape_Area|PU_Zone|PU_Borough|PU_Long|PU_Lat|  DO_Shape_Leng|   DO_Shape_Area|             DO_Zone|DO_Borough|           DO_Long|            DO_Lat|duration_mins|month|day|hour|minute|dayOfWeek|is_weekend|
+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+---------------+----------------+--------------------+----------+------------------+------------------+-------------+-----+---+----+------+---------+----------+
| 2023-01-01 00:10:50|  2023-01-01 00:20:19|         1.41|         264|        

In [24]:
merged_zone = merged_zone.na.fill({'PU_Zone': 'OutsideNYC', 'DO_Zone':'OutsideNYC', 'PU_Borough': 'OutsideNYC', \
                     'DO_Borough':'OutsideNYC'})

In [25]:
# numeric_cols = [field.name for field in merged_zone.schema.fields if isinstance(field.dataType, (DoubleType, IntegerType))]

numeric_cols = ['PU_Long', 'PU_Lat',  'PU_Shape_Leng', 'PU_Shape_Area', 'DO_Long', 'DO_Lat',  'DO_Shape_Leng', 'DO_Shape_Area']
for col_name in numeric_cols:
    mean_value = merged_zone.select(mean(col(col_name))).collect()[0][0]
    merged_zone = merged_zone.fillna({col_name: mean_value})

In [26]:
gpd.read_file("yellow_taxi_data/taxi_zones/taxi_zones.shp")

Unnamed: 0,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough,geometry
0,1,0.116357,0.000782,Newark Airport,1,EWR,"POLYGON ((933100.918 192536.086, 933091.011 19..."
1,2,0.433470,0.004866,Jamaica Bay,2,Queens,"MULTIPOLYGON (((1033269.244 172126.008, 103343..."
2,3,0.084341,0.000314,Allerton/Pelham Gardens,3,Bronx,"POLYGON ((1026308.770 256767.698, 1026495.593 ..."
3,4,0.043567,0.000112,Alphabet City,4,Manhattan,"POLYGON ((992073.467 203714.076, 992068.667 20..."
4,5,0.092146,0.000498,Arden Heights,5,Staten Island,"POLYGON ((935843.310 144283.336, 936046.565 14..."
...,...,...,...,...,...,...,...
258,259,0.126750,0.000395,Woodlawn/Wakefield,259,Bronx,"POLYGON ((1025414.782 270986.139, 1025138.624 ..."
259,260,0.133514,0.000422,Woodside,260,Queens,"POLYGON ((1011466.966 216463.005, 1011545.889 ..."
260,261,0.027120,0.000034,World Trade Center,261,Manhattan,"POLYGON ((980555.204 196138.486, 980570.792 19..."
261,262,0.049064,0.000122,Yorkville East,262,Manhattan,"MULTIPOLYGON (((999804.795 224498.527, 999824...."


In [27]:
na_counts = merged_zone.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_zone.columns])

In [28]:
na_counts.show()

+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+-------------+-------------+-------+----------+-------+------+-------------+-----+---+----+------+---------+----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|PULocationID|DOLocationID|PU_Shape_Leng|PU_Shape_Area|PU_Zone|PU_Borough|PU_Long|PU_Lat|DO_Shape_Leng|DO_Shape_Area|DO_Zone|DO_Borough|DO_Long|DO_Lat|duration_mins|month|day|hour|minute|dayOfWeek|is_weekend|
+--------------------+---------------------+-------------+------------+------------+-------------+-------------+-------+----------+-------+------+-------------+-------------+-------+----------+-------+------+-------------+-----+---+----+------+---------+----------+
|                   0|                    0|            0|           0|           0|            0|            0|      0|         0|      0|     0|            0|            0|      0|         0|      0| 

In [31]:
train_df, test_df = merged_zone.randomSplit(weights=[0.8,0.2], seed=11)

In [32]:
label_name = "duration_mins"

# get a list with feature column names
feature_names = [x.name for x in train_df.schema if x.name != label_name]

In [34]:
from xgboost.spark import SparkXGBRegressor

regressor = SparkXGBRegressor(
  features_col=feature_names,
  label_col=label_name,
  num_workers=6,
  device="cuda",
)


In [35]:
model = regressor.fit(train_df)




ValueError: Values in feature columns must be integral types or float/double types.