## Importação das bibliotecas

In [0]:
%python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import warnings

## Carregando os dados

In [0]:
%python

warnings.filterwarnings("ignore")


# Carregando os dados

table_name = "data_lake_turing_.prata.dados_usuarios_forms" 

df = spark.read.table(table_name).toPandas()
print("Shape:", df.shape)
print("Colunas:", df.columns.tolist())
print("\nAmostra:")
print(df.head())


## Definindo Target e selecionando as features

In [0]:
%python
# Vamos prever Renda_mensal 

#Definição da target
target = "Renda_mensal"
if target not in df.columns:
    raise ValueError(f"Coluna alvo '{target}' não encontrada.")

# Seleção de features 
features = [
    "Zona", "Vulnerabilidade", "Empregado", "Tipo_emprego",
    "Possui_computador", "Velocidade_internet", "Dispositivo_principal",
    "Nivel_digital", "Autonomia_digital", "Conhecimento_tecnico",
    "Pessoas_na_residencia"
]


features = [f for f in features if f in df.columns]
print("\nFeatures usadas:", features)

## Limpeza e Tratamento Inicial

In [0]:
%python

data = df[features + [target]].copy()

# Tratar valores ausentes: estratégia simples (remoção)
# (pode-se optar por imputação mais tarde)
initial_len = len(data)
data = data.dropna(subset=[target])  # garante alvo presente
data = data.fillna({
    # preencher campos categóricos com 'Desconhecido' quando aplicável
    c: ("Desconhecido" if data[c].dtype == object else data[c].median())
    for c in features
})
print(f"\nRegistros antes: {initial_len}, depois do tratamento: {len(data)}")

## Transformações

In [0]:
%python

# Transformações:
# Converte respostas "Sim"/"Não" para binário quando presentes
def binarize_yesno(x):
    if isinstance(x, str):
        x_low = x.strip().lower()
        if x_low in ["sim", "s", "yes", "y", "1", "true"]:
            return 1
        if x_low in ["não", "nao", "n", "no", "0", "false"]:
            return 0
    return x

for col in features:
    if data[col].dtype == object:
        # aplicar binarização para colunas com 'sim'/'não'
        sample = data[col].dropna().astype(str).str.lower().unique()
        if set(sample).intersection({"sim", "não", "nao", "s", "n", "yes", "no"}):
            data[col] = data[col].apply(binarize_yesno)

# Exibir tipos após ajustes
print("\nTipos das features após limpeza:")
print(data[features].dtypes)

## One-hot encoding

In [0]:
%python

# Identificar colunas categóricas e numéricas
cat_cols = [c for c in features if data[c].dtype == object]
num_cols = [c for c in features if c not in cat_cols]

print("\nColunas categóricas:", cat_cols)
print("Colunas numéricas:", num_cols)

# Construir ColumnTransformer: OneHotEncoder para categóricas, passthrough para numéricas
preprocessor = ColumnTransformer(
    transformers=[
        (
            "cat",
            OneHotEncoder(
                handle_unknown="ignore",
                sparse_output=False
            ),
            cat_cols
        ),
    ],
    remainder="passthrough"
)

# Pipeline com LinearRegression
pipeline_lr = Pipeline([
    ("preproc", preprocessor),
    ("model", LinearRegression())
])

# Pipeline com Ridge (regularizado) para comparação
pipeline_ridge = Pipeline([
    ("preproc", preprocessor),
    ("model", Ridge(alpha=1.0))
])

## Treino/Teste

In [0]:
%python

X = data[features]
y = data[target].astype(float)  # garante float

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

print(f"\nTreino: {X_train.shape}, Teste: {X_test.shape}")

## Treino do modelo

In [0]:
%python
pipeline_lr.fit(X_train, y_train)

pipeline_ridge.fit(X_train, y_train)

## Previsões e Métricas

In [0]:
%python
import numpy as np

def evaluate(
    model,
    X_tr,
    y_tr,
    X_te,
    y_te,
    name="Model"
):
    preds_tr = model.predict(X_tr)
    preds_te = model.predict(X_te)

    results = {
        "MAE_train": mean_absolute_error(y_tr, preds_tr),
        "MSE_train": mean_squared_error(y_tr, preds_tr),
        "RMSE_train": np.sqrt(mean_squared_error(y_tr, preds_tr)),
        "R2_train": r2_score(y_tr, preds_tr),
        "MAE_test": mean_absolute_error(y_te, preds_te),
        "MSE_test": mean_squared_error(y_te, preds_te),
        "RMSE_test": np.sqrt(mean_squared_error(y_te, preds_te)),
        "R2_test": r2_score(y_te, preds_te),
    }
    print(f"\n=== Avaliação: {name} ===")
    for k, v in results.items():
        print(f"{k}: {v:.4f}")
    return results

res_lr = evaluate(
    pipeline_lr,
    X_train,
    y_train,
    X_test,
    y_test,
    name="LinearRegression"
)
res_ridge = evaluate(
    pipeline_ridge,
    X_train,
    y_train,
    X_test,
    y_test,
    name="Ridge"
)

## Coeficientes

In [0]:
%python

encoder = pipeline_lr.named_steps["preproc"].named_transformers_["cat"]
if encoder is not None:
    cat_feature_names = list(encoder.get_feature_names_out(cat_cols))
else:
    cat_feature_names = []
all_feature_names = cat_feature_names + num_cols

coefs = pipeline_lr.named_steps["model"].coef_
coef_df = pd.DataFrame({"feature": all_feature_names, "coef": coefs})
coef_df = coef_df.sort_values(
    by="coef",
    key=lambda x: x.abs(),
    ascending=False
)
print("\nCoeficientes (top 20):")
print(coef_df.head(20))

## Escolha do Modelo

A Regressão Linear foi escolhida porque o projeto busca quantificar desigualdade digital e prever indicadores de vulnerabilidade com base em variáveis socioeconômicas e de acesso à tecnologia.
Como o problema envolve estimar valores contínuos — como renda mensal, nível de autonomia digital, ou probabilidade estimada de vulnerabilidade tecnológica a regressão linear é o modelo que atende as necessidades do projeto.