In [45]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
SparkSession

pyspark.sql.session.SparkSession

In [6]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

df.show()

                                                                                

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [28]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz


--2024-03-01 14:30:09--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 192.30.255.112
Connecting to github.com (github.com)|192.30.255.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240301%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240301T143010Z&X-Amz-Expires=300&X-Amz-Signature=7c5066e862e0d6936ec87273ea4a8977f6d2d3ce40a47862d308c13ceaaf6808&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-01 14:30:10--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6

In [29]:
!wc -l fhv_tripdata_2019-10.csv.gz


62958 fhv_tripdata_2019-10.csv.gz


In [41]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType

schema = StructType([
    StructField('dispatching_base_num', StringType(), True), 
    StructField('pickup_datetime', TimestampType(), True), 
    StructField('dropOff_datetime', TimestampType(), True), 
    StructField('PUlocationID', LongType(), True), 
    StructField('DOlocationID', LongType(), True), 
    StructField('SR_Flag', LongType(), True), 
    StructField('Affiliated_base_number', StringType(), True)
])


In [42]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [43]:
print(df.schema)

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', LongType(), True), StructField('DOlocationID', LongType(), True), StructField('SR_Flag', LongType(), True), StructField('Affiliated_base_number', StringType(), True)])


In [24]:
from pyspark.sql.functions import countDistinct

df.select(countDistinct("SR_Flag")).collect()

                                                                                

[Row(count(DISTINCT SR_Flag)=0)]

In [27]:
df.repartition(20).write.parquet("fhv_tripdata_2023-03")

                                                                                

In [None]:
# Question 2

In [46]:
df.repartition(6).write.parquet("fhv_tripdata_2019-10")

                                                                                

In [47]:
# Question 3: Longest trip in dataset in hours

In [57]:
from pyspark.sql.functions import col

df.withColumn('trip_len', col("dropOff_datetime") - col("pickup_datetime")).agg({"trip_len": "max"}).collect()

                                                                                

[Row(max(trip_len)=datetime.timedelta(days=26298, seconds=1800))]

In [58]:
26298*24

631152

In [None]:
# Question 5. Spark UI port 
# 4040

In [None]:
# Question 6. Least frequent pickup location zone 


In [60]:
zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [66]:
df.groupBy('PUlocationID').count().join(zones, on=df.PUlocationID==zones.LocationID).sort("count").show()

[Stage 33:>                                                         (0 + 1) / 1]

+------------+-----+----------+-------------+--------------------+------------+
|PUlocationID|count|LocationID|      Borough|                Zone|service_zone|
+------------+-----+----------+-------------+--------------------+------------+
|           2|    1|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         105|    2|       105|    Manhattan|Governor's Island...| Yellow Zone|
|         111|    5|       111|     Brooklyn| Green-Wood Cemetery|   Boro Zone|
|          30|    8|        30|       Queens|       Broad Channel|   Boro Zone|
|         120|   14|       120|    Manhattan|     Highbridge Park|   Boro Zone|
|          12|   15|        12|    Manhattan|        Battery Park| Yellow Zone|
|         207|   23|       207|       Queens|Saint Michaels Ce...|   Boro Zone|
|          27|   25|        27|       Queens|Breezy Point/Fort...|   Boro Zone|
|         154|   26|       154|     Brooklyn|Marine Park/Floyd...|   Boro Zone|
|           8|   29|         8|       Qu

                                                                                