In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession \
    .builder \
    .master('local[*]') \
    .getOrCreate()

23/02/25 12:15:19 WARN Utils: Your hostname, yujaeseong-ui-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 192.168.219.147 instead (on interface en0)
23/02/25 12:15:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/25 12:15:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/25 12:15:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .csv('../fhvhv_tripdata_2021-06.csv.gz')

                                                                                

In [4]:
df = df \
    .withColumn('pickup_datetime', F.to_timestamp('pickup_datetime')) \
    .withColumn('dropoff_datetime', F.to_timestamp('dropoff_datetime'))
    

In [5]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [4]:
df.repartition(12).write.parquet('hw/', mode='overwrite')

23/02/25 11:58:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,009,097,101 bytes) of heap memory
Scaling row group sizes to 93.98% for 8 writers
                                                                                

In [6]:
df.show(3)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
only showing top 3 rows



In [6]:
df.registerTempTable('trips')



In [7]:
spark.sql("""
SELECT 
    count(1)
FROM trips
WHERE DATE(pickup_datetime) = '2021-06-15'
""").show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|  452470|
+--------+



                                                                                

In [10]:
spark.sql("""
SELECT
    (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) / 3600 AS trip_duration
FROM
    trips
ORDER BY trip_duration DESC
""").show()

[Stage 7:>                                                          (0 + 1) / 1]

+------------------+
|     trip_duration|
+------------------+
|  66.8788888888889|
|25.549722222222222|
|19.980833333333333|
|18.197222222222223|
|16.466944444444444|
|14.268888888888888|
|13.909722222222221|
|             11.67|
|11.365833333333333|
|10.984444444444444|
|           10.2675|
| 9.966388888888888|
| 9.966388888888888|
| 9.637777777777778|
| 9.624444444444444|
| 9.480277777777777|
| 9.471666666666666|
| 9.402222222222223|
| 9.393611111111111|
| 9.376944444444444|
+------------------+
only showing top 20 rows



                                                                                

In [11]:
df_zones = spark.read \
    .option('header', 'true') \
    .csv('../taxi_zone_lookup.csv')

In [12]:
df.show(1)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
only showing top 1 row



In [13]:
df_zones.show(1)

+----------+-------+--------------+------------+
|LocationID|Borough|          Zone|service_zone|
+----------+-------+--------------+------------+
|         1|    EWR|Newark Airport|         EWR|
+----------+-------+--------------+------------+
only showing top 1 row



In [15]:
df_join = df.join(df_zones, df.PULocationID == df_zones.LocationID, 'left').drop('LocationID')
df_join.show(3)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------+------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|Borough|              Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------+------------------+------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|  Bronx|           Norwood|   Boro Zone|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|  Bronx|         Bronxdale|   Boro Zone|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|  Bronx|Van Cortlandt Park|   Boro Zone|
+--------------------+------------------

In [16]:
df_join.registerTempTable('trips_zones')



In [17]:
spark.sql("""
SELECT
    zone,
    count(1) AS trips_count
FROM
    trips_zones
GROUP BY 
    zone
ORDER BY trips_count DESC
""").show()

[Stage 16:>                                                         (0 + 1) / 1]

+--------------------+-----------+
|                zone|trips_count|
+--------------------+-----------+
| Crown Heights North|     231279|
|        East Village|     221244|
|         JFK Airport|     188867|
|      Bushwick South|     187929|
|       East New York|     186780|
|TriBeCa/Civic Center|     164344|
|   LaGuardia Airport|     161596|
|            Union Sq|     158937|
|        West Village|     154698|
|             Astoria|     152493|
|     Lower East Side|     151020|
|        East Chelsea|     147673|
|Central Harlem North|     146402|
|Williamsburg (Nor...|     143683|
|          Park Slope|     143594|
|  Stuyvesant Heights|     141427|
|        Clinton East|     139611|
|West Chelsea/Huds...|     139431|
|             Bedford|     138428|
|         Murray Hill|     137879|
+--------------------+-----------+
only showing top 20 rows



                                                                                