In [0]:
# Importanto Bibliotecas
from pyspark.sql.types  import *
from pyspark.sql.window import Window
from pyspark.sql.functions import * 

####Criando a pasta no diretório e carregando os arquivos.

In [0]:
# Criando uma pasta para armazenar os dados que estão em CSV
dbutils.fs.mkdirs('FileStore/tables/arquivos_mvp')

In [0]:
# Após a realização do carregamento dos arquivos no DBFS, verificar se os arquivos foram carregados corretamente.
display(dbutils.fs.ls('FileStore/tables/arquivos_mvp'))

In [0]:
# Carregando o arquivo CSV de Compras do DBFS
dbfs_compras = '/FileStore/tables/arquivos_mvp/Compras.csv'

# Lendo o arquivo CSV para um DataFrame
df_compras = spark.read.csv(dbfs_compras, header=True, inferSchema=True)

# Exibindo o DataFrame
display(df_compras)

In [0]:
# Carregando o arquivo CSV de Clientes do DBFS
dbfs_clientes = '/FileStore/tables/arquivos_mvp/Clientes.csv'

# Lendo o arquivo CSV para um DataFrame
df_clientes = spark.read.csv(dbfs_clientes, header=True, inferSchema=True)

# Exibindo o DataFrame
display(df_clientes)

####Verificando e tratando o arquivo Clientes

In [0]:
# Verificando o tipo de arquivo de df_clientes
df_clientes.printSchema()

In [0]:
# Verificando valores nulos na tabela clientes:
for coluna in df_clientes.columns:
    print(coluna, df_clientes.filter(col(coluna).isNull()).count())

In [0]:
# Lógica para nova coluna 'GrupoIdade'
df_clientes = df_clientes.withColumn(
    "GrupoIdade",
    when(col("Idade") <= 17, "1-17")
    .when((col("Idade") >= 18) & (col("Idade") <= 29), "18-29")
    .when((col("Idade") >= 30) & (col("Idade") <= 39), "30-39")
    .when((col("Idade") >= 40) & (col("Idade") <= 49), "40-49")
    .when((col("Idade") >= 50) & (col("Idade") <= 60), "50-60")
    .when((col("Idade") > 60), "+60")
    .otherwise("Não Classificado")
)

# Exibe o DataFrame com a nova coluna
display(df_clientes)

In [0]:
# Tratando para que quando na tabela df_clientes na coluna "Idade" constar um valor nulo preencher com "Não Informado"

df_clientes = df_clientes.withColumn(
    "Idade",
    when(col("Idade").isNull(), "Não Informado").otherwise(col("Idade"))
)

In [0]:
# Código para verificação do tratamento realizado
display(df_clientes.filter(col("Idade") == "Não Informado"))

####Verificando e tratando o arquivo Compras

In [0]:
# Verificando o tipo de arquivo de df_compras:
df_compras.printSchema()

In [0]:
# Verificando valores nulos na tabela compras:
for coluna in df_compras.columns:
    print(coluna, df_compras.filter(col(coluna).isNull()).count())

In [0]:
# Tratando para que quando na tabela df_compras na coluna "Destino" constar um valor nulo preencher com "Não Informado"

df_compras = df_compras.withColumn(
    "Destino",
    when(col("Destino").isNull(), "Não Informado").otherwise(col("Destino"))
)

In [0]:
# Código para verificação do tratamento realizado
display(df_compras.filter(col("Destino") == "Não Informado"))

In [0]:
#Primeiro Passo - Incluindo uma nova coluna para a contagem de compras Aprovadas e Canceladas, esse processo é necessário pois um cliente pode comprar mais de uma passagem em uma mesma compra. Com isso, o localizador se repete; então, é realizada a contagem dos localizadores, ou seja, das compras realizadas pelo cliente, aprovadas ou canceladas. O resultado dessa contagem e colocado em apenas uma linha. 

# 1. Agrupando por ClienteId, Localizador, TipoCompra e Destino para contar as compras por status
df_contagem_localizadores = df_compras.groupBy("ClienteId", "Localizador", "TipoCompra","Destino") \
    .agg(
        sum(when(col("StatusVenda") == "SALE_APPROVED", 1).otherwise(0)).alias("ContagemAprovadosLocalizador"),
        sum(when(col("StatusVenda") == "SALE_CANCELED", 1).otherwise(0)).alias("ContagemCanceladosLocalizador")
    )

# 2. Definindo a janela para ordenar as compras por VendaId
window_spec = Window.partitionBy("ClienteId").orderBy("VendaId")

# 3. Realizando um join com o DataFrame original para incluir as colunas calculdas
df_compras_join = df_compras.join(
    df_contagem_localizadores, ["ClienteId", "Localizador", "TipoCompra","Destino"], "left"
)

# 4. Definindo a janela para identificar a primeira ocorrência de cada Localizador 
window_spec_localizador = Window.partitionBy("ClienteId", "Localizador", "TipoCompra", "Destino").orderBy("VendaId")

