# Load parquet file

In [35]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
import pandas as pd

In [1]:
#!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-06 14:43:04--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240306T144304Z&X-Amz-Expires=300&X-Amz-Signature=761147051d6d782a86deeb64a792b8e1a297f3c9ed986af7a16d16ab71469b0e&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-06 14:43:04--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6

In [2]:
#!gzip -d fhv_tripdata_2019-10.csv.gz

In [36]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [5]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [6]:
df_pd = pd.read_csv('fhv_tripdata_2019-10.csv')

In [7]:
df_pd.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [8]:
spark.createDataFrame(df_pd).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', DoubleType(), True), StructField('DOlocationID', DoubleType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [9]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [25]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [34]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PUlocationID|DOlocationID|SR_Flag|affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|              B00021|2019-10-01 00:00:4

In [28]:
# Trim unneccessary spaces
df = df \
    .withColumn('dispatching_base_num', F.trim('dispatching_base_num')) \
    .withColumn('Affiliated_base_number', F.trim('Affiliated_base_number'))

In [29]:
# Rename colums
df = df \
    .withColumnRenamed('dropOff_datetime', 'dropoff_datetime') \
    .withColumnRenamed('Affiliated_base_number', 'affiliated_base_number')

In [30]:
# Replace #N/A by NULL
df = df.replace("#N/A", None)

In [33]:
df \
    .repartition(6) \
    .write.parquet('/home/huely/data/pq/fhv_homework', mode='overwrite')

                                                                                

# Read parquet file

In [39]:
df_fhv = spark.read.parquet('/home/huely/data/pq/fhv_homework/')

In [42]:
df_fhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- affiliated_base_number: string (nullable = true)



In [40]:
df_fhv.createOrReplaceTempView('fhv')

In [50]:
# Question 3: How many taxi trips were there on the 15th of October?
df_result = spark.sql("""
    SELECT
        date_trunc('day', pickup_datetime) AS date,
        COUNT(1) AS record_count
    FROM fhv 
    WHERE
        date_trunc('day', pickup_datetime) == "2019-10-15 00:00:00"
    GROUP BY
        1
""")

In [51]:
df_result.show()

+-------------------+------------+
|               date|record_count|
+-------------------+------------+
|2019-10-15 00:00:00|       62610|
+-------------------+------------+



In [63]:
# Question 4:
# Longest trip for each day
# What is the length of the longest trip in the dataset in hours?

df_result2 = spark.sql("""
    SELECT
        MAX(TIMESTAMPDIFF(HOUR, pickup_datetime, dropoff_datetime)) AS longest_trip
    FROM
        fhv
    
""").show()

+------------+
|longest_trip|
+------------+
|      631152|
+------------+



# JOIN ZONE

In [65]:
df_zones = spark.read.parquet('zones')

In [68]:
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [76]:
df_zones.createOrReplaceTempView('zones')

In [85]:
df_join = spark.sql("""
    SELECT 
        Zone AS pickup_zone,
        COUNT(1) as num_trips
    FROM fhv 
    JOIN zones
    ON fhv.PULocationID = zones.LocationID
    GROUP BY 1
    ORDER BY 2
    LIMIT 3

""").show()

+--------------------+---------+
|         pickup_zone|num_trips|
+--------------------+---------+
|         Jamaica Bay|        1|
|Governor's Island...|        2|
| Green-Wood Cemetery|        5|
+--------------------+---------+

