In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

### Question 1

In [2]:
print(pyspark.__version__)

3.2.1


### Question 2. HVFHW February 2021

In [4]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField('hvfhs_license_num',types.StringType(), True),
    types.StructField('dispatching_base_num',types.StringType(), True),
    types.StructField('pickup_datetime',types.TimestampType(), True),
    types.StructField('dropoff_datetime',types.TimestampType(), True),
    types.StructField('PULocationID',types.IntegerType(), True),
    types.StructField('DOLocationID',types.IntegerType(), True),
    types.StructField('SR_Flag',types.StringType(), True)
])

In [6]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [8]:
df = df.repartition(24)

In [9]:
df.write.parquet("fhvhv/2021/02/")

In [10]:
pwd

'/home/jovyan'

In [11]:
cd /home/jovyan/fhvhv/2021/02

/home/jovyan/fhvhv/2021/02


In [12]:
ls -lrth

total 209M
-rw-r--r-- 1 jovyan users 8.7M Mar  1 09:50 part-00004-9dd08e71-9433-4108-8fcc-d8bbbe9ccb7b-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 8.7M Mar  1 09:50 part-00003-9dd08e71-9433-4108-8fcc-d8bbbe9ccb7b-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 8.7M Mar  1 09:50 part-00001-9dd08e71-9433-4108-8fcc-d8bbbe9ccb7b-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 8.7M Mar  1 09:50 part-00000-9dd08e71-9433-4108-8fcc-d8bbbe9ccb7b-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 8.7M Mar  1 09:50 part-00002-9dd08e71-9433-4108-8fcc-d8bbbe9ccb7b-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 8.7M Mar  1 09:50 part-00005-9dd08e71-9433-4108-8fcc-d8bbbe9ccb7b-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 8.7M Mar  1 09:50 part-00007-9dd08e71-9433-4108-8fcc-d8bbbe9ccb7b-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 8.7M Mar  1 09:50 part-00008-9dd08e71-9433-4108-8fcc-d8bbbe9ccb7b-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 8.7M Mar  1 09:50 part-00009-9dd08e71-9433-4108-8fcc-d8bbbe9ccb7b-c

### Question 3. Count records
How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [15]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [19]:
df.select("pickup_datetime").head(3)

[Row(pickup_datetime=datetime.datetime(2021, 2, 6, 1, 32, 18)),
 Row(pickup_datetime=datetime.datetime(2021, 2, 4, 21, 24)),
 Row(pickup_datetime=datetime.datetime(2021, 2, 4, 15, 58, 5))]

In [27]:
from pyspark.sql.functions import to_date

df.withColumn("pickup_date", 
    to_date("pickup_datetime","yyyy-MM-dd")) \
    .show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|           HV0003|              B02765|2021-02-05 15:13:57|2021-02-05 15:30:25|          71|          89|   null| 2021-02-05|
|           HV0003|              B02866|2021-02-04 11:40:47|2021-02-04 11:46:35|          20|         174|   null| 2021-02-04|
|           HV0003|              B02872|2021-02-02 13:21:54|2021-02-02 13:36:47|          78|         247|   null| 2021-02-02|
|           HV0003|              B02872|2021-02-03 22:05:16|2021-02-03 22:14:52|          75|         262|   null| 2021-02-03|
|           HV0003|              B02888|2021-02-02 16:41:27|2021-02-02 17:06:27|         138|          88|   nu

In [28]:
df.show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02765|2021-02-05 15:13:57|2021-02-05 15:30:25|          71|          89|   null|
|           HV0003|              B02866|2021-02-04 11:40:47|2021-02-04 11:46:35|          20|         174|   null|
|           HV0003|              B02872|2021-02-02 13:21:54|2021-02-02 13:36:47|          78|         247|   null|
|           HV0003|              B02872|2021-02-03 22:05:16|2021-02-03 22:14:52|          75|         262|   null|
|           HV0003|              B02888|2021-02-02 16:41:27|2021-02-02 17:06:27|         138|          88|   null|
+-----------------+--------------------+-------------------+-------------------+

In [34]:
df_with_date = df.withColumn("pickup_date", 
    to_date("pickup_datetime","yyyy-MM-dd"))

