In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/26 04:38:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
pyspark.__version__

'3.3.2'

In [11]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType

schema = StructType([
         StructField('dispatching_base_num', StringType(), True), 
         StructField('pickup_datetime', TimestampType(), True), 
         StructField('dropoff_datetime', TimestampType(), True), 
         StructField('PULocationID', IntegerType(), True), 
         StructField('DOLocationID', IntegerType(), True), 
         StructField('SR_Flag', StringType(), True), 
         StructField('Affiliated_base_number', StringType(), True)
])

In [12]:
curr_dir = '/home/jdtganding/Documents/data-engineering-zoomcamp/week_5_batch_processing'

df_csv = spark.read \
              .option('header','true') \
              .schema(schema) \
              .csv(f"{curr_dir}/data/fhv_tripdata_2021-06.csv")

In [13]:
df_csv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [14]:
df_csv.show(1)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
only showing top 1 row



In [15]:
df_csv.repartition(12).write.parquet(f"{curr_dir}/data/fhv")

                                                                                

In [16]:
df_csv.createOrReplaceTempView('fhv_06_2021')

In [18]:
spark.sql("""
SELECT 
    count(*) as taxi_trips_count
FROM 
    fhv_06_2021
WHERE 
    pickup_datetime >= '2021-06-15 00:00:00'
    AND pickup_datetime < '2021-06-16 00:00:00'
""").show()



+----------------+
|taxi_trips_count|
+----------------+
|          452470|
+----------------+



                                                                                

In [30]:
spark.sql("""
SELECT 
   round((CAST(dropoff_datetime as numeric) -
   CAST(pickup_datetime as numeric))/3600, 2)
   AS trip_duration_hour
FROM 
    fhv_06_2021
ORDER BY
    trip_duration_hour DESC
LIMIT 5;
""").show()



+------------------+
|trip_duration_hour|
+------------------+
|             66.88|
|             25.55|
|             19.98|
|             18.20|
|             16.47|
+------------------+



                                                                                

In [31]:
df_zones = spark.read.parquet(f"{curr_dir}/data/zones")
df_zones.createOrReplaceTempView('zones')

In [32]:
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



                                                                                

In [33]:
df_join = df_csv.join(df_zones, 
                      df_csv.PULocationID==df_zones.LocationID,
                      how = 'inner')

df_join.createOrReplaceTempView('fhv_zones')

In [36]:
spark.sql("""
SELECT 
    Zone, 
    count(*) as number_trips
FROM 
    fhv_zones
GROUP BY 
    Zone
ORDER BY 
    number_trips DESC
LIMIT 5
""").show()



+-------------------+------------+
|               Zone|number_trips|
+-------------------+------------+
|Crown Heights North|      231279|
|       East Village|      221244|
|        JFK Airport|      188867|
|     Bushwick South|      187929|
|      East New York|      186780|
+-------------------+------------+



                                                                                