In [14]:
import pyspark
from pyspark.sql import SparkSession

In [15]:
spark = SparkSession.builder \
    .appName("example") \
    .master("local[*]") \
    .getOrCreate()

In [9]:
# read gz parquet file of all month in directory: data\raw\yellow\2024
df = spark.read.option("compression", "gzip").parquet("data/raw/yellow/2024/01")
df.printSchema()
  

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [10]:
import pandas as pd
# read parquet file chunk at 1000 rows
pd_dataframe = pd.read_parquet("data/raw/yellow/2024/01/yellow_tripdata_2024-01.parquet", engine='pyarrow')
pd_dataframe = pd_dataframe.head(1000)  # limit to 1000 rows
pd_dataframe.dtypes

VendorID                          int32
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int32
DOLocationID                      int32
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
Airport_fee                     float64
dtype: object

In [11]:
# create Spark DataFrame and show schema
spark.createDataFrame(pd_dataframe).schema

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [13]:
from pyspark.sql import types
yellow_schema =  types.StructType([
types.StructField('VendorID', types.IntegerType(), True), 
types.StructField('tpep_pickup_datetime', types.TimestampNTZType(), True), 
types.StructField('tpep_dropoff_datetime', types.TimestampNTZType(), True), 
types.StructField('passenger_count', types.LongType(), True), 
types.StructField('trip_distance', types.DoubleType(), True), 
types.StructField('RatecodeID', types.LongType(), True), 
types.StructField('store_and_fwd_flag', types.StringType(), True), 
types.StructField('PULocationID', types.IntegerType(), True), 
types.StructField('DOLocationID', types.IntegerType(), True), 
types.StructField('payment_type', types.LongType(), True), 
types.StructField('fare_amount', types.DoubleType(), True), 
types.StructField('extra', types.DoubleType(), True), 
types.StructField('mta_tax', types.DoubleType(), True), 
types.StructField('tip_amount', types.DoubleType(), True), 
types.StructField('tolls_amount', types.DoubleType(), True), 
types.StructField('improvement_surcharge', types.DoubleType(), True), 
types.StructField('total_amount', types.DoubleType(), True), 
types.StructField('congestion_surcharge', types.DoubleType(), True), 
types.StructField('Airport_fee', types.DoubleType(), True)])

for month in range(1, 13):
    year = 2024
    month_str = f"{month:02d}"

    print (f"Processing data for {year}-{month_str}...")
    input_path = f"data/raw/yellow/{year}/{month_str}/"
    output_path = f"data/processed/yellow/{year}/{month_str}/"
    
    yellow_df = spark.read \
    .option("header", "true") \
    .schema(yellow_schema) \
    .parquet(input_path)

    yellow_df.repartition(4).write.mode("overwrite").parquet(output_path)

Processing data for 2024-01...
Processing data for 2024-02...
Processing data for 2024-03...
Processing data for 2024-04...
Processing data for 2024-05...
Processing data for 2024-06...
Processing data for 2024-07...
Processing data for 2024-08...
Processing data for 2024-09...
Processing data for 2024-10...
Processing data for 2024-11...
Processing data for 2024-12...


In [16]:
df = spark.read.option("compression", "gzip").parquet("data/raw/green/2024/01")
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [None]:
yellow_schema =   types.StructType([
types.StructField('VendorID', types.IntegerType(), True), 
types.StructField('lpep_pickup_datetime', types.TimestampNTZType(), True), 
types.StructField('lpep_dropoff_datetime', types.TimestampNTZType(), True), 
types.StructField('store_and_fwd_flag', types.StringType(), True), 
types.StructField('RatecodeID', types.LongType(), True), 
types.StructField('PULocationID', types.IntegerType(), True), 
types.StructField('DOLocationID', types.IntegerType(), True), 
types.StructField('passenger_count', types.LongType(), True), 
types.StructField('trip_distance', types.DoubleType(), True), 
types.StructField('fare_amount', types.DoubleType(), True), 
types.StructField('extra', types.DoubleType(), True), 
types.StructField('mta_tax', types.DoubleType(), True), 
types.StructField('tip_amount', types.DoubleType(), True), 
types.StructField('tolls_amount', types.DoubleType(), True), 
types.StructField('ehail_fee', types.DoubleType(), True), 
types.StructField('improvement_surcharge', types.DoubleType(), True), 
types.StructField('total_amount', types.DoubleType(), True), 
types.StructField('payment_type', types.LongType(), True), 
types.StructField('trip_type', types.LongType(), True),
types.StructField('congestion_surcharge', types.DoubleType(), True)])

for month in range(1, 13):
    year = 2024
    month_str = f"{month:02d}"

    print (f"Processing data for {year}-{month_str}...")
    input_path = f"data/raw/green/{year}/{month_str}/"
    output_path = f"data/processed/green/{year}/{month_str}/"
    
    yellow_df = spark.read \
    .option("header", "true") \
    .schema(yellow_schema) \
    .parquet(input_path).mode("overwrite")

    yellow_df.repartition(4).write.mode("overwrite").parquet(output_path)

AttributeError: module 'pyspark.sql.types' has no attribute 'Timestamp_ntzType'