In [1]:
import pyspark
import pandas as pd

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [8]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('FHVHV') \
    .getOrCreate()

In [5]:
spark.version

<module 'pyspark.version' from '/usr/local/Cellar/apache-spark/3.3.2/libexec/python/pyspark/version.py'>

# ======= Preparing dataset ===========

In [14]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

In [6]:
!ls

fhvhv_tripdata_2021-06.csv.gz pyspark.ipynb


In [None]:
# read with pandas for semiparsing the file
df_pd = pd.read_csv('fhvhv_tripdata_2021-06.csv.gz', nrows=10)

In [32]:
# df_pd

In [31]:
# df_pd.dtypes

In [30]:
# spark.createDataFrame(df_pd)

In [7]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

In [13]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [11]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [36]:
schema = types.StructType(
    [
        types.StructField("dispatching_base_num", types.StringType(), True),
        types.StructField("pickup_datetime", types.TimestampType(), True),
        types.StructField("dropoff_datetime", types.TimestampType(), True),
        types.StructField("PULocationID", types.IntegerType(), True),
        types.StructField("DOLocationID", types.IntegerType(), True),
        types.StructField("SR_Flag", types.StringType(), True),
        types.StructField("Affiliated_base_number", types.StringType(), True)
    ]
)

In [37]:
df = spark.read \
    .option("header", "true") \
    .schema(schema)\
    .csv('fhvhv_tripdata_2021-06.csv.gz')

In [38]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [41]:
df = df.repartition(12)

In [42]:
df.write.parquet('fhvhv/2021/06/')

# ========= Working with dataset ===========

In [9]:
df = spark.read.parquet('fhvhv/2021/06/')

In [21]:
df_for_hw = df.withColumn('pickup_date', F.date_trunc('DAY', df.pickup_datetime))

In [26]:
df_for_hw.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|        pickup_date|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|              B02877|2021-06-11 12:28:02|2021-06-11 12:36:01|         230|          43|      N|                B02877|2021-06-11 00:00:00|
|              B02765|2021-06-13 08:46:22|2021-06-13 09:03:00|          37|          61|      N|                B02765|2021-06-13 00:00:00|
|              B02510|2021-06-28 18:39:27|2021-06-28 18:50:53|         231|          79|      N|                  null|2021-06-28 00:00:00|
|              B02869|2021-06-05 18:15:05|2021-06-05 18:28:32|         244|         127|      N|                B02869|2021-06-05 00:00:00|
|              B0287

In [24]:
df_for_hw.registerTempTable("fhvhv")



In [25]:
spark.sql(
"""
SELECT count(pickup_datetime)
FROM fhvhv
WHERE pickup_date = '2021-06-15 00:00:00'

"""
).show()

+----------------------+
|count(pickup_datetime)|
+----------------------+
|                452470|
+----------------------+



In [52]:
spark.sql("""
SELECT 
    max(((bigint(to_timestamp(dropoff_datetime)))-(bigint(to_timestamp(pickup_datetime))))/(3600)) as diff 
FROM fhvhv
""").show()

+----------------+
|            diff|
+----------------+
|66.8788888888889|
+----------------+



In [53]:
spark.sql("""
SELECT 
    max(((bigint(dropoff_datetime))-(bigint(pickup_datetime)))/(3600)) as diff 
FROM fhvhv
""").show()

+----------------+
|            diff|
+----------------+
|66.8788888888889|
+----------------+



In [59]:
spark.sql("""
SELECT 
    bigint(max(dropoff_datetime - pickup_datetime))/3600 as diff 
FROM fhvhv
""").show()

+----------------+
|            diff|
+----------------+
|66.8788888888889|
+----------------+



In [64]:
taxi_zones = spark.read.option("header", "true").csv('../taxi_zones.csv')

In [67]:
taxi_zones.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



In [65]:
taxi_zones.registerTempTable("taxi_zones")

In [80]:
spark.sql("""
SELECT
    Zone,
    count(1) AS count
FROM fhvhv
JOIN taxi_zones on fhvhv.PULocationID = taxi_zones.LocationID
GROUP BY Zone
ORDER BY 2 DESC
LIMIT 5
""").show()

+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
|       East Village|221244|
|        JFK Airport|188867|
|     Bushwick South|187929|
|      East New York|186780|
+-------------------+------+

