In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
pyspark.__version__

'3.0.3'

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('hvfhw_2021_02') \
    .getOrCreate()

22/03/07 11:43:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Create parquet

In [6]:
schema = types.StructType([
    types.StructField('hvfhs_license_num',types.StringType(),True),
    types.StructField('dispatching_base_num',types.StringType(),True),
    types.StructField('pickup_datetime',types.TimestampType(),True),
    types.StructField('dropoff_datetime',types.TimestampType(),True),
    types.StructField('PULocationID',types.LongType(),True),
    types.StructField('DOLocationID',types.LongType(),True),
    types.StructField('SR_Flag',types.DoubleType(),True)])

In [7]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [8]:
df = df.repartition(24)
df.write.parquet('fhvhv/2021/02/')

                                                                                

In [10]:
df.count()

                                                                                

11613942

### Load parquet

In [3]:
df_feb2021 = spark.read.parquet('fhvhv/2021/02/')

                                                                                

### Question 3: Records on 15 Feb 2021

In [6]:
df_feb2021.filter(F.col("pickup_datetime").between('2021-02-15 00:00:00','2021-02-15 23:59:59')).count()

                                                                                

367170

### Question 4: Day with the longest trip 

In [23]:
df_duration = df_feb2021.withColumn('trip_duration', F.col('dropoff_datetime').cast("long")-F.col('pickup_datetime').cast("long"))

In [26]:
df_duration.registerTempTable('fhvhv_duration')

In [29]:
spark.sql("""
SELECT
    trip_duration,
    pickup_datetime
FROM
    fhvhv_duration
SORT BY trip_duration DESC
""").show(3)


[Stage 9:>                                                          (0 + 1) / 1]

+-------------+-------------------+
|trip_duration|    pickup_datetime|
+-------------+-------------------+
|        75540|2021-02-11 13:40:44|
|        44039|2021-02-20 12:08:15|
|        32476|2021-02-10 01:56:17|
+-------------+-------------------+
only showing top 3 rows




                                                                                

### Question 5: Stages for most frequent dispatching_base_num
Now find the most frequently occurring dispatching_base_num in this dataset

In [31]:
df_feb2021.groupBy('dispatching_base_num').count().sort("count", ascending=False).show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows




                                                                                

### Question 6: Most common locations pair

In [4]:
df_zones = spark.read.parquet('zones/')

df_feb2021.registerTempTable('fhv_data')
df_zones.registerTempTable('dim_zones')

In [5]:
df_feb2021.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [6]:
df_locations = spark.sql("""
select
    fhv_data.PULocationID as pickup_id,
    pickup_zone.zone as pickup_zone,
    fhv_data.DOLocationID as dropoff_id,
    dropoff_zone.zone as dropoff_zone,
    CONCAT(pickup_zone.zone, '|', dropoff_zone.zone) as location_pair
from fhv_data
left join dim_zones as pickup_zone
on fhv_data.PULocationID = pickup_zone.locationid
left join dim_zones as dropoff_zone
on fhv_data.DOLocationID = dropoff_zone.locationid
""")

In [7]:
df_locations.show(10)

+---------+--------------------+----------+--------------------+--------------------+
|pickup_id|         pickup_zone|dropoff_id|        dropoff_zone|       location_pair|
+---------+--------------------+----------+--------------------+--------------------+
|      126|         Hunts Point|       147|            Longwood|Hunts Point|Longwood|
|       85|             Erasmus|        72|East Flatbush/Rem...|Erasmus|East Flat...|
|      229|Sutton Place/Turt...|       261|  World Trade Center|Sutton Place/Turt...|
|      260|            Woodside|       160|      Middle Village|Woodside|Middle V...|
|      140|     Lenox Hill East|        72|East Flatbush/Rem...|Lenox Hill East|E...|
|       51|          Co-Op City|       213|Soundview/Castle ...|Co-Op City|Soundv...|
|      143| Lincoln Square West|       137|            Kips Bay|Lincoln Square We...|
|      260|            Woodside|       260|            Woodside|   Woodside|Woodside|
|      167|  Morrisania/Melrose|       169|          M

In [8]:
df_locations.registerTempTable('fhvhv_locations')

In [16]:
## Why wrong?
spark.sql("""
SELECT
    FIRST(location_pair),
    count(1) as total
FROM
    fhvhv_locations
GROUP BY location_pair
SORT BY total DESC
""").show(3, False)

[Stage 16:>                                                         (0 + 4) / 4]

+-----------------------------------+-----+
|first(location_pair)               |total|
+-----------------------------------+-----+
|East Harlem North|East Harlem South|8910 |
|Canarsie|Prospect-Lefferts Gardens |4361 |
|South Ozone Park|Baisley Park      |3109 |
+-----------------------------------+-----+
only showing top 3 rows



                                                                                

In [15]:
df_locations.groupBy('pickup_zone', 'dropoff_zone').agg(F.count("*")).sort('count(1)', ascending=False).show(30, False)



+-------------------------+-------------------------+--------+
|pickup_zone              |dropoff_zone             |count(1)|
+-------------------------+-------------------------+--------+
|East New York            |East New York            |45041   |
|Borough Park             |Borough Park             |37329   |
|Canarsie                 |Canarsie                 |28026   |
|Crown Heights North      |Crown Heights North      |25976   |
|Bay Ridge                |Bay Ridge                |17934   |
|Astoria                  |Astoria                  |14688   |
|Jackson Heights          |Jackson Heights          |14688   |
|Central Harlem North     |Central Harlem North     |14481   |
|Bushwick South           |Bushwick South           |14424   |
|Flatbush/Ditmas Park     |Flatbush/Ditmas Park     |13976   |
|South Ozone Park         |South Ozone Park         |13716   |
|Brownsville              |Brownsville              |12829   |
|JFK Airport              |NA                       |12

                                                                                