# Análise RFM (Recency, Frequency, Monetary) com Apache Spark (PySpark)

**Dataset:** Online Retail (UCI Machine Learning Repository)  
Fonte: https://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx

**Objetivo:** calcular métricas RFM por cliente, criar scores (1 a 5) com `ntile(5)` e segmentar clientes para apoiar decisões de CRM e retenção.


## 1. Introdução

RFM é uma técnica de segmentação de clientes baseada em três dimensões:

- **Recency (R):** quão recente foi a última compra (quanto menor, melhor).
- **Frequency (F):** número de compras/pedidos (quanto maior, melhor).
- **Monetary (M):** valor total gasto (quanto maior, melhor).

O dataset **Online Retail** contém transações de uma empresa de varejo online (invoices, produtos, quantidade, preço, país e `CustomerID`).

Neste notebook:

- A **limpeza e filtragem** será feita **exclusivamente em Spark SQL**.
- O **cálculo do RFM** será feito com **PySpark DataFrame API**.
- O **scoring** será feito com **window functions** usando **`ntile(5)`**.

Observação sobre lazy evaluation: transformações são definidas de forma declarativa; ações como `show()` e `count()` disparam a execução.


## 2. Setup do Ambiente Spark

Notebook pronto para rodar em **Google Colab** com Spark local via PySpark.


In [None]:
!pip -q install pyspark==3.5.1 openpyxl==3.1.2


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = (
    SparkSession.builder
    .appName('RFM - Online Retail')
    .master('local[*]')
    .getOrCreate()
)

spark.sparkContext.setLogLevel('WARN')
print('Spark version:', spark.version)


## 3. Ingestão dos Dados

O dataset original está em **Excel (.xlsx)**. Como a ingestão em Spark será via **CSV**, vamos:

1. Baixar o `.xlsx` da UCI
2. Converter para `.csv` **sem Pandas** (somente `openpyxl` + `csv`)
3. Ler o CSV com `spark.read.csv`

Nesta etapa não aplicamos regras de negócio; apenas preparamos o formato de leitura.


In [None]:
import os
import csv
import requests
from openpyxl import load_workbook

base_dir = '/content'
xlsx_path = os.path.join(base_dir, 'OnlineRetail.xlsx')
csv_path = os.path.join(base_dir, 'OnlineRetail.csv')

url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx'

if not os.path.exists(xlsx_path):
    resp = requests.get(url, timeout=120)
    resp.raise_for_status()
    with open(xlsx_path, 'wb') as f:
        f.write(resp.content)

