In [0]:
from pyspark.sql.functions import lit

In [0]:
# Lista de arquivos por mês
months = ["01", "02", "03", "04", "05"]
paths = [f"s3a://case-ifood-matsuura/newyork/landing/yellow_tripdata_2023-{m}.parquet" for m in months]

# Capturar o schema base
base_df = spark.read.parquet(paths[0])
base_columns = base_df.columns

# Inicializa o DataFrame com o primeiro arquivo, adicionando coluna 'file'
bronze_df = base_df.withColumn("file", lit(paths[0]))

for path in paths[1:]:
    df = spark.read.parquet(path)
    
    # Adiciona colunas ausentes
    for col in base_columns:
        if col not in df.columns:
            df = df.withColumn(col, lit(None))
    
    # Reordena as colunas
    df = df.select(base_columns)
    
    # Adiciona coluna 'file'
    df = df.withColumn("file", lit(path))
    
    # Junta ao DataFrame final
    bronze_df = bronze_df.unionByName(df, allowMissingColumns=True)

# Grava como tabela Delta
bronze_df.write.format("delta").mode("overwrite").option("path", "s3a://case-ifood-matsuura/newyork/bronze/yellow_taxi").option("mergeSchema", "true").saveAsTable("newyork.bronze.yellow_taxi")

In [0]:
bronze_df.groupBy("file").count().display()

In [0]:
df = spark.read.parquet("s3a://case-ifood-matsuura/newyork/landing/yellow_tripdata_2023-01.parquet")
df.printSchema()

In [0]:
df = spark.read.parquet("s3a://case-ifood-matsuura/newyork/landing/yellow_tripdata_2023-05.parquet")
df.printSchema()