In [15]:
import pyspark
from pyspark.sql import SparkSession

In [16]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

QUESTION 2

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

In [None]:
!gzip -d fhvhv_tripdata_2021-06.csv.gz

In [None]:
!head -n 1001 fhvhv_tripdata_2021-06.csv > head_2021-06.csv

In [None]:
import pandas as pd
df_pandas = pd.read_csv('head_2021-06.csv')
df_pandas.dtypes

In [None]:
from pyspark.sql import types

In [None]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [None]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [None]:
df.schema

In [None]:
df = df.repartition(12)

In [None]:
df.write.parquet('fhvhv/2021/06/', mode="overwrite")

In [None]:
%ll -h fhvhv/2021/06

QUESTION 3

In [17]:
df = spark.read.parquet('fhvhv/2021/06/')

In [23]:
df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02617|2021-06-04 16:50:34|2021-06-04 17:01:18|         118|         109|      N|                B02617|
|              B02875|2021-06-02 22:28:45|2021-06-02 22:37:28|         163|          79|      N|                B02875|
|              B02871|2021-06-03 11:47:48|2021-06-03 11:52:23|         231|          13|      N|                B02871|
|              B02888|2021-06-03 08:45:25|2021-06-03 09:00:12|           9|          92|      N|                B02888|
|              B02510|2021-06-05 09:50:43|2021-06-05 10:06:53|          14|         133|      N|                  null|
+--------------------+------------------

In [19]:
df.registerTempTable('fhvhv_202106')

In [22]:
spark.sql("""
SELECT COUNT(1)
FROM fhvhv_202106
WHERE pickup_datetime >= '2021-06-15 00:00:00' AND pickup_datetime <= '2021-06-15 23:59:59'
""").show()

[Stage 20:>                                                         (0 + 4) / 4]

+--------+
|count(1)|
+--------+
|  452470|
+--------+



                                                                                

QUESTION 4

In [33]:
import pyspark.sql.functions as F
from pyspark.sql import types

In [75]:
def cal_duration(pickup_datetime, dropoff_datetime):
    return (dropoff_datetime - pickup_datetime).seconds / 3600

cal_dur = F.udf(cal_duration, types.FloatType())

In [76]:
df_dur = df.withColumn("duration", cal_dur(df["pickup_datetime"], df["dropoff_datetime"]))
df_dur.show(5)
df_dur.registerTempTable("fhvhv_202106_dur")

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|  duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+
|              B02617|2021-06-04 16:50:34|2021-06-04 17:01:18|         118|         109|      N|                B02617|0.17888889|
|              B02875|2021-06-02 22:28:45|2021-06-02 22:37:28|         163|          79|      N|                B02875|0.14527778|
|              B02871|2021-06-03 11:47:48|2021-06-03 11:52:23|         231|          13|      N|                B02871|0.07638889|
|              B02888|2021-06-03 08:45:25|2021-06-03 09:00:12|           9|          92|      N|                B02888|0.24638888|
|              B02510|2021-06-05 09:50:43|2021-06-05 10:06:53|          14|        

In [79]:
spark.sql("""
SELECT duration
FROM fhvhv_202106_dur
ORDER BY DURATION DESC
LIMIT 10
""").show()



+---------+
| duration|
+---------+
|19.980833|
| 18.87889|
|18.197222|
|16.466944|
|14.268888|
|13.909722|
|    11.67|
|11.365833|
|10.984445|
|  10.2675|
+---------+



                                                                                

QUESTION 6

In [83]:
df_zones = spark.read.parquet('zones')
df_zones.registerTempTable('zones')

In [84]:
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [109]:
spark.sql("""
SELECT COUNT(1) AS count, MIN(zones.Zone) as zone
FROM fhvhv_202106 JOIN zones ON fhvhv_202106.PULocationID = zones.LocationID
GROUP BY fhvhv_202106.PULocationID
ORDER BY count DESC
LIMIT 1
""").show()

[Stage 83:>                                                         (0 + 4) / 4]

+------+-------------------+
| count|               zone|
+------+-------------------+
|231279|Crown Heights North|
+------+-------------------+



