In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
from datetime import datetime

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('hvfhw') \
    .getOrCreate()

# Question 1: 

In [3]:
spark.version

'3.2.3'

# Answer 1:
The version of spark is `3.2.3`.

# Question 2:

In [4]:
#import pandas as pd
#
#df_github_head = pd.read_csv('fhv_tripdata_2021-06_githubVersion.csv')
#df_github_head = df_github_head[:100]
#df_nycgov_head.to_csv('head_github.csv',index=False)
#
#df_nycgov_head = pd.read_parquet('fhvhv_tripdata_2021-06_nycGovVersion.parquet')
#df_nycgov_head = df_nycgov_head[:100]
#df_nycgov_head = df_nycgov_head[['hvfhs_license_num','dispatching_base_num','pickup_datetime','dropoff_datetime','PULocationID','DOLocationID','shared_request_flag']]
#df_nycgov_head.columns = ['hvfhs_license_num','dispatching_base_num','pickup_datetime','dropoff_datetime','PULocationID','DOLocationID','SR_Flag']
#df_nycgov_head.to_csv('head_nycgov.csv', index=False)
#
#df_nycgov = pd.read_parquet('fhvhv_tripdata_2021-06_nycGovVersion.parquet')
#df_nycgov = df_nycgov[['hvfhs_license_num','dispatching_base_num','pickup_datetime','dropoff_datetime','PULocationID','DOLocationID','shared_request_flag']]
#df_nycgov.columns = ['hvfhs_license_num','dispatching_base_num','pickup_datetime','dropoff_datetime','PULocationID','DOLocationID','SR_Flag']
#df_nycgov.to_csv('fhvhv_tripdata_2021-06_nycGovVersion.csv', index=False)

In [4]:
schema_github = types.StructType(
    [
        types.StructField('dispatching_base_num', types.StringType(), True), 
        types.StructField('pickup_datetime', types.TimestampType(), True), 
        types.StructField('dropoff_datetime', types.TimestampType(), True), 
        types.StructField('PULocationID', types.IntegerType(), True), 
        types.StructField('DOLocationID', types.IntegerType(), True), 
        types.StructField('SR_Flag', types.StringType(), True),
        types.StructField('Affiliated_base_number', types.StringType(), True),
    ]
)

schema_nycgov = types.StructType(
    [
        types.StructField('hvfhs_license_num', types.StringType(), True), 
        types.StructField('dispatching_base_num', types.StringType(), True), 
        types.StructField('pickup_datetime', types.TimestampType(), True), 
        types.StructField('dropoff_datetime', types.TimestampType(), True), 
        types.StructField('PULocationID', types.IntegerType(), True), 
        types.StructField('DOLocationID', types.IntegerType(), True), 
        types.StructField('SR_Flag', types.StringType(), True),
    ]
)

In [5]:
df_spark_github = spark.read \
                    .option("header", "true") \
                    .schema(schema_github) \
                    .csv('fhv_tripdata_2021-06_githubVersion.csv')
                    
df_spark_nycgov = spark.read \
                    .option("header", "true") \
                    .schema(schema_nycgov) \
                    .csv('fhvhv_tripdata_2021-06_nycGovVersion.csv')

In [7]:
df_spark_github \
    .repartition(12) \
    .write.parquet('./data_fhvhv/github/pq/', mode='overwrite')
    
df_spark_nycgov \
    .repartition(12) \
    .write.parquet('./data_fhvhv/nycgov/pq/', mode='overwrite')

# Answer 2:
The average size of the Parquet file is approximately `24MB` as can be seen in the image below.

# Question 3:

In [6]:
df_github_q3 = df_spark_github \
                    .withColumn('pickup_date', F.to_date(df_spark_github.pickup_datetime)) \
                    .withColumn('dropoff_date', F.to_date(df_spark_github.dropoff_datetime))
                    
df_github_q3.where(df_github_q3.pickup_date == datetime(2021, 6, 15)).count()

452470

