## Question 2

In [None]:
!wget --no-check-certificate https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

In [2]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types, functions

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [None]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [None]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [None]:
df = df.repartition(24)

In [None]:
df.write.parquet('fhvhv/2021/02/')

In [4]:
df = spark.read.parquet('fhvhv/2021/02/')

## Question 3

In [15]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02866|2021-02-02 11:23:35|2021-02-02 11:44:17|          10|          95|   null|
|           HV0003|              B02764|2021-02-01 14:47:14|2021-02-01 14:57:06|         180|         216|   null|
|           HV0003|              B02887|2021-02-02 08:34:10|2021-02-02 08:54:28|          14|          45|   null|
|           HV0005|              B02510|2021-02-03 20:28:12|2021-02-03 20:41:19|          33|         189|   null|
|           HV0003|              B02867|2021-02-05 09:16:17|2021-02-05 09:37:13|          78|         243|   null|
|           HV0003|              B02882|2021-02-01 16:18:34|2021-02-01 16:34:30|

In [20]:
df.where(functions.to_date(df.pickup_datetime) == '2021-02-15').count()

367170

## Question 4

In [25]:
df.withColumn('duration', functions.col("dropoff_datetime").cast("long") - functions.col('pickup_datetime').cast("long")).sort(functions.col("duration").desc()).show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+--------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|duration|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+--------+
|           HV0005|              B02510|2021-02-11 13:40:44|2021-02-12 10:39:44|         247|          41|   null|   75540|
|           HV0004|              B02800|2021-02-17 15:54:53|2021-02-18 07:48:34|         242|         254|   null|   57221|
|           HV0004|              B02800|2021-02-20 12:08:15|2021-02-21 00:22:14|         188|          55|   null|   44039|
|           HV0003|              B02864|2021-02-03 20:24:25|2021-02-04 07:41:58|          51|         147|   null|   40653|
|           HV0003|              B02887|2021-02-19 23:17:44|2021-02-20 09:44:01|         210|         149|   null|   37577|
|       

## Question 5

In [16]:
df.groupby(df['dispatching_base_num']).count().sort(functions.col('count').desc()).show(1)

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
+--------------------+-------+
only showing top 1 row



In [5]:
df.registerTempTable("fhv_feb")

In [6]:
spark.sql("""
SELECT
    dispatching_base_num,
    count(*)
FROM
    fhv_feb
GROUP BY 
    dispatching_base_num
ORDER BY 
    2 desc
""").show()

+--------------------+--------+
|dispatching_base_num|count(1)|
+--------------------+--------+
|              B02510| 3233664|
|              B02764|  965568|
|              B02872|  882689|
|              B02875|  685390|
|              B02765|  559768|
|              B02869|  429720|
|              B02887|  322331|
|              B02871|  312364|
|              B02864|  311603|
|              B02866|  311089|
|              B02878|  305185|
|              B02682|  303255|
|              B02617|  274510|
|              B02883|  251617|
|              B02884|  244963|
|              B02882|  232173|
|              B02876|  215693|
|              B02879|  210137|
|              B02867|  200530|
|              B02877|  198938|
+--------------------+--------+
only showing top 20 rows



## Question 6

In [7]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [19]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [24]:
df_join1 = df.join(df_zones, df.PULocationID == df_zones.LocationID, 'left').select(df["*"],df_zones["Zone"]).withColumnRenamed("Zone","pickup_location")

In [27]:
df_join_all = df_join1.join(df_zones, df_join1.DOLocationID == df_zones.LocationID, 'left').select(df_join1["*"],df_zones["Zone"]).withColumnRenamed("Zone","dropoff_location")

In [38]:
df_join_all.groupby('pickup_location','dropoff_location').count().sort(functions.col('count').desc()).show(50)

+--------------------+--------------------+-----+
|     pickup_location|    dropoff_location|count|
+--------------------+--------------------+-----+
|       East New York|       East New York|45041|
|        Borough Park|        Borough Park|37329|
|            Canarsie|            Canarsie|28026|
| Crown Heights North| Crown Heights North|25976|
|           Bay Ridge|           Bay Ridge|17934|
|             Astoria|             Astoria|14688|
|     Jackson Heights|     Jackson Heights|14688|
|Central Harlem North|Central Harlem North|14481|
|      Bushwick South|      Bushwick South|14424|
|Flatbush/Ditmas Park|Flatbush/Ditmas Park|13976|
|    South Ozone Park|    South Ozone Park|13716|
|         Brownsville|         Brownsville|12829|
|         JFK Airport|                  NA|12542|
|Prospect-Lefferts...| Crown Heights North|11814|
|        Forest Hills|        Forest Hills|11548|
|      Bushwick North|      Bushwick South|11491|
|      Bushwick South|      Bushwick North|11487|


In [8]:
df_zones.registerTempTable("zones")

In [12]:
spark.sql(
    """
    with step1 as(
    SELECT
        f.*,
        case when z1.Zone == 'NA' then 'Unknown' else coalesce(z1.Zone, 'Unknown') end  as pickuplocation,
        case when z2.Zone == 'NA' then 'Unknown' else coalesce(z2.Zone, 'Unknown') end  as dropofflocation
    FROM
        fhv_feb f
        left outer join 
            zones as z1
            on f.PULocationID = z1.LocationID
        left outer join 
            zones as z2
            on f.DOLocationID = z2.LocationID
    )
    select 
        pickuplocation|| ' / ' ||dropofflocation,
        count(*)
    from 
        step1
    group by 
        1
    order by 
        2 desc
""").show()

+----------------------------------------------------+--------+
|concat(concat(pickuplocation,  / ), dropofflocation)|count(1)|
+----------------------------------------------------+--------+
|                                East New York / E...|   45041|
|                                Borough Park / Bo...|   37329|
|                                 Canarsie / Canarsie|   28026|
|                                Crown Heights Nor...|   25976|
|                                Bay Ridge / Bay R...|   17934|
|                                Jackson Heights /...|   14688|
|                                   Astoria / Astoria|   14688|
|                                Central Harlem No...|   14481|
|                                Bushwick South / ...|   14424|
|                                Flatbush/Ditmas P...|   13976|
|                                South Ozone Park ...|   13716|
|                                Brownsville / Bro...|   12829|
|                                JFK Air