# Parte 2 - Análise de Dados

Um dos problemas mais clássicos do mercado de e-commerce é o carrinho abandonado. 
<br><br>
Carrinho 
abandonado é aquele em que o cliente seleciona produtos para compra no e-commerce, mas não 
finaliza o processo. 
<br><br> Algumas das estratégias para recuperá-lo são investir em remarketing, flexibilizar 
o frete e melhorar o checkout. 
Analisar os dados desse evento é fundamental para que as empresas tentem entender o porquê que 
os clientes estão desistindo das compras. <br><br>
Esse projeto visa ajudar a Cantu a coletar informações relacionadas a esse evento.

Considerando a seguinte estrutura e relacionamento dos dados

![ER](ER.png)


Responda os seguintes questionamentos para ajudar a área responsável: 
- Quais os produtos que mais tiveram carrinhos abandonados? 
- Quais as duplas de produtos em conjunto que mais tiveram carrinhos abandonados? 
- Quais produtos tiveram um aumento de abandono? 
- Quais os produtos novos e a quantidade de carrinhos no seu primeiro mês de lançamento? 
- Quais estados tiveram mais abandonos? 

---

# Definição de bibliotecas e funções úteis

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType, IntegerType, DoubleType
from datetime import datetime
from pyspark.sql.window import Window
from collections import Counter
from itertools import combinations

In [0]:
catalog_name = 'datacraft_catalog'

# Carregando os dados

In [0]:
tb_addresses = ( spark
      .read
      .format("parquet")
      .load("/Volumes/datacraft_catalog/cantu/e-commerce/tb_addresses.parquet") )

# tb_addresses.display()

In [0]:
tb_cartentries = ( spark
      .read
      .format("parquet")
      .load("/Volumes/datacraft_catalog/cantu/e-commerce/tb_cartentries.parquet") )

# tb_cartentries.display()

In [0]:
tb_carts = ( spark
      .read
      .format("parquet")
      .load("/Volumes/datacraft_catalog/cantu/e-commerce/tb_carts.parquet") )

# tb_carts.display()

In [0]:
tb_cmssitelp = ( spark
      .read
      .format("csv")
      .option("header", "true")
      .option("delimiter", "|")
      .load("/Volumes/datacraft_catalog/cantu/e-commerce/tb_cmssitelp.csv") )

# tb_cmssitelp.display()

In [0]:
tb_paymentinfos = ( spark
      .read
      .format("parquet")
      .load("/Volumes/datacraft_catalog/cantu/e-commerce/tb_paymentinfos.parquet") )

# tb_paymentinfos.display()

In [0]:
tb_paymentmodes = ( spark
      .read
      .format("csv")
      .option("header", "true")
      .option("delimiter", "|")
      .load("/Volumes/datacraft_catalog/cantu/e-commerce/tb_paymentmodes.csv") )

# tb_paymentmodes.display()

In [0]:
tb_regions = ( spark
      .read
      .format("csv")
      .option("header", "true")
      .option("delimiter", "|")
      .load("/Volumes/datacraft_catalog/cantu/e-commerce/tb_regions.csv") )

# tb_regions.display()

In [0]:
tb_users = ( spark
      .read
      .format("csv")
      .option("header", "true")
      .option("delimiter", "|")
      .load("/Volumes/datacraft_catalog/cantu/e-commerce/tb_users.csv") )

# tb_users.display()

In [0]:
tb_addresses.createOrReplaceTempView("tb_addresses")
tb_cartentries.createOrReplaceTempView("tb_cartentries")
tb_carts.createOrReplaceTempView("tb_carts")
tb_cmssitelp.createOrReplaceTempView("tb_cmssitelp")
tb_paymentinfos.createOrReplaceTempView("tb_paymentinfos")
tb_paymentmodes.createOrReplaceTempView("tb_paymentmodes")
tb_regions.createOrReplaceTempView("tb_regions")
tb_users.createOrReplaceTempView("tb_users")

