In [0]:
%pip install shapely

In [0]:
%pip install geopandas

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import date_format
from pyspark.sql.functions import to_date
from shapely.geometry import Point, LineString, Polygon,point
from pyspark.sql.functions import udf
import shapely
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pandas as pd
import geopandas 
import datetime
import time
import shapely.speedups
shapely.speedups.enable()
from pyspark.sql.functions import *
from pyspark.sql import Window

In [0]:
geo_schema = StructType([
  StructField("properties", StructType([
    StructField("@id", StringType(), True),
    StructField("borough", StringType(), True),
    StructField("boroughCode", IntegerType(), True)
  ]), True),
  StructField("geometry", StructType([
    StructField("coordinates", ArrayType(ArrayType(ArrayType(FloatType()))), True),
    StructField("type", StringType(), True)
  ]), True),
])

nyc_schema = StructType([
  StructField("medallion", StringType(), True),
  StructField("hack_license", StringType(), True),
  StructField("vendor_id", StringType(), True),
  StructField("rate_code", IntegerType(), True),
  StructField("store_and_fwd_flag", StringType(), True),
  StructField("pickup_datetime", StringType(), True),
  StructField("dropoff_datetime", StringType(), True),
  StructField("passenger_count", IntegerType(), True),
  StructField("pickup_longitude", FloatType(), True),
  StructField("pickup_latitude", FloatType(), True),
  StructField("dropoff_longitude", FloatType(), True),
  StructField("dropoff_latitude", FloatType(), True),  
])

In [0]:
df_nyc = (spark.read
        .schema(nyc_schema)
       .format("csv")
       .option("header",True)
       .load("dbfs:/FileStore/shared_uploads/kingbillyintartu@gmail.com/Sample_NYC_Data.csv"))
display(df_nyc)

medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,01-01-13 15:11,01-01-13 15:18,4,-73.978165,40.757977,-73.98984,40.75117
0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,06-01-13 0:18,06-01-13 0:22,1,-74.00668,40.73178,-73.9945,40.75066
0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,05-01-13 18:49,05-01-13 18:54,1,-74.00471,40.73777,-74.009834,40.726
DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,07-01-13 23:54,07-01-13 23:58,2,-73.9746,40.759945,-73.98473,40.759388
DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,07-01-13 23:25,07-01-13 23:34,1,-73.97625,40.748528,-74.00259,40.747868
20D9ECB2CA0767CF7A01564DF2844A3E,598CCE5B9C1918568DEE71F43CF26CD2,CMT,1,N,07-01-13 15:27,07-01-13 15:38,1,-73.96674,40.76425,-73.98332,40.743763
496644932DF3932605C22C7926FF0FE0,513189AD756FF14FE670D10B92FAF04C,CMT,1,N,08-01-13 11:01,08-01-13 11:08,1,-73.995804,40.743977,-74.007416,40.744343
0B57B9633A2FECD3D3B1944AFC7471CF,CCD4367B417ED6634D986F573A552A62,CMT,1,N,07-01-13 12:39,07-01-13 13:10,3,-73.98994,40.756775,-73.86525,40.77063
2C0E91FF20A856C891483ED63589F982,1DA2F6543A62B8ED934771661A9D2FA0,CMT,1,N,07-01-13 18:15,07-01-13 18:20,1,-73.98007,40.743137,-73.98271,40.735336
2D4B95E2FA7B2E85118EC5CA4570FA58,CD2F522EEE1FF5F5A8D8B679E23576B3,CMT,1,N,07-01-13 15:33,07-01-13 15:49,2,-73.977936,40.786983,-73.95292,40.80637


In [0]:
# Calculate the difference between dropoff_time and pickup_time
def time_difference(pick_up, drop_off):
  pickupTime = time.mktime(datetime.datetime.strptime(pick_up, '%d-%m-%y %H:%M').timetuple())
  dropoffTime = time.mktime(datetime.datetime.strptime(drop_off, '%d-%m-%y %H:%M').timetuple())
  return dropoffTime-pickupTime

timeDiff = udf(time_difference, FloatType())

df_nyc = (df_nyc.
          withColumn('duration', timeDiff(col('pickup_datetime'), col('dropoff_datetime')))
          .filter((col('duration') > 0) & (col('duration') < 4*3600)) # if difference is negative or greater than 4 hours, drop it
       )

display(df_nyc)

medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,duration
89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,01-01-13 15:11,01-01-13 15:18,4,-73.978165,40.757977,-73.98984,40.75117,420.0
0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,06-01-13 0:18,06-01-13 0:22,1,-74.00668,40.73178,-73.9945,40.75066,240.0
0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,05-01-13 18:49,05-01-13 18:54,1,-74.00471,40.73777,-74.009834,40.726,300.0
DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,07-01-13 23:54,07-01-13 23:58,2,-73.9746,40.759945,-73.98473,40.759388,240.0
DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,07-01-13 23:25,07-01-13 23:34,1,-73.97625,40.748528,-74.00259,40.747868,540.0
20D9ECB2CA0767CF7A01564DF2844A3E,598CCE5B9C1918568DEE71F43CF26CD2,CMT,1,N,07-01-13 15:27,07-01-13 15:38,1,-73.96674,40.76425,-73.98332,40.743763,660.0
496644932DF3932605C22C7926FF0FE0,513189AD756FF14FE670D10B92FAF04C,CMT,1,N,08-01-13 11:01,08-01-13 11:08,1,-73.995804,40.743977,-74.007416,40.744343,420.0
0B57B9633A2FECD3D3B1944AFC7471CF,CCD4367B417ED6634D986F573A552A62,CMT,1,N,07-01-13 12:39,07-01-13 13:10,3,-73.98994,40.756775,-73.86525,40.77063,1860.0
2C0E91FF20A856C891483ED63589F982,1DA2F6543A62B8ED934771661A9D2FA0,CMT,1,N,07-01-13 18:15,07-01-13 18:20,1,-73.98007,40.743137,-73.98271,40.735336,300.0
2D4B95E2FA7B2E85118EC5CA4570FA58,CD2F522EEE1FF5F5A8D8B679E23576B3,CMT,1,N,07-01-13 15:33,07-01-13 15:49,2,-73.977936,40.786983,-73.95292,40.80637,960.0


In [0]:
df_geospatial = (spark.read
      .schema(geo_schema)
      .json("dbfs:/FileStore/shared_uploads/kingbillyintartu@gmail.com/nyc_boroughs.geojson")
      .filter(col('geometry').isNotNull())
      .withColumn("coordinates", explode(col("geometry.coordinates")))
      .withColumn("borough", col("properties.borough"))
      .withColumn("boroughcode", col("properties.boroughCode"))
      .select(col("coordinates"), col("boroughcode"), col("borough"))
      .sort(col("boroughCode").asc())
)

In [0]:
display(df_geospatial.take(2))

coordinates,boroughcode,borough
"List(List(-74.01092529296875, 40.68449020385742), List(-74.01193237304688, 40.68388748168945), List(-74.01217651367188, 40.6840934753418), List(-74.00835418701172, 40.6864013671875), List(-74.00816345214844, 40.68617630004883), List(-74.01092529296875, 40.68449020385742))",1,Manhattan
"List(List(-74.0050048828125, 40.68760681152344), List(-74.00563049316406, 40.68678283691406), List(-74.0078353881836, 40.68738555908203), List(-74.0074234008789, 40.68820571899414), List(-74.0050048828125, 40.68760681152344))",1,Manhattan


In [0]:
# Prepare geospatial datageopandas.GeoDataFrame(
df_geospatial_pandas = df_geospatial.select(col('coordinates'), col('borough')).toPandas()
polygons=[]
for index, row in df_geospatial_pandas.iterrows():
  poly_points = Polygon([(pts[0], pts[1]) for pts in row[0]])
  polygons.append(poly_points)
df_geospatial_pandas['Polygons'] = polygons
df_geospatial_pandas.drop('coordinates', inplace=True, axis=1)
df_geospatial_data = geopandas.GeoDataFrame(df_geospatial_pandas)

In [0]:
df_geospatial_data

