In [0]:
!pip install kagglehub --quiet

In [0]:
# BRONZE - Baixar e ler o Online Retail II (UCI) de forma resiliente
import kagglehub, os, pandas as pd

# Baixa (ou reusa cache) e captura o caminho atual da sessão
path = kagglehub.dataset_download("mashlyn/online-retail-ii-uci")
print("Dataset em:", path)
print("Arquivos:", os.listdir(path))

# Descobre o CSV automaticamente (evita hardcode e erros ao reiniciar kernel)
csv_candidates = [f for f in os.listdir(path) if f.lower().endswith(".csv")]
if not csv_candidates:
    raise FileNotFoundError("Nenhum CSV encontrado no diretório do dataset. Verifique os arquivos listados acima.")

csv_path = os.path.join(path, csv_candidates[0])
print("Usando CSV:", csv_path)

# Leitura com pandas (encoding e data já tratados)
df_pandas = pd.read_csv(
    csv_path,
    encoding="ISO-8859-1",        # troque para "utf-8" se necessário
    parse_dates=["InvoiceDate"],
    dayfirst=True
)

# Converte para Spark e cria a BRONZE
df = spark.createDataFrame(df_pandas)
df.createOrReplaceTempView("bronze_temp")

display(df.limit(5))



In [0]:
from pyspark.sql.functions import col, to_timestamp, round as sround

bronze = spark.table("bronze_temp")

# 1) Limpeza e tipagem
silver_all = (
    bronze
    .withColumnRenamed("Invoice", "InvoiceNo")
    .withColumnRenamed("Customer ID", "CustomerID")
    .withColumnRenamed("Price", "UnitPrice")
    .withColumn("InvoiceDate", to_timestamp(col("InvoiceDate")))
    .withColumn("Quantity", col("Quantity").cast("int"))
    .withColumn("UnitPrice", col("UnitPrice").cast("double"))
    .filter(col("CustomerID").isNotNull())
    .filter((col("Quantity") > 0) & (col("UnitPrice") > 0))
    .filter(~col("InvoiceNo").startswith("C"))   # remove notas de crédito/cancelamentos
    .withColumn("Amount", sround(col("Quantity") * col("UnitPrice"), 2))
    .dropDuplicates(["InvoiceNo", "CustomerID", "InvoiceDate", "Amount"])
)

# 2) Projeção (somente colunas úteis para a Gold + storytelling)
silver = silver_all.select(
    "CustomerID", "InvoiceNo", "InvoiceDate", "Quantity", "UnitPrice", "Amount", "Country"
)

# 3) View Silver (mantém o mesmo nome esperado pela Gold)
silver.createOrReplaceTempView("transacoes_silver")

display(silver.limit(5))





GOLD - Cálculo das métricas R, F e M (RFM)
-- Data de referência = última data do dataset (reprodutível)

In [0]:
%sql
create or replace temp view _ref as
select MAX(InvoiceDate) as ref_date
from transacoes_silver;

Recencia

In [0]:
%sql
create or replace temp view recencia as
with last_purchase as (
    select CustomerID, MAX(InvoiceDate) AS last_date
    from transacoes_silver
    group by CustomerID
)
select l.CustomerID,
       DATEDIFF(r.ref_date, l.last_date) as R
from last_purchase l cross join _ref r;

In [0]:
%sql
select * from recencia

Frequência (F) - soma de itens (poderia ser nº de pedidos com COUNT(DISTINCT InvoiceNo))

In [0]:
%sql
create or replace temp view frequencia as
    select CustomerID, SUM(quantity) as F from transacoes_silver group by CustomerID order by CustomerID


Monetário (M)

In [0]:
%sql
create or replace temp view monetaridade as
    select CustomerID, SUM(amount) as M from transacoes_silver group by CustomerID order by CustomerID

Quintis (scores)

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW R_score AS
SELECT CustomerID, R, 6 - NTILE(5) OVER (ORDER BY R ASC) AS R_score
FROM recencia;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW frequencia_score AS
SELECT CustomerID, F, NTILE(5) OVER (ORDER BY F ASC) AS F_score
FROM frequencia;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW monetaridade_score AS
SELECT CustomerID, M, NTILE(5) OVER (ORDER BY M ASC) AS M_score
FROM monetaridade;

-- RFM + média(F,M)

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW rfm_silver AS
SELECT r.CustomerID, r.R_score, f.F_score, m.M_score
FROM R_score r
JOIN frequencia_score f USING (CustomerID)
JOIN monetaridade_score m USING (CustomerID);

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW rfm_silver_media AS
SELECT CustomerID,
       R_score,
       (F_score + M_score)/2.0 AS media_FM
FROM rfm_silver;

In [0]:
%sql
SELECT * FROM rfm_silver_media ORDER BY CustomerID LIMIT 10;

In [0]:
import numpy as np
#Converter para pandas para segmentação
df_rfm = spark.table("rfm_silver_media").toPandas()

# Regras baseadas em análise e contexto do Online Retail II
regras = [
    (df_rfm['R_score'] >= 4) & (df_rfm['media_FM'] >= 4),
    (df_rfm['R_score'] >= 3) & (df_rfm['media_FM'].between(3, 4)),
    (df_rfm['R_score'] >= 3) & (df_rfm['media_FM'].between(2, 3)),
    (df_rfm['R_score'] == 5) & (df_rfm['media_FM'] <= 2),
    (df_rfm['R_score'] <= 2) & (df_rfm['media_FM'] >= 3),
    (df_rfm['R_score'].between(2, 3)) & (df_rfm['media_FM'] <= 2),
    (df_rfm['R_score'] <= 2) & (df_rfm['media_FM'] <= 2)
]

segmentos = [
    'Champions',
    'Loyal Customers',
    'Potential Loyalist',
    'New Customers',
    'At Risk',
    'About to Sleep',
    'Lost'
]

df_rfm['GRUPO'] = np.select(regras, segmentos, default='Outros')

In [0]:
spark.createDataFrame(df_rfm).createOrReplaceTempView('rfm_gold_segmentacao')

display(spark.table("rfm_gold_segmentacao").limit(10))

In [0]:
# Instalar o pacote squarify (apenas uma vez por sessão)
!pip install squarify --quiet

# Reinicializa a sessão Python para ativar o pacote (Databricks exige isso após pip install)
dbutils.library.restartPython()


In [0]:
import matplotlib.pyplot as plt
import squarify

df = df_rfm.copy()

# Agrupar por grupo e contar clientes
grouped = df.groupby('GRUPO', as_index=False).agg(qt=('CustomerID','count'))
total = grouped['qt'].sum()

# Labels com % de clientes
labels = [f"{row['GRUPO']}\n{(row['qt']/total)*100:.1f}%" for _, row in grouped.iterrows()]

# Plot Treemap
plt.figure(figsize=(10,6))
squarify.plot(
    sizes=grouped['qt'],
    label=labels,
    alpha=0.85
)
plt.title("Treemap de Clientes por Segmento (% de clientes)")
plt.axis('off')
plt.show()


In [0]:
plt.figure(figsize=(8,6))
for seg, sub in df.groupby('GRUPO'):
    plt.scatter(sub['media_FM'], sub['R_score'], label=seg, alpha=0.7)

plt.xlabel('Média (F_score e M_score)')
plt.ylabel('R_score')
plt.title('Distribuição dos Clientes por Segmento (RFM)')
plt.legend(bbox_to_anchor=(1.02, 1), loc='upper left')
plt.show()
