In [0]:
from pyspark.sql.functions import col, unix_timestamp, hour, month, year
from pyspark.sql.types import DoubleType, IntegerType, TimestampType

bronze_path = "dbfs:/Volumes/nycproject/default/bronze/nyc_taxi"
silver_path = "dbfs:/Volumes/nycproject/default/silver/nyc_taxi"

df = spark.read.format("delta").load(bronze_path)

df_clean = df \
    .withColumn("VendorID", col("VendorID").cast(IntegerType())) \
    .withColumn("passenger_count", col("passenger_count").cast(IntegerType())) \
    .withColumn("trip_distance", col("trip_distance").cast(DoubleType())) \
    .withColumn("fare_amount", col("fare_amount").cast(DoubleType())) \
    .withColumn("extra", col("extra").cast(DoubleType())) \
    .withColumn("mta_tax", col("mta_tax").cast(DoubleType())) \
    .withColumn("tip_amount", col("tip_amount").cast(DoubleType())) \
    .withColumn("tolls_amount", col("tolls_amount").cast(DoubleType())) \
    .withColumn("improvement_surcharge", col("improvement_surcharge").cast(DoubleType())) \
    .withColumn("total_amount", col("total_amount").cast(DoubleType())) \
    .withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(TimestampType())) \
    .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(TimestampType()))

df_silver = df_clean.dropna(subset=["tpep_pickup_datetime", "tpep_dropoff_datetime", "total_amount"]).dropDuplicates() \
    .withColumn("trip_duration_min", (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60) \
    .withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
    .withColumn("pickup_month", month("tpep_pickup_datetime")) \
    .withColumn("pickup_year", year("tpep_pickup_datetime"))

df_silver.write.format("delta").mode("overwrite").save(silver_path)

In [0]:
display(df_silver)