# 5. Criando as colunas com lógica para contar quantas passagens o cliente comprou em uma única compra
df_compras_contagem = df_compras_join.withColumn(
    "PrimeiraOcorrenciaLocalizador",
    row_number().over(window_spec_localizador)
).withColumn(
    "NumeroComprasAprovadas",
    when(col("PrimeiraOcorrenciaLocalizador") == 1, col("ContagemAprovadosLocalizador")).otherwise(0)
).withColumn(
    "NumeroVendasCanceladas",
    when(col("PrimeiraOcorrenciaLocalizador") == 1, -col("ContagemCanceladosLocalizador")).otherwise(0)
).drop("ContagemAprovadosLocalizador", "ContagemCanceladosLocalizador", "PrimeiraOcorrenciaLocalizador")

# Para visualizar o DataFrame com as novas colunas:
display(df_compras_contagem)

In [0]:
#Criando tabela virtual temporaria do SQL 
df_compras_contagem.createOrReplaceTempView("compras_contagem")

In [0]:
%sql
SELECT 
  ClienteId,
  sum(NumeroComprasAprovadas) ComprasAprovadas,
  sum(NumeroVendasCanceladas) ComprasCanceladas
FROM 
  compras_contagem
GROUP BY
  ClienteId

In [0]:
#Segundo Passo - Inclusão de nova coluna para identificar a primeira compra dos clientes. Essa coluna será importante para montar a regra de classificação da compra do cliente.Importante pontuar que, como um cliente pode comprar mais de uma passagem em uma mesma compra, então, se um cliente fez a compra de mais de uma passagem na mesma compra, então essas compras serão classificadas como primeira compra (True).

# Define uma janela particionada por ClienteId e ordenada por DataCompra
window_spec_cliente = Window.partitionBy("ClienteId").orderBy("DataCompra")

# Adiciona uma coluna para numerar as compras de cada cliente por ordem de data
df_compras_numerada = df_compras_contagem.withColumn(
    "NumeroCompraCliente",
    row_number().over(window_spec_cliente)
)

# Filtra a primeira compra de cada cliente
df_primeira_compra = df_compras_numerada.filter(col("NumeroCompraCliente") == 1)

# Define uma janela particionada por ClienteId
window_spec_cliente_agrupado = Window.partitionBy("ClienteId")

# Definir os localizadores da primeira compra em uma lista
df_primeira_compra_agrupada = df_primeira_compra.withColumn(
    "LocalizadoresPrimeiraCompra",
    collect_list("Localizador").over(window_spec_cliente_agrupado)
)

# Seleciona apenas as colunas ClienteId e LocalizadoresPrimeiraCompra
df_primeira_compra_localizadores = df_primeira_compra_agrupada.select("ClienteId", "LocalizadoresPrimeiraCompra")

# Junta o DataFrame original com os localizadores da primeira compra
df_compras_com_primeira = df_compras_numerada.join(df_primeira_compra_localizadores, "ClienteId", "left")

# Cria a coluna 'PrimeiraCompra' usando array_contains
df_compras_primeira_compra = df_compras_com_primeira.withColumn(
    "PrimeiraCompra",
    array_contains(col("LocalizadoresPrimeiraCompra"), col("Localizador"))
).drop("LocalizadoresPrimeiraCompra","NumeroCompraCliente") 

#Para visualizar o DataFrame com a nova coluna:
display(df_compras_primeira_compra)

In [0]:
#Criando tabela virtual temporaria do SQL 
df_compras_primeira_compra.createOrReplaceTempView("compras_primeira_compra")

In [0]:
%sql
SELECT
  ClienteId,
  DataCompra,
  PrimeiraCompra
FROM
  compras_primeira_compra
ORDER BY
  ClienteId,
  DataCompra

In [0]:
#Teceiro Passo - Inclusão da coluna com o total de compras realizadas pelo cliente. Essa coluna contará de forma distinta os localizadores para saber quantas compras foram realizadas pelos clientes. Esse passo é importante para identificar se o cliente está realizando uma recompra ou não.

# Contando os localizadores com StatusVenda 'SALE_APPROVED' e agrupar por ClienteId e Localizador
df_aprovados = df_compras_primeira_compra.filter(col("StatusVenda") == "SALE_APPROVED") \
    .groupBy("ClienteId", "Localizador") \
    .agg(count("*").alias("CountAprovados"))

# Contando os localizadores com StatusVenda 'SALE_CANCELED' e agrupar por ClienteId e Localizador
df_cancelados = df_compras_primeira_compra.filter(col("StatusVenda") == "SALE_CANCELED") \
    .groupBy("ClienteId", "Localizador") \
    .agg(count("*").alias("CountCancelados"))

# Realizando um join entre os dataframes de aprovados e cancelados
df_combinado = df_aprovados.join(df_cancelados, ["ClienteId", "Localizador"], "full") \
    .fillna(0)

# Realizando a contagem dos localizadores
df_contagem_localizador = df_combinado.withColumn(
    "ContagemLocalizador",
    when(col("CountAprovados") > col("CountCancelados"), 1)
    .otherwise(0)
)

