In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("Taxi Trips To Parquet")
         .getOrCreate()
        )

22/02/27 06:35:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
from pyspark.sql import types

schema=types.StructType([
    types.StructField("hvfhs_license_num", types.StringType(), True),
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.IntegerType(), True)
    ])


fhv_df = (spark.read
          .option("header", "true")
          .schema(schema)
          .csv("data/raw/fhvhv/2021/02/")
         )
fhv_df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

In [8]:
fhv_df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)



In [9]:
fhv_df.repartition(24).write.parquet('data/pq/fhvhv/2021/02', mode='overwrite')

                                                                                

In [11]:
fhv_df=spark.read.parquet("data/pq/fhvhv/2021/02")

In [22]:
from pyspark.sql import functions as F


fhv_df.filter(F.date_trunc('day', 'pickup_datetime') == '2021-02-15 00:00:00' ).count()

                                                                                

367170

In [32]:
fhv_duration_df = (fhv_df
                  .withColumn("duration_hours", (F.col('dropoff_datetime').cast('long') - F.col('pickup_datetime').cast('long'))/3600 )
                   .withColumn("pickup_date", F.col('pickup_datetime').cast('date') )
                   .groupby('pickup_date')
                   .agg(F.max('duration_hours').alias('duration_hours'))
                   .orderBy('duration_hours', ascending=False)
                  )

fhv_duration_df.show(1)



+-----------+------------------+
|pickup_date|    duration_hours|
+-----------+------------------+
| 2021-02-11|20.983333333333334|
+-----------+------------------+
only showing top 1 row





In [35]:
fhv_freq_df = (fhv_df
               .groupby('dispatching_base_num')
               .count()
               .orderBy('count', ascending=False)
              )

fhv_freq_df.show(1)

[Stage 60:>                                                         (0 + 4) / 4]

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
+--------------------+-------+
only showing top 1 row





In [37]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv -O taxi_zone_lookup.csv

--2022-02-27 08:48:06--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.140.248
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.140.248|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi_zone_lookup.csv’


2022-02-27 08:48:07 (58.3 MB/s) - ‘taxi_zone_lookup.csv’ saved [12322/12322]



In [40]:
zones_df = spark.read.option("header","true").option("inferSchema", "true").csv('taxi_zone_lookup.csv')
zones_df.write.parquet("data/pq/zones/", mode='overwrite')

In [41]:
zones_df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [67]:
join_df = (fhv_df
           .join(
               zones_df.withColumnRenamed('Zone', 'pickup_zone')
               , fhv_df.PULocationID == zones_df.LocationID, how='left'
                )
           .join(
               zones_df.withColumnRenamed('Zone', 'dropoff_zone')
               , fhv_df.DOLocationID == zones_df.LocationID, how='left'
                )
           .select(['pickup_zone', 'dropoff_zone'])
           .withColumn('pickup_zone', F.coalesce('pickup_zone', F.lit('Unknown')))
           .withColumn('dropoff_zone', F.coalesce('dropoff_zone', F.lit('Unknown')))
           .withColumn('pair', F.concat(F.col('pickup_zone'), F.lit('/'), F.col('dropoff_zone')))
          .groupBy('pair')
          .count()
          .orderBy('count', ascending=False)
          )

# pd_pair_df = (join_df
#               .withColumn('pair', F.concat(F.col('pickup_zone'), F.lit('/'), F.col('dropoff_zone')))
#               .groupBy('pair')
#               .count()
#               .orderBy('count', ascending=False)
#              )

join_df.show(1, False)



+---------------------------+------+
|pair                       |count |
+---------------------------+------+
|Crown Heights North/Unknown|177801|
+---------------------------+------+
only showing top 1 row



                                                                                

In [54]:
fhv_df.registerTempTable('fhv')
zones_df.registerTempTable('zones')

In [86]:
spark.sql(
"""
SELECT
    CONCAT(
        COALESCE(p.Zone, 'Unknown'),
        '/',
        COALESCE(d.Zone, 'Unknown')
        ) AS pair,
        count(*) as count
FROM
    fhv f
LEFT JOIN
    zones as p
    ON
        f.PULocationID = p.LocationID
LEFT JOIN 
    zones as d
    ON
        f.DOLocationID = d.LocationID
group by pair
order by count desc
"""
).show(1, False)



+---------------------------+-----+
|pair                       |count|
+---------------------------+-----+
|East New York/East New York|45041|
+---------------------------+-----+
only showing top 1 row