# Analise

## ✅ 1. Quais os produtos que mais tiveram carrinhos abandonados?


In [0]:
df_carts = spark.table("tb_carts")
df_cartentries = spark.table("tb_cartentries")

df_produtos_abandonados = (
    df_carts.alias("c")
    .join(df_cartentries.alias("ce"), F.col("c.PK") == F.col("ce.p_order"), "inner")
    .groupBy(F.col("ce.p_product"))
    .agg(F.countDistinct(F.col("c.PK")).alias("qtde_abandonados"))
    .orderBy(F.desc("qtde_abandonados"))
)

windowSpec = Window.partitionBy("p_product").orderBy(F.desc("qtde_abandonados"))
df_produtos_abandonados = df_produtos_abandonados.withColumn("cumulative_sum", F.sum("qtde_abandonados").over(windowSpec))
total_abandonados = df_produtos_abandonados.agg(F.sum("qtde_abandonados")).collect()[0][0]
df_produtos_abandonados = df_produtos_abandonados.withColumn("cumulative_perc", F.col("cumulative_sum") / total_abandonados * 100)

df_produtos_abandonados = df_produtos_abandonados.withColumn(
    "ABC_class",
    F.when(F.col("cumulative_perc") <= 80, "A")
    .when((F.col("cumulative_perc") > 80) & (F.col("cumulative_perc") <= 95), "B")
    .otherwise("C")
)

display(df_produtos_abandonados.orderBy(F.desc("qtde_abandonados")))

Esta análise utiliza a classificação ABC para identificar os produtos mais frequentemente abandonados em carrinhos. 
Primeiro, calculamos a quantidade de abandonos por produto. Em seguida, ordenamos os produtos por frequência de abandono e calculamos a soma acumulada e o percentual acumulado. 
Com base nesses percentuais, classificamos os produtos em três categorias:
- Classe A: representam até 80% dos abandonos (produtos mais críticos).
- Classe B: representam de 80% a 95% dos abandonos.
- Classe C: representam os 5% restantes.
Essa abordagem auxilia na priorização de ações para reduzir o abandono de carrinhos, focando nos produtos mais impactantes.

## ✅ 2. Quais as duplas de produtos em conjunto que mais tiveram carrinhos abandonados?

In [0]:
# Exemplo de estrutura de carrinhos abandonados
carrinhos_abandonados = {
    100: ['Banana', 'Maça', 'Pera'],
    101: ['Banana', 'Maça'],
    103: ['Banana', 'Pera']
}

# Convertendo o dicionário em uma lista de tuplas
carrinhos_abandonados_list = [(k, v) for k, v in carrinhos_abandonados.items()]

df = spark.createDataFrame(carrinhos_abandonados_list, ["carrinho_id", "produtos"])

# Coletar o resultado para o driver
result_list = df.collect()

def contar_combinacoes(carrinhos):
    contador_pares = Counter()
    for row in carrinhos:
        produtos = row['produtos']
        pares = combinations(sorted(set(produtos)), 2)
        contador_pares.update(pares)
    return contador_pares

resultado = contar_combinacoes(result_list)

# Converter o resultado para um DataFrame
df_resultado = spark.createDataFrame(
    [(produto1, produto2, qtd) for (produto1, produto2), qtd in resultado.items()],
    ["produto1", "produto2", "qtd_abandonos"]
)

display(df_resultado.orderBy(F.desc("qtd_abandonos")))

In [0]:
tb_cartentries = spark.table("tb_cartentries")

result = tb_cartentries \
    .groupBy("p_order") \
    .agg(F.collect_list("p_product").alias("produtos"))

# Coletar o resultado para o driver
result_list = result.collect()

def contar_combinacoes(carrinhos):
    contador_pares = Counter()
    for row in carrinhos:
        produtos = row['produtos']
        pares = combinations(sorted(set(produtos)), 2)
        contador_pares.update(pares)
    return contador_pares

