In [18]:
import pyspark
from pyspark.sql import SparkSession

In [19]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [20]:
import pandas as pd

In [21]:
from pyspark.sql import types

In [22]:
green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [23]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2020/1


                                                                                

processing data for 2020/2


                                                                                

processing data for 2020/3
processing data for 2020/4
processing data for 2020/5
processing data for 2020/6
processing data for 2020/7
processing data for 2020/8
processing data for 2020/9
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [24]:
year = 2021 

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2021/1
processing data for 2021/2
processing data for 2021/3
processing data for 2021/4
processing data for 2021/5
processing data for 2021/6
processing data for 2021/7
processing data for 2021/8
processing data for 2021/9
processing data for 2021/10
processing data for 2021/11


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/beaubranton/de-zoomcamp/data-engineering-zoomcamp-main/05-batch/code/data/raw/green/2021/11.

In [29]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/yellow/{year}/{month:02d}/'
    output_path = f'data/pq/yellow/{year}/{month:02d}/'

    df_yellow = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_yellow \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2020/1


                                                                                

processing data for 2020/2


                                                                                

processing data for 2020/3


                                                                                

processing data for 2020/4
processing data for 2020/5
processing data for 2020/6


                                                                                

processing data for 2020/7


                                                                                

processing data for 2020/8


                                                                                

processing data for 2020/9


                                                                                

processing data for 2020/10


                                                                                

processing data for 2020/11


                                                                                

processing data for 2020/12


                                                                                

In [28]:
year = 2021

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/yellow/{year}/{month:02d}/'
    output_path = f'data/pq/yellow/{year}/{month:02d}/'

    df_yellow = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_yellow \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2021/1


                                                                                

processing data for 2021/2


                                                                                

processing data for 2021/3


                                                                                

processing data for 2021/4


                                                                                

processing data for 2021/5


                                                                                

processing data for 2021/6


                                                                                

processing data for 2021/7


                                                                                

processing data for 2021/8
processing data for 2021/9


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/beaubranton/de-zoomcamp/data-engineering-zoomcamp-main/05-batch/code/data/raw/yellow/2021/09.

In [17]:
df = pd.read_csv('/Users/beaubranton/de-zoomcamp/data-engineering-zoomcamp-main/05-batch/code/data/raw/yellow/2020/01/yellow_tripdata_2020_01.csv.gz',compression='gzip')
print(df)

  df = pd.read_csv('/Users/beaubranton/de-zoomcamp/data-engineering-zoomcamp-main/05-batch/code/data/raw/yellow/2020/01/yellow_tripdata_2020_01.csv.gz',compression='gzip')


         VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0             1.0  2020-01-01 00:28:15   2020-01-01 00:33:03              1.0   
1             1.0  2020-01-01 00:35:39   2020-01-01 00:43:04              1.0   
2             1.0  2020-01-01 00:47:41   2020-01-01 00:53:52              1.0   
3             1.0  2020-01-01 00:55:23   2020-01-01 01:00:14              1.0   
4             2.0  2020-01-01 00:01:58   2020-01-01 00:04:16              1.0   
...           ...                  ...                   ...              ...   
6405003       NaN  2020-01-31 22:51:00   2020-01-31 23:22:00              NaN   
6405004       NaN  2020-01-31 22:10:00   2020-01-31 23:26:00              NaN   
6405005       NaN  2020-01-31 22:50:07   2020-01-31 23:17:57              NaN   
6405006       NaN  2020-01-31 22:25:53   2020-01-31 22:48:32              NaN   
6405007       NaN  2020-01-31 22:44:00   2020-01-31 23:06:00              NaN   

         trip_distance  Rat