In [None]:
!hdfs dfs -ls /taxi/raw/

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PreProcessing") \
    .getOrCreate()

# All Data

## Renaming Columns, Payment Types, Pick Columns, Casting

In [None]:
df_all_raw = spark.read.parquet(f"/taxi/raw/2019/yellow_tripdata_2019-01.parquet")

In [None]:
df_all_raw.show(2)

In [None]:
from pyspark.sql.functions import when, expr, col

df = df_all_raw\
.withColumnRenamed("Trip_Pickup_DateTime","pickup_datetime")\
.withColumnRenamed("Trip_Dropoff_DateTime","dropoff_datetime")\
.withColumnRenamed("tpep_pickup_datetime","pickup_datetime")\
.withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")\
.withColumnRenamed("Passenger_Count","passenger_count")\
.withColumnRenamed("Trip_Distance","trip_distance")\
.withColumnRenamed("Payment_Type","payment_type")\
.withColumnRenamed("Tip_Amt","tip_amount")\
.withColumnRenamed("Total_Amt","total_amount")\
.withColumn('payment_type', \
              when(col("payment_type") == "Credit", 1)\
              .when(col("payment_type") == "CREDIT", 1)\
              .when(col("payment_type") == "CRD", 1)\
              .when(col("payment_type") == "Cre", 1)\
              .when(col("payment_type") == "CRE", 1)\
              .when(col("payment_type") == "CASH", 2)\
              .when(col("payment_type") == "Cash", 2)\
              .when(col("payment_type") == "CSH", 2)\
              .when(col("payment_type") == "CAS", 2)\
              .when(col("payment_type") == "Cas", 2)\
              .when(col("payment_type") == "No Charge", 3)\
              .when(col("payment_type") == "NO CHARGE", 3)\
              .when(col("payment_type") == "NOC", 3)\
              .when(col("payment_type") == "NO ", 3)\
              .when(col("payment_type") == "Dispute", 4)\
              .when(col("payment_type") == "DISPUTE", 4)\
              .when(col("payment_type") == "DIS", 4)\
              .when(col("payment_type") == "Dis", 4)\
              .when(col("payment_type") == "Unknown", 5)\
              .when(col("payment_type") == "UNKNOWN", 5)\
              .when(col("payment_type") == "UNK", 5)\
              .when(col("payment_type") == "Voided Trip", 6)\
              .when(col("payment_type") == "VOIDED TRIP", 6)\
              .otherwise(expr("payment_type")))\
.selectExpr(\
    "cast(pickup_datetime as timestamp)", \
    "cast(dropoff_datetime as timestamp)", \
    "cast(passenger_count as long)", \
    "trip_distance", \
    "cast(payment_type as string)", \
    "tip_amount", \
    "total_amount" \
)

In [None]:
df.show(2)

## adding month/year as columns

You can use `input_file_name` to get the filename of the dataframe. Here we have the month/year available

In [None]:
from pyspark.sql.functions import input_file_name

df.withColumn("filename", input_file_name()).show(2, False)

In [None]:
from pyspark.sql.functions import input_file_name, split

df.withColumn("filename", split(input_file_name(), "-")).show(2, False)

In [None]:
df = df\
.withColumn("filename", split(input_file_name(), "-"))\
.selectExpr("*", "substring_index(element_at(filename, 2), '_', -1) as year", "substring_index(element_at(filename, -1), '.' , 1) as month")\
.drop("filename")

In [None]:
df.show(2)

## convert trip distance to km instead of miles

In [None]:
from pyspark.sql.functions import expr

df = df.withColumn("trip_distance", expr("trip_distance * 1.60934"))

## adding a field `trip_amount`

In [None]:
df = df.withColumn("trip_amount", expr("total_amount-tip_amount"))

## Union Data

In [None]:
from pyspark.sql.functions import when, expr, col, input_file_name, split
   
