In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, TimestampType


class SilverLayerBuilder:
    """
    Constrói a Silver Layer a partir de diretórios brutos de Parquet (yellow/green),
    normalizando esquemas e salvando no caminho de destino.
    """
    def __init__(self, spark: SparkSession, base_dir: str):
        self.spark = spark
        self.base_dir = base_dir.rstrip("/")
        self.raw_roots = {
            "yellow": os.path.join(self.base_dir, "yellow"),
            "green":  os.path.join(self.base_dir, "green"),
        }
        self.silver_base = os.path.join(self.base_dir, "silver_layer")

    def list_parquet_files(self, root_path: str) -> list[str]:
        """Recusa todos os arquivos .parquet em root_path (recursivamente)."""
        files = []
        for dirpath, _, filenames in os.walk(root_path):
            for fn in filenames:
                if fn.endswith(".parquet"):
                    files.append(os.path.join(dirpath, fn))
        return sorted(files)

    def read_and_cast(self, path: str, is_green: bool=False) -> DataFrame:
        """
        Lê um Parquet, renomeia colunas de datetime se for green,
        faz cast para tipos coerentes e retorna apenas as colunas alvo.
        """
        df = self.spark.read.parquet(path)

        if is_green:
            df = (
                df
                .withColumnRenamed("lpep_pickup_datetime",  "tpep_pickup_datetime")
                .withColumnRenamed("lpep_dropoff_datetime", "tpep_dropoff_datetime")
            )

        return (
            df
            .withColumn("VendorID",             col("VendorID").cast(IntegerType()))
            .withColumn("passenger_count",      col("passenger_count").cast(IntegerType()))
            .withColumn("total_amount",         col("total_amount").cast(DoubleType()))
            .withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(TimestampType()))
            .withColumn("tpep_dropoff_datetime",col("tpep_dropoff_datetime").cast(TimestampType()))
            .select(
                "VendorID",
                "passenger_count",
                "total_amount",
                "tpep_pickup_datetime",
                "tpep_dropoff_datetime",
            )
        )

    def build(self):
        """Executa o processo de leitura, normalização e gravação da Silver Layer."""
        for tipo, raw_root in self.raw_roots.items():
            is_green = (tipo == "green")
            parquet_files = self.list_parquet_files(raw_root)

            if not parquet_files:
                print(f"Nenhum Parquet encontrado em: {raw_root}")
                continue

            df_acc = None
            for p in parquet_files:
                df_piece = self.read_and_cast(p, is_green=is_green)
                df_acc = df_piece if df_acc is None else df_acc.unionByName(df_piece)

            out_path = os.path.join(self.silver_base, tipo)
            df_acc.write.mode("overwrite").parquet(out_path)
            print(f"Silver layer ({tipo}) salva em: {out_path}")


if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("SilverLayerBuilder") \
        .getOrCreate()

    base_dir = "/content/drive/MyDrive/ifood/teste_2"
    builder = SilverLayerBuilder(spark, base_dir)
    builder.build()

    spark.stop()
