In [1]:
import pyspark #Main library
from pyspark.sql import SparkSession #Need to start spart
from pyspark.sql import types #Used to set data types in columns
from pyspark.sql import functions as F #Predifined functions
from pyspark.sql.types import StructType, StructField, StringType, IntegerType #Data types

In [2]:
import pandas as pd #import pandas

In [3]:
#Create session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [4]:
#Create a df importing without pyspark with the data type
df_head = spark.read.csv('fhv_tripdata_2019-10.csv', header=True, inferSchema=True)

In [5]:
#Check if the data types are okay
df_head.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [6]:
#Can be set de data types
schema = types.StructType([
	types.StructField('Affiliated_base_number',types.StringType(), True),
	types.StructField('dispatching_base_num',types.StringType(), True),
	types.StructField('pickup_datetime',types.TimestampType(), True),
	types.StructField('dropOff_datetime',types.TimestampType(), True),
	types.StructField('PULocationID', types.IntegerType(), True),
	types.StructField('DOLocationID', types.IntegerType(), True),
	types.StructField('SR_Flag', types.StringType(), True)
])

In [7]:
#Create a df setting the schema withwith the data type
df_fhv = spark.read.option("header", "true").schema(schema).csv('fhv_tripdata_2019-10.csv')

In [8]:
#Fix column names
df_fhv = df_fhv.withColumnRenamed('dropOff_datetime', 'dropoff_datetime')

In [9]:
#Check if columns names are okay now
df_fhv.columns

['Affiliated_base_number',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [10]:
#Show dataframe before partition
df_fhv.show(5)

+----------------------+--------------------+-------------------+----------------+------------+------------+-------+
|Affiliated_base_number|dispatching_base_num|    pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+----------------------+--------------------+-------------------+----------------+------------+------------+-------+
|                B00009| 2019-10-01 00:23:00|2019-10-01 00:35:00|            null|         264|        null| B00009|
|                B00013| 2019-10-01 00:11:29|2019-10-01 00:13:22|            null|         264|        null| B00013|
|                B00014| 2019-10-01 00:11:43|2019-10-01 00:37:20|            null|         264|        null| B00014|
|                B00014| 2019-10-01 00:56:29|2019-10-01 00:57:47|            null|         264|        null| B00014|
|                B00014| 2019-10-01 00:23:09|2019-10-01 00:28:27|            null|         264|        null| B00014|
+----------------------+--------------------+-------------------

In [11]:
#Create dataframe partitionedin 6
df_fhv_partitioned = df_fhv.repartition(6)

In [12]:
df_fhv_partitioned.write.parquet("fhv/19/10")

In [13]:
df_fhv.printSchema()

root
 |-- Affiliated_base_number: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [14]:
#Read fhv dataset form partioned folder
df_fhv = spark.read.parquet('fhv/19/10/*')

In [15]:
#Register fhv dataframe as tables
df_fhv.createOrReplaceTempView('fhv')

In [16]:
#Use SQL query to count records
spark.sql("""
SELECT
    count(1)
FROM
    fhv
WHERE
    pickup_datetime >= '2019-10-15 00:00:00' AND pickup_datetime <= '2019-10-15 23:59:59'
""").show()

+--------+
|count(1)|
+--------+
|   62295|
+--------+



In [17]:
#Use SQL query to return the longest trip in hours  
spark.sql("""
SELECT 
    MAX(DATEDIFF(dropoff_datetime, pickup_datetime) * 24) AS max_duration_trips_hours
FROM
    fhv
WHERE
    pickup_datetime IS NOT NULL
    AND dropoff_datetime IS NOT NULL;
""").show()


+------------------------+
|max_duration_trips_hours|
+------------------------+
|                    null|
+------------------------+



In [18]:
#Use SQL query to count records
spark.sql("""
SELECT
    count(1)
FROM
    fhv
WHERE
    dropoff_datetime IS NOT NULL;
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [19]:
#Load zones
df_zones = spark.read.parquet('zones/')

In [20]:
#Check zones
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [21]:
#Check zones schema
df_zones.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [23]:
#Join df with both trips with zones, in df_zones the zone id is named zone and in df_zones is LocationID
df_join = df_fhv.join(df_zones, df_fhv.PULocationID == df_zones.LocationID)

In [24]:
#See results
df_join.show(5)

+----------------------+--------------------+-------------------+----------------+------------+------------+-------+----------+---------+--------------------+------------+
|Affiliated_base_number|dispatching_base_num|    pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|SR_Flag|LocationID|  Borough|                Zone|service_zone|
+----------------------+--------------------+-------------------+----------------+------------+------------+-------+----------+---------+--------------------+------------+
|                B00647| 2019-10-02 18:00:55|2019-10-02 18:08:43|            null|         182|        null| B00647|       182|    Bronx|         Parkchester|   Boro Zone|
|                B03160| 2019-10-02 08:26:00|2019-10-02 08:58:00|            null|         107|        null| B02870|       107|Manhattan|            Gramercy| Yellow Zone|
|                B02719| 2019-10-01 11:32:22|2019-10-01 11:53:01|            null|         235|        null| B02719|       235|    Bronx|Uni

In [25]:
#Check schema
df_join.printSchema()

root
 |-- Affiliated_base_number: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [26]:
#Register fhv dataframe as tables
df_join.createOrReplaceTempView('join')

In [30]:
#Use SQL query to count records by zone
spark.sql("""
SELECT
    Zone,
    COUNT(*) AS pickup_count
FROM
    join
WHERE
    pickup_datetime IS NOT NULL
GROUP BY
    Zone
ORDER BY
    pickup_count
LIMIT 10;
""").head(10)

[Row(Zone="Governor's Island/Ellis Island/Liberty Island", pickup_count=4),
 Row(Zone='Rikers Island', pickup_count=4),
 Row(Zone='Jamaica Bay', pickup_count=14),
 Row(Zone='Battery Park', pickup_count=35),
 Row(Zone='Broad Channel', pickup_count=36),
 Row(Zone='Breezy Point/Fort Tilden/Riis Beach', pickup_count=54),
 Row(Zone='Astoria Park', pickup_count=73),
 Row(Zone='Freshkills Park', pickup_count=132),
 Row(Zone='Saint Michaels Cemetery/Woodside', pickup_count=162),
 Row(Zone='Green-Wood Cemetery', pickup_count=261)]

In [31]:
#Save the results as parquet file dropping locationID and zone
df_join.drop('PULocationID', 'LocationID').write.parquet('tmp/fhv-zones')