In [0]:
from pyspark.sql.functions import date_format, when, col, hour, concat_ws, month, dayofmonth, count, avg, row_number, udf, concat, sum, max, round, dayofweek
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.window import Window

from funcoes.gerais import *

In [0]:
tabela_bronze = "estudo.default.tvendas_bronze"
tabela_silver = "estudo.default.tvendas_silver"

In [0]:
df_origem = spark.table(tabela_bronze)

In [0]:
# Tratamentos necessários nos dados de entrada:
# time_purchase - voltar a ser apenas HH:mm:ss
# place_origin_return e place_destination_return - tratamento para tirar o valor '0' quando não houver informações de retorno
# fk_return_ota_bus_company - tratamento para tirar o valor '1' quando não houver informações de retorno
# gmv_success - transformação em tipo de dado mais adequado para valor monetário

df_tratado = df_origem.withColumn(
    "time_purchase",
    date_format("time_purchase", "HH:mm:ss")
    ).withColumn("place_origin_return",
                 when(col("place_origin_return") != '0', col("place_origin_return"))
                 .otherwise(None)
    ).withColumn("place_destination_return",
                 when(col("place_destination_return") != '0', col("place_destination_return"))
                 .otherwise(None)
    ).withColumn("fk_return_ota_bus_company",
                 when(col("fk_return_ota_bus_company") != '1', col("fk_return_ota_bus_company"))
                 .otherwise(None)
    ).withColumn("gmv_success",
                 col("gmv_success").cast("decimal(19,2)")
    ).withColumn("datetime_purchase",
                 concat_ws(" ", col("date_purchase"), col("time_purchase")).cast("timestamp")
                 )

In [0]:
# Filtros necessários para higienização da base:
# gmv_success - trazendo apenas registros com valores de veda positivos. Esses dados podem ser sujeira na base, como também podem ser estorno e/ou uso de cupons na compra. Se tratando do valor monetário na venda de passagens, não temos a justificativa para manter valorez zerados ou negativos. 

df_filtrado = df_tratado.filter(col("gmv_success") > 0)

In [0]:
# Adicionando informações úteis, calculados com base nos dados originais:

df = df_filtrado.withColumn("date_purchase", col("date_purchase").cast("date")) \
    .withColumn("ano_mes_compra", date_format(col("date_purchase"), "yyyy-MM")) \
    .withColumn("hora_do_dia_compra", hour(col("time_purchase")))

In [0]:
dias_semana_num = {
    1: 'Domingo', 2: 'Segunda', 3: 'Terça', 4: 'Quarta',
    5: 'Quinta', 6: 'Sexta', 7: 'Sábado'
}

In [0]:
dias_semana_map_udf = udf(lambda d: dias_semana_num.get(d, 'Desconhecido'), StringType())
df = df.withColumn("day_of_week_num", dayofweek(col("date_purchase")))
df = df.withColumn("dia_semana_compra", dias_semana_map_udf(col("day_of_week_num")))

In [0]:
df = df.withColumn("rota_ida", concat_ws(" -> ", col("place_origin_departure"), col("place_destination_departure")))

In [0]:
df = df.withColumn("rota_volta",
                   when(col("place_origin_return").isNotNull(),
                        concat_ws(" -> ", col("place_origin_return"), col("place_destination_return")))
                   .otherwise(None))

In [0]:
df = df.withColumn("mes_compra", month(col("date_purchase"))) \
       .withColumn("dia_do_mes_compra", dayofmonth(col("date_purchase")))

In [0]:
df = df.withColumn("periodo_do_dia_compra",
                   when((col("hora_do_dia_compra") >= 0) & (col("hora_do_dia_compra") <= 5), "Madrugada")
                   .when((col("hora_do_dia_compra") >= 6) & (col("hora_do_dia_compra") <= 11), "Manhã")
                   .when((col("hora_do_dia_compra") >= 12) & (col("hora_do_dia_compra") <= 17), "Tarde")
                   .otherwise("Noite"))

In [0]:
meses_map = {
    1: 'Janeiro', 2: 'Fevereiro', 3: 'Março', 4: 'Abril',
    5: 'Maio', 6: 'Junho', 7: 'Julho', 8: 'Agosto',
    9: 'Setembro', 10: 'Outubro', 11: 'Novembro', 12: 'Dezembro'
}

In [0]:
map_mes_udf = udf(lambda m: meses_map.get(m, 'Não definido'), StringType())

In [0]:
grouped_metrics_basic = df.groupBy("fk_contact").agg(
    count(col("nk_ota_localizer_id")).alias("total_compras"),
    round(sum(col("gmv_success")), 2).alias("valor_total_compra"),
    round(avg(col("gmv_success")), 2).alias("valor_medio_compra"),
    sum(col("total_tickets_quantity_success")).alias("total_tickets_compras"),
    sum(when(col("rota_volta").isNotNull(), 1).otherwise(0)).alias("total_compras_com_volta"),
    max(col("datetime_purchase")).alias("data_ultima_compra")
)

In [0]:
def calculate_mode_spark(dataframe, group_cols, target_col, output_col_name):
    window_spec_count = Window.partitionBy(*group_cols, target_col)

    df_counted = dataframe.withColumn(
        "count_target",
        count(target_col).over(window_spec_count)
    )

    window_spec_rank = Window.partitionBy(*group_cols).orderBy(col("count_target").desc(), col(target_col))

    df_mode = df_counted.withColumn(
        "rank_mode",
        row_number().over(window_spec_rank)
    ).filter(col("rank_mode") == 1) \
     .select(*group_cols, col(target_col).alias(output_col_name))

    return df_mode

In [0]:
rota_ida_frequente = calculate_mode_spark(
    df, ["fk_contact"], "rota_ida", "local_partida_ida_local_destino_ida_frequente"
)

In [0]:
rota_volta_frequente = calculate_mode_spark(
    df.filter(col("rota_volta").isNotNull()),
    ["fk_contact"], "rota_volta", "local_partida_volta_local_destino_volta_frequente"
)

In [0]:
dia_preferido = calculate_mode_spark(
    df, ["fk_contact"], "dia_semana_compra", "dia_da_semana_preferido_de_compra"
)

In [0]:
periodo_dia_preferido = calculate_mode_spark(
    df, ["fk_contact"], "periodo_do_dia_compra", "periodo_do_dia_preferido_para_compra"
)

In [0]:
epoca_ano_preferida = calculate_mode_spark(
    df, ["fk_contact"], "mes_compra", "epoca_do_ano_preferido_da_compra"
)

In [0]:
epoca_mes_preferida = calculate_mode_spark(
    df, ["fk_contact"], "dia_do_mes_compra", "epoca_do_mes_preferido_da_compra"
)

In [0]:
final_metrics = grouped_metrics_basic \
    .join(rota_ida_frequente, on=["fk_contact"], how="left") \
    .join(rota_volta_frequente, on=["fk_contact"], how="left") \
    .join(dia_preferido, on=["fk_contact"], how="left") \
    .join(periodo_dia_preferido, on=["fk_contact"], how="left") \
    .join(epoca_ano_preferida, on=["fk_contact"], how="left") \
    .join(epoca_mes_preferida, on=["fk_contact"], how="left")

In [0]:
df_final_metrics = final_metrics.withColumn(
    "epoca_do_ano_preferido_da_compra",
    map_mes_udf(col("epoca_do_ano_preferido_da_compra"))
)

In [0]:
# Escrita na tabela que será usada nos estudos e desenvolvimentos
escrita_tabela(df_final_metrics, tabela_silver)