##Carga dados Yellow Taxi Trip - Camada Ouro

In [0]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, sum as spark_sum, avg, count, countDistinct, round, 
    date_format, expr
)

# 🔹 Cria a sessão Spark
spark = SparkSession.builder.getOrCreate()

# 🔹 Criar schema se não existirem na camada prata
spark.sql("CREATE CATALOG IF NOT EXISTS tlc_trip")
spark.sql("CREATE SCHEMA IF NOT EXISTS tlc_trip.ouro")

# Carregar dados da camada prata já tratados 
df_silver = spark.table("tlc_trip.prata.yellow_taxi_trip_tratada")
                                 
# Tabela Gold: Métricas agregadas por dia ---
df_gold_daily = (
    df_silver
    .withColumn("dia", date_format(col("tpep_pickup_datetime"), "yyyy-MM-dd"))
    .groupBy("dia")
    .agg(
        count("VendorID").alias("total_corridas"),
        round(spark_sum("trip_distance"), 2).alias("distancia_total_km"),
        round(avg("trip_distance"), 2).alias("distancia_media_km"),
        round(spark_sum("total_amount"), 2).alias("faturamento_total"),
        round(avg("total_amount"), 2).alias("ticket_medio"),
        round(avg("tip_amount"), 2).alias("gorjeta_media"),
        round(avg("passenger_count"), 2).alias("passageiros_media")
    )
)

df_gold_daily.write.mode("overwrite").saveAsTable("tlc_trip.ouro.taxi_metrics_daily")

# Tabela Gold: Métricas por local de partida (PULocationID) ---
df_gold_pu_location = (
    df_silver
    .groupBy("PULocationID")
    .agg(
        count("VendorID").alias("total_corridas"),
        round(spark_sum("total_amount"), 2).alias("faturamento_total"),
        round(avg("total_amount"), 2).alias("ticket_medio"),
        round(avg("tip_amount"), 2).alias("gorjeta_media"),
        round(avg("trip_distance"), 2).alias("distancia_media_km"),
        round(avg("passenger_count"), 2).alias("passageiros_media")
    )
)

df_gold_pu_location.write.mode("overwrite").saveAsTable("tlc_trip.ouro.taxi_metrics_by_pulocation")

# Insight 1: Top 5 dias com maior faturamento
#            Identifica quais dias geraram maior receita para avaliar sazonalidade, feriados ou eventos especiais.

print("Top 5 dias com maior faturamento:")
df_gold_daily.orderBy(col("faturamento_total").desc()).show(5)

# Insight 2: Locais de partida com maior número de corridas
#            Mapeia os pontos de partida mais movimentados, útil para posicionar veículos e melhorar logística.

print("Top 5 locais de partida (PULocationID) com mais corridas:")
df_gold_pu_location.orderBy(col("total_corridas").desc()).show(5)

# Insight 3: Relação entre distância média e gorjeta média por local de partida
#            Explora se corridas mais longas tendem a ter gorjetas maiores, o que pode ajudar a planejar incentivos para motoristas.

print("Locais de partida com maior distância média e gorjeta média:")
df_gold_pu_location.orderBy(col("distancia_media_km").desc(), col("gorjeta_media").desc()).show(5)

