In [43]:
%pip install pyspark
%pip install pandas

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [44]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd

In [45]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [46]:
spark.version

'3.5.1'

In [47]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [48]:
#df_pd=pd.read_csv("data/raw/fhv/2019/10/fhvhv_tripdata_2021-02.csv.gz")
df = spark.read \
    .option("header", "true") \
    .option("compression", "gzip") \
    .schema(schema) \
    .csv('data/raw/fhv/2019/10/fhv_tripdata_2019_10.csv.gz')
df.show()
df = df.repartition(6)
df.write.parquet('data/pq/fhvhv/2019/10/', mode='overwrite')

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [49]:
df = spark.read.parquet('data/pq/fhvhv/2019/10/')

**Q3**: How many taxi trips were there on February 15?

In [50]:
from pyspark.sql import functions as F

In [51]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62610

In [52]:
df.registerTempTable('fhvhv_2019_10')

In [53]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhvhv_2019_10
WHERE
    to_date(pickup_datetime) = '2019-10-15';
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



**Q4**: Longest trip for each day

In [54]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [55]:
df \
    .withColumn('duration', (df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long'))/ 3600) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

+-----------+-----------------+
|pickup_date|    max(duration)|
+-----------+-----------------+
| 2019-10-28|         631152.5|
| 2019-10-11|         631152.5|
| 2019-10-31|87672.44083333333|
| 2019-10-01|70128.02805555555|
| 2019-10-17|           8794.0|
+-----------+-----------------+



In [56]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600) AS duration
FROM 
    fhvhv_2019_10
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

+-----------+-----------------+
|pickup_date|         duration|
+-----------+-----------------+
| 2019-10-28|         631152.5|
| 2019-10-11|         631152.5|
| 2019-10-31|87672.44083333333|
| 2019-10-01|70128.02805555555|
| 2019-10-17|           8794.0|
+-----------+-----------------+



**Q5**: Most frequent `dispatching_base_num`

How many stages this spark job has?



In [57]:
spark.sql("""
SELECT
    dispatching_base_num,
    COUNT(1)
FROM 
    fhvhv_2019_10
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

+--------------------+--------+
|dispatching_base_num|count(1)|
+--------------------+--------+
|              B01145|   47548|
|              B00256|   43234|
|              B00856|   36778|
|              B03016|   34985|
|              B01239|   33586|
+--------------------+--------+



In [58]:
df \
    .groupBy('dispatching_base_num') \
        .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .show()

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B01145|47548|
|              B00256|43234|
|              B00856|36778|
|              B03016|34985|
|              B01239|33586|
+--------------------+-----+



**Q6**: Most common locations pair

In [59]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('data/raw/zones/taxi_zone_lookup.csv')
df_zones.show()
df_zones.write.parquet('data/pq/zones/', mode='overwrite')

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [60]:
df_zones = spark.read.parquet('data/pq/zones')

In [61]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [62]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [63]:
df_zones.registerTempTable('zones')



In [66]:
spark.sql("""
SELECT
    CONCAT(pul.Zone, ' / ', dol.Zone) AS pu_do_pair,
    COUNT(1)
FROM 
    fhvhv_2019_10 fhv INNER JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      INNER JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 asc
LIMIT 5;
""").show()

+--------------------+--------+
|          pu_do_pair|count(1)|
+--------------------+--------+
|Sunnyside / Winds...|       1|
|Murray Hill / Oce...|       1|
|Homecrest / Murra...|       1|
|Howard Beach / La...|       1|
|Midtown South / F...|       1|
+--------------------+--------+



In [67]:
spark.sql("""
SELECT
    pul.Zone AS pu_do_pair,
    COUNT(1)
FROM 
    fhvhv_2019_10 fhv INNER JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      INNER JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 asc
LIMIT 5;
""").show()

+--------------------+--------+
|          pu_do_pair|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+

