In [4]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/30 10:42:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [6]:
df = spark.read \
    .option('header', 'true') \
    .schema(schema) \
    .csv('./fhvhv_tripdata_2021-01.csv') 

In [9]:
df = df.repartition(24)

In [10]:
df.write.parquet('data/pq/fhvhv/2021/01')

                                                                                

In [45]:
df.registerTempTable('fhvhv_2021_01')



**Q3** : How many trips on 15 January ?

In [12]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-01-15'") \
    .count()

                                                                                

443059

In [19]:
import pandas as pd
from datetime import datetime

In [14]:
df_pandas = pd.read_csv('./fhvhv_tripdata_2021-01.csv')

In [32]:
df_pandas.loc[:, 'pickup_date'] = pd.to_datetime(df_pandas.pickup_datetime).dt.date.astype(str)

In [38]:
df_pandas.loc[df_pandas.pickup_date == '2021-01-15'].count()[0]break

443059

**Q4** : Longest trip for each day

In [39]:
df \
    .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()



+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2021-01-27|        59143|
| 2021-01-05|        45012|
| 2021-01-30|        41193|
| 2021-01-04|        39967|
| 2021-01-06|        38417|
+-----------+-------------+



                                                                                

**Q5** : Most frequent dispatching_base_num

In [44]:
df \
    .groupby('dispatching_base_num') \
    .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3091000|
|              B02764|1009388|
|              B02872| 924960|
|              B02875| 735450|
|              B02765| 591242|
+--------------------+-------+



                                                                                

In [48]:
spark.sql("""
SELECT 
    dispatching_base_num,
    COUNT(1)
FROM
    fhvhv_2021_01
GROUP BY 
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()



+--------------------+--------+
|dispatching_base_num|count(1)|
+--------------------+--------+
|              B02510| 3091000|
|              B02764| 1009388|
|              B02872|  924960|
|              B02875|  735450|
|              B02765|  591242|
+--------------------+--------+



                                                                                

**Q6** : Most common locations pair

In [49]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [50]:
zones = spark.read \
    .parquet('zones/part-00000-e20c729b-b168-470e-a1d2-6ef554b6beb8-c000.snappy.parquet')

In [52]:
zones.registerTempTable('zones')



In [51]:
zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [58]:
spark.sql("""
SELECT * 
FROM 
    fhvhv_2021_01 as fhv LEFT JOIN zones as pul ON fhv.PULocationID = pul.LocationID
LIMIT 5;
""").show()



+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+----------+---------+--------------------+------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|LocationID|  Borough|                Zone|service_zone|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+----------+---------+--------------------+------------+
|           HV0003|              B02882|2021-01-05 15:25:51|2021-01-05 15:38:17|         229|         236|   null|       229|Manhattan|Sutton Place/Turt...| Yellow Zone|
|           HV0003|              B02875|2021-01-01 22:56:14|2021-01-01 23:06:41|          49|          80|   null|        49| Brooklyn|        Clinton Hill|   Boro Zone|
|           HV0003|              B02888|2021-01-01 01:51:36|2021-01-01 02:18:24|         197|          11|   null|       197|   Queens|       Richmond

                                                                                

In [59]:
spark.sql("""
SELECT * 
FROM 
    fhvhv_2021_01 as fhv LEFT JOIN zones as pul ON fhv.PULocationID = pul.LocationID
                         LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID
LIMIT 5;
""").show()



+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+----------+---------+--------------------+------------+----------+---------+--------------------+------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|LocationID|  Borough|                Zone|service_zone|LocationID|  Borough|                Zone|service_zone|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+----------+---------+--------------------+------------+----------+---------+--------------------+------------+
|           HV0003|              B02882|2021-01-05 15:25:51|2021-01-05 15:38:17|         229|         236|   null|       229|Manhattan|Sutton Place/Turt...| Yellow Zone|       236|Manhattan|Upper East Side N...| Yellow Zone|
|           HV0003|              B02875|2021-01-01 22:56:14|2021-01-01 23:06:41|          49|       

                                                                                

In [60]:
spark.sql("""
SELECT
    CONCAT(pul.Zone, ' / ', dol.Zone) AS pu_do_pair,
    COUNT(1)
FROM 
    fhvhv_2021_01 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()



+--------------------+--------+
|          pu_do_pair|count(1)|
+--------------------+--------+
|East New York / E...|   47637|
|Borough Park / Bo...|   30920|
| Canarsie / Canarsie|   29897|
|Crown Heights Nor...|   28851|
|Central Harlem No...|   17379|
+--------------------+--------+



                                                                                