In [10]:
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F

In [3]:
# Question 1: What is your Spark version?
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

print(spark.version)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/27 15:23:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
3.3.0


In [25]:
# Question 2: Repartition HVFHV Data on June 2021 into 12 partitions 
# and save it to parquet format. What is the average size of parquet files?
!head -n 101 data/raw/fhvhv_tripdata_2021-06.csv > data/raw/head-homework.csv

df = spark.read \
    .option("header", "true") \
    .csv("data/raw/head-homework.csv")

df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [41]:
schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True), 
    types.StructField("pickup_datetime", types.TimestampType(), True), 
    types.StructField("dropoff_datetime", types.TimestampType(), True), 
    types.StructField("PULocationID", types.IntegerType(), True), 
    types.StructField("DOLocationID", types.IntegerType(), True), 
    types.StructField("SR_Flag", types.StringType(), True), 
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("data/raw/fhvhv_tripdata_2021-06.csv")

In [42]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [43]:
df.repartition(12).write.parquet("data/fhvhv/2021/06", mode="overwrite")

                                                                                

In [44]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [67]:
# Question 3: How many taxi trips were there on June 15?
df = spark.read.parquet("data/fhvhv/2021/06/")

df.withColumn("pickup_date", F.to_date("pickup_datetime")).filter(F.col("pickup_date") == "2021-06-15").count()

                                                                                

452470

In [78]:
# Question 4: Calculate the duration of each trip, How long was the longest trip in hours?
second_df = df.withColumn("duration_seconds", F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime"))
second_df.withColumn("duration_hours", F.col("duration_seconds") / 3600).orderBy(F.col("duration_hours"), ascending=False).show()

[Stage 58:>                                                         (0 + 4) / 4]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|duration_seconds|    duration_hours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+------------------+
|              B02872|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|      N|                B02872|          240764|  66.8788888888889|
|              B02765|2021-06-22 12:09:45|2021-06-23 13:42:44|         188|         198|      N|                B02765|           91979|25.549722222222222|
|              B02879|2021-06-27 10:32:29|2021-06-28 06:31:20|          78|         169|      N|                B02879|           71931|19.980833333333333|
|              B02800|2021-06-26 22:37:11|2021-06-27 16:49:01|  

                                                                                

In [None]:
# Question 5: What port does Spark's UI run on by default?
# PORT: 4040

In [90]:
# Question 6: What is the most frequent pickup location zone?
zone_df = spark.read \
    .option("header", "true") \
    .parquet("data/zones/")

In [93]:
df.join(zone_df, df.PULocationID == zone_df.LocationID, how="left").groupBy("Zone").count().orderBy("count", ascending=False).show()

[Stage 78:>                                                         (0 + 4) / 4]

+--------------------+------+
|                Zone| count|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
|     Lower East Side|151020|
|        East Chelsea|147673|
|Central Harlem North|146402|
|Williamsburg (Nor...|143683|
|          Park Slope|143594|
|  Stuyvesant Heights|141427|
|        Clinton East|139611|
|West Chelsea/Huds...|139431|
|             Bedford|138428|
|         Murray Hill|137879|
+--------------------+------+
only showing top 20 rows



                                                                                