In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

table_path = "workspace.breast_cancer.breast_cancer"

In [0]:
df_raw = spark.read.table(table_path)

In [0]:
display(df_raw)
df_raw.printSchema()

In [0]:
%python
df_clean = df_raw.drop("id") \
    .withColumn(
        "label",
        F.when(F.col("diagnosis") == "M", 1).otherwise(0)
    ) \
    .drop("diagnosis")

In [0]:
display(df_clean)
df_clean.printSchema()

In [0]:

expressoes_agregacao = [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_clean.columns]

df_clean.select(expressoes_agregacao).show()

N√£o h√° valores nulos no dataframe, podemos dar continuidade ao estudo

In [0]:
cols_to_compare = ["radius_mean", "area_mean", "concavity_mean", "label"]
df_clean.select(cols_to_compare).summary("min", "max", "mean", "stddev").show()

In [0]:
df_grouped = df_clean.groupBy("label").count()
display(df_grouped)

Databricks visualization. Run in Databricks to view.

"Conforme a teoria de SVM, identifiquei disparidade de escalas (ex: area_mean vs smoothness_mean). Isso torna o passo de Standard Scaling obrigat√≥rio para evitar o vi√©s do modelo."

In [0]:
#separa√ß√£o das colunas num√©ricas para o pipeline
feature_cols = [c for c in df_clean.columns if c != 'label']
print(feature_cols)

In [0]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw"
)
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)
pipeline_preparacao = Pipeline(stages=[assembler, scaler])

modelo_preparacao = pipeline_preparacao.fit(df_clean)

df_final = modelo_preparacao.transform(df_clean)

display(
    df_final.select(
        "features_raw",
        "features",
        "label"
    ).limit(5)
)

In [0]:
# Divis√£o dos dados em 70% Treino e 30% Teste
train_data, test_data = df_final.randomSplit([0.7, 0.3], seed=42)

print(f"Registros para Treinamento: {train_data.count()}")
print(f"Registros para Teste (Prova Real): {test_data.count()}")

In [0]:
#  TREINAMENTO DO MODELO SVM
from pyspark.ml.classification import LinearSVC

svm = LinearSVC(
    featuresCol="features",
    labelCol="label",        
    maxIter=100, 
    regParam=0.1
)

svm_model = svm.fit(train_data)

print(f"Intercepto (b): {svm_model.intercept}")
print(f"N√∫mero de pesos (w) aprendidos: {len(svm_model.coefficients)}")


predictions = svm_model.transform(test_data)

# Visualizando o Resultado
print("--- Resultados nos Dados de Teste ---")
display(predictions.select("label", "rawPrediction", "prediction"))

In [0]:
#  AVALIA√á√ÉO DO MODELO ---
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col


evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="f1"
)

accuracy = evaluator_acc.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)

print(f"üéØ Acur√°cia Global SVC: {accuracy:.2%}")
print(f"‚öñÔ∏è  F1-Score (Equil√≠brio) SVC: {f1_score:.2%}")

# Matriz de Confus√£o 
print("\n--- Matriz de Confus√£o ---")
confusion_matrix = predictions.groupBy("label", "prediction").count() \
                              .orderBy("label", "prediction")

display(confusion_matrix)

from pyspark.sql import functions as F


#Calculando Recall e Precision para a classe 'Maligno' (1)
TP = predictions.filter((F.col("label") == 1) & (F.col("prediction") == 1)).count()
FN = predictions.filter((F.col("label") == 1) & (F.col("prediction") == 0)).count()
FP = predictions.filter((F.col("label") == 0) & (F.col("prediction") == 1)).count()

recall_maligno = TP / (TP + FN) if (TP + FN) > 0 else 0
precision_maligno = TP / (TP + FP) if (TP + FP) > 0 else 0

print("\n" + "="*40)
print("üìä DETALHAMENTO DA CLASSE 'MALIGNO' (1.0)")
print("="*40)
print(f"üö® Recall (Sensibilidade) SVC: {recall_maligno:.2%}")
print(f"üéØ Precision (Precis√£o) SVC :   {precision_maligno:.2%} ")
print("-" * 40)

In [0]:
# Drop 'features_raw' from train_data and test_data if it exists para recalcular aqui
if 'features_raw' in train_data.columns:
    train_data = train_data.drop('features_raw')
