In [1]:
import pandas as pd

import pyspark
from pyspark.sql import SparkSession, types

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Aim: We want to convert the csv files into parquet files because:

- parquet files will have the schema built-in (no need to infer schema or specify schema)
- file size will be smaller

We start by defining a schema for the green taxi trips, starting from the schema inferred by pandas.

In [5]:
pd_df = pd.read_csv('./data/raw/green/2020/01/green_tripdata_2020_01.csv.gz', nrows=500)
spark.createDataFrame(pd_df).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', LongType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', LongType(), True), StructField('trip_type', LongType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [13]:
green_schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True),
    types.StructField('lpep_pickup_datetime', types.TimestampType(), True),
    types.StructField('lpep_dropoff_datetime', types.TimestampType(), True),
    types.StructField('store_and_fwd_flag', types.StringType(), True),
    types.StructField('RatecodeID', types.IntegerType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('passenger_count', types.IntegerType(), True), 
    types.StructField('trip_distance', types.DoubleType(), True), 
    types.StructField('fare_amount', types.DoubleType(), True), 
    types.StructField('extra', types.DoubleType(), True), 
    types.StructField('mta_tax', types.DoubleType(), True), 
    types.StructField('tip_amount', types.DoubleType(), True), 
    types.StructField('tolls_amount', types.DoubleType(), True), 
    types.StructField('ehail_fee', types.DoubleType(), True), 
    types.StructField('improvement_surcharge', types.DoubleType(), True), 
    types.StructField('total_amount', types.DoubleType(), True), 
    types.StructField('payment_type', types.IntegerType(), True), 
    types.StructField('trip_type', types.IntegerType(), True), 
    types.StructField('congestion_surcharge', types.DoubleType(), True)
])

We can now use the schema to read in the csv and write it to a repartitioned parquet file.

In [16]:
year = 2020
for month in range(1, 13):
    print(f'Converting data for {year}-{month:02d}')
    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'
    
    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

Converting data for 2020-01
Converting data for 2020-02
Converting data for 2020-03
Converting data for 2020-04
Converting data for 2020-05
Converting data for 2020-06
Converting data for 2020-07
Converting data for 2020-08
Converting data for 2020-09
Converting data for 2020-10
Converting data for 2020-11
Converting data for 2020-12


We can now do the same for the yellow dataset.

In [10]:
pd_df = pd.read_csv('./data/raw/yellow/2020/01/yellow_tripdata_2020_01.csv.gz', nrows=500)
spark.createDataFrame(pd_df).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', StringType(), True), StructField('tpep_dropoff_datetime', StringType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [17]:
yellow_schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True),
    types.StructField('tpep_pickup_datetime', types.TimestampType(), True),
    types.StructField('tpep_dropoff_datetime', types.TimestampType(), True),
    types.StructField('passenger_count', types.IntegerType(), True), 
    types.StructField('trip_distance', types.DoubleType(), True), 
    types.StructField('RatecodeID', types.IntegerType(), True),
    types.StructField('store_and_fwd_flag', types.StringType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('payment_type', types.IntegerType(), True), 
    types.StructField('fare_amount', types.DoubleType(), True), 
    types.StructField('extra', types.DoubleType(), True), 
    types.StructField('mta_tax', types.DoubleType(), True), 
    types.StructField('tip_amount', types.DoubleType(), True), 
    types.StructField('tolls_amount', types.DoubleType(), True), 
    types.StructField('improvement_surcharge', types.DoubleType(), True), 
    types.StructField('total_amount', types.DoubleType(), True), 
    types.StructField('congestion_surcharge', types.DoubleType(), True)
])

In [18]:
year = 2020
for month in range(1, 13):
    print(f'Converting data for {year}-{month:02d}')
    input_path = f'data/raw/yellow/{year}/{month:02d}/'
    output_path = f'data/pq/yellow/{year}/{month:02d}/'
    
    df_yellow = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_yellow \
        .repartition(4) \
        .write.parquet(output_path)

Converting data for 2020-01
Converting data for 2020-02
Converting data for 2020-03
Converting data for 2020-04
Converting data for 2020-05
Converting data for 2020-06
Converting data for 2020-07
Converting data for 2020-08
Converting data for 2020-09
Converting data for 2020-10
Converting data for 2020-11
Converting data for 2020-12


In [19]:
# spark.stop()