# Realizando a soma dos localizadores distintos e agrupando por ClienteId 
df_contagem = df_contagem_localizador.groupBy("ClienteId") \
    .agg(count("*").alias("TotalLocalizadores"),  # Contagem total de localizadores (para referência)
         sum("ContagemLocalizador").alias("TotalCompras"))

# Realizando um join para incluir a nova coluna
df_compras_total = df_compras_primeira_compra.join(df_contagem.select("ClienteId", "TotalCompras"), "ClienteId", "left")

# Para visualizar o DataFrame com a nova coluna:
display(df_compras_total)

In [0]:
#Criando tabela virtual temporaria do SQL 
df_compras_total.createOrReplaceTempView("compras_total")

In [0]:
%sql
SELECT DISTINCT
  ClienteId,
  TotalCompras
FROM
  compras_total

In [0]:
#Quarto passo - Somente realizando o agrupamento e plotando apenas as colunas necessárias

# Realiza o tratamento para agrupar os totais de compras aprovadas e canceladas
df_compras_agg = df_compras_total.groupBy(
    "ClienteId",
    "Localizador",
    "DataCompra",
    "Destino",
    "MesAnoCompra",
    "TipoCompra",
    "PrimeiraCompra",
    "TotalCompras",
).agg(
    sum("NumeroComprasAprovadas").alias("NumeroComprasAprovadas"),
    sum("NumeroVendasCanceladas").alias("NumeroVendasCanceladas"),
)

# Exibindo o schema
display(df_compras_agg)

In [0]:
# Quinto Passo - Criando uma nova coluna para classificar as compras de acordo com as regras aplicadas abaixo.

# Cria a nova coluna 'CompraAux' somando as colunas existentes
df_compras_aux = df_compras_agg.withColumn("CompraAux", col("NumeroComprasAprovadas") + col("NumeroVendasCanceladas"))

# Cria a nova coluna 'StatusCompra' com base na condição, utilizando o DataFrame 'df_compras_aux'
df_compras_aux = df_compras_aux.withColumn(
    "StatusCompra",
    when(col("CompraAux") > 0, 1).otherwise(0)
).drop("CompraAux")

df_compras_final = df_compras_aux.withColumn(
    "ClassificacaoCompra",
    when((col("TotalCompras") == 0) & (col("StatusCompra") == 0), "Sem Compra")
    .when((col("TotalCompras") == 1) & (col("StatusCompra") == 1) & (col("PrimeiraCompra") == True), "Primeira Compra(Única)")
    .when((col("TotalCompras") == 1) & (col("StatusCompra") == 0) & (col("PrimeiraCompra") == True), "Primeira Compra(Cancelada)")
    .when((col("TotalCompras") == 1) & (col("StatusCompra") == 0) & (col("PrimeiraCompra") == False), "Compra Cancelada(Recompra)")
    .when((col("TotalCompras") == 1) & (col("StatusCompra") == 1) & (col("PrimeiraCompra") == False), "Compra Única(Recompra)")
    .when((col("TotalCompras") > 1) & (col("StatusCompra") == 0) & (col("PrimeiraCompra") == False), "Compra Cancelada")
    .when((col("TotalCompras") > 1) & (col("StatusCompra") == 0) & (col("PrimeiraCompra") == True), "Primeira Compra(Cancelada)")
    .when((col("TotalCompras") > 1) & (col("StatusCompra") == 1) & (col("PrimeiraCompra") == True), "Primeira Compra")
    .when((col("TotalCompras") > 1) & (col("StatusCompra") == 1) & (col("PrimeiraCompra") == False), "Recompra")
).drop("StatusCompra","PrimeiraCompra","TotalCompras")

# Para visualizar o DataFrame com a nova coluna:
display(df_compras_final)

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS mvp

In [0]:
%sql
USE mvp;
CREATE TABLE IF NOT EXISTS compras(
ClienteId integer,
Localizador string,
DataCompra date,
Destino string,
MesAnoCompra string,
TipoCompra string,
NumeroComprasAprovadas long,
NumeroVendasCanceladas long,
ClassificacaoCompra string
)

In [0]:
database_name = "mvp"
table_name = "compras"
df_compras_final.write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.{table_name}")

In [0]:
%sql
USE mvp;
CREATE TABLE IF NOT EXISTS clientes(
ClienteId integer,
DataCadastro date,
MesAnoCadastro string,
Idade string,
GrupoIdade string
)

In [0]:
database_name = "mvp"
table_name = "clientes"
df_clientes.write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.{table_name}")

In [0]:
%sql
SELECT * FROM mvp.clientes

In [0]:
%sql
SELECT
  cp.ClienteId,
  cp.Localizador,
  cp.Destino,
  cp.DataCompra,
  cp.MesAnoCompra,
  cp.TipoCompra,
  cl.GrupoIdade,
  cp.NumeroComprasAprovadas ComprasAporvadas,
  cp.NumeroVendasCanceladas ComprasCanceladas,
  cp.ClassificacaoCompra
FROM 
  mvp.compras cp
  LEFT JOIN mvp.clientes cl ON (cp.ClienteId = cl.ClienteId)
WHERE 
  year(cp.DataCompra) >='2024'