## Question 1. Install Spark and PySpark

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('homework') \
    .getOrCreate()

22/02/27 20:26:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark.version

'3.0.3'

## Question 2. HVFHW February 2021

In [4]:
from pyspark.sql import types

In [6]:
# download the file
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-23 16:23:58--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.43.60
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.43.60|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-23 16:24:13 (49.0 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [5]:
# define the schema
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [6]:
# read the file
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [7]:
# perform repartition
df = df.repartition(24)

In [14]:
# save parquet file
df.write.parquet('fhvhv/2021/02/')

                                                                                

In [17]:
# check the size of folder fhvhv/2021/02/
#!ls fhvhv/2021/02/ -lh
!du -sh fhvhv/2021/02/

210M	fhvhv/2021/02/


## Question 3. Count records

In [36]:
# get the total number of rows in February file
!wc -l fhvhv_tripdata_2021-02.csv

11613943 fhvhv_tripdata_2021-02.csv


### Using pyspark data frame

In [8]:
from pyspark.sql import functions as F

In [11]:
# create 2 new columns: pickup_date and dropoff_date
df = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) 

In [47]:
# count number of trips that started on February 15
df \
    .select('pickup_date') \
    .filter(df.pickup_date == '2021-02-15') \
    .count()

                                                                                

367170

### Using SQL

In [15]:
# create a temp table from the data frame
df.registerTempTable('table_df')

In [40]:
# count number of trips that started on February 15
spark.sql("""
SELECT COUNT(*)
FROM table_df
WHERE pickup_date = '2021-02-15'
;
""").show()



+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

## Question 4. Longest trip for each day

### Using pyspark data frame

