In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/03 00:28:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/03 00:28:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/03/03 00:28:52 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/03/03 00:28:52 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/03/03 00:28:52 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/03/03 00:28:52 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.


In [5]:
df = spark.read.parquet('../data/fhvhv/fhvhv_tripdata_2021-06.parquet')

                                                                                

In [6]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

**Q2**: Avg size of reparitioned files

In [8]:
df = df_fhvhv.repartition(12)

In [9]:
df.write.parquet('../data/fhvhv/2021/06/')

                                                                                

**Q3**: How many taxi trips on June 15

In [12]:
df = spark.read.parquet('../data/fhvhv/2021/06/*')

In [13]:
from pyspark.sql import functions as F

In [14]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-06-15'") \
    .count()

                                                                                

452470

In [18]:
df.registerTempTable('fhvhv_2021_06')



In [19]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhvhv_2021_06
WHERE
    to_date(pickup_datetime) = '2021-06-15';
""").show()



+--------+
|count(1)|
+--------+
|  452470|
+--------+




                                                                                

**Q4**: Longest trip for each day

In [17]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'originating_base_num',
 'request_datetime',
 'on_scene_datetime',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_miles',
 'trip_time',
 'base_passenger_fare',
 'tolls',
 'bcf',
 'sales_tax',
 'congestion_surcharge',
 'airport_fee',
 'tips',
 'driver_pay',
 'shared_request_flag',
 'shared_match_flag',
 'access_a_ride_flag',
 'wav_request_flag',
 'wav_match_flag']

In [15]:
df \
    .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()



+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2021-06-25|       240764|
| 2021-06-22|        91979|
| 2021-06-27|        71931|
| 2021-06-26|        65510|
| 2021-06-23|        59281|
+-----------+-------------+




                                                                                

In [20]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60) AS duration
FROM 
    fhvhv_2021_06
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()



+-----------+------------------+
|pickup_date|          duration|
+-----------+------------------+
| 2021-06-25| 4012.733333333333|
| 2021-06-22|1532.9833333333333|
| 2021-06-27|           1198.85|
| 2021-06-26|1091.8333333333333|
| 2021-06-23| 988.0166666666667|
+-----------+------------------+




                                                                                

**Q6**: Most common locations pair

In [22]:
df_zones = spark.read.parquet('../data/taxi_zone_lookup.parquet')

In [23]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [24]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'originating_base_num',
 'request_datetime',
 'on_scene_datetime',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_miles',
 'trip_time',
 'base_passenger_fare',
 'tolls',
 'bcf',
 'sales_tax',
 'congestion_surcharge',
 'airport_fee',
 'tips',
 'driver_pay',
 'shared_request_flag',
 'shared_match_flag',
 'access_a_ride_flag',
 'wav_request_flag',
 'wav_match_flag']

In [25]:
df_zones.registerTempTable('zones')

In [27]:
spark.sql("""
SELECT
    pul.Zone,
    COUNT(1)
FROM 
    fhvhv_2021_06 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID

GROUP BY 
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()



+-------------------+--------+
|               Zone|count(1)|
+-------------------+--------+
|Crown Heights North|  231279|
|       East Village|  221244|
|        JFK Airport|  188867|
|     Bushwick South|  187929|
|      East New York|  186780|
+-------------------+--------+




                                                                                