In [32]:
import pyspark

In [33]:
from pyspark.sql import SparkSession

In [34]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [35]:
print(f'Spark Version : {spark.version}')

Spark Version : 3.5.1


In [36]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [37]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [38]:
import pandas as pd

In [39]:
df_pandas = pd.read_csv('fhv_tripdata_2019-10.csv',nrows=1001) 

In [40]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [41]:
from pyspark.sql import types
from pyspark.sql.functions import *

In [42]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [43]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [44]:
df = df.repartition(6)

In [45]:
df.write.mode('overwrite').parquet('parquet')

In [46]:
df = spark.read.parquet('parquet')

In [47]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [48]:
from pyspark.sql import functions as F

In [49]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02735|2019-10-03 22:35:28|2019-10-03 22:41:01|         264|         259|   NULL|                B02682|
|              B01145|2019-10-01 10:55:00|2019-10-01 10:58:18|         264|         174|   NULL|                B02864|
|              B02569|2019-10-04 07:01:37|2019-10-04 07:37:50|         193|         100|   NULL|                B02759|
|              B00727|2019-10-02 14:10:31|2019-10-02 14:22:31|         264|         264|   NULL|                B00727|
|              B00628|2019-10-03 01:28:47|2019-10-03 02:02:31|         261|         191|   NULL|                B00628|
|              B01051|2019-10-01 11:52:2

In [50]:
df.registerTempTable('trips_data')



In [51]:
spark.sql("""
select count (*) as total_trips
from trips_data
where pickup_datetime between  '2019-10-15 00:00:00' AND '2019-10-15 23:59:59'
""").show()

+-----------+
|total_trips|
+-----------+
|      62610|
+-----------+



In [52]:
df.withColumn('dif_in_sec',unix_timestamp("dropoff_datetime") - unix_timestamp('pickup_datetime')) \
    .withColumn('dif_in_hours',col('dif_in_sec')/3600) \
    .sort('dif_in_sec',ascending=False) \
    .show(truncate=False)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+------------------+
|dispatching_base_num|pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|dif_in_sec|dif_in_hours      |
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+------------------+
|B02832              |2019-10-28 09:00:00|2091-10-28 09:30:00|264         |264         |NULL   |B02832                |2272149000|631152.5          |
|B02832              |2019-10-11 18:00:00|2091-10-11 18:30:00|264         |264         |NULL   |B02832                |2272149000|631152.5          |
|B02416              |2019-10-31 23:46:33|2029-11-01 00:13:00|NULL        |NULL        |NULL   |B02416                |315620787 |87672.44083333333 |
|B00746              |2019-10-01 21:43:42|2027-10-01 21:45:23|159         |264         |NULL   |B007

In [53]:
df_zones =spark.read \
    .option("header","true") \
    .csv('taxi+_zone_lookup.csv')

In [54]:
df_zones.show(truncate=False)

+----------+-------------+-----------------------+------------+
|LocationID|Borough      |Zone                   |service_zone|
+----------+-------------+-----------------------+------------+
|1         |EWR          |Newark Airport         |EWR         |
|2         |Queens       |Jamaica Bay            |Boro Zone   |
|3         |Bronx        |Allerton/Pelham Gardens|Boro Zone   |
|4         |Manhattan    |Alphabet City          |Yellow Zone |
|5         |Staten Island|Arden Heights          |Boro Zone   |
|6         |Staten Island|Arrochar/Fort Wadsworth|Boro Zone   |
|7         |Queens       |Astoria                |Boro Zone   |
|8         |Queens       |Astoria Park           |Boro Zone   |
|9         |Queens       |Auburndale             |Boro Zone   |
|10        |Queens       |Baisley Park           |Boro Zone   |
|11        |Brooklyn     |Bath Beach             |Boro Zone   |
|12        |Manhattan    |Battery Park           |Yellow Zone |
|13        |Manhattan    |Battery Park C

In [55]:
df_zones.createOrReplaceTempView("zones")

In [56]:
spark.sql("""
select 
    zones.LocationID,
    zones.Zone,
    count(1) as total_trips
    from trips_data
    inner join zones
    on trips_data.PULocationID = zones.LocationID
    group by zones.LocationID,
    zones.Zone
    order by total_trips
    """
).show()

+----------+--------------------+-----------+
|LocationID|                Zone|total_trips|
+----------+--------------------+-----------+
|         2|         Jamaica Bay|          1|
|       105|Governor's Island...|          2|
|       111| Green-Wood Cemetery|          5|
|        30|       Broad Channel|          8|
|       120|     Highbridge Park|         14|
|        12|        Battery Park|         15|
|       207|Saint Michaels Ce...|         23|
|        27|Breezy Point/Fort...|         25|
|       154|Marine Park/Floyd...|         26|
|         8|        Astoria Park|         29|
|       128|    Inwood Hill Park|         39|
|       253|       Willets Point|         47|
|        96|Forest Park/Highl...|         53|
|        34|  Brooklyn Navy Yard|         57|
|        59|        Crotona Park|         62|
|        58|        Country Club|         77|
|        99|     Freshkills Park|         89|
|       190|       Prospect Park|         98|
|        54|     Columbia Street| 