# Tratando dados para ML

In [0]:
from pyspark.sql.functions import when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
import pyspark.pandas as ps
import re

import pandas as pd
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

In [0]:

print(f'Versão Spark: {spark.version}')

In [0]:
%sql

SELECT * 
FROM telecom.silver.teleco_cleaned
LIMIT 10;

In [0]:
df = spark.table('telecom.silver.teleco_cleaned')

In [0]:
df = df.drop('customerID')
 # recomendação do spark ml
df = df.withColumn('label', when(df.Churn == 'Yes', 1).otherwise(0))

In [0]:
df.printSchema()

## Indexação e Codificação

In [0]:
# aparentemente databricks free edition limita o uso de algumas funções como StringIndexer, OneHotEncoder, VectorAssembler. Então usarei função do pandas.

# binary_transformers = [
#     StringIndexer(inputCol=col, outputCol=col + "_idx") 
#     for col in binary_cols
# ]

In [0]:
pdf = df.toPandas() #spark para dataframe pandas

In [0]:
pdf.head()

In [0]:
pdf.describe()

In [0]:
len(pdf.columns)

In [0]:
cat_cols = ['gender','MultipleLines', 'InternetService', 'OnlineSecurity', 
            'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 
            'StreamingMovies', 'Contract', 'PaymentMethod']

# Label Encoding
label_encoders = {}
for col in cat_cols:
    le = LabelEncoder()
    pdf[col+'_idx'] = le.fit_transform(pdf[col])
    label_encoders[col] = le  # Guardar os encoders para uso futuro

In [0]:
# Colunas categóricas binárias (Yes/No)
binary_cols = ['SeniorCitizen', 'Partner', 'Dependents', 'PhoneService', 'PaperlessBilling','Churn']
for col in binary_cols:
    pdf[col] = pdf[col].map({'Yes': 1, 'No': 0})

In [0]:
pdf

In [0]:
# One-Hot Encoding
pdf = pd.get_dummies(pdf, columns=cat_cols, prefix=cat_cols).reset_index(drop=True)

In [0]:
pdf.head()

In [0]:
pdf.where(pdf.isnull() == True).any()

In [0]:
pdf.where(pdf.isna() == True).any()

In [0]:
# pdf.columns = [col.replace(" ,;{}()\n\t=", "_") for col in pdf.columns]

In [0]:
pdf.columns = [
    re.sub(r'[ ,;{}()\n\t=/\\|:]+', '_', col).strip('_')
    for col in pdf.columns
]

In [0]:
pdf.info()

In [0]:
pdf.head(3)

In [0]:
# Spark apresenta problemas com variáveis do tipo uint8, então precisamos converte-las
pdf = pdf.astype({col: 'int16' for col in pdf.select_dtypes(include=['uint8']).columns})

## Salvando o trabalho

In [0]:
df_spark = spark.createDataFrame(pdf)

In [0]:
df_spark.write.format("delta").mode("overwrite").save("/Volumes/telecom/gold/teleco_ml_ready")


In [0]:
df_spark.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("telecom.gold.teleco_ml_ready")