if 'features_raw' in test_data.columns:
    test_data = test_data.drop('features_raw')

from pyspark.ml.feature import PolynomialExpansion

#CRIANDO O MODELO COM EXPANS√ÉO POLINOMIAL
poly_expansion = PolynomialExpansion(
    inputCol="features_raw", 
    outputCol="poly_features", 
    degree=2
)

# ATUALIZANDO O SCALER
scaler_poly = StandardScaler(
    inputCol="poly_features", 
    outputCol="features_scaled",
    withStd=True, 
    withMean=True
)

# PIPELINE N√ÉO-LINEAR
pipeline_nlinear = Pipeline(stages=[
    assembler,       
    poly_expansion,  
    scaler_poly,    
    svm           
])

# Precisamos atualizar a coluna de features do SVM para ler "features_scaled"
svm.setFeaturesCol("features_scaled")

# TREINAMENTO E AVALIA√á√ÉO COMPARATIVA
print("‚è≥ Treinando SVM com Expans√£o Polinomial (Grau 2)...")
model_nlinear = pipeline_nlinear.fit(train_data)
predictions_nlinear = model_nlinear.transform(test_data)

# Avalia√ß√£o
acc_nlinear = evaluator_acc.evaluate(predictions_nlinear)
f1_nlinear = evaluator_f1.evaluate(predictions_nlinear)

# C√°lculo do recall para a classe 'Maligno' (1)
TP_nlinear = predictions_nlinear.filter((F.col("label") == 1) & (F.col("prediction") == 1)).count()
FN_nlinear = predictions_nlinear.filter((F.col("label") == 1) & (F.col("prediction") == 0)).count()
recall_nlinear = TP_nlinear / (TP_nlinear + FN_nlinear) if (TP_nlinear + FN_nlinear) > 0 else 0

print("\n" + "="*40)
print(f"üìä RESULTADO FINAL (COMPARATIVO)")
print("="*40)
print(f"1. SVM Simples (Linear):     {accuracy:.2%}")
print(f"2. SVM Polinomial (Grau 2):  {acc_nlinear:.2%}")
print(f"‚öñÔ∏è  F1-Score (Equil√≠brio) Polinomial: {f1_nlinear:.2%}")
print(f"üö® Recall (Sensibilidade - Maligno) Polinomial: {recall_nlinear:.2%}")
print("\n--- Matriz de Confus√£o ---")
confusion_matrix_nlinear = predictions_nlinear.groupBy("label", "prediction").count() \
                              .orderBy("label", "prediction")

display(confusion_matrix_nlinear)


In [0]:
# O MODELO COM (SCIKIT-LEARN E KERNEL RBF) ---
import pandas as pd
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, recall_score

print("üîÑ Convertendo dados do Spark para Pandas (Para usar Scikit-Learn)...")
df_pandas = df_clean.toPandas()

# 1. Prepara√ß√£o ao modo scikit-learn
X = df_pandas.drop("label", axis=1)
y = df_pandas["label"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

scaler_sklearn = StandardScaler()
X_train_scaled = scaler_sklearn.fit_transform(X_train)
X_test_scaled = scaler_sklearn.transform(X_test)

# Treinando o RBF 
svm_rbf = SVC(kernel="rbf", C=1.0, gamma="scale", random_state=42)
svm_rbf.fit(X_train_scaled, y_train)

# Avalia√ß√£o
y_pred_rbf = svm_rbf.predict(X_test_scaled)
acc_rbf = accuracy_score(y_test, y_pred_rbf)
f1_rbf = f1_score(y_test, y_pred_rbf)
recall_rbf = recall_score(y_test, y_pred_rbf)

print("\n" + "="*40)
print(f"üèÜ PLACAR FINAL DOS MODELOS")
print("="*40)
print(f"1. Spark Linear:        95.10%")
print(f"2. Spark Polinomial:    97.90%")
print(f"3. Scikit Kernel RBF:   {acc_rbf:.2%}")
print(f"‚öñÔ∏è  F1-Score Kernel RBF:           {f1_rbf:.2%}")
print(f"üö® Recall (Maligno) RBF:    {recall_rbf:.2%}")
print("-" * 40)

