In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/14 12:42:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Creating Green Schema

In [6]:
import pandas as pd

df_green_pd = pd.read_csv('data/raw/green/2021/01/green_tripdata_2021_01.csv.gz', nrows=1000)

In [7]:
df_green_pd.dtypes

VendorID                   int64
lpep_pickup_datetime      object
lpep_dropoff_datetime     object
store_and_fwd_flag        object
RatecodeID                 int64
PULocationID               int64
DOLocationID               int64
passenger_count            int64
trip_distance            float64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
ehail_fee                float64
improvement_surcharge    float64
total_amount             float64
payment_type               int64
trip_type                  int64
congestion_surcharge     float64
dtype: object

In [8]:
spark.createDataFrame(df_green_pd).schema # change this

StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', LongType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', LongType(), True), StructField('trip_type', LongType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [17]:
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, TimestampType, IntegerType

green_schema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("lpep_pickup_datetime", TimestampType(), True),
    StructField("lpep_dropoff_datetime", TimestampType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("ehail_fee", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("trip_type", IntegerType(), True),
    StructField("congestion_surcharge", DoubleType(), True)
])


In [26]:
year = 2020

for month in range(1, 13):
    print(f'Processing data for {year}/{month:02d}')
    input_path = f"data/raw/green/{year}/{month:02d}/"
    output_path = f"data/pq/green/{year}/{month:02d}/"
    df_green = spark.read \
        .schema(green_schema) \
        .csv(input_path, header=True)
    
    df_green \
        .repartition(4) \
        .write.parquet(output_path)

Processing data for 2020/01


                                                                                

Processing data for 2020/02


                                                                                

Processing data for 2020/03


                                                                                

Processing data for 2020/04
Processing data for 2020/05
Processing data for 2020/06
Processing data for 2020/07
Processing data for 2020/08
Processing data for 2020/09
Processing data for 2020/10
Processing data for 2020/11
Processing data for 2020/12


## Creating Yellow Schema

In [12]:
df_yellow_pd = pd.read_csv('data/raw/yellow/2021/01/yellow_tripdata_2021_01.csv.gz', nrows=1000)

In [15]:
spark.createDataFrame(df_yellow_pd).schema # change this

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', StringType(), True), StructField('tpep_dropoff_datetime', StringType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [16]:
# from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType

yellow_schema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True)
])

In [22]:
df_yellow = spark.read \
    .schema(yellow_schema) \
    .csv("data/raw/yellow/2021/01/", header=True)

In [28]:
year = 2021

for month in range(1, 13):
    print(f'Processing data for {year}/{month:02d}')
    input_path = f"data/raw/yellow/{year}/{month:02d}/"
    output_path = f"data/pq/yellow/{year}/{month:02d}/"
    df_yellow = spark.read \
        .schema(yellow_schema) \
        .csv(input_path, header=True)
    
    df_yellow \
        .repartition(4) \
        .write.parquet(output_path)

Processing data for 2021/01


                                                                                

Processing data for 2021/02


                                                                                

Processing data for 2021/03


                                                                                

Processing data for 2021/04


                                                                                

Processing data for 2021/05


                                                                                

Processing data for 2021/06


                                                                                

Processing data for 2021/07


[Stage 115:>                                                        (0 + 4) / 4]

Processing data for 2021/08


                                                                                

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/charles/de-zoomcamp-2025/notes/5/code/data/raw/yellow/2021/08.