In [35]:
df_with_date.show(4)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|           HV0003|              B02765|2021-02-05 15:13:57|2021-02-05 15:30:25|          71|          89|   null| 2021-02-05|
|           HV0003|              B02866|2021-02-04 11:40:47|2021-02-04 11:46:35|          20|         174|   null| 2021-02-04|
|           HV0003|              B02872|2021-02-02 13:21:54|2021-02-02 13:36:47|          78|         247|   null| 2021-02-02|
|           HV0003|              B02872|2021-02-03 22:05:16|2021-02-03 22:14:52|          75|         262|   null| 2021-02-03|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-----

In [41]:
df_with_date.filter(df_with_date.pickup_date=="2021-02-15").count()

367170

### Question 4. Longest trip for each day
Now calculate the duration for each trip.

Trip starting on which day was the longest?

In [47]:
from pyspark.sql.functions import to_timestamp

df_with_trip_duration = df_with_date.withColumn("trip_duration", to_timestamp("dropoff_datetime").cast("long") - to_timestamp("pickup_datetime").cast("long"))

In [48]:
df_with_trip_duration.show(4)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+-------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|trip_duration|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+-------------+
|           HV0003|              B02765|2021-02-05 15:13:57|2021-02-05 15:30:25|          71|          89|   null| 2021-02-05|          988|
|           HV0003|              B02866|2021-02-04 11:40:47|2021-02-04 11:46:35|          20|         174|   null| 2021-02-04|          348|
|           HV0003|              B02872|2021-02-02 13:21:54|2021-02-02 13:36:47|          78|         247|   null| 2021-02-02|          893|
|           HV0003|              B02872|2021-02-03 22:05:16|2021-02-03 22:14:52|          75|         262|   null| 2021-02-03|          576|
+------------

In [63]:
df_with_trip_duration.registerTempTable('duration_table')



In [66]:
spark.sql("""
SELECT 
    *
FROM
    duration_table
ORDER BY
    trip_duration desc
""").show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+-------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|trip_duration|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+-------------+
|           HV0005|              B02510|2021-02-11 13:40:44|2021-02-12 10:39:44|         247|          41|   null| 2021-02-11|        75540|
|           HV0004|              B02800|2021-02-17 15:54:53|2021-02-18 07:48:34|         242|         254|   null| 2021-02-17|        57221|
|           HV0004|              B02800|2021-02-20 12:08:15|2021-02-21 00:22:14|         188|          55|   null| 2021-02-20|        44039|
|           HV0003|              B02864|2021-02-03 20:24:25|2021-02-04 07:41:58|          51|         147|   null| 2021-02-03|        40653|
|           H

### Question 5. Most frequent dispatching_base_num

In [68]:
spark.sql("""
SELECT 
    dispatching_base_num, count(*) as total_cnts
FROM
    duration_table
GROUP BY
    dispatching_base_num
ORDER BY
    total_cnts desc
""").show(5)

+--------------------+----------+
|dispatching_base_num|total_cnts|
+--------------------+----------+
|              B02510|   3233664|
|              B02764|    965568|
|              B02872|    882689|
|              B02875|    685390|
|              B02765|    559768|
+--------------------+----------+
only showing top 5 rows



### Question 6. Most common locations pair

In [69]:
spark.sql("""
SELECT 
    PULocationID, DOLocationID, count(*) as total_cnts
FROM
    duration_table
GROUP BY
    PULocationID, DOLocationID
ORDER BY
    total_cnts desc
""").show(5)

+------------+------------+----------+
|PULocationID|DOLocationID|total_cnts|
+------------+------------+----------+
|          76|          76|     45041|
|          26|          26|     37329|
|          39|          39|     28026|
|          61|          61|     25976|
|          14|          14|     17934|
+------------+------------+----------+
only showing top 5 rows



In [70]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [75]:
df_zones.filter(df_zones.LocationID==76).show()

+----------+--------+-------------+------------+
|LocationID| Borough|         Zone|service_zone|
+----------+--------+-------------+------------+
|        76|Brooklyn|East New York|   Boro Zone|
+----------+--------+-------------+------------+