def read(year, month):
    df = spark.read.parquet(f"/taxi/raw/{year}/yellow_tripdata_{year}-{month}.parquet")\
    
    df = df\
    .withColumnRenamed("Trip_Pickup_DateTime","pickup_datetime")\
    .withColumnRenamed("Trip_Dropoff_DateTime","dropoff_datetime")\
    .withColumnRenamed("tpep_pickup_datetime","pickup_datetime")\
    .withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")\
    .withColumnRenamed("Passenger_Count","passenger_count")\
    .withColumnRenamed("Trip_Distance","trip_distance")\
    .withColumnRenamed("Payment_Type","payment_type")\
    .withColumnRenamed("Tip_Amt","tip_amount")\
    .withColumnRenamed("Total_Amt","total_amount")\
    .withColumn('payment_type', \
                  when(col("payment_type") == "Credit", 1)\
                  .when(col("payment_type") == "CREDIT", 1)\
                  .when(col("payment_type") == "CRD", 1)\
                  .when(col("payment_type") == "Cre", 1)\
                  .when(col("payment_type") == "CRE", 1)\
                  .when(col("payment_type") == "CASH", 2)\
                  .when(col("payment_type") == "Cash", 2)\
                  .when(col("payment_type") == "CSH", 2)\
                  .when(col("payment_type") == "CAS", 2)\
                  .when(col("payment_type") == "Cas", 2)\
                  .when(col("payment_type") == "No Charge", 3)\
                  .when(col("payment_type") == "NO CHARGE", 3)\
                  .when(col("payment_type") == "NOC", 3)\
                  .when(col("payment_type") == "NO ", 3)\
                  .when(col("payment_type") == "Dispute", 4)\
                  .when(col("payment_type") == "DISPUTE", 4)\
                  .when(col("payment_type") == "DIS", 4)\
                  .when(col("payment_type") == "Dis", 4)\
                  .when(col("payment_type") == "Unknown", 5)\
                  .when(col("payment_type") == "UNKNOWN", 5)\
                  .when(col("payment_type") == "UNK", 5)\
                  .when(col("payment_type") == "Voided Trip", 6)\
                  .when(col("payment_type") == "VOIDED TRIP", 6)\
                  .otherwise(expr("payment_type")))\
    .selectExpr(\
        "cast(pickup_datetime as timestamp)", \
        "cast(dropoff_datetime as timestamp)", \
        "cast(passenger_count as long)", \
        "trip_distance", \
        "cast(payment_type as string)", \
        "tip_amount", \
        "total_amount" \
    )
    

    df = df\
    .withColumn("filename", split(input_file_name(), "-"))\
    .selectExpr("*", "substring_index(element_at(filename, 2), '_', -1) as year", "substring_index(element_at(filename, -1), '.' , 1) as month")\
    .drop("filename")
    

    df = df.withColumn("trip_distance", expr("trip_distance * 1.60934"))
    df = df.withColumn("trip_amount", expr("total_amount-tip_amount"))
    return df

In [None]:
my_dfs = []
for year in ["2009", "2010", "2011", "2012", "2013", "2014", "2015", "2016", "2017", "2018", "2019", "2020", "2021", "2022", "2023"]:
    if year == "2023":
        !echo processing {year}/01
        my_dfs.append(read(year, "01"))
    else:
        for month in ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]:
            !echo processing {year}/{month}
            my_dfs.append(read(year, month))

In [None]:
from functools import reduce
from pyspark.sql import DataFrame

df = reduce(DataFrame.unionAll, my_dfs)

In [None]:
df.show(2)

In [None]:
df.printSchema()

In [None]:
df = df.selectExpr(\
    "cast(year as int)", \
    "cast(month as int)", \
    "pickup_datetime", \
    "dropoff_datetime", \
    "passenger_count", \
    "trip_distance", \
    "payment_type", \
    "tip_amount", \
    "trip_amount", \
    "total_amount" \
)

In [None]:
df.printSchema()

## Write results 

In [None]:
!hdfs dfs -rm -r /taxi/raw_all.parquet/

In [None]:
df.repartition(55).write.parquet(f"/taxi/raw_all.parquet")

## Stopping Spark 

In [None]:
spark.stop()