if not os.path.exists(csv_path):
    wb = load_workbook(xlsx_path, read_only=True, data_only=True)
    ws = wb.active
    with open(csv_path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        for row in ws.iter_rows(values_only=True):
            writer.writerow(list(row))
    wb.close()

print('CSV pronto em:', csv_path)


In [None]:
df_raw = (
    spark.read
    .option('header', True)
    .csv(csv_path)
)

df_raw.printSchema()
df_raw.show(5, truncate=False)


## 4. Limpeza e Regras de Negócio (Spark SQL)

Nesta seção, a limpeza e filtragem são feitas **exclusivamente em Spark SQL**.

Regras obrigatórias:

- Remover registros com `CustomerID` nulo
- Remover cancelamentos (`InvoiceNo` iniciando com `C`)
- Remover quantidades `<= 0`
- Criar `Revenue = Quantity * UnitPrice`

Saída: view temporária **`vendas_clean`**.


In [None]:
df_raw.createOrReplaceTempView('vendas_raw')

spark.sql("""
Create Or Replace Temp View vendas_clean AS
Select
  Cast(InvoiceNo AS String)                            AS InvoiceNo,
  Cast(StockCode AS String)                            AS StockCode,
  Cast(Description AS String)                          AS Description,
  Cast(Quantity AS Int)                                AS Quantity,
  Cast(UnitPrice AS Double)                            AS UnitPrice,
  Cast(Cast(CustomerID AS Double) AS Int)              AS CustomerID,
  Cast(Country AS String)                              AS Country,
  Cast(Quantity AS Double) * Cast(UnitPrice AS Double) AS Revenue,
  Coalesce(
    to_timestamp(InvoiceDate),
    to_timestamp(InvoiceDate, 'M/d/yyyy H:mm')
  )                                                     AS InvoiceDate
From 
  vendas_raw
Where 1 = 1
  And CustomerID Is Not Null
  And InvoiceNo Not Like 'C%'
  And Cast(Quantity AS Int) > 0
  And Cast(UnitPrice AS Double) > 0
""")

df_clean = spark.table('vendas_clean')
df_clean.printSchema()
df_clean.show(5, truncate=False)


**Cache para evitar recomputação durante múltiplas agregações.**


In [None]:
df_clean.cache()
print('Linhas em vendas_clean:', df_clean.count())


## 5. Construção do RFM (PySpark)

A partir de `vendas_clean`, calculamos por `CustomerID`:

- **Recency:** dias desde a última compra
- **Frequency:** número distinto de `InvoiceNo`
- **Monetary:** soma da receita (`Revenue`)

Data de referência: `max(InvoiceDate) + 1 dia`.


In [None]:
from datetime import timedelta

max_invoice_ts = df_clean.select(F.max('InvoiceDate').alias('max_invoice_ts')).first()['max_invoice_ts']
reference_date = max_invoice_ts + timedelta(days=1)

rfm = (
    df_clean
    .groupBy('CustomerID')
    .agg(
        F.datediff(F.lit(reference_date), F.max('InvoiceDate')).alias('Recency'),
        F.countDistinct('InvoiceNo').alias('Frequency'),
        F.sum('Revenue').alias('Monetary')
    )
)

rfm.show(10, truncate=False)


## 6. Scoring RFM (Window + `ntile(5)`)

Scores de 1 a 5:

- `R_Score`: quanto mais recente, maior o score
- `F_Score`: quanto mais frequente, maior o score
- `M_Score`: quanto maior o gasto, maior o score

O `RFM_Score` será a concatenação `R_ScoreF_ScoreM_Score` (ex.: `555`).


In [None]:
w_r = Window.orderBy(F.col('Recency').asc())
w_f = Window.orderBy(F.col('Frequency').desc())
w_m = Window.orderBy(F.col('Monetary').desc())

rfm_scored = (
    rfm
    .withColumn('R_Score', F.lit(6) - F.ntile(5).over(w_r))
    .withColumn('F_Score', F.lit(6) - F.ntile(5).over(w_f))
    .withColumn('M_Score', F.lit(6) - F.ntile(5).over(w_m))
    .withColumn('RFM_Score', F.concat_ws('', F.col('R_Score'), F.col('F_Score'), F.col('M_Score')))
)

rfm_scored.orderBy(F.desc('RFM_Score')).show(10, truncate=False)


**Plano lógico e físico otimizado pelo Catalyst.**


In [None]:
rfm_scored.explain(True)


## 7. Segmentação de Clientes

Segmentos (heurísticos) baseados nos scores:

- **Champions:** muito recentes, frequentes e com alto gasto
- **Loyal Customers:** alta frequência e boa recência
- **Big Spenders:** alto gasto (mesmo que frequência seja moderada)
- **At Risk:** pouca recência (tempo sem comprar) porém já compraram com alguma frequência

As regras abaixo são um ponto de partida e devem ser calibradas conforme a estratégia do negócio.


In [None]:
segmented = (
    rfm_scored
    .withColumn(
        'Segment',
        F.when((F.col('R_Score') >= 4) & (F.col('F_Score') >= 4) & (F.col('M_Score') >= 4), 'Champions')
         .when((F.col('R_Score') >= 3) & (F.col('F_Score') >= 4), 'Loyal Customers')
         .when((F.col('M_Score') >= 4) & (F.col('R_Score') >= 3), 'Big Spenders')
         .when((F.col('R_Score') <= 2) & (F.col('F_Score') >= 3), 'At Risk')
         .otherwise('Others')
    )
)

segmented.groupBy('Segment').count().orderBy(F.desc('count')).show(truncate=False)


## 8. Análise de Resultados

Vamos calcular:

- Quantidade de clientes por segmento
- Receita total e ticket médio por segmento

Essas visões ajudam a priorizar ações (retenção, reativação e upsell).


In [None]:
clientes_por_segmento = (
    segmented
    .groupBy('Segment')
    .agg(F.count('*').alias('Clientes'))
    .orderBy(F.desc('Clientes'))
)

receita_por_segmento = (
    segmented
    .groupBy('Segment')
    .agg(
        F.sum('Monetary').alias('Receita_Total'),
        F.avg('Monetary').alias('Ticket_Medio')
    )
    .orderBy(F.desc('Receita_Total'))
)

clientes_por_segmento.show(truncate=False)
receita_por_segmento.show(truncate=False)


### Insights de negócio (direcionamento)

- **Champions:** campanhas de retenção premium, early access, programas VIP.
- **Loyal Customers:** recomendação personalizada e bundles para elevar ticket médio.
- **Big Spenders:** ofertas premium e upsell; foco em experiência e atendimento.
- **At Risk:** campanhas de reativação (cupom, frete, comunicação multicanal) e pesquisa de motivos de churn.


## 9. Conclusão

Principais aprendizados:

- RFM resume comportamento de compra em métricas simples e interpretáveis.
- O scoring via quantis (`ntile`) permite comparar clientes de forma relativa.
- A segmentação direciona ações com maior potencial de impacto.

Próximos passos sugeridos:

- Clusterização (ex.: K-Means) em features RFM padronizadas
- CLV (Customer Lifetime Value)
- Modelos preditivos (propensão à recompra/churn)
