In [0]:
# Substitua pelos nomes reais do seu catálogo, schema e tabela
CATALOGO = "workspace"
SCHEMA = "churn_zero"
TABELA = "inference_silver"
TABELA_GOLD = "inference_gold"

# Carregar a tabela do Unity Catalog
df = spark.table(f"{CATALOGO}.{SCHEMA}.{TABELA}")

print(f"DataFrame carregado com {df.count()} linhas.")
df.printSchema()

In [0]:
colunas_quali = ['gender', 'SeniorCitizen', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'sentiment_label','feedback_topic','sentiment_score']
colunas_quant = ['tenure', 'MonthlyCharges', 'TotalCharges', 'MonthlyIncome']
distinct_values = {}
for coluna in colunas_quali:
    distinct_values[coluna] = [row[0] for row in df.select(coluna).distinct().collect()]
display(distinct_values)

In [0]:
from pyspark.sql.functions import col, when, lower

colunas_binarias = [
    'Partner', 'Dependents', 'OnlineSecurity', 'OnlineBackup', 
    'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies',
    'PaperlessBilling', 'MultipleLines', "PhoneService"
]
df_binario = df
for c in colunas_binarias:
    df_binario = df_binario.withColumn(
        c,
        when(lower(col(c)) == 'yes', '1')
        .when(lower(col(c)) == 'no', '0')
    )
df_binario = df_binario.withColumn('gender', when(col('gender') == 'Male', 1).when(col('gender') == 'Female', 0))

In [0]:

df_binnig = df_binario
df_binnig = df_binnig.withColumn(
    "tenure",
    when(col("tenure") <= 5, "1_muito_novo_muitoAltoRisco") 
    .when((col("tenure") > 5) & (col("tenure") <= 17), "2_novo_riscoAlto") 
    .when((col("tenure") > 17) & (col("tenure") <= 43), "3_estável_riscoMedio")  
    .otherwise("4_Leal_BaixoRisco") 
)
display(df_binnig)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile

COLUNA_ALVO = "MonthlyIncome"
NUM_FAIXAS = 4
NOVA_COLUNA = "monthlyIncome_segment"

windowSpec = Window.orderBy(col(COLUNA_ALVO))
df_binnig = df_binnig.withColumn(
    NOVA_COLUNA,
    ntile(NUM_FAIXAS).over(windowSpec)
)

display(
    df_binnig.groupBy(NOVA_COLUNA).count().orderBy(NOVA_COLUNA)
)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile

COLUNA_ALVO = "MonthlyCharges"
NUM_FAIXAS = 4
NOVA_COLUNA = "monthlyCharges_segment"

windowSpec = Window.orderBy(col(COLUNA_ALVO))
df_binnig = df_binnig.withColumn(
    NOVA_COLUNA,
    ntile(NUM_FAIXAS).over(windowSpec)
)

display(
    df_binnig.groupBy(NOVA_COLUNA).count().orderBy(NOVA_COLUNA)
)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile

COLUNA_ALVO = "TotalCharges"
NUM_FAIXAS = 4
NOVA_COLUNA = "totalCharges_segment"

windowSpec = Window.orderBy(col(COLUNA_ALVO))
df_binnig = df_binnig.withColumn(
    NOVA_COLUNA,
    ntile(NUM_FAIXAS).over(windowSpec)
)

display(
    df_binnig.groupBy(NOVA_COLUNA).count().orderBy(NOVA_COLUNA)
)

In [0]:
import pandas as pd

# 1. Converter o DataFrame Spark para Pandas
# ATENÇÃO: Se o DataFrame for muito grande, esta linha falhará.
pandas_df = df_binnig.toPandas()

# 2. Aplicar o get_dummies do Pandas
colunas_ohe = [
    "InternetService", 
    "Contract", 
    "PaymentMethod",
    "feedback_topic",
    "tenure",
    "monthlyIncome_segment",
    "monthlyCharges_segment",
    "totalCharges_segment"
]

pandas_df_ohe = pd.get_dummies(pandas_df, columns=colunas_ohe)

# 3. (Opcional) Converter de volta para Spark (se precisar continuar no ambiente Spark)
df_OHE = spark.createDataFrame(pandas_df_ohe)

print("One-Hot Encoding via Pandas concluído.")
df_OHE.printSchema()

In [0]:
from pyspark.sql.functions import col

int_columns = ["gender","SeniorCitizen", "Partner", "Dependents", "PhoneService", "MultipleLines", 
               "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", 
               "StreamingTV", "StreamingMovies", "PaperlessBilling", "sentiment_score"]

# 1. Inicialize a variável final com o DataFrame original
df_processed = df_OHE 

# 2. Itere e REATRIBUA o resultado a df_processed em cada passo
for c in int_columns:
  if c in df_processed.columns: # Verifique a existência na variável que está sendo transformada
    # Aplica o cast e SALVA DE VOLTA no df_processed para o próximo passo
    df_processed = df_processed.withColumn(c, col(c).cast("int")) 

# 3. Use o resultado final
df_final = df_processed # Opcional: renomear para df_final

print("Schema após o cast corrigido:")
df_final.printSchema()

In [0]:
colunas_para_dropar = [ 
    "TotalCharges",
    "MonthlyIncome",
    "CustomerFeedback",
    "sentiment_label"
]


df_final = df_final.drop(*colunas_para_dropar)

print("Colunas removidas com sucesso!")
print("\nNovo Schema do DataFrame (apenas as primeiras colunas):")
display(df_final)

In [0]:
from pyspark.sql.functions import col
import re

# Lista de caracteres inválidos que você deve remover/substituir
# ( ) , ; { } \n \t = e espaços
CARACTERES_INVALIDOS = r'[ ,;{}\n\t=()]'
CARACTERE_SUBSTITUTO = '_'

# 1. Obter todos os nomes de colunas atuais
colunas_atuais = df_final.columns

# 2. Iterar sobre as colunas e renomear
for nome_antigo in colunas_atuais:
    # 2.1. Substituir caracteres inválidos por underscore
    nome_novo = re.sub(CARACTERES_INVALIDOS, CARACTERE_SUBSTITUTO, nome_antigo)
    
    # 2.2. Remover underscores duplicados ou no início/fim
    nome_novo = nome_novo.strip(CARACTERE_SUBSTITUTO) # Remove _ no início/fim
    nome_novo = re.sub(r'_{2,}', CARACTERE_SUBSTITUTO, nome_novo) # Remove múltiplos __

    # 3. Renomear a coluna no DataFrame (se o nome foi alterado)
    if nome_antigo != nome_novo:
        # Renomeia a coluna no DataFrame
        df_final = df_final.withColumnRenamed(nome_antigo, nome_novo)
        
        # Opcional: imprimir a alteração para monitoramento
        print(f"Renomeado: '{nome_antigo}' -> '{nome_novo}'")

print("\nNomes das colunas limpos e prontos para o Unity Catalog.")
df_final.printSchema()

In [0]:
df_final.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{TABELA_GOLD}")

print(f"\nTabela '{TABELA_GOLD}' salva no Unity Catalog e pronta para o treinamento do modelo!")