In [0]:
spark.sql("""
CREATE OR REPLACE TABLE workspace.churn.churn_reason_raw AS
SELECT
  Partner AS customer_id,
  Churn,
  CustomerFeedback_clean,
  ai_extract(
    CONCAT(
      'Você é um analista de churn da operadora. ',
      'Leia o feedback e extraia: ',
      '(1) reason_short = motivo principal em 3 a 6 palavras; ',
      '(2) reason_long = explicação em uma frase. ',
      'Feedback: ',
      CustomerFeedback_clean
    ),
    array(
      'reason_short',
      'reason_long'
    )
  ) AS reason_struct
FROM workspace.churn.history_genie
WHERE Churn = 1
""")

In [0]:
%sql
SELECT
  customer_id,
  Churn,
  CustomerFeedback_clean,
  reason_struct.reason_short,
  reason_struct.reason_long
FROM workspace.churn.churn_reason_raw
LIMIT 20;

In [0]:
%sql
-- Top motivos curtos
SELECT
  reason_struct,
  COUNT(*) AS qtd
FROM workspace.churn.churn_reason_raw
GROUP BY reason_struct
ORDER BY qtd DESC
LIMIT 50;

Taxonomia com base no Short reason e Long Reason

1. **Better deal / pricing**  
   _Exemplos:_ encontrou oferta melhor, melhor preço, promoção concorrente, custo alto etc.

2. **Technical issues / speed**  
   _Exemplos:_ internet lenta, velocidades inconsistentes, problemas técnicos genéricos.

3. **Service reliability / outages**  
   _Exemplos:_ quedas frequentes, instabilidade, serviço “vai e volta”.

4. **Product / plan mismatch**  
   _Exemplos:_ plano sem internet, recurso esperado não incluído, tipo de serviço não atende uso.

5. **Payment / billing issues**  
   _Exemplos:_ método de pagamento inconveniente, cobrança, faturamento.

6. **Personal reasons**  
   _Exemplos:_ mudança, motivos pessoais genéricos.

7. **Other / unclear**  
   _Exemplos:_ o que não encaixar claramente acima.

In [0]:
spark.sql("""
CREATE OR REPLACE TABLE workspace.churn.churn_reason_final AS
SELECT
  customer_id,
  Churn,
  CustomerFeedback_clean,
  reason_struct.reason_short AS reason_short,
  reason_struct.reason_long  AS reason_long,
  ai_classify(
    CONCAT(
      'Motivo curto: ', reason_struct.reason_short, '. ',
      'Motivo detalhado: ', reason_struct.reason_long, '. ',
      'Classifique o motivo de churn em UMA das categorias abaixo.'
    ),
    array(
      'Better deal / pricing',
      'Technical issues / speed',
      'Service reliability / outages',
      'Product / plan mismatch',
      'Payment / billing issues',
      'Personal reasons',
      'Other / unclear'
    )
  ) AS churn_category
FROM workspace.churn.churn_reason_raw
""")

In [0]:
%sql
SELECT
  churn_category,
  COUNT(*) AS qtd
FROM workspace.churn.churn_reason_final
GROUP BY churn_category
ORDER BY qtd DESC;

In [0]:
import matplotlib.pyplot as plt

# agrega os dados
df_counts = (
    spark.table("workspace.churn.churn_reason_final")
         .groupBy("churn_category")
         .count()
         .filter("churn_category IS NOT NULL")
         .orderBy("count", ascending=False)
         .toPandas()
)

categories = df_counts["churn_category"]
counts = df_counts["count"]

plt.figure(figsize=(10, 6))
plt.bar(categories, counts)
plt.xticks(rotation=45, ha="right")
plt.ylabel("Número de clientes")
plt.title("Contagem de churn por motivo")
plt.tight_layout()
plt.show()

In [0]:
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

# Tabelas
df_reason = spark.table("workspace.churn.churn_reason_final").alias("r")
df_hist   = spark.table("workspace.churn.history_genie").alias("h")

# Join pela combinação (Churn, CustomerFeedback_clean)
df_base = (
    df_reason
    .join(
        df_hist,
        (F.col("r.CustomerFeedback_clean") == F.col("h.CustomerFeedback_clean")) &
        (F.col("r.Churn") == F.col("h.Churn")),
        "inner"
    )
    .filter(F.col("r.churn_category").isNotNull())
)

In [0]:
df_avg_spend = (
    df_base
    .groupBy("r.churn_category")
    .agg(F.avg("h.TotalCharges").alias("avg_spend"))
    .orderBy(F.col("avg_spend").desc())
    .toPandas()
)

categories = df_avg_spend["churn_category"]
avg_spend  = df_avg_spend["avg_spend"]

plt.figure(figsize=(10, 6))
bars = plt.bar(categories, avg_spend)

plt.ylim(0, max(avg_spend) * 1.15)