Unnamed: 0,borough,Polygons
0,Manhattan,POLYGON ((-74.01092529296875 40.68449020385742...
1,Manhattan,"POLYGON ((-74.0050048828125 40.68760681152344,..."
2,Manhattan,POLYGON ((-74.00382232666016 40.68893051147461...
3,Manhattan,POLYGON ((-74.00297546386719 40.69042587280273...
4,Manhattan,POLYGON ((-74.04387664794922 40.69018936157227...
...,...,...
102,Queens,POLYGON ((-73.89144897460938 40.77622222900391...
103,Staten Island,POLYGON ((-74.05050659179688 40.56642150878906...
104,Staten Island,POLYGON ((-74.05313873291016 40.57770156860352...
105,Staten Island,POLYGON ((-74.15945434570312 40.64144897460938...


In [0]:
def findBorough(longitude, latitude):
  mgdf = df_geospatial_data.apply(lambda x: x['borough'] if x['Polygons'].intersects(Point(longitude, latitude)) else None, axis=1)
  idx = mgdf.first_valid_index()
  first_valid_value = mgdf.loc[idx] if idx is not None else None
  return first_valid_value

findBorough = udf(findBorough, StringType())

In [0]:
sc.broadcast(df_geospatial_data)

df_nyc = (df_nyc
          .withColumn("Pickup_location", findBorough(col('pickup_longitude'), col('pickup_latitude')))
          .withColumn('Dropoff_location', findBorough(col('dropoff_longitude'), col('dropoff_latitude')))
       )

In [0]:
display(df_nyc)

medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,duration,Pickup_location,Dropoff_location
89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,01-01-13 15:11,01-01-13 15:18,4,-73.978165,40.757977,-73.98984,40.75117,420.0,Manhattan,Manhattan
0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,06-01-13 0:18,06-01-13 0:22,1,-74.00668,40.73178,-73.9945,40.75066,240.0,Manhattan,Manhattan
0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,05-01-13 18:49,05-01-13 18:54,1,-74.00471,40.73777,-74.009834,40.726,300.0,Manhattan,Manhattan
DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,07-01-13 23:54,07-01-13 23:58,2,-73.9746,40.759945,-73.98473,40.759388,240.0,Manhattan,Manhattan
DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,07-01-13 23:25,07-01-13 23:34,1,-73.97625,40.748528,-74.00259,40.747868,540.0,Manhattan,Manhattan
20D9ECB2CA0767CF7A01564DF2844A3E,598CCE5B9C1918568DEE71F43CF26CD2,CMT,1,N,07-01-13 15:27,07-01-13 15:38,1,-73.96674,40.76425,-73.98332,40.743763,660.0,Manhattan,Manhattan
496644932DF3932605C22C7926FF0FE0,513189AD756FF14FE670D10B92FAF04C,CMT,1,N,08-01-13 11:01,08-01-13 11:08,1,-73.995804,40.743977,-74.007416,40.744343,420.0,Manhattan,Manhattan
0B57B9633A2FECD3D3B1944AFC7471CF,CCD4367B417ED6634D986F573A552A62,CMT,1,N,07-01-13 12:39,07-01-13 13:10,3,-73.98994,40.756775,-73.86525,40.77063,1860.0,Manhattan,Queens
2C0E91FF20A856C891483ED63589F982,1DA2F6543A62B8ED934771661A9D2FA0,CMT,1,N,07-01-13 18:15,07-01-13 18:20,1,-73.98007,40.743137,-73.98271,40.735336,300.0,Manhattan,Manhattan
2D4B95E2FA7B2E85118EC5CA4570FA58,CD2F522EEE1FF5F5A8D8B679E23576B3,CMT,1,N,07-01-13 15:33,07-01-13 15:49,2,-73.977936,40.786983,-73.95292,40.80637,960.0,Manhattan,Manhattan


In [0]:
df_last_nyc = (df_nyc      
               .select(
                 col('medallion'),
                 col('hack_license'),
                 col('pickup_datetime'),
                 col('dropoff_datetime'),
                 col('passenger_count'),
                 col('duration'),
                 col('Pickup_location'),
                 col('Dropoff_location'))
              )

# Queries

### Query 1

Utilization per taxi/driver: The sum of idle time aggregated by destination borough

In [0]:
window_spec = Window.partitionBy("medallion").orderBy(col("pickup_datetime").asc())

query_1 = (df_last_nyc
       .withColumn("dropoff_lagging", lag('dropoff_datetime').over(window_spec))
       .filter(col("dropoff_lagging").isNotNull())
       )

query_1 = (query_1.withColumn("idle", timeDiff(col('dropoff_lagging'), col('pickup_datetime'))) # timeDiff function returned seconds
         .filter((col('idle') < 4*3600) & (col('idle') > 0))
         .filter(col('Dropoff_location').isNotNull())
        ) # if idle less than 4 hours keep it

In [0]:
# End of Query 1
display(query_1.groupBy(col('Dropoff_location')).agg(sum(col('idle')))) 

Dropoff_location,sum(idle)
Queens,8310600.0
Brooklyn,4736700.0
Staten Island,2940.0
Manhattan,82800300.0
Bronx,617400.0


The total idle time per taxi/driver aggregated by destination borough:
<br>
Queens: 8,310,600 seconds
<br>
Brooklyn: 4,736,700 seconds
<br>
Staten Island: 2940 seconds
<br>
Manhattan: 82,800,300 seconds
<br>
Bronx: 617,400 seconds

### Query 2

The average time it takes for a taxi to find its next fare per destination borough.

In [0]:
query_2 = (query_1.groupBy(col("Dropoff_location")).agg(avg(col("idle")))) # returns average time as seconds
display(query_2)

Dropoff_location,avg(idle)
Queens,1934.9476135040743
Brooklyn,1822.5086571758368
Staten Island,980.0
Manhattan,1066.382041573294
Bronx,2166.315789473684


The average time it takes for a taxi to find its next fare per destination borough:
<br>
Queens: 1934.95 seconds
<br>
Brooklyn: 1822.51 seconds
<br>
Staten Island: 980 seconds
<br>
Manhattan: 1066.38 seconds
<br>
Bronx: 2166.32 seconds

### Query 3

The number of trips that started and ended in the same borough

In [0]:
query_3 = (df_last_nyc.filter(col("Pickup_location") == col("Dropoff_location")).count())
query_3

85,944 trips started and ended in the same borough.

### Query 4

The number of trips that started in one borough and ended in another

In [0]:
query_4 = (df_last_nyc.filter(col("Pickup_location") != col("Dropoff_location")).count())
query_4

11,431 trips started in one borough and ended in another.