In [7]:
import pyspark
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("homework05")\
    .getOrCreate()

## Question 1: What is the spark version?

In [4]:
spark.version

'3.5.1'

## ---

In [1]:
# download data

! wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-06 17:10:28--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240306T171028Z&X-Amz-Expires=300&X-Amz-Signature=584168f9457ac2bcac0a0bc4cdc5ab5b5ecd53569779670f3687d1b3e1512d63&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-06 17:10:28--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6

In [3]:
# unzip file
!gunzip fhv_tripdata_2019-10.csv.gz

In [4]:
!wc -l fhv_tripdata_2019-10.csv

1897494 fhv_tripdata_2019-10.csv


In [10]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [11]:
df.head(n=5)

[Row(dispatching_base_num='B00009', pickup_datetime='2019-10-01 00:23:00', dropOff_datetime='2019-10-01 00:35:00', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime='2019-10-01 00:11:29', dropOff_datetime='2019-10-01 00:13:22', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime='2019-10-01 00:11:43', dropOff_datetime='2019-10-01 00:37:20', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime='2019-10-01 00:56:29', dropOff_datetime='2019-10-01 00:57:47', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime='2019-10-01 00:23:09', dropOff_datetime='2019-10-01 00:28:27', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_num

In [12]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [13]:
# read data with define schema
from pyspark.sql import types

schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

df = spark.read\
    .option("header","true")\
    .schema(schema)\
    .csv('fhv_tripdata_2019-10.csv')

In [14]:
df.head(5)

[Row(dispatching_base_num='B00009', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 23), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 35), PULocationID=264, DOLocationID=264, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 29), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 13, 22), PULocationID=264, DOLocationID=264, SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 43), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 37, 20), PULocationID=264, DOLocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 56, 29), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 57, 47), PULocationID=264, DOLocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=

In [16]:
# repartition the df into 6 partitions as requested
df = df.repartition(numPartitions=6)

df.write.parquet('data/homework5/')


                                                                                

In [18]:
# check files sizes
! ls -l data/homework5/

total 37640
-rw-r--r-- 1 andre andre       0 Mar  6 17:24 _SUCCESS
-rw-r--r-- 1 andre andre 6424988 Mar  6 17:24 part-00000-9fcac57f-9abd-4582-bce9-76f302afde09-c000.snappy.parquet
-rw-r--r-- 1 andre andre 6417333 Mar  6 17:24 part-00001-9fcac57f-9abd-4582-bce9-76f302afde09-c000.snappy.parquet
-rw-r--r-- 1 andre andre 6418178 Mar  6 17:24 part-00002-9fcac57f-9abd-4582-bce9-76f302afde09-c000.snappy.parquet
-rw-r--r-- 1 andre andre 6417268 Mar  6 17:24 part-00003-9fcac57f-9abd-4582-bce9-76f302afde09-c000.snappy.parquet
-rw-r--r-- 1 andre andre 6421083 Mar  6 17:24 part-00004-9fcac57f-9abd-4582-bce9-76f302afde09-c000.snappy.parquet
-rw-r--r-- 1 andre andre 6438857 Mar  6 17:24 part-00005-9fcac57f-9abd-4582-bce9-76f302afde09-c000.snappy.parquet


## Question 2 : The average size of the parquet
6 MB

## ---

## Question3: Count records on October 15th

In [19]:
from pyspark.sql import functions as F

# register the table
df.createOrReplaceTempView("fhv_data")

In [20]:
# Adding the date column to the spark dataframe
df = df.withColumn('pickup_date',F.to_date(df.pickup_datetime))

df.select('pickup_date').filter(df.pickup_date == '2019-10-15').count()

                                                                                

62610

## ---

In [22]:
from pyspark.sql.functions import col

# Calculate the duration of each trip in seconds
df = df.withColumn("trip_duration_seconds", df.dropoff_datetime-df.pickup_datetime)

# Convert the duration from seconds to hours
df = df.withColumn("trip_duration_hours", df.trip_duration_seconds / 3600)

df_result=spark.sql(
    '''

        SELECT MAX(trip_duration_hours) AS longest_trip_hours
        FROM (
            SELECT TIMESTAMPDIFF(SECOND, pickup_datetime, dropoff_datetime) / 3600.0 AS trip_duration_hours
            FROM fhv_data
        ) AS durations;
        
    '''
)

## Question 4: The Longest trip

In [23]:
df_result.show()



+------------------+
|longest_trip_hours|
+------------------+
|     631152.500000|
+------------------+



                                                                                

## ---

In [24]:
# download zone lookup csv 
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-06 17:56:23--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240306T175623Z&X-Amz-Expires=300&X-Amz-Signature=cd4e4b3533a01371c5c8d40c54046c5ab083127e804ff7dee7aedeafc039c2f4&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-06 17:56:23--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62

In [25]:
# read into dataframe
zone_lookup = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

zone_lookup.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [28]:
# temp tables
df.registerTempTable('trips_data')
zone_lookup.registerTempTable('zone_lookup')

## Question 6 : Least frequent pickup location zone

In [29]:
# Using the zone lookup data and the FHV October 2019 data, 
# what is the name of the LEAST frequent pickup location Zone?

# column from trips_data : PUlocationID
# column from zone_lookup : LocationID
spark.sql("""
SELECT 
    count(1),
    zone_lookup.Zone 
FROM 
    trips_data
LEFT JOIN 
    zone_lookup
ON
    trips_data.PUlocationID = zone_lookup.LocationID 
GROUP BY 
    zone_lookup.Zone
ORDER BY 
    count(1) ASC
""").show()



+--------+--------------------+
|count(1)|                Zone|
+--------+--------------------+
|       1|         Jamaica Bay|
|       2|Governor's Island...|
|       5| Green-Wood Cemetery|
|       8|       Broad Channel|
|      14|     Highbridge Park|
|      15|        Battery Park|
|      23|Saint Michaels Ce...|
|      25|Breezy Point/Fort...|
|      26|Marine Park/Floyd...|
|      29|        Astoria Park|
|      39|    Inwood Hill Park|
|      47|       Willets Point|
|      53|Forest Park/Highl...|
|      57|  Brooklyn Navy Yard|
|      62|        Crotona Park|
|      77|        Country Club|
|      89|     Freshkills Park|
|      98|       Prospect Park|
|     105|     Columbia Street|
|     110|  South Williamsburg|
+--------+--------------------+
only showing top 20 rows



                                                                                

## ======================================