In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/02/28 00:55:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-27 13:14:48--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.9.211
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.9.211|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-27 13:14:58 (74.9 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [4]:
!wc -l fhvhv_tripdata_2021-02.csv

11613943 fhvhv_tripdata_2021-02.csv


In [5]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-02.csv')

In [6]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))

In [7]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

In [8]:
!head -n 1001 fhvhv_tripdata_2021-02.csv > head.csv

In [9]:
!wc -l head.csv

1001 head.csv


In [10]:
import pandas as pd

In [11]:
# use pandas to open relatively small file
df_pandas = pd.read_csv('head.csv')

In [12]:
df_pandas.dtypes
# pandas is still not smart enough to figure out that timestamp is not just string

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [13]:
# spark.createDataFrame(df_pandas).show()
spark.createDataFrame(df_pandas).schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(SR_Flag,DoubleType,true)))

Integer - 4 bytes
Long - 8 bytes

In [None]:
# but we want to change LongType to integer
# Integer takes 4 bytes, Long takes 8 bytes

In [3]:
from pyspark.sql import types

In [15]:
# StructType comes from scala. Want to turn this to python code
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [16]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [17]:
df.head(10)
# we now see that datetime have the correct type

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 10, 40), dropoff_datetime=datetime.datetime(2021, 2, 1, 0, 21, 9), PULocationID=35, DOLocationID=39, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 27, 23), dropoff_datetime=datetime.datetime(2021, 2, 1, 0, 44, 1), PULocationID=39, DOLocationID=35, SR_Flag=None),
 Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 28, 38), dropoff_datetime=datetime.datetime(2021, 2, 1, 0, 38, 27), PULocationID=39, DOLocationID=91, SR_Flag=None),
 Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 43, 37), dropoff_datetime=datetime.datetime(2021, 2, 1, 1, 23, 20), PULocationID=91, DOLocationID=228, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02872', pickup_datetime=d

In [18]:
# take 1 large file and split into 24 partitions
# so that each partition can be worked on by individual Spark clusters
# amenable to parallelization
# repartition is a lazy command. Doesn't trigger repartitioning yet
# only when we do something e.g. save the dataframe, does the command get applied
df = df.repartition(24)

In [19]:
df.write.parquet('fhvhv/2021/02/')

                                                                                

In [4]:
df = spark.read.parquet('fhvhv/2021/02/')

                                                                                

In [6]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



Question 1: PySpark Version

In [11]:
pyspark.__version__

'3.0.3'

Question 2: Size of fhvhv folder Feb 2021

In [9]:
!du -sh fhvhv/2021/02

210M	fhvhv/2021/02


In [4]:
# functions already available in Spark
from pyspark.sql import functions as F

In [5]:
df.withColumn('pickup_date', F.to_date(df.pickup_datetime))

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string, pickup_date: date]

In [9]:
df.show()


[Stage 1:>                                                          (0 + 1) / 1]

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02887|2021-02-06 01:18:35|2021-02-06 01:40:34|         163|         235|   null|
|           HV0005|              B02510|2021-02-05 07:13:06|2021-02-05 07:31:56|         225|         181|   null|
|           HV0003|              B02869|2021-02-04 16:56:52|2021-02-04 17:21:36|         260|          95|   null|
|           HV0003|              B02871|2021-02-03 18:34:17|2021-02-03 18:57:12|         235|          60|   null|
|           HV0003|              B02869|2021-02-04 07:25:09|2021-02-04 07:30:34|          55|          55|   null|
|           HV0003|              B02836|2021-02-04 23:15:27|2021-02-04 23:34:29|


                                                                                

Question 3: Record count on Feb 15, 2021

In [13]:
df.registerTempTable('fhvhv_data')

In [14]:
spark.sql("""
SELECT
    COUNT(*) as ct_feb15
FROM
    fhvhv_data
WHERE DATE(pickup_datetime) between '2021-02-15' and '2021-02-15'
""").show()


[Stage 1:>                                                          (0 + 4) / 4]

+--------+
|ct_feb15|
+--------+
|  367170|
+--------+




                                                                                

Question 4: Day with the longest trip

In [24]:
spark.sql("""
SELECT 
    pickup_datetime, ((bigint(to_timestamp(dropoff_datetime)))-(bigint(to_timestamp(pickup_datetime))))/(60) as duration
FROM
    fhvhv_data
ORDER BY 2 DESC
LIMIT 1
""").show()


[Stage 9:>                                                          (0 + 4) / 4]

+-------------------+--------+
|    pickup_datetime|duration|
+-------------------+--------+
|2021-02-11 13:40:44|  1259.0|
+-------------------+--------+




                                                                                

Question 5: stages (for most frequent `dispatching_base_num`)

In [26]:
spark.sql("""
SELECT 
    dispatching_base_num, 
    COUNT(*) as ct_dispatching_base_num
FROM 
    fhvhv_data
GROUP BY 1
""").show()

                                                                                

+--------------------+-----------------------+
|dispatching_base_num|ct_dispatching_base_num|
+--------------------+-----------------------+
|              B02876|                 215693|
|              B03136|                   1741|
|              B02877|                 198938|
|              B02869|                 429720|
|              B02883|                 251617|
|              B02835|                 189031|
|              B02884|                 244963|
|              B02880|                 115716|
|              B02878|                 305185|
|              B02836|                 128978|
|              B02872|                 882689|
|              B02512|                  41043|
|              B02867|                 200530|
|              B02866|                 311089|
|              B02871|                 312364|
|              B02889|                 138762|
|              B02844|                   3502|
|              B02510|                3233664|
|            

Question 6: Most common locations pair

In [5]:
df_zones = spark.read.parquet('zones/')

In [11]:
df.registerTempTable('df')
df_zones.registerTempTable('df_zones')

In [18]:
spark.sql("""
SELECT *
FROM df_zones
WHERE Zone LIKE "%East New York%"
""").show()

+----------+--------+--------------------+------------+
|LocationID| Borough|                Zone|service_zone|
+----------+--------+--------------------+------------+
|        76|Brooklyn|       East New York|   Boro Zone|
|        77|Brooklyn|East New York/Pen...|   Boro Zone|
+----------+--------+--------------------+------------+



In [26]:
spark.sql("""
SELECT concat_zones, COUNT(*) as ct_concat_zones
FROM
(SELECT dispatching_base_num, pickup_datetime, dropoff_datetime, PULocationID, PUzone, DOLocationID, Zone as DOzone,
    CONCAT(PUzone, ' / ', Zone) as concat_zones
FROM
(SELECT dispatching_base_num, pickup_datetime, dropoff_datetime, PULocationID, Zone as PUzone, DOLocationID
FROM
    df t
LEFT JOIN df_zones zpu ON t.PULocationID = zpu.LocationID) t2
LEFT JOIN df_zones zdo ON t2.DOLocationID = zdo.LocationID) t3
GROUP BY 1
ORDER BY 2 DESC
LIMIT 2
""").show()



+--------------------+---------------+
|        concat_zones|ct_concat_zones|
+--------------------+---------------+
|East New York / E...|          45041|
|Borough Park / Bo...|          37329|
+--------------------+---------------+





                                                                                

EXTRA - try join df's first

In [6]:
df_join1 = df.join(df_zones, df.PULocationID == df_zones.LocationID)

In [7]:
df_join2 = df_join1.join(df_zones, df_join1.DOLocationID == df_zones.LocationID)

In [8]:
df_join2.registerTempTable('df_join2')