In [31]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pyspark.sql.functions as F

In [32]:
spark = SparkSession.builder \
    .master("local[2]") \
    .appName('test') \
    .getOrCreate()

In [74]:
spark.version

'3.2.0'

In [33]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [4]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [5]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,IntegerType,true),StructField(DOLocationID,IntegerType,true),StructField(SR_Flag,StringType,true)))

In [6]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

In [7]:
df = df.withColumn('pickup_date', F.to_date(df.pickup_datetime))
df = df.withColumn('dropoff_date', F.to_date(df.dropoff_datetime))

In [8]:
df.select(['dispatching_base_num', 'pickup_date', 'dropoff_date'])\
    .show()

+--------------------+-----------+------------+
|dispatching_base_num|pickup_date|dropoff_date|
+--------------------+-----------+------------+
|              B02764| 2021-02-01|  2021-02-01|
|              B02764| 2021-02-01|  2021-02-01|
|              B02510| 2021-02-01|  2021-02-01|
|              B02510| 2021-02-01|  2021-02-01|
|              B02872| 2021-02-01|  2021-02-01|
|              B02872| 2021-02-01|  2021-02-01|
|              B02872| 2021-02-01|  2021-02-01|
|              B02764| 2021-02-01|  2021-02-01|
|              B02764| 2021-02-01|  2021-02-01|
|              B02510| 2021-02-01|  2021-02-01|
|              B02510| 2021-02-01|  2021-02-01|
|              B02510| 2021-02-01|  2021-02-01|
|              B02510| 2021-02-01|  2021-02-01|
|              B02764| 2021-02-01|  2021-02-01|
|              B02764| 2021-02-01|  2021-02-01|
|              B02764| 2021-02-01|  2021-02-01|
|              B02888| 2021-02-01|  2021-02-01|
|              B02888| 2021-02-01|  2021

In [23]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'pickup_date',
 'dropoff_date']

In [53]:
df.repartition(24).write.parquet("processed_repartition/2021-02/", mode='overwrite')

In [54]:
df_pq = spark.read.parquet('processed_repartition/*')

In [55]:
df_pq.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,IntegerType,true),StructField(DOLocationID,IntegerType,true),StructField(SR_Flag,StringType,true),StructField(pickup_date,DateType,true),StructField(dropoff_date,DateType,true)))

In [56]:
df_pq.registerTempTable('taxi_data')



In [57]:
# How many taxi trips were there on February 15?
spark.sql('''
    select count(1) from taxi_data where pickup_date = '2021-02-15'
''').show()

+--------+
|count(1)|
+--------+
|  367170|
+--------+



In [68]:
# Day with the longest trip *
spark.sql('''
    select pickup_date, (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) as trip_duration_sec
    from taxi_data
    order by trip_duration_sec desc limit 1
''').show()

+-----------+-----------------+
|pickup_date|trip_duration_sec|
+-----------+-----------------+
| 2021-02-11|            75540|
+-----------+-----------------+



In [73]:
#find the most frequently occurring dispatching_base_num in this dataset.
spark.sql('''
    select dispatching_base_num ,count(1) as count
    from taxi_data
    group by dispatching_base_num
    order by count desc limit 1
''').show()

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
+--------------------+-------+



In [76]:
df_zone = spark.read.option('header', 'true').csv('taxi+_zone_lookup.csv')

In [77]:
df_zone.schema

StructType(List(StructField(LocationID,StringType,true),StructField(Borough,StringType,true),StructField(Zone,StringType,true),StructField(service_zone,StringType,true)))

In [78]:
df_zone.registerTempTable('zone')



In [102]:
spark.sql('''
    select concat(coalesce(pickup_zone.Zone, 'Unknown'), '/', coalesce(drop_zone.Zone, 'Unknown'))
    as zone_pair, count(1) as count from taxi_data a
    join zone as pickup_zone on a.PULocationID = pickup_zone.LocationID
    join zone as drop_zone on a.DOLocationID = drop_zone.LocationID
    group by pickup_zone.Zone, drop_zone.Zone
    order by count desc limit 1
''').show(100, False)

+---------------------------+-----+
|zone_pair                  |count|
+---------------------------+-----+
|East New York/East New York|45041|
+---------------------------+-----+



In [100]:
spark.sql('''
    select PULocationID, DOLocationID from taxi_data where PULocationID != DOLocationID
''').show()

+------------+------------+
|PULocationID|DOLocationID|
+------------+------------+
|          42|          74|
|          42|          75|
|         174|          20|
|          82|         234|
|         127|         243|
|         148|         158|
|          36|          80|
|         133|          89|
|          81|          87|
|         244|         243|
|          26|         133|
|         248|         250|
|         142|          75|
|         158|          50|
|          16|          92|
|          61|         234|
|          74|         159|
|         117|          42|
|         159|         185|
|          64|          92|
+------------+------------+
only showing top 20 rows

