In [17]:
import pyspark
import pandas as pd
import pyspark.sql.types as types
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp

## Question 1: Install Spark and PySpark

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
spark_version = spark.version

# Print the version
print("Apache Spark Version:", spark_version)


Apache Spark Version: 3.5.0


## Question 2: FHV October 2019

In [3]:
!wget  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-03 19:36:45--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T183646Z&X-Amz-Expires=300&X-Amz-Signature=6862d51d7031e34049476220af3be6b2aff4acf3b83b8cf44eb750f094f1a7e7&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-03 19:36:45--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

 12900K .......... .......... .......... .......... .......... 68% 35,9M 0s
 12950K .......... .......... .......... .......... .......... 68% 37,8M 0s
 13000K .......... .......... .......... .......... .......... 68% 35,1M 0s
 13050K .......... .......... .......... .......... .......... 69% 30,2M 0s
 13100K .......... .......... .......... .......... .......... 69% 33,1M 0s
 13150K .......... .......... .......... .......... .......... 69% 38,4M 0s
 13200K .......... .......... .......... .......... .......... 70% 37,3M 0s
 13250K .......... .......... .......... .......... .......... 70% 33,5M 0s
 13300K .......... .......... .......... .......... .......... 70% 31,6M 0s
 13350K .......... .......... .......... .......... .......... 70% 34,5M 0s
 13400K .......... .......... .......... .......... .......... 71% 31,5M 0s
 13450K .......... .......... .......... .......... .......... 71% 31,2M 0s
 13500K .......... .......... .......... .......... .......... 71% 31,0M 0s
 13550K ....

In [4]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [5]:
df.schema

StructType(List(StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropOff_datetime,StringType,true),StructField(PUlocationID,StringType,true),StructField(DOlocationID,StringType,true),StructField(SR_Flag,StringType,true),StructField(Affiliated_base_number,StringType,true)))

In [6]:
from pyspark.sql import types
schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.StringType(), True),
    types.StructField("dropOff_datetime", types.StringType(), True),
    types.StructField("PUlocationID", types.StringType(), True),
    types.StructField("DOlocationID", types.StringType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])


In [7]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [8]:
df = df.repartition(6)

In [10]:
df.write.parquet('fhv/2019/10/')

In [12]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B01437|2019-10-18 16:58:08|2019-10-18 17:11:20|         264|         130|   NULL|                B01437|
|              B03048|2019-10-11 09:20:25|2019-10-11 09:37:15|         264|         264|   NULL|                B03048|
|              B01328|2019-10-08 09:02:00|2019-10-08 09:16:00|          78|         242|   NULL|                B02534|
|              B00310|2019-10-17 14:52:53|2019-10-17 15:07:03|         264|          60|   NULL|                B00310|
|              B01239|2019-10-04 12:15:42|2019-10-04 12:42:54|         264|         247|   NULL|                B03127|
|              B01300|2019-10-30 12:06:2

In [11]:
df

DataFrame[dispatching_base_num: string, pickup_datetime: string, dropOff_datetime: string, PUlocationID: string, DOlocationID: string, SR_Flag: string, Affiliated_base_number: string]

## Question 3 : Count records

In [13]:
df = df.withColumn("pickup_datetime", df["pickup_datetime"].cast("timestamp"))

In [15]:
trips_october_15 = df.filter(df["pickup_datetime"].cast("date") == "2019-10-15")


nombre_de_trajets = trips_october_15.count()

print("Number of trips on the 15th of october:", nombre_de_trajets)

Number of trips on the 15th of october: 62610


##  Question 4 : Longest trip for each day

In [18]:
df = df.withColumn("pickup_datetime", unix_timestamp(df["pickup_datetime"]).cast("timestamp"))
df = df.withColumn("dropOff_datetime", unix_timestamp(df["dropOff_datetime"]).cast("timestamp"))

df = df.withColumn("trip_duration_seconds", (col("dropOff_datetime").cast("long") - col("pickup_datetime").cast("long")))

longest_trip_hours = df.agg({"trip_duration_seconds": "max"}).collect()[0][0] / 3600

print("The length of the longest trip in hours :", longest_trip_hours)

The length of the longest trip in hours : 631152.5


## Question 5 : User Interface

###### Spark's user interface :  4040 

## Question 6 : Least frequent pickup location zone

In [19]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-03 23:35:53--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T223554Z&X-Amz-Expires=300&X-Amz-Signature=d67e24fe66f43e5b5a8cbcbbd057c4fe29b36503798b8e752f75eb6a4de3b972&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-03 23:35:53--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [20]:
zone_df = spark.read.csv("taxi_zone_lookup.csv", header=True)  
zone_df.createOrReplaceTempView("zone_data")


zone_df.printSchema()


zone_df.show()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath

In [22]:
joined_df = df.join(zone_df, df.PUlocationID == zone_df.LocationID, "left")

zone_counts = joined_df.groupBy("Zone").count()


least_frequent_zone = zone_counts.orderBy("count").select("Zone").first()[0]

print("The name of the LEAST frequent pickup location Zone :", least_frequent_zone)

The name of the LEAST frequent pickup location Zone : Jamaica Bay
