In [6]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
import os
import glob

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/06 18:27:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/03/06 18:28:02 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [10]:
spark.version

'3.5.1'

In [3]:
df_pandas = pd.read_csv('/Users/amaliatemneanu/Documents/Projects/data-engineering-zoomcamp-assignments/5.1_batch/data/raw/fhv/2019/10/fhv_tripdata_2019_10.csv.gz')

In [4]:
df_pandas.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014


In [5]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', DoubleType(), True), StructField('DOlocationID', DoubleType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [7]:
fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropOff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [9]:
year = 2019

month = 10

print(f'processing data for {year}/{month}')

input_path = f'data/raw/fhv/{year}/{month:02d}/'
output_path = f'data/pq/fhv/{year}/{month:02d}/'

df_fhv = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv(input_path)

df_fhv \
    .repartition(6) \
    .write.parquet(output_path, mode="overwrite")

processing data for 2019/10


                                                                                

In [10]:
# List all files in the output directory
files = glob.glob(os.path.join(output_path, "*.parquet"))
print(f"Found {len(files)} files: {files}")

# Calculate the total size of the files in MB
total_size_mb = sum(os.path.getsize(f) for f in files) / (1024 * 1024)

# Calculate the average size of the files
avg_size_mb = total_size_mb / len(files)

print(f"The average size of the Parquet files is {avg_size_mb} MB")

Found 6 files: ['data/pq/fhv/2019/10/part-00000-ce5a64df-a483-448d-8cc8-02522e6872d1-c000.snappy.parquet', 'data/pq/fhv/2019/10/part-00001-ce5a64df-a483-448d-8cc8-02522e6872d1-c000.snappy.parquet', 'data/pq/fhv/2019/10/part-00003-ce5a64df-a483-448d-8cc8-02522e6872d1-c000.snappy.parquet', 'data/pq/fhv/2019/10/part-00004-ce5a64df-a483-448d-8cc8-02522e6872d1-c000.snappy.parquet', 'data/pq/fhv/2019/10/part-00005-ce5a64df-a483-448d-8cc8-02522e6872d1-c000.snappy.parquet', 'data/pq/fhv/2019/10/part-00002-ce5a64df-a483-448d-8cc8-02522e6872d1-c000.snappy.parquet']
The average size of the Parquet files is 6.354584693908691 MB


In [27]:
df_fhv.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
+--------------------+------------------

In [12]:
df_fhv.registerTempTable('trips_data')



In [14]:
result = spark.sql("""
SELECT
    COUNT(*) AS trip_count
FROM
    trips_data
WHERE
    DATE(pickup_datetime) = '2019-10-15'
""")

result.show()

[Stage 7:>                                                          (0 + 1) / 1]

+----------+
|trip_count|
+----------+
|     62610|
+----------+



                                                                                

In [15]:
longest_trip = spark.sql("""
SELECT
    MAX((unix_timestamp(dropOff_datetime) - unix_timestamp(pickup_datetime)) / 3600.0) AS longest_trip_hours
FROM
    trips_data
""")

longest_trip.show()


[Stage 10:>                                                         (0 + 1) / 1]

+------------------+
|longest_trip_hours|
+------------------+
|     631152.500000|
+------------------+



                                                                                

In [16]:
df_zones = spark.read.parquet('zones/')

In [17]:
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [18]:
df_zones.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [19]:
df_result = df_fhv.join(df_zones, df_fhv.PULocationID == df_zones.LocationID)

In [21]:
df_result.show(100)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|LocationID|  Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|       264|  Unknown|                  NV|         N/A|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|       264|  Unknown|                  NV|         N/A|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|      

In [22]:
df_result.registerTempTable('joined_fhv_zones')



In [26]:
least_frequent_pickup_zone_query = """
SELECT Zone, COUNT(*) as pickup_count
FROM joined_fhv_zones
GROUP BY Zone
ORDER BY pickup_count ASC
LIMIT 1
"""

least_frequent_pickup_zone = spark.sql(least_frequent_pickup_zone_query)
least_frequent_pickup_zone.show()


[Stage 32:>                                                         (0 + 1) / 1]

+-----------+------------+
|       Zone|pickup_count|
+-----------+------------+
|Jamaica Bay|           1|
+-----------+------------+



                                                                                