In [8]:
df_nycgov_q3 = df_spark_nycgov \
                    .withColumn('pickup_date', F.to_date(df_spark_nycgov.pickup_datetime)) \
                    .withColumn('dropoff_date', F.to_date(df_spark_nycgov.dropoff_datetime))
                    
df_nycgov_q3.where(df_nycgov_q3.pickup_date == datetime(2021, 6, 15)).count()

452470

# Answer 3:
The total amount of trips that have started on June 15 is `452,470`.

# Question 4:

In [9]:
df_github_q4 = df_spark_github \
                .withColumn('trip_length', F.col('dropoff_datetime') - F.col('pickup_datetime')) \
                .orderBy('trip_length', ascending=False)
df_github_q4 = df_github_q4.withColumn('trip_length_extr', F.regexp_extract('trip_length', '\d\s\d{2}:\d{2}',0)) \
                .drop('trip_length')

In [10]:
df_nycgov_q4 = df_spark_nycgov \
                .withColumn('trip_length', F.col('dropoff_datetime') - F.col('pickup_datetime')) \
                .orderBy('trip_length', ascending=False)
df_nycgov_q4 = df_nycgov_q4.withColumn('trip_length_extr', F.regexp_extract('trip_length', '\d\s\d{2}:\d{2}',0)) \
                .drop('trip_length')

In [11]:
def transform_dayInterval_to_hours(interval_str):
    day = int(interval_str.split(' ')[0])
    time_str = interval_str.split(' ')[1]

    hour = int(time_str.split(':')[0])
    minute = int(time_str.split(':')[1])
    
    trip_length_in_hours = str(round(day*24 + hour + minute/60, 2))
    return trip_length_in_hours

transform_dayInterval_to_hours_udf = F.udf(transform_dayInterval_to_hours, returnType=types.StringType())

In [12]:
df_github_q4 = df_github_q4 \
                .withColumn('trip_length_hours', transform_dayInterval_to_hours_udf(df_github_q4['trip_length_extr'])) \
                .drop('trip_length_extr')
                
df_github_q4.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|trip_length_hours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------+
|              B02872|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|      N|                B02872|            66.87|
|              B02765|2021-06-22 12:09:45|2021-06-23 13:42:44|         188|         198|      N|                B02765|            25.53|
|              B02879|2021-06-27 10:32:29|2021-06-28 06:31:20|          78|         169|      N|                B02879|            19.97|
|              B02800|2021-06-26 22:37:11|2021-06-27 16:49:01|         263|          36|      N|                  null|            18.18|
|              B02682|2021-06-23 2

In [13]:
df_nycgov_q4 = df_nycgov_q4.withColumn('trip_length_hours', transform_dayInterval_to_hours_udf(df_nycgov_q4['trip_length_extr'])) \
                .drop('trip_length_extr')
                
df_nycgov_q4.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|trip_length_hours|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+
|           HV0003|              B02872|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|      N|            66.87|
|           HV0003|              B02765|2021-06-22 12:09:45|2021-06-23 13:42:44|         188|         198|      N|            25.53|
|           HV0003|              B02879|2021-06-27 10:32:29|2021-06-28 06:31:20|          78|         169|      N|            19.97|
|           HV0004|              B02800|2021-06-26 22:37:11|2021-06-27 16:49:01|         263|          36|      N|            18.18|
|           HV0003|              B02682|2021-06-23 20:40:43|2021-06-2

# Answer 4:
The longest trip took `66.87 Hours`.

# Question 5:

In [14]:
spark.sparkContext.uiWebUrl

'http://host.docker.internal:4040'

# Answer 5:
Spark's UI runs on local port `4040`.

# Question 6:

In [15]:
schema_zones = types.StructType(
    [
        types.StructField('LocationID', types.IntegerType(), True), 
        types.StructField('Borough', types.StringType(), True), 
        types.StructField('Zone', types.StringType(), True), 
        types.StructField('service_zone', types.StringType(), True),
    ]
)

