In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [2]:
df_fhvhv_2021_02 = spark.read.parquet('fhvhv/2021/02/')

In [3]:
df_fhvhv_2021_02.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [5]:
df_fhvhv_2021_02.registerTempTable('fhvhv_hw_data')

In [36]:
df_fhvhv_2021_02.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02875|2021-02-03 07:54:48|2021-02-03 08:15:19|         146|         162|   null|
|           HV0003|              B02880|2021-02-03 07:21:31|2021-02-03 07:29:04|         203|         203|   null|
|           HV0005|              B02510|2021-02-01 10:25:26|2021-02-01 10:39:45|         203|         218|   null|
|           HV0003|              B02617|2021-02-04 07:15:42|2021-02-04 07:28:01|          32|         220|   null|
|           HV0003|              B02883|2021-02-02 13:13:52|2021-02-02 13:23:13|          24|          75|   null|
|           HV0003|              B02764|2021-02-02 08:43:57|2021-02-02 08:53:46|

In [20]:
df_fhvhv_2021_02.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [13]:
spark.sql("""
SELECT
    CAST(pickup_datetime AS DATE),
    count(1)
FROM
    fhvhv_hw_data
WHERE 
    CAST(pickup_datetime AS DATE) = '2021-02-15'
GROUP BY
    CAST(pickup_datetime AS DATE)
""").show()

+---------------+--------+
|pickup_datetime|count(1)|
+---------------+--------+
|     2021-02-15|  367170|
+---------------+--------+



In [28]:
spark.sql("""
SELECT
    unix_timestamp(dropoff_datetime, 'yyyy-MM-dd HH:mm:ss') - unix_timestamp(pickup_datetime, 'yyyy-MM-dd HH:mm:ss') AS duration_single_trip,
    CAST(pickup_datetime AS DATE)
FROM
    fhvhv_hw_data
ORDER BY
    duration_single_trip DESC
""").show()

+--------------------+---------------+
|duration_single_trip|pickup_datetime|
+--------------------+---------------+
|               75540|     2021-02-11|
|               57221|     2021-02-17|
|               44039|     2021-02-20|
|               40653|     2021-02-03|
|               37577|     2021-02-19|
|               35010|     2021-02-25|
|               34806|     2021-02-20|
|               34612|     2021-02-18|
|               34555|     2021-02-18|
|               34169|     2021-02-10|
|               32476|     2021-02-10|
|               32439|     2021-02-25|
|               32223|     2021-02-21|
|               32087|     2021-02-09|
|               31447|     2021-02-06|
|               30913|     2021-02-02|
|               30856|     2021-02-10|
|               30732|     2021-02-09|
|               30660|     2021-02-21|
|               30511|     2021-02-05|
+--------------------+---------------+
only showing top 20 rows



In [32]:
df_dispatch = \
spark.sql("""
SELECT
    dispatching_base_num,
    count(1) AS Frequency
FROM
    fhvhv_hw_data
GROUP BY
    dispatching_base_num
ORDER BY
    Frequency DESC
LIMIT
    1
""").show()

+--------------------+---------+
|dispatching_base_num|Frequency|
+--------------------+---------+
|              B02510|  3233664|
+--------------------+---------+



In [34]:
zones = spark.read.parquet('zones/')

In [35]:
zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [44]:
zones.registerTempTable('zones')

In [45]:
df_join = \
spark.sql("""
SELECT 
CONCAT(zpu.Zone, '/', zdo.Zone) AS bothzones,
count(1) AS Frequency
FROM 
fhvhv_hw_data fhvhv 
FULL OUTER JOIN zones zpu 
ON 
fhvhv.PULocationID = zpu.LocationID
FULL OUTER JOIN zones zdo
ON
fhvhv.DOLocationID = zdo.LocationID
GROUP BY bothzones
ORDER BY Frequency DESC
""").show()

+--------------------+---------+
|           bothzones|Frequency|
+--------------------+---------+
|East New York/Eas...|    45041|
|Borough Park/Boro...|    37329|
|   Canarsie/Canarsie|    28026|
|Crown Heights Nor...|    25976|
| Bay Ridge/Bay Ridge|    17934|
|Jackson Heights/J...|    14688|
|     Astoria/Astoria|    14688|
|Central Harlem No...|    14481|
|Bushwick South/Bu...|    14424|
|Flatbush/Ditmas P...|    13976|
|South Ozone Park/...|    13716|
|Brownsville/Brown...|    12829|
|      JFK Airport/NA|    12542|
|Prospect-Lefferts...|    11814|
|Forest Hills/Fore...|    11548|
|Bushwick North/Bu...|    11491|
|Bushwick South/Bu...|    11487|
|Crown Heights Nor...|    11462|
|Crown Heights Nor...|    11342|
|Prospect-Lefferts...|    11308|
+--------------------+---------+
only showing top 20 rows

