In [1]:
from pyspark.sql import SparkSession

In [18]:
print(spark.version)

3.1.3


In [2]:
# Spark session
spark = SparkSession.builder \
  .master('yarn') \
  .appName('module 5_1') \
  .getOrCreate()

In [3]:
input_path = "gs://data-zoom-bucket/yellow_tripdata_2024-10.parquet"

In [4]:
 df = spark.read.parquet(input_path)

                                                                                

In [5]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [6]:
df.createOrReplaceTempView("taxi_data")

In [9]:
result = spark.sql("""
    SELECT COUNT(*) AS record_count
    FROM taxi_data
    WHERE DATE(tpep_pickup_datetime) = '2024-10-15' 
""")

result.show()



+------------+
|record_count|
+------------+
|      128893|
+------------+



                                                                                

In [11]:
from pyspark.sql import functions as F

In [13]:
df \
    .withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime)) \
    .filter("pickup_date = '2024-10-15'") \
    .count()

                                                                                

128893

In [17]:
spark.sql("""
SELECT
    to_date(tpep_pickup_datetime) AS pickup_date,
    MAX((CAST(tpep_dropoff_datetime AS LONG) - CAST(tpep_pickup_datetime AS LONG)) / (60*60)) AS duration
FROM 
    taxi_data
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 10;
""").show()



+-----------+------------------+
|pickup_date|          duration|
+-----------+------------------+
| 2024-10-16|162.61777777777777|
| 2024-10-03|           143.325|
| 2024-10-22|137.76055555555556|
| 2024-10-18|114.83472222222223|
| 2024-10-21| 89.89833333333333|
| 2024-10-20| 89.44611111111111|
| 2024-10-12| 67.57333333333334|
| 2024-10-17| 66.06666666666666|
| 2024-10-24| 38.47416666666667|
| 2024-10-23| 33.95111111111111|
+-----------+------------------+



                                                                                

In [19]:
input_path2 = "gs://data-zoom-bucket/taxi_zone_lookup.csv"

In [21]:
df_loopup = spark.read.csv(input_path2, header=True, inferSchema=True)


                                                                                

In [24]:
df_loopup.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [26]:
df_loopup.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [27]:
df_loopup.createOrReplaceTempView("zone_table")

In [30]:
spark.sql("""
SELECT
    a.PULocationID,
    b.Zone,
    count(1) rec_count
FROM 
    taxi_data a
    left join zone_table b on a.PULocationID = b.LocationID
GROUP BY
    1,2
ORDER BY
    3
LIMIT 10;
""").show()



+------------+--------------------+---------+
|PULocationID|                Zone|rec_count|
+------------+--------------------+---------+
|         105|Governor's Island...|        1|
|           5|       Arden Heights|        2|
|         199|       Rikers Island|        2|
|           2|         Jamaica Bay|        3|
|         111| Green-Wood Cemetery|        3|
|         204|   Rossville/Woodrow|        4|
|         245|       West Brighton|        4|
|          84|Eltingville/Annad...|        4|
|          44|Charleston/Totten...|        4|
|         187|       Port Richmond|        4|
+------------+--------------------+---------+



                                                                                

In [31]:
# Repartition data (adjust the number as needed)
repartitioned_df = df.repartition(4)

In [33]:
output_path = "gs://zoom-batch-storage/output/module5"
repartitioned_df.write \
        .mode("overwrite") \
        .parquet(output_path)

                                                                                

In [34]:
spark.stop()