# Camada Silver

## Importações

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import (
  IntegerType,
  DoubleType,
  TimestampType
)

In [0]:
# Tabelas da camada bronze
bronze_tables = [
    "taxi_trip_bronze.yellow_tripdata_2015_01",
    "taxi_trip_bronze.yellow_tripdata_2016_01",
    "taxi_trip_bronze.yellow_tripdata_2016_02",
    "taxi_trip_bronze.yellow_tripdata_2016_03"
]

## Consolidação das tabelas

In [0]:
df_bronze = None

for table in bronze_tables:
    df = spark.table(table)
    if df_bronze is None:
        df_bronze = df
    else:
        df_bronze = df_bronze.unionByName(df)

In [0]:
df_bronze.display()

## Padronização de colunas

In [0]:
df_silver = (
  df_bronze.withColumnRenamed("VendorID", "vendor_id")
           .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
           .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
           .withColumnRenamed("RateCodeID", "rate_code_id")
)

In [0]:
df_silver.display()

In [0]:
# tipagem
df_silver = (
    df_silver.withColumn("vendor_id", F.col("vendor_id").cast(IntegerType()))
             .withColumn("pickup_datetime", F.col("pickup_datetime").cast(TimestampType()))
             .withColumn("dropoff_datetime", F.col("dropoff_datetime").cast(TimestampType()))
             .withColumn("passenger_count", F.col("passenger_count").cast(IntegerType()))
             .withColumn("trip_distance", F.col("trip_distance").cast(DoubleType()))
             .withColumn("pickup_longitude", F.col("pickup_longitude").cast(DoubleType()))
             .withColumn("pickup_latitude", F.col("pickup_latitude").cast(DoubleType()))
             .withColumn("rate_code_id", F.col("rate_code_id").cast(IntegerType()))
             .withColumn("dropoff_longitude", F.col("dropoff_longitude").cast(DoubleType()))
             .withColumn("dropoff_latitude", F.col("dropoff_latitude").cast(DoubleType()))
             .withColumn("payment_type", F.col("payment_type").cast(IntegerType()))
             .withColumn("fare_amount", F.col("fare_amount").cast(DoubleType()))
             .withColumn("extra", F.col("extra").cast(DoubleType()))
             .withColumn("mta_tax", F.col("mta_tax").cast(DoubleType()))
             .withColumn("tip_amount", F.col("tip_amount").cast(DoubleType()))
             .withColumn("tolls_amount", F.col("tolls_amount").cast(DoubleType()))
             .withColumn("improvement_surcharge", F.col("improvement_surcharge").cast(DoubleType()))
             .withColumn("total_amount", F.col("total_amount").cast(DoubleType()))

)
df_silver.display()

In [0]:
df_silver = df_silver.drop("rate_cod_id")
df_silver.display()

In [0]:
# Garantindo exclusão de registros impossíveis
df_silver = df_silver.filter(
    (F.col("dropoff_datetime") > F.col("pickup_datetime")) & 
    (F.col("trip_distance") > 0) & 
    (F.col("total_amount") > 0) &
    (F.col("passenger_count") >= 0)
)

## Escrita na tabela silver

In [0]:
df_silver.write.mode("overwrite").saveAsTable("taxi_trip_silver.silver_taxi_trips")

## Validação

In [0]:
%sql
SELECT
  COUNT(*) AS total_rows,
  MIN(pickup_datetime) AS min_pickup_datetime,
  MAX(pickup_datetime) AS max_pickup_datetime
FROM taxi_trip_silver.silver_taxi_trips
  

In [0]:
%sql
SELECT * FROM taxi_trip_silver.silver_taxi_trips