## Instantiate PySpark

In [13]:
import pyspark
from pyspark.sql import SparkSession

In [14]:
pyspark.__version__

'3.5.1'

In [15]:
pyspark.__file__

'/Users/ernestsalim/Devs/Zoomcamp/Week_5/spark-3.5.1-bin-hadoop3/python/pyspark/__init__.py'

In [18]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

In [30]:
spark.version

'3.5.1'

In [27]:
!wc -l fhv_tripdata_2019-10.csv

 1897494 fhv_tripdata_2019-10.csv


In [28]:
df = spark.read \
    .option("header", "true") \
    .csv("./fhv_tripdata_2019-10.csv")

In [31]:
df.head(5)

[Row(dispatching_base_num='B00009', pickup_datetime='2019-10-01 00:23:00', dropOff_datetime='2019-10-01 00:35:00', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime='2019-10-01 00:11:29', dropOff_datetime='2019-10-01 00:13:22', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime='2019-10-01 00:11:43', dropOff_datetime='2019-10-01 00:37:20', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime='2019-10-01 00:56:29', dropOff_datetime='2019-10-01 00:57:47', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime='2019-10-01 00:23:09', dropOff_datetime='2019-10-01 00:28:27', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_num

In [32]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

## Defining Schema

In [35]:
!head -n 1001 fhv_tripdata_2019-10.csv > head.csv

In [37]:
!wc -l head.csv

    1001 head.csv


In [38]:
import pandas as pd

In [40]:
df_pd = pd.read_csv('head.csv')

In [41]:
df_pd.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [43]:
spark.createDataFrame(df_pd).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', DoubleType(), True), StructField('DOlocationID', DoubleType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [44]:
from pyspark.sql import types

In [45]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [52]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("./fhv_tripdata_2019-10.csv")

In [53]:
df.head(5)

[Row(dispatching_base_num='B00009', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 23), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 35), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 29), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 13, 22), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 43), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 37, 20), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 56, 29), dropoff_datetime=datetime.datetime(2019, 10, 1, 0, 57, 47), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=

## Partitioning and Parquetize

In [54]:
df = df.repartition(6) # Lazy Command

In [55]:
df.write.parquet('fhv/2019/10')

                                                                                

## Querying

In [56]:
df = spark.read.parquet('fhv/2019/10')

In [57]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [69]:
from pyspark.sql import functions as F

In [76]:
test_df = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PUlocationID', 'DOlocationID') 


In [77]:
test_df.show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PUlocationID|DOlocationID|
+-----------+------------+------------+------------+
| 2019-10-01|  2019-10-01|        NULL|        NULL|
| 2019-10-04|  2019-10-04|         205|         121|
| 2019-10-02|  2019-10-02|         264|         127|
| 2019-10-04|  2019-10-04|         264|         119|
| 2019-10-01|  2019-10-01|         264|         264|
| 2019-10-04|  2019-10-04|         264|         186|
| 2019-10-02|  2019-10-02|         264|          35|
| 2019-10-03|  2019-10-03|         264|          69|
| 2019-10-02|  2019-10-02|         264|         264|
| 2019-10-02|  2019-10-02|         264|         174|
| 2019-10-04|  2019-10-04|         264|         264|
| 2019-10-03|  2019-10-03|         264|          39|
| 2019-10-04|  2019-10-04|         264|         264|
| 2019-10-02|  2019-10-02|         264|          63|
| 2019-10-02|  2019-10-02|         264|         247|
| 2019-10-02|  2019-10-02|         102|       

In [83]:
dates = ('2019-10-15', '2019-10-15')

test_df.filter(test_df.pickup_date.between(*dates)).count()

                                                                                

62610

In [98]:
test_df = df \
    .withColumn("time_difference", df.dropoff_datetime.cast("long") - df.pickup_datetime.cast("long")) \
    .withColumn("time_difference_hours", F.expr("time_difference / (1000 * 60 * 60)")) \
    .select('time_difference', 'time_difference_hours', 'pickup_datetime', 'dropoff_datetime', 'PUlocationID', 'DOlocationID') 

In [99]:
test_df.show()

+---------------+---------------------+-------------------+-------------------+------------+------------+
|time_difference|time_difference_hours|    pickup_datetime|   dropoff_datetime|PUlocationID|DOlocationID|
+---------------+---------------------+-------------------+-------------------+------------+------------+
|          10234| 0.002842777777777778|2019-10-01 09:29:00|2019-10-01 12:19:34|        NULL|        NULL|
|            282| 7.833333333333333E-5|2019-10-04 10:45:00|2019-10-04 10:49:42|         205|         121|
|           1120| 3.111111111111111E-4|2019-10-02 01:58:19|2019-10-02 02:16:59|         264|         127|
|            930| 2.583333333333333...|2019-10-04 00:13:52|2019-10-04 00:29:22|         264|         119|
|           1847| 5.130555555555556E-4|2019-10-01 20:41:56|2019-10-01 21:12:43|         264|         264|
|           1384| 3.844444444444444...|2019-10-04 05:51:53|2019-10-04 06:14:57|         264|         186|
|            390| 1.083333333333333...|2019-10

In [100]:
test_df.select(F.max("time_difference_hours")) \
    .show()

+--------------------------+
|max(time_difference_hours)|
+--------------------------+
|                  631.1525|
+--------------------------+



In [101]:
zones = spark.read \
    .option("header", "true") \
    .csv("./taxi_zone_lookup.csv")

In [102]:
zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [104]:
joined = df.join(zones, F.col('PUlocationID') == F.col('LocationID'), how='inner')

In [110]:
zones_pickup_count = joined.groupby('Zone').agg(F.count("*"))

In [116]:
min_pickup = zones_pickup_count.agg(F.min("count(1)")).collect()[0][0]

res = zones_pickup_count.filter(F.col("count(1)") == min_pickup).first()

                                                                                

In [118]:
res

Row(Zone='Jamaica Bay', count(1)=1)