resultado = contar_combinacoes(result_list)

# Converter o resultado para um DataFrame
df_resultado = spark.createDataFrame(
    [(produto1, produto2, qtd) for (produto1, produto2), qtd in resultado.items()],
    ["produto1", "produto2", "qtd_abandonos"]
)

display(df_resultado.orderBy(F.desc("qtd_abandonos")).limit(1))

## ✅ 3. Quais produtos tiveram um aumento de abandono?

In [0]:
dados = [
    {"produto": 10, "data": "2024-01-15"},
    {"produto": 10, "data": "2024-01-20"},
    {"produto": 10, "data": "2024-02-10"},
    {"produto": 20, "data": "2024-01-10"},
    {"produto": 20, "data": "2024-02-15"},
    {"produto": 20, "data": "2024-02-25"},
    {"produto": 30, "data": "2024-02-01"},
    {"produto": 30, "data": "2024-03-01"},
    {"produto": 30, "data": "2024-03-10"},
    {"produto": 30, "data": "2024-03-15"},
]

df = spark.createDataFrame(dados)
df = df.withColumn("data", F.to_date("data"))
df = df.withColumn("mes", F.date_format("data", "yyyy-MM"))

df_abandonos = (
    df.groupBy("produto", "mes")
    .count()
    .withColumnRenamed("count", "qtd_abandonos")
    .withColumn("mes_ord", F.to_date(F.concat(F.col("mes"), F.lit("-01"))))
)

window_spec = Window.partitionBy("produto").orderBy("mes_ord")
df_abandonos = df_abandonos.withColumn(
    "abandonos_anteriores", F.lag("qtd_abandonos").over(window_spec)
)
df_abandonos = df_abandonos.withColumn(
    "crescimento", F.col("qtd_abandonos") - F.col("abandonos_anteriores")
)

produtos_com_aumento = df_abandonos.filter(F.col("crescimento") > 0)

display(produtos_com_aumento)

> No exemplo acima - o produto 10 não aparece no resultado final do algoritmo porque ele teve uma redução no número de abandonos em fevereiro comparado a janeiro.

In [0]:
tb_cartentries = ( 
                  spark.table("tb_cartentries")
                  .select(
                      F.col("p_product").alias('produto'), 
                      F.col("createdTS").try_cast('date').alias('data')
                      )
                  .filter('p_product is not null')
                  )

df = tb_cartentries.withColumn("mes", F.date_format("data", "yyyy-MM"))

# Contar abandonos por produto e mês
df_abandonos = df.groupBy("produto", "mes").count().withColumnRenamed("count", "qtd_abandonos")

# Ordenar e calcular crescimento
window_spec = Window.partitionBy("produto").orderBy("mes")
df_abandonos = df_abandonos.withColumn("mes_ord", F.to_date(F.concat(F.col("mes"), F.lit("-01"))))
df_abandonos = df_abandonos.withColumn("abandonos_anteriores", F.lag("qtd_abandonos").over(window_spec))
df_abandonos = df_abandonos.withColumn("crescimento", F.col("qtd_abandonos") - F.col("abandonos_anteriores"))

# Filtrar produtos com aumento
df_crescimento = df_abandonos.filter(F.col("crescimento") > 0).orderBy(F.desc("crescimento"))

# Exibe o resultado
display(df_crescimento.orderBy("produto", F.desc("mes")))


O código acima realiza a análise de crescimento de abandonos de produtos em dois cenários diferentes.

No primeiro cenário, os dados são fornecidos diretamente em uma lista de dicionários. O DataFrame é criado e processado para calcular o número de abandonos por produto e mês. Em seguida, é calculado o crescimento de abandonos em relação ao mês anterior para cada produto. Os produtos que apresentaram crescimento são filtrados e exibidos.