# valor médio em cima de cada barra
for bar, value in zip(bars, avg_spend):
    height = bar.get_height()
    plt.text(
        bar.get_x() + bar.get_width() / 2,
        height,
        f"{value:,.2f}",
        ha="center",
        va="bottom",
        fontsize=9
    )

plt.xticks(rotation=45, ha="right")
plt.ylabel("Gasto total médio (TotalCharges)")
plt.title("Gasto total médio por motivo de churn")
plt.tight_layout()
plt.show()

In [0]:
df_avg_tenure = (
    df_base
    .groupBy("r.churn_category")
    .agg(F.avg("h.tenure").alias("avg_tenure"))
    .orderBy(F.col("avg_tenure").desc())
    .toPandas()
)

categories = df_avg_tenure["churn_category"]
avg_tenure = df_avg_tenure["avg_tenure"]

plt.figure(figsize=(10, 6))
bars = plt.bar(categories, avg_tenure)

plt.ylim(0, max(avg_tenure) * 1.15)

for bar, value in zip(bars, avg_tenure):
    height = bar.get_height()
    plt.text(
        bar.get_x() + bar.get_width() / 2,
        height,
        f"{value:,.1f}",
        ha="center",
        va="bottom",
        fontsize=9
    )

plt.xticks(rotation=45, ha="right")
plt.ylabel("Tenure médio (meses)")
plt.title("Tempo médio de relacionamento por motivo de churn")
plt.tight_layout()
plt.show()

In [0]:
from pyspark.sql import functions as F

df_contract = (
    df_base
    .withColumn(
        "contract_type",
        F.when(F.col("h.Contract_One_year"),  F.lit("One year"))
         .when(F.col("h.Contract_Two_year"),  F.lit("Two year"))
         .otherwise(F.lit("Month-to-month"))
    )
    .groupBy("r.churn_category", "contract_type")
    .count()
    .toPandas()
)

# Pivot para ficar motivo x tipo de contrato
pivot = df_contract.pivot(
    index="churn_category",
    columns="contract_type",
    values="count"
).fillna(0)

pivot = pivot.sort_values(by=pivot.columns.tolist(), ascending=False)

pivot

In [0]:
categories = pivot.index.tolist()
contract_types = pivot.columns.tolist()

x = range(len(categories))
width = 0.25  # largura de cada barra

plt.figure(figsize=(10, 6))

for i, ct in enumerate(contract_types):
    plt.bar(
        [xi + i*width for xi in x],
        pivot[ct].values,
        width,
        label=ct
    )

plt.xticks([xi + width for xi in x], categories, rotation=45, ha="right")
plt.ylabel("Número de clientes churnados")
plt.title("Tipo de contrato por motivo de churn")
plt.legend(title="Tipo de contrato")
plt.tight_layout()
plt.show()

In [0]:
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

# ----- recalcula a tabela, para garantir que está tudo consistente -----
df_reason = spark.table("workspace.churn.churn_reason_final").alias("r")
df_hist   = spark.table("workspace.churn.history_genie").alias("h")

df_base = (
    df_reason
    .join(
        df_hist,
        (F.col("r.CustomerFeedback_clean") == F.col("h.CustomerFeedback_clean")) &
        (F.col("r.Churn") == F.col("h.Churn")),
        "inner"
    )
    .filter(F.col("r.churn_category").isNotNull())
)

df_internet = (
    df_base
    .withColumn(
        "internet_service",
        F.when(F.col("h.InternetService_Fiber_optic"), F.lit("Fiber optic"))
         .when(F.col("h.InternetService_No"),          F.lit("No internet"))
         .otherwise(F.lit("DSL/Other"))
    )
)

df_counts = (
    df_internet
    .groupBy("r.churn_category", "internet_service")
    .count()
    .toPandas()
)

pivot_pct = (
    df_counts
    .pivot(index="churn_category", columns="internet_service", values="count")
    .fillna(0)
)

# converte pra porcentagem por categoria de churn
pivot_pct = pivot_pct.div(pivot_pct.sum(axis=1), axis=0) * 100

# garante ordem fixa das colunas
internet_order = ["DSL/Other", "Fiber optic", "No internet"]
pivot_pct = pivot_pct[internet_order]

# ordena categorias pelo total de churn (opcional)
pivot_pct = pivot_pct.sort_index()

categories = pivot_pct.index.tolist()
x = np.arange(len(categories))
width = 0.25  # largura de cada barra

fig, ax = plt.subplots(figsize=(10, 6))

for i, itype in enumerate(internet_order):
    ax.bar(
        x + (i - 1)*width,                 # desloca cada tipo para o lado
        pivot_pct[itype].values,
        width,
        label=itype
    )

