In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/01 21:25:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_green = spark.read \
.option("header", "true") \
.csv('data/raw/green/2021/01/')

In [4]:
import pandas as pd 

In [5]:
df_green_pd = pd.read_csv('data/raw/green/2021/01/green_tripdata_2021_01.csv.gz', nrows=1000)

In [6]:
df_green_pd.dtypes

VendorID                   int64
lpep_pickup_datetime      object
lpep_dropoff_datetime     object
store_and_fwd_flag        object
RatecodeID                 int64
PULocationID               int64
DOLocationID               int64
passenger_count            int64
trip_distance            float64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
ehail_fee                float64
improvement_surcharge    float64
total_amount             float64
payment_type               int64
trip_type                  int64
congestion_surcharge     float64
dtype: object

In [9]:
from pyspark.sql import types

In [13]:
green_schema = types.StructType([
            types.StructField('VendorID', types.IntegerType(), True),
            types.StructField('lpep_pickup_datetime', types.TimestampType(), True), 
            types.StructField('lpep_dropoff_datetime', types.TimestampType(), True), 
            types.StructField('store_and_fwd_flag', types.StringType(), True), 
            types.StructField('RatecodeID', types.IntegerType(), True), 
            types.StructField('PULocationID', types.IntegerType(), True), 
            types.StructField('DOLocationID', types.IntegerType(), True), 
            types.StructField('passenger_count', types.IntegerType(), True), 
            types.StructField('trip_distance', types.DoubleType(), True), 
            types.StructField('fare_amount', types.DoubleType(), True), 
            types.StructField('extra', types.DoubleType(), True), 
            types.StructField('mta_tax', types.DoubleType(), True), 
            types.StructField('tip_amount', types.DoubleType(), True), 
            types.StructField('tolls_amount', types.DoubleType(), True), 
            types.StructField('ehail_fee', types.DoubleType(), True), 
            types.StructField('improvement_surcharge', types.DoubleType(), True), 
            types.StructField('total_amount', types.DoubleType(), True), 
            types.StructField('payment_type', types.LongType(), True), 
            types.StructField('trip_type', types.LongType(), True), 
            types.StructField('congestion_surcharge', types.DoubleType(), True)])

In [34]:
year = 2020

for month in range(1,13):
    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'
    
    df_green = spark.read \
    .option("header", "true") \
    .schema(green_schema) \
    .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path, mode='overwrite')

    print(f"processed data for {year} and {month:02d}")

                                                                                

processed data for 2020 and 01


                                                                                

processed data for 2020 and 02


                                                                                

processed data for 2020 and 03
processed data for 2020 and 04
processed data for 2020 and 05
processed data for 2020 and 06
processed data for 2020 and 07
processed data for 2020 and 08
processed data for 2020 and 09
processed data for 2020 and 10
processed data for 2020 and 11
processed data for 2020 and 12


In [7]:
spark.createDataFrame(df_green_pd).schema

StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', LongType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', LongType(), True), StructField('trip_type', LongType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [16]:
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [17]:
df_yellow = spark.read \
.option("header", "true") \
.csv('data/raw/yellow/2021/01/')

In [20]:
df_yellow_pd = pd.read_csv('data/raw/yellow/2021/01/yellow_tripdata_2021_01.csv.gz', nrows=1000)

In [21]:
spark.createDataFrame(df_yellow_pd).schema

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', StringType(), True), StructField('tpep_dropoff_datetime', StringType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [22]:
yellow_schema = types.StructType([
        types.StructField('VendorID', types.IntegerType(), True), 
        types.StructField('tpep_pickup_datetime', types.TimestampType(), True), 
        types.StructField('tpep_dropoff_datetime', types.TimestampType(), True), 
        types.StructField('passenger_count', types.LongType(), True), 
        types.StructField('trip_distance', types.DoubleType(), True), 
        types.StructField('RatecodeID', types.IntegerType(), True), 
        types.StructField('store_and_fwd_flag', types.StringType(), True), 
        types.StructField('PULocationID', types.IntegerType(), True), 
        types.StructField('DOLocationID', types.IntegerType(), True), 
        types.StructField('payment_type', types.LongType(), True), 
        types.StructField('fare_amount', types.DoubleType(), True), 
        types.StructField('extra', types.DoubleType(), True), 
        types.StructField('mta_tax', types.DoubleType(), True), 
        types.StructField('tip_amount', types.DoubleType(), True), 
        types.StructField('tolls_amount', types.DoubleType(), True), 
        types.StructField('improvement_surcharge', types.DoubleType(), True), 
        types.StructField('total_amount', types.DoubleType(), True), 
        types.StructField('congestion_surcharge', types.DoubleType(), True)])

In [24]:
df_yellow = spark.read \
.option("header", "true") \
.schema(yellow_schema) \
.csv('data/raw/yellow/2021/01/')

In [25]:
df_yellow.dtypes

[('VendorID', 'int'),
 ('tpep_pickup_datetime', 'timestamp'),
 ('tpep_dropoff_datetime', 'timestamp'),
 ('passenger_count', 'bigint'),
 ('trip_distance', 'double'),
 ('RatecodeID', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('PULocationID', 'int'),
 ('DOLocationID', 'int'),
 ('payment_type', 'bigint'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'double'),
 ('improvement_surcharge', 'double'),
 ('total_amount', 'double'),
 ('congestion_surcharge', 'double')]