In [1]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master('spark://de-zoomcamp.europe-west6-a.c.de-zcamp-1234.internal:7077') \
    .appName('Homework') \
    .getOrCreate()

22/06/24 08:52:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/24 08:52:49 WARN SparkContext: Please ensure that the number of slots available on your executors is limited by the number of cores to task cpus and not another custom resource. If cores is not the limiting resource then dynamic allocation will not work properly!


In [10]:
df_hvfhw = spark.read.parquet('fhvhv_tripdata_2021-02.parquet')

In [11]:
df_hvfhw.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

#### Question 3. Count records 

In [13]:
from pyspark.sql import functions as F

df_hvfhw \
    .withColumn('pickup_datetime', F.to_date(df_hvfhw.pickup_datetime)) \
    .filter("pickup_datetime == '2021-02-15'") \
    .count()

                                                                                

367170

#### Question 4. Longest trip for each day 

In [31]:
df_hvfhw \
   .withColumn('duration', F.unix_timestamp(df_hvfhw.dropoff_datetime) - F.unix_timestamp(df_hvfhw.pickup_datetime)) \
   .withColumn('pickup_date', F.to_date(df_hvfhw.pickup_datetime)) \
   .groupBy('pickup_date') \
      .max('duration') \
   .orderBy('max(duration)', ascending=False) \
   .show(1)



+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2021-02-11|        75540|
+-----------+-------------+
only showing top 1 row



                                                                                

In [19]:
df_hvfhw.registerTempTable('hvfhw')

In [32]:
spark.sql("""
SELECT 
    date_trunc('day', pickup_datetime) AS day, 
    (CAST(dropoff_datetime AS LONG)- CAST (pickup_datetime AS LONG)) AS duration
FROM 
    hvfhw
ORDER BY
    2 DESC;
""").show(1)



+-------------------+--------+
|                day|duration|
+-------------------+--------+
|2021-02-11 00:00:00|   75540|
+-------------------+--------+
only showing top 1 row



                                                                                

#### Question 5. Most frequent dispatching_base_num

In [40]:
df_hvfhw \
    .groupBy('dispatching_base_num') \
        .agg({'dispatching_base_num':'count'}) \
    .orderBy('count(dispatching_base_num)', ascending=False) \
    .show(1)



+--------------------+---------------------------+
|dispatching_base_num|count(dispatching_base_num)|
+--------------------+---------------------------+
|              B02510|                    3233664|
+--------------------+---------------------------+
only showing top 1 row





####  Question 6. Most common locations pair

In [42]:
zones = spark.read.parquet('zones/')

In [44]:
zones.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



In [46]:
df_hvfhw_zones = df_hvfhw \
    .join(zones, df_hvfhw.PULocationID==zones.LocationID) \
    .withColumnRenamed('Zone', 'PUZone') \
    .drop('service_zone', 'Borough', 'LocationID') \
    .join(zones, df_hvfhw.DOLocationID==zones.LocationID) \
    .withColumnRenamed('Zone', 'DOZone') \
    .drop('service_zone', 'Borough', 'LocationID')

In [58]:
df_hvfhw_zones \
    .withColumn('Location_pair', F.concat(df_hvfhw_zones.PUZone, F.lit(' / '), df_hvfhw_zones.DOZone)) \
    .groupBy('Location_pair') \
        .count()\
    .orderBy('count', ascending=False) \
    .show(1)



+--------------------+-----+
|       Location_pair|count|
+--------------------+-----+
|East New York / E...|45041|
+--------------------+-----+
only showing top 1 row