In [12]:
# calculate the duration for each trip
df = df \
    .withColumn("trip_duration", (F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime")))

In [37]:
df.show(6)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+------------+-------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|dropoff_date|trip_duration|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+------------+-------------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null| 2021-02-01|  2021-02-01|          629|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null| 2021-02-01|  2021-02-01|          998|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null| 2021-02-01|  2021-02-01|          589|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23

In [13]:
# trip starting on which day was the longest?
df \
    .groupBy('pickup_date') \
    .max('trip_duration') \
    .orderBy(F.col('max(trip_duration)').desc()) \
    .show(31)



+-----------+------------------+
|pickup_date|max(trip_duration)|
+-----------+------------------+
| 2021-02-11|             75540|
| 2021-02-17|             57221|
| 2021-02-20|             44039|
| 2021-02-03|             40653|
| 2021-02-19|             37577|
| 2021-02-25|             35010|
| 2021-02-18|             34612|
| 2021-02-10|             34169|
| 2021-02-21|             32223|
| 2021-02-09|             32087|
| 2021-02-06|             31447|
| 2021-02-02|             30913|
| 2021-02-05|             30511|
| 2021-02-12|             30148|
| 2021-02-08|             30106|
| 2021-02-14|             29777|
| 2021-02-22|             28278|
| 2021-02-27|             27170|
| 2021-02-15|             25874|
| 2021-02-04|             25592|
| 2021-02-16|             25441|
| 2021-02-23|             24439|
| 2021-02-26|             24422|
| 2021-02-24|             23669|
| 2021-02-13|             21442|
| 2021-02-01|             20638|
| 2021-02-28|             19850|
| 2021-02-



### Using SQL

In [50]:
# calculate the duration for each trip
df_q42 = spark.sql("""
SELECT 
    pickup_date
    , MAX((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))) AS trip_length
FROM table_df
GROUP BY 1
ORDER BY trip_length DESC
;
""")

In [51]:
df_q42.printSchema()

root
 |-- pickup_date: date (nullable = true)
 |-- trip_length: long (nullable = true)



In [53]:
df_q42.show(31)



+-----------+-----------+
|pickup_date|trip_length|
+-----------+-----------+
| 2021-02-11|      75540|
| 2021-02-17|      57221|
| 2021-02-20|      44039|
| 2021-02-03|      40653|
| 2021-02-19|      37577|
| 2021-02-25|      35010|
| 2021-02-18|      34612|
| 2021-02-10|      34169|
| 2021-02-21|      32223|
| 2021-02-09|      32087|
| 2021-02-06|      31447|
| 2021-02-02|      30913|
| 2021-02-05|      30511|
| 2021-02-12|      30148|
| 2021-02-08|      30106|
| 2021-02-14|      29777|
| 2021-02-22|      28278|
| 2021-02-27|      27170|
| 2021-02-15|      25874|
| 2021-02-04|      25592|
| 2021-02-16|      25441|
| 2021-02-23|      24439|
| 2021-02-26|      24422|
| 2021-02-24|      23669|
| 2021-02-13|      21442|
| 2021-02-01|      20638|
| 2021-02-28|      19850|
| 2021-02-07|      17672|
+-----------+-----------+



                                                                                

## Question 5. Most frequent dispatching_base_num

### Using pyspark data frame

In [23]:
df \
    .groupBy('dispatching_base_num') \
    .count() \
    .orderBy(F.col('count').desc()) \
    .show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



                                                                                

### Using SQL

In [16]:
# calculate the most frequent dispatching_base_num  
df_q5 = spark.sql("""
SELECT 
    dispatching_base_num
    , COUNT(dispatching_base_num)
FROM table_df
GROUP BY 1
ORDER BY 2 DESC
;
""")

In [17]:
df_q5.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- count(dispatching_base_num): long (nullable = false)



In [19]:
df_q5.show(31)



+--------------------+---------------------------+
|dispatching_base_num|count(dispatching_base_num)|
+--------------------+---------------------------+
|              B02510|                    3233664|
|              B02764|                     965568|
|              B02872|                     882689|
|              B02875|                     685390|
|              B02765|                     559768|
|              B02869|                     429720|
|              B02887|                     322331|
|              B02871|                     312364|
|              B02864|                     311603|
|              B02866|                     311089|
|              B02878|                     305185|
|              B02682|                     303255|
|              B02617|                     274510|
|              B02883|                     251617|
|              B02884|                     244963|
|              B02882|                     232173|
|              B02876|         



## Question 6. Most common locations pair  

In [24]:
# read zones 
df_zones = spark.read.parquet('zones/')

In [25]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [26]:
# create a temp table from df_zones
df_zones.registerTempTable('table_zones')

In [32]:
# calculate the most frequent dispatching_base_num
df_q6 = spark.sql("""
SELECT 
    table_df.PULocationID
    , table_df.DOLocationID
    , CONCAT(COALESCE(zpu.Zone, 'Unknown'), ' / ', COALESCE(zdo.Zone, 'Unknown')) AS most_common_locations 
    , COUNT(*)
FROM table_df
LEFT JOIN table_zones AS zpu ON table_df.PULocationID = zpu.LocationID
LEFT JOIN table_zones AS zdo ON table_df.DOLocationID = zdo.LocationID
GROUP BY 1, 2, 3
ORDER BY 4 DESC
;
""")

In [34]:
df_q6.show()



+------------+------------+---------------------+--------+
|PULocationID|DOLocationID|most_common_locations|count(1)|
+------------+------------+---------------------+--------+
|          76|          76| East New York / E...|   45041|
|          26|          26| Borough Park / Bo...|   37329|
|          39|          39|  Canarsie / Canarsie|   28026|
|          61|          61| Crown Heights Nor...|   25976|
|          14|          14| Bay Ridge / Bay R...|   17934|
|           7|           7|    Astoria / Astoria|   14688|
|         129|         129| Jackson Heights /...|   14688|
|          42|          42| Central Harlem No...|   14481|
|          37|          37| Bushwick South / ...|   14424|
|          89|          89| Flatbush/Ditmas P...|   13976|
|         216|         216| South Ozone Park ...|   13716|
|          35|          35| Brownsville / Bro...|   12829|
|         132|         265|     JFK Airport / NA|   12542|
|         188|          61| Prospect-Lefferts...|   1181

                                                                                