In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

## Question 1

In [3]:
spark.version

'3.1.2'

## Question 2

In [35]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-02-28 20:27:37--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 20.207.73.82
Connecting to github.com (github.com)|20.207.73.82|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230228%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230228T145737Z&X-Amz-Expires=300&X-Amz-Signature=6a13fa2c570c606b26e9542172bb624c93e8e7bea764ad7433eadeee1b0e3b1e&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-02-28 20:27:37--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

In [36]:
!gzip -d fhvhv_tripdata_2021-06.csv.gz

fhvhv_tripdata_2021-06.csv already exists -- do you wish to overwrite (y or n)? ^C


In [4]:
!ls -lh fhvhv_tripdata_2021-06.csv

-rw-r--r--  1 rahulmandviya  staff   878M Dec 20 05:43 fhvhv_tripdata_2021-06.csv


In [5]:
!wc -l fhvhv_tripdata_2021-06.csv

 14961893 fhvhv_tripdata_2021-06.csv


In [6]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [7]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

df = df.repartition(12)

df.write.parquet('data/pq/fhvhv/2021/06/', mode='overwrite')

In [8]:
ls -lh data/pq/fhvhv/2021/06/

total 566784
-rw-r--r--  1 rahulmandviya  staff     0B Feb 28 22:03 _SUCCESS
-rw-r--r--  1 rahulmandviya  staff    22M Feb 28 22:03 part-00000-ae9c3056-7e9d-43aa-8a74-0088deee328f-c000.snappy.parquet
-rw-r--r--  1 rahulmandviya  staff    22M Feb 28 22:03 part-00001-ae9c3056-7e9d-43aa-8a74-0088deee328f-c000.snappy.parquet
-rw-r--r--  1 rahulmandviya  staff    22M Feb 28 22:03 part-00002-ae9c3056-7e9d-43aa-8a74-0088deee328f-c000.snappy.parquet
-rw-r--r--  1 rahulmandviya  staff    22M Feb 28 22:03 part-00003-ae9c3056-7e9d-43aa-8a74-0088deee328f-c000.snappy.parquet
-rw-r--r--  1 rahulmandviya  staff    22M Feb 28 22:03 part-00004-ae9c3056-7e9d-43aa-8a74-0088deee328f-c000.snappy.parquet
-rw-r--r--  1 rahulmandviya  staff    22M Feb 28 22:03 part-00005-ae9c3056-7e9d-43aa-8a74-0088deee328f-c000.snappy.parquet
-rw-r--r--  1 rahulmandviya  staff    22M Feb 28 22:03 part-00006-ae9c3056-7e9d-43aa-8a74-0088deee328f-c000.snappy.parquet
-rw-r--r--  1 rahulmandviya  staff    22M Feb 28 22:0

## Question 3

In [9]:
df = spark.read.parquet('data/pq/fhvhv/2021/06/')

In [10]:
from pyspark.sql.functions import *

In [11]:
df \
    .withColumn('pickup_date', to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-06-15'") \
    .count()

452470

## Question 4

In [12]:
df.createOrReplaceTempView('fhvhv_2021_06')

In [13]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [14]:
spark.sql("""
SELECT
    pickup_datetime,
    dropoff_datetime,
    ROUND((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600, 2) AS durationInHours
FROM
    fhvhv_2021_06
ORDER BY
    3 DESC
""").show()

+-------------------+-------------------+---------------+
|    pickup_datetime|   dropoff_datetime|durationInHours|
+-------------------+-------------------+---------------+
|2021-06-25 13:55:41|2021-06-28 08:48:25|          66.88|
|2021-06-22 12:09:45|2021-06-23 13:42:44|          25.55|
|2021-06-27 10:32:29|2021-06-28 06:31:20|          19.98|
|2021-06-26 22:37:11|2021-06-27 16:49:01|           18.2|
|2021-06-23 20:40:43|2021-06-24 13:08:44|          16.47|
|2021-06-23 22:03:31|2021-06-24 12:19:39|          14.27|
|2021-06-24 23:11:00|2021-06-25 13:05:35|          13.91|
|2021-06-04 20:56:02|2021-06-05 08:36:14|          11.67|
|2021-06-27 07:45:19|2021-06-27 19:07:16|          11.37|
|2021-06-20 17:05:12|2021-06-21 04:04:16|          10.98|
|2021-06-01 12:25:29|2021-06-01 22:41:32|          10.27|
|2021-06-01 12:01:46|2021-06-01 21:59:45|           9.97|
|2021-06-28 13:13:59|2021-06-28 23:11:58|           9.97|
|2021-06-27 03:52:14|2021-06-27 13:30:30|           9.64|
|2021-06-18 08

## Question 6

In [21]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2023-02-28 20:25:14--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.84.3, 52.216.222.56, 52.216.41.136, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.84.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2023-02-28 20:25:15 (55.4 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [15]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [16]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [17]:
df_zones.write.parquet('zones', mode='overwrite')

In [18]:
df_joined = df.join(df_zones, df.PULocationID == df_zones.LocationID)