Importar bibliotecas

In [0]:
import pyspark.sql.functions as F

Paths

In [0]:
bronze_table = "taxi_catalog.bronze.yellow_taxi"
silver_table = "taxi_catalog.silver.yellow_taxi"

df = spark.table(bronze_table)

Trasformações

In [0]:
for col_name in df.columns:
    new_col_name = col_name.strip().lower().replace(" ", "_")
    if col_name != new_col_name:
        df = df.withColumnRenamed(col_name, new_col_name)

df = df.select(
    F.col("vendorid").cast("int").alias("vendor_id"),
    F.to_timestamp("tpep_pickup_datetime").alias("pickup_datetime"),
    F.to_timestamp("tpep_dropoff_datetime").alias("dropoff_datetime"),
    F.col("passenger_count").cast("int").alias("passenger_count"),
    F.col("trip_distance").cast("double").alias("trip_distance"),
    F.col("RatecodeID").cast("int").alias("ratecode_id"),
    F.col("store_and_fwd_flag").alias("store_and_fwd_flag"),
    F.col("PULocationID").cast("int").alias("pulocation_id"),
    F.col("DOLocationID").cast("int").alias("dolocation_id"),
    F.col("payment_type").cast("int").alias("payment_type"),
    F.col("fare_amount").cast("double").alias("fare_amount"),
    F.col("extra").cast("double").alias("extra"),
    F.col("mta_tax").cast("double").alias("mta_tax"),
    F.col("tip_amount").cast("double").alias("tip_amount"),
    F.col("tolls_amount").cast("double").alias("tolls_amount"),
    F.col("improvement_surcharge").cast("double").alias("improvement_surcharge"),
    F.col("congestion_surcharge").cast("double").alias("congestion_surcharge"),
    F.col("airport_fee").cast("double").alias("airport_fee"),
    F.col("total_amount").cast("double").alias("total_amount"),
    F.col("source_file").alias("source_file"),
    F.col("ingestion_ts").alias("ingestion_ts")
)

df = (
    df.withColumn("partition_year",  F.year("pickup_datetime"))
      .withColumn("partition_month", F.month("pickup_datetime"))
      .withColumn("partition_day",   F.dayofmonth("pickup_datetime"))
)

cols_final = [
    "vendor_id", "pickup_datetime", "dropoff_datetime",
    "passenger_count", "trip_distance", "ratecode_id",
    "store_and_fwd_flag", "pulocation_id", "dolocation_id",
    "payment_type", "fare_amount", "extra", "mta_tax",
    "tip_amount", "tolls_amount", "improvement_surcharge",
    "congestion_surcharge", "airport_fee", "total_amount",
    "is_airport_trip", "is_weekend",
    "source_file", "ingestion_ts",
    "partition_year", "partition_month", "partition_day"
]
df = df.select([c for c in cols_final if c in df.columns])

spark.sql("CREATE SCHEMA IF NOT EXISTS taxi_catalog.silver")
df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .option("overwriteSchema", "true") \
    .partitionBy("partition_year", "partition_month", "partition_day") \
    .saveAsTable(silver_table)

print(f"Tabela Silver criada: {silver_table}")