df_zones = spark.read.option('header','true').schema(schema_zones).csv('taxi_zone_lookup.csv')
df_zones = df_zones.withColumnRenamed('Zone','Zone_Name')
df_zones.createOrReplaceTempView("df_zones")


df_github_q6 = df_spark_github

df_github_q6 = df_github_q6 \
                .join(df_zones, df_github_q6.PULocationID == df_zones.LocationID) \
                .drop('LocationID') \
                .withColumnRenamed('Borough','Start_Borough') \
                .withColumnRenamed('Zone_Name', 'Start_Zone') \
                .withColumnRenamed('service_zone','Start_Service_Zone')
                
df_github_q6 = df_github_q6 \
                .join(df_zones, df_github_q6.DOLocationID == df_zones.LocationID) \
                .drop('LocationID') \
                .withColumnRenamed('Borough','End_Borough') \
                .withColumnRenamed('Zone_Name', 'End_Zone') \
                .withColumnRenamed('service_zone','End_Service_Zone')

df_github_q6 = df_github_q6 \
                .groupBy('Start_Zone') \
                .count() \
                .orderBy('count',ascending=False) \
                .withColumnRenamed('count','Number of Pickups per Zone')
                
df_github_q6.show()

+--------------------+--------------------------+
|          Start_Zone|Number of Pickups per Zone|
+--------------------+--------------------------+
| Crown Heights North|                    231279|
|        East Village|                    221244|
|         JFK Airport|                    188867|
|      Bushwick South|                    187929|
|       East New York|                    186780|
|TriBeCa/Civic Center|                    164344|
|   LaGuardia Airport|                    161596|
|            Union Sq|                    158937|
|        West Village|                    154698|
|             Astoria|                    152493|
|     Lower East Side|                    151020|
|        East Chelsea|                    147673|
|Central Harlem North|                    146402|
|Williamsburg (Nor...|                    143683|
|          Park Slope|                    143594|
|  Stuyvesant Heights|                    141427|
|        Clinton East|                    139611|


In [16]:

df_nycgov_q6 = df_spark_nycgov

df_nycgov_q6 = df_nycgov_q6 \
                .join(df_zones, df_nycgov_q6.PULocationID == df_zones.LocationID) \
                .drop('LocationID') \
                .withColumnRenamed('Borough','Start_Borough') \
                .withColumnRenamed('Zone_Name', 'Start_Zone') \
                .withColumnRenamed('service_zone','Start_Service_Zone')
                
df_nycgov_q6 = df_nycgov_q6 \
                .join(df_zones, df_nycgov_q6.DOLocationID == df_zones.LocationID) \
                .drop('LocationID') \
                .withColumnRenamed('Borough','End_Borough') \
                .withColumnRenamed('Zone_Name', 'End_Zone') \
                .withColumnRenamed('service_zone','End_Service_Zone')

df_nycgov_q6 = df_nycgov_q6 \
                .groupBy('Start_Zone') \
                .count() \
                .orderBy('count',ascending=False) \
                .withColumnRenamed('count','Number of Pickups per Zone')
                
df_nycgov_q6.show()

+--------------------+--------------------------+
|          Start_Zone|Number of Pickups per Zone|
+--------------------+--------------------------+
| Crown Heights North|                    231279|
|        East Village|                    221244|
|         JFK Airport|                    188867|
|      Bushwick South|                    187929|
|       East New York|                    186780|
|TriBeCa/Civic Center|                    164344|
|   LaGuardia Airport|                    161596|
|            Union Sq|                    158937|
|        West Village|                    154698|
|             Astoria|                    152493|
|     Lower East Side|                    151020|
|        East Chelsea|                    147673|
|Central Harlem North|                    146402|
|Williamsburg (Nor...|                    143683|
|          Park Slope|                    143594|
|  Stuyvesant Heights|                    141427|
|        Clinton East|                    139611|


# Answer 6
The zone with the most pickups is `Crown Heights North` with `231279` trips starting in this location zone.

In [17]:
spark.stop()