No segundo cenário, os dados são lidos de uma tabela chamada 'tb_cartentries'. O DataFrame é processado de maneira similar ao primeiro cenário: os abandonos são contados por produto e mês, o crescimento é calculado, e os produtos com aumento de abandonos são filtrados e exibidos.

Ambos os cenários utilizam funções do PySpark para manipulação e análise dos dados, incluindo a criação de colunas, agrupamento, ordenação e uso de janelas para cálculos de crescimento.


## ✅ 4. Quais os produtos novos e a quantidade de carrinhos no seu primeiro mês de lançamento? 

In [0]:
dados = [
    {"produto": 100, "data": "2024-04-05"},
    {"produto": 100, "data": "2024-04-10"},
    {"produto": 100, "data": "2024-05-01"},
    {"produto": 200, "data": "2024-05-15"},
    {"produto": 200, "data": "2024-05-20"},
    {"produto": 300, "data": "2024-06-01"},
    {"produto": 300, "data": "2024-06-02"},
    {"produto": 300, "data": "2024-06-10"},
    {"produto": 400, "data": "2024-06-15"},
]

df = spark.createDataFrame(dados)
df = df.withColumn("data", F.to_date("data"))
df = df.withColumn("mes", F.date_format("data", "yyyy-MM"))

window_spec = Window.partitionBy("produto").orderBy("data")
df = df.withColumn("primeiro_mes", F.first("mes").over(window_spec))
df_primeiro_mes = df.filter(F.col("mes") == F.col("primeiro_mes"))
df_novos_produtos = (
    df_primeiro_mes.groupBy("produto", "primeiro_mes")
    .count()
    .withColumnRenamed("count", "qtd_carrinhos")
)

display(df_novos_produtos.orderBy("produto"))

In [0]:
# Carregar dados da tabela tb_cartentries
tb_cartentries = (
    spark.table("tb_cartentries")
    .select(
        F.col("p_product").alias('produto'), 
        F.col("createdTS").cast('date').alias('data')
    )
    .filter('p_product is not null')
)

# Adicionar coluna de mês
df = tb_cartentries.withColumn("mes", F.date_format("data", "yyyy-MM"))

# Encontrar o primeiro mês de cada produto
window_spec = Window.partitionBy("produto").orderBy("data")
df_primeiro_mes = df.withColumn("primeiro_mes", F.first("mes").over(window_spec))

# Filtrar para manter apenas o primeiro mês de cada produto
df_primeiro_mes = df_primeiro_mes.filter(F.col("mes") == F.col("primeiro_mes"))

# Contar a quantidade de carrinhos no primeiro mês de cada produto
df_novos_produtos = df_primeiro_mes.groupBy("produto", "primeiro_mes").count().withColumnRenamed("count", "qtd_carrinhos")

# Exibir o resultado
display(df_novos_produtos.orderBy(F.desc("qtd_carrinhos")))

## ✅ 5. Quais estados tiveram mais abandonos? 

- Não foi possível calcular a quantidade de abandonos por estado, pois apenas 6 registros estão preenchidos na coluna `p_paymentaddress` da tabela `tb_carts`. Essa coluna, que faz referência (FK) à tabela `tb_addresses`, limita a análise devido à baixa quantidade de dados disponíveis.

In [0]:
tb_carts = spark.table("tb_carts")
describe_tb_carts = tb_carts.select("p_paymentaddress").summary()
display(describe_tb_carts)

## ✅ 6. Gere um relatório dos produtos, mês a mês informando a quantidade de carrinhos abandonados, quantidade de itens abandonados e o valor não faturado? 

In [0]:
df_carts = spark.table("tb_carts")
df_cartentries = spark.table("tb_cartentries")

df_join  = (
df_carts.alias("c")
.join(df_cartentries.alias("ce"), F.col("c.PK") == F.col("ce.p_order"), "inner")
.select(
    F.col("c.PK"), 
    F.col("ce.p_product"), 
    F.col("ce.p_quantity"), 
    F.col("ce.p_totalprice"), 
    F.col("c.createdTS")
    )
)

