In [87]:
import pyspark
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, LongType, DoubleType, TimestampType, StructField, StructType

In [44]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [58]:
green_taxi_schema = StructType([
    StructField('VendorID', LongType(), True), 
    StructField('lpep_pickup_datetime', TimestampType(), True), 
    StructField('lpep_dropoff_datetime', TimestampType(), True), 
    StructField('store_and_fwd_flag', StringType(), True), 
    StructField('RatecodeID', DoubleType(), True), 
    StructField('PULocationID', LongType(), True), 
    StructField('DOLocationID', LongType(), True), 
    StructField('passenger_count', DoubleType(), True), 
    StructField('trip_distance', DoubleType(), True), 
    StructField('fare_amount', DoubleType(), True), 
    StructField('extra', DoubleType(), True), 
    StructField('mta_tax', DoubleType(), True), 
    StructField('tip_amount', DoubleType(), True), 
    StructField('tolls_amount', DoubleType(), True), 
    StructField('ehail_fee', IntegerType(), True), 
    StructField('improvement_surcharge', DoubleType(), True), 
    StructField('total_amount', DoubleType(), True), 
    StructField('payment_type', DoubleType(), True), 
    StructField('trip_type', DoubleType(), True), 
    StructField('congestion_surcharge', DoubleType(), True)])

In [60]:
year = 2020
for month in range(1,13):
    # Read file
    print(f'processing data for {year}/{month:02d}')
    df_green_taxi = spark.read\
        .format('parquet')\
        .option('header', True)\
        .schema(green_taxi_schema)\
        .load(f'data/raw/green/{year}/{month:02d}/')

    # Write file
    df_green_taxi.repartition(4).write.mode('overwrite')\
        .format('parquet')\
        .save(f'data/pq/green/{year}/{month:02d}')

processing data for 2020/01


                                                                                

processing data for 2020/02


                                                                                

processing data for 2020/03
processing data for 2020/04
processing data for 2020/05
processing data for 2020/06
processing data for 2020/07
processing data for 2020/08
processing data for 2020/09
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [84]:
yellow_taxi_schema = StructType([
    StructField('VendorID', LongType(), True), 
    StructField('tpep_pickup_datetime', TimestampType(), True), 
    StructField('tpep_dropoff_datetime', TimestampType(), True), 
    StructField('passenger_count', DoubleType(), True), 
    StructField('trip_distance', DoubleType(), True), 
    StructField('RatecodeID', DoubleType(), True), 
    StructField('store_and_fwd_flag', StringType(), True), 
    StructField('PULocationID', LongType(), True), 
    StructField('DOLocationID', LongType(), True), 
    StructField('payment_type', LongType(), True), 
    StructField('fare_amount', DoubleType(), True), 
    StructField('extra', DoubleType(), True), 
    StructField('mta_tax', DoubleType(), True), 
    StructField('tip_amount', DoubleType(), True), 
    StructField('tolls_amount', DoubleType(), True), 
    StructField('improvement_surcharge', DoubleType(), True), 
    StructField('total_amount', DoubleType(), True), 
    StructField('congestion_surcharge', DoubleType(), True), 
    StructField('airport_fee', DoubleType(), True)])

In [92]:
year = 2020
for month in range(1,13):
    # Read file
    print(f'processing data for {year}/{month:02d}')
    df_yellow_taxi = spark.read\
        .format('parquet')\
        .option('header', True)\
        .load(f'data/raw/yellow/{year}/{month:02d}/')\
        .withColumn('airport_fee', col('airport_fee').cast(DoubleType()))

    # Write file
    df_yellow_taxi.repartition(4).write.mode('overwrite')\
        .format('parquet')\
        .save(f'data/pq/yellow/{year}/{month:02d}')

processing data for 2020/01


                                                                                

processing data for 2020/02


                                                                                

processing data for 2020/03


                                                                                

processing data for 2020/04


                                                                                

processing data for 2020/05


                                                                                

processing data for 2020/06


                                                                                

processing data for 2020/07


                                                                                

processing data for 2020/08


                                                                                

processing data for 2020/09


                                                                                

processing data for 2020/10


                                                                                

processing data for 2020/11


                                                                                

processing data for 2020/12


                                                                                

In [91]:
df_yellow_taxi = spark.read\
        .format('parquet')\
        .option('header', True)\
        .schema(yellow_taxi_schema)\
        .load(f'data/raw/yellow/{year}/{month:02d}/')\
        .withColumn('airport_fee', col('airport_fee').cast(DoubleType()))
df_yellow_taxi.schema

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True)])