In [12]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t

In [5]:
# spark master
spark_master = "local[*]"

spark = SparkSession.builder\
            .master(spark_master) \
            .appName("HW 2024")\
            .getOrCreate()

In [6]:
spark

In [None]:
#File location https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
file_path = os.getcwd()+"/data/Raw/2019/fhv_tripdata_2019-10.csv.gz"

In [11]:
df_csv = spark.read.format('csv')\
            .option("header", "true")\
            .option("delimiter",',')\
            .load(file_path)\
            .limit(10)

In [15]:
df_csv.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [14]:
df_csv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [None]:
#To get the schema
df_csv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropOff_datetime: string (nullable = true)
 |-- PUlocationID: string (nullable = true)
 |-- DOlocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [18]:
#Convert schema to types
schema = t.StructType([
t.StructField("dispatching_base_num",t.StringType()),
t.StructField("pickup_datetime",t.TimestampType()),
t.StructField("dropOff_datetime",t.TimestampType()),
t.StructField("PUlocationID",t.IntegerType()),
t.StructField("DOlocationID",t.IntegerType()),
t.StructField("SR_Flag",t.StringType()),
t.StructField("Affiliated_base_number",t.StringType())
])

In [22]:
# Read CSV with updated schema
df_csv = spark.read.format('csv')\
            .option("header", "true")\
            .option("delimiter",',')\
            .schema(schema)\
            .load(file_path)

In [None]:
#Now we have the new schema
df_csv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [25]:
folder_path = os.getcwd()+"/data/Processed/FHV/2019/10/"

In [None]:
#Repartition the Dataframe to 6 partitions and save it to parquet.
df_csv.repartition(6)\
    .write.parquet(folder_path)

In [None]:
#Get file size
os.path.getsize(folder_path+[f for f in os.listdir(folder_path) if f.endswith(".parquet")][0])/ (1024 * 1024)

6.366528511047363

In [34]:
#read parquet df

df = spark.read.parquet(folder_path)

In [35]:
df.limit(10).show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00254|2019-10-24 21:26:26|2019-10-24 22:20:05|         237|         265|   NULL|                B00254|
|              B02795|2019-10-23 14:09:00|2019-10-23 14:57:13|         238|         183|   NULL|                B02795|
|              B00248|2019-10-07 23:51:13|2019-10-08 00:11:55|         264|          16|   NULL|                B00248|
|              B01087|2019-10-11 01:45:09|2019-10-11 02:17:12|          50|          35|   NULL|                B01087|
|              B03016|2019-10-29 21:14:33|2019-10-29 21:25:46|         264|          32|   NULL|                B02864|
|              B00887|2019-10-21 14:12:3

In [37]:
#How many taxi trips were there that started on the 15th of October?
df.filter(f.to_date(f.col('pickup_datetime')) == '2019-10-15')\
    .select(f.count('*').alias('count_rows')).show()

+----------+
|count_rows|
+----------+
|     62610|
+----------+



In [43]:
# What is the length of the longest trip in the dataset in hours?

df.withColumn('Trip_length', f.col('dropOff_datetime').cast('long')-f.col('pickup_datetime').cast('long'))\
.select((f.max('Trip_length')/360).alias('longest_trip')).show()

+------------+
|longest_trip|
+------------+
|   6311525.0|
+------------+



In [49]:
# what is the name of the LEAST frequent pickup location Zone?
df_zones = spark.read.format('csv')\
        .option('header','true')\
        .load(os.getcwd()+"/data/taxi_zone_lookup.csv")\
        .select(f.col('LocationID').alias('PUlocationID'),'Zone')

df.join(df_zones,'PUlocationID','inner')\
    .groupBy('Zone')\
    .agg(f.count('*').alias('cnt'))\
    .orderBy(f.col('cnt'))\
    .select('Zone').limit(1).show()

+-----------+
|       Zone|
+-----------+
|Jamaica Bay|
+-----------+



In [None]:
## if spark is local
# spark.stop()