In [0]:
import pandas as pd
import numpy as np

In [0]:
display(dbutils.fs.ls("/Volumes/workspace/default/dados_online/"))

df_spark = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("sep", ";") 
    .csv("/Volumes/workspace/default/dados_online/Online Retail.csv")
)


df = df_spark.toPandas()


print(df.shape)
df.head()



In [0]:
print(df.shape)

In [0]:
print(df.dtypes)

In [0]:
df.sample(5)

In [0]:
# Limpeza
df_clean = (
    df.dropna(subset=["CustomerID"])  # remove linhas sem cliente
      .loc[~df["InvoiceNo"].astype(str).str.startswith("C")]  # remove cancelamentos
      .loc[df["Quantity"] > 0]  # quantidade positiva
      .loc[df["UnitPrice"] > 0]  # preço positivo
      .copy()
)

In [0]:
# Coluna de valor total
df_clean["TotalPrice"] = df_clean["Quantity"] * df_clean["UnitPrice"]

In [0]:
# Data de referência (última data do dataset após limpeza)
ref_date = pd.to_datetime(df_clean["InvoiceDate"]).max()
ref_date

In [0]:
# Garante tipo datetime
df_clean["InvoiceDate"] = pd.to_datetime(df_clean["InvoiceDate"])

rfm = (
    df_clean.groupby("CustomerID", as_index=False)
    .agg(
        Recency=("InvoiceDate", lambda x: (ref_date - x.max()).days),
        Frequency=("InvoiceNo", "nunique"),
        Monetary=("TotalPrice", "sum")
    )
)

rfm.sort_values("Monetary", ascending=False).head(10)


In [0]:
def qcut_safe(series, q=5, labels=None):
    try:
        return pd.qcut(series, q, labels=labels)
    except Exception:
        return pd.cut(series, q, labels=labels, duplicates="drop")

# Recency invertido: menor recência = melhor score
rfm["R_Score"] = qcut_safe(rfm["Recency"], 5, labels=[5,4,3,2,1]).astype(int)

# Em Frequency, para reduzir empates usamos rank antes do qcut
rfm["F_Score"] = qcut_safe(rfm["Frequency"].rank(method="first"), 5, labels=[1,2,3,4,5]).astype(int)

# Monetary direto
rfm["M_Score"] = qcut_safe(rfm["Monetary"], 5, labels=[1,2,3,4,5]).astype(int)

# Score combinado (string) e soma numérica (opcional)
rfm["RFM_Score"] = rfm["R_Score"].astype(str) + rfm["F_Score"].astype(str) + rfm["M_Score"].astype(str)
rfm["RFM_Sum"] = rfm[["R_Score","F_Score","M_Score"]].sum(axis=1)

rfm.head()
#OK

In [0]:
def segment(row):
    r, f, m = row["R_Score"], row["F_Score"], row["M_Score"]
    if (r >= 4) and (f >= 4) and (m >= 4):
        return "Campeões"
    if (r >= 4) and (f >= 4):
        return "Leais"
    if (r >= 4) and (f <= 2) and (m >= 3):
        return "Promissores"
    if (r <= 2) and (f >= 3) and (m >= 3):
        return "Em risco"
    if (r <= 2) and (f <= 2) and (m >= 3):
        return "Hibernando (alto gasto)"
    if (r >= 3) and (f <= 2) and (m <= 2):
        return "Novos/baixo valor"
    return "Atenção"

rfm["Segment"] = rfm.apply(segment, axis=1)

# Resumo por segmento
seg_resumo = rfm.groupby("Segment").agg(
    customers=("CustomerID", "count"),
    avg_monetary=("Monetary", "mean"),
    avg_frequency=("Frequency", "mean"),
    avg_recency=("Recency", "mean")
).sort_values("customers", ascending=False).round(2)

seg_resumo


In [0]:
# Distribuição de clientes por segmento
dist_segmento = rfm["Segment"].value_counts().rename_axis("Segment").reset_index(name="Customers")
dist_segmento


In [0]:
# 5.2 Ticket médio por segmento
ticket_segmento = rfm.groupby("Segment")["Monetary"].mean().sort_values(ascending=False).round(2)
ticket_segmento

In [0]:
# Top 20 clientes por Monetary
top_clientes = rfm.sort_values("Monetary", ascending=False).head(20)
top_clientes[["CustomerID","Recency","Frequency","Monetary","R_Score","F_Score","M_Score","Segment"]]

In [0]:
# Para cada cliente, pegue o registro mais recente para extrair o Country
idx = (
    df_clean.sort_values(["CustomerID","InvoiceDate"])
            .groupby("CustomerID")["InvoiceDate"].idxmax()
)
last_purchase = (
    df_clean.loc[idx, ["CustomerID","Country","InvoiceDate"]]
             .rename(columns={"Country":"LastCountry", "InvoiceDate":"LastPurchaseAt"})
)

rfm_country = rfm.merge(last_purchase, on="CustomerID", how="left")
rfm_country.head()


In [0]:
# padronização rápida de tipos
rfm_country["CustomerID"] = rfm_country["CustomerID"].astype(int)

# pandas -> Spark
rfm_spark = spark.createDataFrame(rfm_country)

spark.sql("CREATE CATALOG IF NOT EXISTS analytics")
spark.sql("CREATE SCHEMA IF NOT EXISTS analytics.marketing")

# Salva tabela Delta 
rfm_spark.write.mode("overwrite").saveAsTable("analytics.marketing.customers_rfm")
