In [28]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd 

In [29]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [7]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-02-26 21:41:24--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240226%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240226T214124Z&X-Amz-Expires=300&X-Amz-Signature=28d724538cfeb4d5ecabe33606dbca7118e67c8c96a9f1fcbac0c82b955e25e4&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-02-26 21:41:24--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [81]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [31]:
df_pandas= pd.read_csv('fhv_tripdata_2019-10.csv.gz')

In [33]:
pd.DataFrame.iteritems = pd.DataFrame.items

In [41]:
from pyspark.sql import types

In [46]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [93]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [83]:
df = df.repartition(6)
df = df.coalesce(6)
df.write.parquet('data/fhv/2019/10/', mode='overwrite')

                                                                                

In [87]:
df = spark.read.schema(schema).parquet('data/fhv/2019/10/')

In [68]:
from pyspark.sql import functions as F

In [98]:
pickup_datetime_column = 'pickup_datetime'
fifteen = df.filter(F.col(pickup_datetime_column).substr(1, 10) == '2019-10-15')

In [99]:
fifteen.count()

                                                                                

62610

In [102]:
df.registerTempTable('trips')



In [103]:
df_longest = spark.sql("""
    SELECT 
        *,
        UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime) AS trip_duration_seconds
    FROM 
        trips
    ORDER BY 
        trip_duration_seconds DESC
    LIMIT 1
""")

In [107]:
df_longest.registerTempTable('longest')

In [112]:
longest_time = spark.sql("""
    SELECT 
        *,
        UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime)  / 3600 AS trip_duration
    FROM 
        longest
""")

In [113]:
longest_time.show()

[Stage 46:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+---------------------+-------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|trip_duration_seconds|trip_duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+---------------------+-------------+
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   null|                B02832|           2272149000|3.842529462E9|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+---------------------+-------------+



                                                                                

In [114]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-02-26 22:53:42--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240226%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240226T225342Z&X-Amz-Expires=300&X-Amz-Signature=8b01209d6b965f7292170bc97240b07e108c98d49d64c0d76ebdf5718463b909&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-02-26 22:53:42--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [117]:
zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [120]:
df_join = df.join(zones, df.PULocationID == zones.LocationID)

In [130]:
df_join.registerTempTable('joined')

In [132]:
df_join.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True), StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [136]:
mostPickups = spark.sql("""SELECT Zone, COUNT(*) AS PickupCount FROM joined 
GROUP BY Zone
ORDER BY PickupCount ASC""")

In [137]:
mostPickups.show()

[Stage 64:>                                                         (0 + 1) / 1]

+--------------------+-----------+
|                Zone|PickupCount|
+--------------------+-----------+
|         Jamaica Bay|          1|
|Governor's Island...|          2|
| Green-Wood Cemetery|          5|
|       Broad Channel|          8|
|     Highbridge Park|         14|
|        Battery Park|         15|
|Saint Michaels Ce...|         23|
|Breezy Point/Fort...|         25|
|Marine Park/Floyd...|         26|
|        Astoria Park|         29|
|    Inwood Hill Park|         39|
|       Willets Point|         47|
|Forest Park/Highl...|         53|
|  Brooklyn Navy Yard|         57|
|        Crotona Park|         62|
|        Country Club|         77|
|     Freshkills Park|         89|
|       Prospect Park|         98|
|     Columbia Street|        105|
|  South Williamsburg|        110|
+--------------------+-----------+
only showing top 20 rows



                                                                                