In [1]:
import os
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
rootpath = os.path.dirname(os.path.abspath(""))
datapath = os.path.join(rootpath, 'data')
print(f"datapath: {datapath}")

datapath: /home/onur/repos/nytaxi-spark/data


In [3]:
spark = SparkSession.builder \
    .master('local[2]') \
    .appName('taxi_schema') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/13 08:41:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Green: read and bring to correct format

In [6]:
df_green = spark.read.csv(
    os.path.join(datapath,'raw', 'green', '2021', '01'),
    #'/home/onur/repos/nytaxi-spark/data/raw/green/2020/01/'
    header=True
    #schema=schema
)

In [7]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-01-01 00:15:56|  2021-01-01 00:19:52|                 N|         1|          43|         151|              1|         1.01|        5.5|  0.5|    0.

In [8]:
df_green_pd = pd.read_csv('/home/onur/repos/nytaxi-spark/data/raw/green/2020/01/green_tripdata_2020_01.csv.gz', nrows=1000)

In [11]:
df_green_pd.dtypes

VendorID                   int64
lpep_pickup_datetime      object
lpep_dropoff_datetime     object
store_and_fwd_flag        object
RatecodeID                 int64
PULocationID               int64
DOLocationID               int64
passenger_count            int64
trip_distance            float64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
ehail_fee                float64
improvement_surcharge    float64
total_amount             float64
payment_type               int64
trip_type                  int64
congestion_surcharge     float64
dtype: object

In [12]:
spark.createDataFrame(df_green_pd).schema

StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', LongType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', LongType(), True), StructField('trip_type', LongType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [None]:
from pyspark.sql import types

In [17]:
green_schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True),
    types.StructField('lpep_pickup_datetime', types.TimestampType(), True),
    types.StructField('lpep_dropoff_datetime', types.TimestampType(), True),
    types.StructField('store_and_fwd_flag', types.StringType(), True),
    types.StructField('RatecodeID', types.IntegerType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('passenger_count', types.IntegerType(), True),
    types.StructField('trip_distance', types.DoubleType(), True),
    types.StructField('fare_amount', types.DoubleType(), True),
    types.StructField('extra', types.DoubleType(), True),
    types.StructField('mta_tax', types.DoubleType(), True),
    types.StructField('tip_amount', types.DoubleType(), True),
    types.StructField('tolls_amount', types.DoubleType(), True),
    types.StructField('ehail_fee', types.DoubleType(), True),
    types.StructField('improvement_surcharge', types.DoubleType(), True),
    types.StructField('total_amount', types.DoubleType(), True),
    types.StructField('payment_type', types.IntegerType(), True),
    types.StructField('trip_type', types.IntegerType(), True),
    types.StructField('congestion_surcharge', types.DoubleType(), True)
    ])

In [18]:
df_green = spark.read.csv(
    os.path.join(datapath,'raw', 'green', '2021', '01'),
    #'/home/onur/repos/nytaxi-spark/data/raw/green/2020/01/'
    header=True,
    schema=green_schema
)

In [19]:
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



### Same thing for yellow

In [22]:
df_yellow_pd = pd.read_csv('/home/onur/repos/nytaxi-spark/data/raw/yellow/2020/01/yellow_tripdata_2020_01.csv.gz', nrows=1000)

In [26]:
spark.createDataFrame(df_yellow_pd).schema

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', StringType(), True), StructField('tpep_dropoff_datetime', StringType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [28]:
yellow_schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True), 
    types.StructField('tpep_pickup_datetime', types.TimestampType(), True), 
    types.StructField('tpep_dropoff_datetime', types.TimestampType(), True), 
    types.StructField('passenger_count', types.IntegerType(), True), 
    types.StructField('trip_distance', types.DoubleType(), True), 
    types.StructField('RatecodeID', types.IntegerType(), True), 
    types.StructField('store_and_fwd_flag', types.StringType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('payment_type', types.IntegerType(), True), 
    types.StructField('fare_amount', types.DoubleType(), True), 
    types.StructField('extra', types.DoubleType(), True), 
    types.StructField('mta_tax', types.DoubleType(), True), 
    types.StructField('tip_amount', types.DoubleType(), True), 
    types.StructField('tolls_amount', types.DoubleType(), True), 
    types.StructField('improvement_surcharge', types.DoubleType(), True), 
    types.StructField('total_amount', types.DoubleType(), True), 
    types.StructField('congestion_surcharge', types.DoubleType(), True)])

In [29]:
df_yellow = spark.read.csv(
    os.path.join(datapath,'raw', 'yellow', '2021', '01'),
    #'/home/onur/repos/nytaxi-spark/data/raw/green/2020/01/'
    header=True,
    schema=yellow_schema
)

In [30]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



### Loop over years and months

In [40]:
year = 2020

for month in range(1, 13):
    input_path = os.path.join(datapath,'raw', 'green', f"{year}", f"{month:02d}")
    output_path = os.path.join(datapath,'pq', 'green', f"{year}", f"{month:02d}")
    print(f'Processing: {input_path}')
    df_green = spark.read.csv(
        input_path,
        #'/home/onur/repos/nytaxi-spark/data/raw/green/2020/01/'
        header=True,
        schema=green_schema
    )
    df_green.repartition(4).write.parquet(output_path)

Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/01


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/02


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/03


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/04
Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/05
Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/06
Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/07
Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/08
Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/09
Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/10


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/11
Processing: /home/onur/repos/nytaxi-spark/data/raw/green/2020/12


In [None]:
for year in [2020, 2021]:
    for month in range(1, 13):
        input_path = os.path.join(datapath,'raw', 'yellow', f"{year}", f"{month:02d}")
        output_path = os.path.join(datapath,'pq', 'yellow', f"{year}", f"{month:02d}")
        print(f'Processing: {input_path}')
        df_yellow = spark.read.csv(
            input_path,
            #'/home/onur/repos/nytaxi-spark/data/raw/green/2020/01/'
            header=True,
            schema=yellow_schema
        )
        df_yellow.repartition(4).write.parquet(output_path)

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/01


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/02


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/03


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/04


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/05


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/06


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/07


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/08


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/09


                                                                                

Processing: /home/onur/repos/nytaxi-spark/data/raw/yellow/2020/10


[Stage 89:>                                                         (0 + 1) / 1]