In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/03 19:05:03 WARN Utils: Your hostname, ubuntu-focal resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/03/03 19:05:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/03 19:05:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/03 19:05:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.version

'3.3.2'

In [4]:
import pandas as pd
from pyspark.sql import types

In [5]:
df = pd.read_csv('fhv_tripdata_2019-10.csv')

In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1897493 entries, 0 to 1897492
Data columns (total 7 columns):
 #   Column                  Dtype  
---  ------                  -----  
 0   dispatching_base_num    object 
 1   pickup_datetime         object 
 2   dropOff_datetime        object 
 3   PUlocationID            float64
 4   DOlocationID            float64
 5   SR_Flag                 float64
 6   Affiliated_base_number  object 
dtypes: float64(3), object(4)
memory usage: 101.3+ MB


In [7]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [8]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [9]:
df = df.repartition(6)

In [10]:
df.write.parquet('fhv/2019/10/', mode='overwrite')

[Stage 0:>                                                          (0 + 4) / 4]

24/03/03 19:06:16 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 6
CSV file: file:///home/vagrant/notebooks/fhv_tripdata_2019-10.csv


                                                                                

In [11]:
df = spark.read.parquet('fhv/2019/10/')

In [12]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [13]:
from pyspark.sql import functions as F

In [14]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropOff_date', F.to_date(df.dropOff_datetime)) \
    .select('pickup_date', 'dropOff_date', 'PULocationID', 'DOLocationID') \
    .filter(F.to_date(df.pickup_datetime) == '2019-10-15') \
    .count() 

                                                                                

62610

In [15]:
df.registerTempTable('fhv_trips_data')



In [16]:
spark.sql("""
SELECT
    pickup_datetime,
    dropOff_datetime,
    TIMESTAMPDIFF(HOUR, pickup_datetime, dropOff_datetime) as trip_length
FROM
    fhv_trips_data
order by trip_length desc

""").show()

                                                                                

+-------------------+-------------------+-----------+
|    pickup_datetime|   dropOff_datetime|trip_length|
+-------------------+-------------------+-----------+
|2019-10-11 18:00:00|2091-10-11 18:30:00|     631152|
|2019-10-28 09:00:00|2091-10-28 09:30:00|     631152|
|2019-10-31 23:46:33|2029-11-01 00:13:00|      87672|
|2019-10-01 21:43:42|2027-10-01 21:45:23|      70128|
|2019-10-17 14:00:00|2020-10-18 00:00:00|       8794|
|2019-10-26 21:26:00|2020-10-26 21:36:00|       8784|
|2019-10-30 12:30:04|2019-12-30 13:02:08|       1464|
|2019-10-25 07:04:57|2019-12-08 07:54:33|       1056|
|2019-10-25 07:04:57|2019-12-08 07:21:11|       1056|
|2019-10-01 13:47:17|2019-11-03 15:20:28|        793|
|2019-10-01 13:41:00|2019-11-03 14:58:51|        793|
|2019-10-01 07:21:12|2019-11-03 08:44:21|        793|
|2019-10-01 12:31:09|2019-11-03 12:54:27|        792|
|2019-10-01 07:21:19|2019-11-03 07:28:09|        792|
|2019-10-01 12:08:29|2019-11-03 12:24:41|        792|
|2019-10-01 14:41:15|2019-11

In [17]:
df_zones = spark.read.parquet('zones/')

In [18]:
df_zones.registerTempTable('zones')

In [19]:
spark.sql("""
select * from zones
""").show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [20]:
df_join = df.join(df_zones, df.PULocationID == df_zones.LocationID)

In [21]:
df_join.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------+---------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|LocationID|  Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------+---------+--------------------+------------+
|              B02947|2019-10-05 06:30:00|2019-10-05 06:34:11|         167|          69|   null|       167|    Bronx|  Morrisania/Melrose|   Boro Zone|
|              B01437|2019-10-06 16:05:09|2019-10-06 16:10:16|         264|         134|   null|       264|  Unknown|                  NV|         N/A|
|     B01467         |2019-10-02 07:15:02|2019-10-02 07:49:28|         193|         157|   null|       193|   Queens|Queensbridge/Rave...|   Boro Zone|
|              B00459|2019-10-06 17:58:34|2019-10-06 18:05:28|         264|          76|

In [22]:
df_join.registerTempTable('df_join')

In [24]:
spark.sql("""
select
    Zone,
    count(1) as pickup_frequency
from
    df_join
group by PULocationID, Zone
order by pickup_frequency asc 
""").show()



+--------------------+----------------+
|                Zone|pickup_frequency|
+--------------------+----------------+
|         Jamaica Bay|               1|
|Governor's Island...|               2|
| Green-Wood Cemetery|               5|
|       Broad Channel|               8|
|     Highbridge Park|              14|
|        Battery Park|              15|
|Saint Michaels Ce...|              23|
|Breezy Point/Fort...|              25|
|Marine Park/Floyd...|              26|
|        Astoria Park|              29|
|    Inwood Hill Park|              39|
|       Willets Point|              47|
|Forest Park/Highl...|              53|
|  Brooklyn Navy Yard|              57|
|        Crotona Park|              62|
|        Country Club|              77|
|     Freshkills Park|              89|
|       Prospect Park|              98|
|     Columbia Street|             105|
|  South Williamsburg|             110|
+--------------------+----------------+
only showing top 20 rows



                                                                                

In [34]:
spark.sql("""
select
    PULocationID as pickup_zone_id,
    Zone,
    count(1) as pickup_frequency
from
    fhv_trips_data
left join zones on PULocationID = LocationID
group by pickup_zone_id, Zone
order by pickup_frequency asc 
""").show()



+--------------+--------------------+----------------+
|pickup_zone_id|                Zone|pickup_frequency|
+--------------+--------------------+----------------+
|             2|         Jamaica Bay|               1|
|           105|Governor's Island...|               2|
|           111| Green-Wood Cemetery|               5|
|            30|       Broad Channel|               8|
|           120|     Highbridge Park|              14|
|            12|        Battery Park|              15|
|           207|Saint Michaels Ce...|              23|
|            27|Breezy Point/Fort...|              25|
|           154|Marine Park/Floyd...|              26|
|             8|        Astoria Park|              29|
|           128|    Inwood Hill Park|              39|
|           253|       Willets Point|              47|
|            96|Forest Park/Highl...|              53|
|            34|  Brooklyn Navy Yard|              57|
|            59|        Crotona Park|              62|
|         

                                                                                