ax.set_xticks(x)
ax.set_xticklabels(categories, rotation=45, ha="right")
ax.set_ylabel("Participação (%)")
ax.set_title("Serviço de internet por categoria de churn (percentual)")
ax.legend(title="Tipo de internet")
ax.set_ylim(0, 100)

plt.tight_layout()
plt.show()

In [0]:
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql import functions as F

# ============================
# 1) Monta base com motivo + features
# ============================
df_reason = spark.table("workspace.churn.churn_reason_final").alias("r")
df_hist   = spark.table("workspace.churn.history_genie").alias("h")

df_base = (
    df_reason
    .join(
        df_hist,
        (F.col("r.CustomerFeedback_clean") == F.col("h.CustomerFeedback_clean")) &
        (F.col("r.Churn") == F.col("h.Churn")),
        "inner"
    )
    .filter(F.col("r.churn_category").isNotNull())
)

# ============================
# 2) Deriva tipo de pagamento a partir das dummies
# ============================
df_pay = (
    df_base
    .withColumn(
        "payment_type",
        F.when(F.col("h.PaymentMethod_Electronic_check"),      F.lit("Electronic check"))
         .when(F.col("h.PaymentMethod_Mailed_check"),          F.lit("Mailed check"))
         .when(F.col("h.PaymentMethod_Credit_card_automatic"),F.lit("Credit card (automatic)"))
         .otherwise(F.lit("Bank transfer (automatic)"))
    )
)

# ============================
# 3) Tabela de contagem e % por motivo de churn
# ============================
df_counts = (
    df_pay
    .groupBy("r.churn_category", "payment_type")
    .count()
    .toPandas()
)

pivot_pct = (
    df_counts
    .pivot(index="churn_category", columns="payment_type", values="count")
    .fillna(0)
)

# transforma em % dentro de cada motivo de churn
pivot_pct = pivot_pct.div(pivot_pct.sum(axis=1), axis=0) * 100

# ordem fixa dos tipos de pagamento
payment_order = [
    "Electronic check",
    "Mailed check",
    "Credit card (automatic)",
    "Bank transfer (automatic)",
]
pivot_pct = pivot_pct[payment_order]

# ordena motivos pelo total de casos (opcional)
totals = df_counts.groupby("churn_category")["count"].sum().sort_values(ascending=False)
pivot_pct = pivot_pct.loc[totals.index]

print("Percentual de tipo de pagamento por motivo de churn:")
print(pivot_pct.round(1))

# ============================
# 4) Gráfico de barras agrupadas em %
# ============================
categories = pivot_pct.index.tolist()
x = np.arange(len(categories))
width = 0.2  # largura de cada barra

fig, ax = plt.subplots(figsize=(10, 6))

for i, ptype in enumerate(payment_order):
    ax.bar(
        x + (i - (len(payment_order)-1)/2)*width,
        pivot_pct[ptype].values,
        width,
        label=ptype
    )

ax.set_xticks(x)
ax.set_xticklabels(categories, rotation=45, ha="right")
ax.set_ylabel("Participação (%)")
ax.set_title("Tipo de pagamento por motivo de churn")
ax.set_ylim(0, 100)
ax.legend(title="PaymentMethod")

plt.tight_layout()
plt.show()

In [0]:
# ============================================
# Construção de modelos:
#  - Modelo 1: probabilidade de churn
#  - Modelo 2: motivo de churn (entre churnados)
# Fonte: tabelas Spark history_genie + churn_reason_final
# ============================================

import pandas as pd
import numpy as np
from pyspark.sql import functions as F

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score

# -------------------------------------------------------
# 0. Carregar dados das tabelas Spark para pandas
# -------------------------------------------------------

# Base de features (numéricas + dummies) que você usou até agora
df_raw = spark.table("workspace.churn.history_genie").toPandas()

# Tabela com motivos de churn gerados por LLM
df_reasons = (
    spark.table("workspace.churn.churn_reason_final")
         .select("CustomerFeedback_clean", "Churn", "churn_category")
         .toPandas()
)

# Juntar pelas colunas em comum (Churn + CustomerFeedback_clean)
df_full = df_raw.merge(
    df_reasons,
    on=["CustomerFeedback_clean", "Churn"],
    how="left",
)

# Target binário: churn sim/não (já é 0/1 em history_genie)
df_full["Churn_flag"] = df_full["Churn"].astype(int)

# -------------------------------------------------------
# 1. Definir features
#    (vamos usar só variáveis numéricas e dummies, sem texto)
# -------------------------------------------------------

cols_drop = [
    "Churn",               # target original
    "Churn_flag",          # target binário (vamos tirar de X)
    "churn_category",      # target do modelo 2
    "CustomerFeedback",    # texto original
    "CustomerFeedback_clean",
]

feature_cols = [c for c in df_full.columns if c not in cols_drop]

X_all = df_full[feature_cols]
y_churn = df_full["Churn_flag"]

