In [0]:
#importacao das libs

from pyspark.sql.functions import month, year, avg, hour, round
from pyspark.sql.functions import col

### Leitura do arquivo delta ###

In [0]:
df = spark.read.format("delta").load("s3a://bucket-taxi-project/consumption/taxi/")

In [0]:
df_geral = df.write.format("delta").mode("overwrite").saveAsTable("gold.taxi_2023")

### Pergunta: Qual a média de valor total (total\_amount) recebido em um mês considerando todos os yellow táxis da frota? ###

In [0]:
# Adiciona colunas de ano e mês
df_mes = df.withColumn("ano", year("pickup_ts")) \
           .withColumn("mes", month("pickup_ts"))

In [0]:
# Agrupa por ano e mês e calcula a média
df_media_mensal = df_mes.groupBy("ano", "mes") \
                        .agg(round(avg("total_amount"),2).alias("media_total_amount"))
df_media_mensal.createOrReplaceTempView("df_media_mensal")

In [0]:
# Realiza a ordenação e salva no schema Gold
df_media_mensal = spark.sql("""
                              select
                              *
                              from df_media_mensal
                              order by ano, mes
                              """)
df_media_mensal.write.format("delta").option("mergeSchema","True").mode("overwrite").saveAsTable("gold.media_mensal")

In [0]:
%sql
select * from gold.media_mensal

### Pergunta: Qual a média de passageiros (passenger\_count) por cada hora do dia que pegaram táxi no mês de maio considerando todos os táxis da frota? ###

In [0]:
# Filtra apenas o mês solicitado (maio)
df_maio = df.filter(
    (year("pickup_ts") == 2023) &
    (month("pickup_ts") == 5))

In [0]:
# Adiciona a hora, conforme a coluna de pickup_ts e salva no schema Gold
df_maio_hora = df_maio.withColumn("hora_dia", hour("pickup_ts"))
df_maio_hora.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("gold.taxi_hora")

In [0]:
# Realiza a média de passageiros por hora
df_media_passageiros_por_hora = spark.sql("""
                                          select hora_dia,round(avg(passenger_count),2) as media_passageiros
                                          from gold.taxi_hora
                                          group by hora_dia
                                          order by hora_dia
                                          """)
df_media_passageiros_por_hora.write.format("delta").option("mergeSchema","True").mode("overwrite").saveAsTable("gold.media_passageiros_por_hora")


In [0]:
%sql
select * from gold.media_passageiros_por_hora

In [0]:
# Realiza a média de passageiros por hora agrupando por VendorID
df_media_passageiros_por_hora_por_id = spark.sql("""
                                          select hora_dia,round(avg(passenger_count),2) as media_passageiros, id 
                                          from gold.taxi_hora
                                          group by hora_dia, id
                                          order by hora_dia, id
                                          """)
df_media_passageiros_por_hora_por_id.write.format("delta").option("mergeSchema","True").mode("overwrite").saveAsTable("gold.media_passageiros_por_hora_por_id")

In [0]:
%sql
select * from gold.media_passageiros_por_hora_por_id

### Análises ###

In [0]:
 # Duração média das corridas por hora em maio 2023
query_duracao_media_corridas = spark.sql("""
                                          SELECT 
                                          hora_dia,
                                          ROUND(AVG(UNIX_TIMESTAMP(dropoff_ts) - UNIX_TIMESTAMP(pickup_ts)) / 60, 2) AS duracao_media_min
                                          FROM gold.taxi_hora
                                          GROUP BY hora_dia
                                          ORDER BY hora_dia
                                          """)
query_duracao_media_corridas.show()

In [0]:
 #  Média de passageiros por dia da semana dos meses de janeiro a maio de 2023
query_media_passageiros = spark.sql("""
                                          SELECT 
                                          date_format(pickup_ts, 'E') AS dia_semana,
                                          ROUND(AVG(passenger_count), 2) AS media_passageiros
                                          FROM gold.taxi_2023
                                          GROUP BY dia_semana
                                          ORDER BY dia_semana
                                          """)
query_media_passageiros.show()

In [0]:
 # Número de passageiros por dia da semana
query_numero_passageiros = spark.sql("""
                                          SELECT 
                                          date_format(pickup_ts, 'E') AS dia_semana,
                                          count(*) total_de_corridas
                                          FROM gold.taxi_2023
                                          where month(pickup_ts) = 3
                                          GROUP BY dia_semana
                                          ORDER BY dia_semana
                                          """)
query_numero_passageiros.show()