df_join  = df_join.withColumn("mes", F.date_format("createdTS", "yyyy-MM"))


df_relatorio = (
    df_join
    .groupBy("p_product", "mes")
    .agg(
        F.countDistinct("c.PK").alias("qtd_carrinhos_abandonados"),
        F.sum("p_quantity").alias("qtd_itens_abandonados"),
        F.sum("p_totalprice").alias("valor_nao_faturado")
    )
    .orderBy("mes", "p_product")
)

df_relatorio.filter("p_product == 8797277388801").orderBy(F.desc("mes")).display() # Usando o produto 8797277388801 como exemplo

In [0]:
df_carts = spark.table("tb_carts")
df_cartentries = spark.table("tb_cartentries")

df_join  = (
df_carts.alias("c")
.join(df_cartentries.alias("ce"), F.col("c.PK") == F.col("ce.p_order"), "inner")
.select(
    F.col("c.PK"), 
    F.col("ce.p_product"), 
    F.col("ce.p_quantity"), 
    F.col("ce.p_totalprice"), 
    F.col("c.createdTS")
    )
)

df_join  = df_join.withColumn("data", F.date_format("createdTS", "yyyy-MM-dd"))


df_relatorio = (
    df_join
    .groupBy("p_product", "data")
    .agg(
        F.countDistinct("c.PK").alias("qtd_carrinhos_abandonados"),
        F.sum("p_quantity").alias("qtd_itens_abandonados"),
        F.sum("p_totalprice").alias("valor_nao_faturado")
    )
    .orderBy("data", "p_product")
)

# Exibe o resultado
df_relatorio.filter("p_product == 8797277388801").orderBy(F.desc("data")).display() # Usando o produto 8797277388801 como exemplo

## ✅ 7. Exporte um arquivo .txt com os 50 carrinhos com os maiores carts.p_totalprice

> **Observação:** O layout apresentado na prova estava confuso em relação à resposta da pergunta e foi considerado um layout que, na minha opinião, faz mais sentido.

In [0]:
df_carts = spark.table("tb_carts")
df_cartentries = spark.table("tb_cartentries")
df_users = spark.table("tb_users")
df_paymentmodes = spark.table("tb_paymentmodes")
df_paymentinfos = spark.table("tb_paymentinfos")
df_cmssitelp = spark.table("tb_cmssitelp")
df_addresses = spark.table("tb_addresses")

df_joined = (
    df_carts.alias("carts")
    .join(df_cartentries.alias("cartentries"), F.col("carts.PK") == F.col("cartentries.p_order"), "left")
    .join(df_users.alias("users"), F.col("carts.p_user") == F.col("users.PK"), "left")
    .join(df_paymentmodes.alias("paymentmodes"), F.col("carts.p_paymentmode") == F.col("paymentmodes.PK"), "left")
    .join(df_paymentinfos.alias("paymentinfos"), F.col("carts.p_paymentinfo") == F.col("paymentinfos.PK"), "left")
    .join(df_cmssitelp.alias("cmssitelp"), F.col("carts.p_site") == F.col("cmssitelp.ITEMPK"), "left")
    .join(df_addresses.alias("addresses"), F.col("carts.p_paymentaddress") == F.col("addresses.PK"), "left")
    .groupBy(
        F.col("carts.PK"),
        F.col("carts.createdTS"),
        F.col("carts.p_totalprice"),
        F.col("users.p_uid"),
        F.col("paymentmodes.p_code"),
        F.col("paymentinfos.p_installments"),
        F.col("cmssitelp.p_name"),
        F.col("addresses.p_postalcode")
    )
    .agg(
        F.sum("cartentries.p_quantity").alias("p_quantity"),
        F.count("cartentries.PK").alias("p_total_entries")
    )
)

df_result = df_joined.orderBy(F.desc("p_totalprice")).limit(50)

display(df_result)