# numéricas (ajuste se quiser incluir/excluir)
numeric_cols = ["tenure", "MonthlyCharges", "TotalCharges", "MonthlyIncome"]

# o resto é binário/bool (0/1 ou True/False)
categorical_cols = [c for c in feature_cols if c not in numeric_cols]

# -------------------------------------------------------
# 2. Pipeline de pré-processamento
# -------------------------------------------------------

numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler()),
])

categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot",  OneHotEncoder(handle_unknown="ignore")),
])

preprocess = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_cols),
        ("cat", categorical_transformer, categorical_cols),
    ]
)

# -------------------------------------------------------
# 3. Modelo 1 – probabilidade de churn
# -------------------------------------------------------

clf_churn = LogisticRegression(
    max_iter=1000,
    class_weight="balanced",
)

pipe_churn = Pipeline(steps=[
    ("preprocess", preprocess),
    ("model", clf_churn),
])

X_train, X_test, y_train, y_test = train_test_split(
    X_all,
    y_churn,
    test_size=0.2,
    random_state=42,
    stratify=y_churn,
)

pipe_churn.fit(X_train, y_train)

y_pred = pipe_churn.predict(X_test)
y_proba = pipe_churn.predict_proba(X_test)[:, 1]

print("=== Modelo de CHURN ===")
print(classification_report(y_test, y_pred, digits=3))
print("AUC:", roc_auc_score(y_test, y_proba))

# -------------------------------------------------------
# 4. Modelo 2 – motivo de churn (só para quem já churnou)
# -------------------------------------------------------

df_churn = df_full[(df_full["Churn_flag"] == 1) & df_full["churn_category"].notna()].copy()

X_reason = df_churn[feature_cols]
y_reason = df_churn["churn_category"]

Xr_train, Xr_test, yr_train, yr_test = train_test_split(
    X_reason,
    y_reason,
    test_size=0.2,
    random_state=42,
    stratify=y_reason,
)

clf_reason = LogisticRegression(
    max_iter=1000,
    multi_class="multinomial",
    class_weight="balanced",
)

pipe_reason = Pipeline(steps=[
    ("preprocess", preprocess),   # reaproveita o mesmo preprocessador
    ("model", clf_reason),
])

pipe_reason.fit(Xr_train, yr_train)

yr_pred = pipe_reason.predict(Xr_test)

print("\n=== Modelo de MOTIVO de churn (condicional) ===")
print(classification_report(yr_test, yr_pred, digits=3))

# -------------------------------------------------------
# 5. Função para prever churn + top motivos
# -------------------------------------------------------

def prever_churn_e_motivos(cliente_dict):
    """
    cliente_dict: dicionário com as mesmas chaves de feature_cols.
    Exemplo mínimo:
    {
      "SeniorCitizen": 0,
      "Partner": 1,
      "Dependents": 0,
      "tenure": 12,
      "PhoneService": 1,
      ...
    }
    """
    x = pd.DataFrame([cliente_dict])[feature_cols]

    # probabilidade de churn
    p_churn = pipe_churn.predict_proba(x)[0, 1]

    # distribuição de motivos condicional a churn
    motivo_probas = pipe_reason.predict_proba(x)[0]
    motivos = pipe_reason.named_steps["model"].classes_

    order = np.argsort(motivo_probas)[::-1]
    top_motivos = [
        (motivos[i], float(motivo_probas[i]))
        for i in order[:3]
    ]

    return p_churn, top_motivos

# -------------------------------------------------------
# 6. Exemplo de uso (ajuste os valores conforme suas colunas)
# -------------------------------------------------------

exemplo = {
    # binárias 0/1
    "SeniorCitizen": 0,
    "Partner": 1,
    "Dependents": 0,
    "PhoneService": 1,
    "MultipleLines": 0,
    "OnlineSecurity": 0,
    "OnlineBackup": 0,
    "DeviceProtection": 0,
    "TechSupport": 0,
    "StreamingTV": 1,
    "StreamingMovies": 1,
    "PaperlessBilling": 1,
    "gender_Male": 1,
    "InternetService_Fiber_optic": 1,
    "InternetService_No": 0,
    "Contract_One_year": 0,
    "Contract_Two_year": 0,
    "PaymentMethod_Credit_card_automatic": 0,
    "PaymentMethod_Electronic_check": 1,
    "PaymentMethod_Mailed_check": 0,
    # numéricas
    "tenure": 12,
    "MonthlyCharges": 95.0,
    "TotalCharges": 1100.0,
    "MonthlyIncome": 4000,
}

p_churn, top_motivos = prever_churn_e_motivos(exemplo)

print("\nProbabilidade de churn:", round(p_churn, 3))
print("Top motivos prováveis (condicionais a churn):")
for motivo, p in top_motivos:
    print(f"  - {motivo}: {p:.3f}")