In [117]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

22/03/02 19:53:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/03/02 19:53:46 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [118]:
df_fhvhv = spark.read \
    .option('header', 'true') \
    .csv('fhvhv_tripdata_2021-02.csv')

In [119]:
df_fhvhv.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))

In [120]:
df_fhvhv.show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
+-----------------+--------------------+-------------------+-------------------+

In [121]:
from pyspark.sql import types

In [122]:
fhvhv_schema = types.StructType([
    types.StructField("hvfhs_license_num", types.StringType(), True),
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.StringType(), True)
])

In [123]:
df_fhvhv = spark.read \
    .option('header', 'true') \
    .schema(fhvhv_schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [124]:
df_fhvhv.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [125]:
#df_fhvhv \
#    .repartition(24) \
#    .write.parquet('data/pq/fhvhv_2021-02', mode='overwrite')

                                                                                

In [126]:
!du -h --max-depth=1 .
# 210M	./data
# 910M	.

210M	./data
4.0K	./spark-warehouse
910M	.


In [127]:
df_fhvhv = spark.read.parquet('data/pq/fhvhv_2021-02')

In [128]:
df_hfvhv.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [129]:
df_fhvhv.registerTempTable('trips_fhvhv')

In [130]:
spark.sql("""
SELECT
    count(*)
FROM 
    trips_fhvhv
WHERE
    date_trunc('day', pickup_datetime) = '2021-02-15'
""").show()

[Stage 5:>                                                          (0 + 4) / 4]

+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

In [131]:
#from pyspark.sql import functions as F
import pyspark.sql.functions

from pyspark.sql.functions import col

In [132]:
df_fhvhv_triplength = df_fhvhv \
    .withColumn("pickup_seconds", col("pickup_datetime").cast(types.LongType())) \
    .withColumn("dropoff_seconds", col("dropoff_datetime").cast(types.LongType())) \
    .withColumn("trip_length_seconds", col('dropoff_seconds') - col('pickup_seconds'))

In [133]:
df_fhvhv_triplength.registerTempTable("trips_fhvhv_length")

In [134]:
spark.sql("""
SELECT
    pickup_datetime,
    dropoff_datetime, 
    trip_length_seconds,
    round(trip_length_seconds/3600,2) AS trip_length_hours
FROM
    trips_fhvhv_length
SORT BY
    trip_length_seconds DESC
LIMIT 5
""").show()



+-------------------+-------------------+-------------------+-----------------+
|    pickup_datetime|   dropoff_datetime|trip_length_seconds|trip_length_hours|
+-------------------+-------------------+-------------------+-----------------+
|2021-02-11 13:40:44|2021-02-12 10:39:44|              75540|            20.98|
|2021-02-17 15:54:53|2021-02-18 07:48:34|              57221|            15.89|
|2021-02-25 09:18:18|2021-02-25 18:18:57|              32439|             9.01|
|2021-02-12 06:16:42|2021-02-12 14:39:10|              30148|             8.37|
|2021-02-10 15:00:54|2021-02-10 22:49:57|              28143|             7.82|
+-------------------+-------------------+-------------------+-----------------+



                                                                                

In [147]:
spark.sql("""
SELECT
    dispatching_base_num,
    count(dispatching_base_num) AS base_count
FROM
    trips_fhvhv_length
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5
""").show()



+--------------------+----------+
|dispatching_base_num|base_count|
+--------------------+----------+
|              B02510|   3233664|
|              B02764|    965568|
|              B02872|    882689|
|              B02875|    685390|
|              B02765|    559768|
+--------------------+----------+



                                                                                

In [152]:
df_zones = spark.read.parquet('../code/zones')

In [154]:
df_zones.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [153]:
df_zones.registerTempTable("zones_table")

In [162]:
spark.sql("""
SELECT
    PULocationID,
    zones1.Zone || ' (' || zones1.Borough || ')',
    DOLocationID,
    zones2.Zone || ' (' || zones2.Borough || ')',
    count(*),
    avg(trip_length_seconds),
    avg(round(trip_length_seconds/3600,2)) AS trip_length_hours
FROM
    trips_fhvhv_length
LEFT OUTER JOIN zones_table as zones1
on PULocationID = zones1.LocationID
LEFT OUTER JOIN zones_table as zones2
on DOLocationID = zones2.LocationID
GROUP BY
    1, 2, 3, 4
ORDER BY
    5 DESC
LIMIT 20
""").show()



+------------+--------------------------------------------+------------+--------------------------------------------+--------+------------------------+-------------------+
|PULocationID|concat(concat(concat(Zone,  (), Borough), ))|DOLocationID|concat(concat(concat(Zone,  (), Borough), ))|count(1)|avg(trip_length_seconds)|  trip_length_hours|
+------------+--------------------------------------------+------------+--------------------------------------------+--------+------------------------+-------------------+
|          76|                        East New York (Br...|          76|                        East New York (Br...|   45041|       522.2509269332386|0.14520281521280348|
|          26|                        Borough Park (Bro...|          26|                        Borough Park (Bro...|   37329|       636.9488333467277| 0.1770593908221506|
|          39|                         Canarsie (Brooklyn)|          39|                         Canarsie (Brooklyn)|   28026|       447.558

                                                                                