In [1]:
import os, sys, time, socket
from pathlib import Path


import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, types as T
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
# =============================
# Crear Sesión Spark
# =============================
# --- Entorno base (mismo Python del venv para driver/worker) ---
this_python = sys.executable
os.environ["PYSPARK_PYTHON"] = this_python
os.environ["PYSPARK_DRIVER_PYTHON"] = this_python
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

spark_tmp = r"C:\spark-tmp"
Path(spark_tmp).mkdir(parents=True, exist_ok=True)

# --- Variables críticas de Windows que a veces NO llegan al executor desde Notebook ---
win_keys = [
    "SystemRoot", "ComSpec", "PATHEXT", "WINDIR", "USERPROFILE",
    "HOMEDRIVE", "HOMEPATH", "TEMP", "TMP", "NUMBER_OF_PROCESSORS", "PROCESSOR_ARCHITECTURE"
]
executor_env = {k: os.environ[k] for k in win_keys if k in os.environ}

# Para depurar: también pasamos PATH y PYTHONPATH actuales del kernel
executor_env["PATH"] = os.environ.get("PATH", "")
executor_env["PYTHONPATH"] = os.pathsep.join(sys.path)
executor_env["PYSPARK_PYTHON"] = this_python
executor_env["PYSPARK_DRIVER_PYTHON"] = this_python
executor_env["TEMP"] = spark_tmp
executor_env["TMP"] = spark_tmp

builder = (
    SparkSession.builder
    .appName("NotebookSafe-EnvFix")
    .master("local[1]")  # primero 1 worker; luego podrás subir a local[*]
    .config("spark.driver.bindAddress","127.0.0.1")
    .config("spark.driver.host","127.0.0.1")
    .config("spark.local.dir", spark_tmp)
    .config("spark.sql.execution.arrow.pyspark.enabled","false")
    .config("spark.python.worker.reuse","false")
    .config("spark.port.maxRetries","64")
    .config("spark.pyspark.python", this_python)
    .config("spark.pyspark.driver.python", this_python)
    .config("spark.driver.extraJavaOptions","-Djava.net.preferIPv4Stack=true")
    .config("spark.executor.extraJavaOptions","-Djava.net.preferIPv4Stack=true")
)

# Inyecta todas las env vars al EXECUTOR
for k, v in executor_env.items():
    builder = builder.config(f"spark.executorEnv.{k}", v)

spark = builder.getOrCreate()

print("Spark:", spark.version)
print("Driver Python:", sys.executable)
print("Cuenta JVM:", spark.range(10).count())

def pyver_in_worker(it):
    import sys as _sys
    yield "Worker Python: " + _sys.executable

print(spark.sparkContext.parallelize([0],1).mapPartitions(pyver_in_worker).first())

Spark: 3.5.4
Driver Python: d:\magali\ciencia de datos UCI\Cuarto\big data aplicada\IC\Y4AN1_Big_Data_Aplicac\venv_bda\Scripts\python.exe
Cuenta JVM: 10
Worker Python: d:\magali\ciencia de datos UCI\Cuarto\big data aplicada\IC\Y4AN1_Big_Data_Aplicac\venv_bda\Scripts\python.exe


In [3]:
import subprocess, sys
subprocess.check_call([sys.executable, "-c", "print('subprocess OK')"])

0

In [4]:
# ==================
# Data sintética
# ==================

# Etiqueta: 1 = maligno, 0 = benigno
#
# Generamos dos distribuciones con separabilidad moderada y ruido:
# - Benignos: medias menores, varianzas más compactas
# - Malignos: medias mayores en varios rasgos, +correlación aproximada

def make_synthetic_breast_cancer(n=1200, seed=42):
    rng = np.random.default_rng(seed)

    features = [
        "mean_radius", "mean_texture", "mean_perimeter", "mean_area",
        "smoothness", "compactness", "concavity", "symmetry", "fractal_dim",
        "mean_density", "cell_size_var", "nuclei_clump", "mitoses"
    ]

    # ~40% malignos
    n_mal = int(n * 0.40)
    n_ben = n - n_mal

    # Benignos
    mu_b = [12.5, 16.0, 80.0, 500.0, 0.090, 0.070, 0.040, 0.18, 0.055, 0.85, 1.2, 2.0, 1.0]
    sd_b = [ 1.5,  2.5, 12.0,  90.0, 0.012, 0.020, 0.015, 0.03,  0.006, 0.08, 0.3, 0.7, 0.6]

    # Malignos
    mu_m = [17.0, 22.5, 110.0, 950.0, 0.105, 0.120, 0.090, 0.21, 0.062, 1.10, 2.0, 3.5, 1.8]
    sd_m = [ 2.0,  3.0,  16.0, 150.0, 0.014, 0.030, 0.020, 0.04,  0.007, 0.10, 0.4, 0.9, 0.7]

    def gen_class(n_rows, mu, sd):
        X = np.column_stack([rng.normal(loc=mu[i], scale=sd[i], size=n_rows) for i in range(len(mu))])
        X = np.maximum(X, 0)  # sin negativos
        # Dependencias suaves realistas
        X[:, 2] = np.maximum(X[:, 2], X[:, 0] * 5 + rng.normal(0, 5, size=n_rows))        # perimeter ~ 5*radius
        X[:, 3] = np.maximum(X[:, 3], (X[:, 0] ** 2) * 3 + rng.normal(0, 80, size=n_rows))# area ~ 3*radius^2
        return X

    X_b = gen_class(n_ben, mu_b, sd_b)
    X_m = gen_class(n_mal, mu_m, sd_m)

    y_b = np.zeros((n_ben, 1), dtype=int)
    y_m = np.ones((n_mal, 1), dtype=int)

    X = np.vstack([X_b, X_m])
    y = np.vstack([y_b, y_m]).ravel()

    # Mezclar
    idx = rng.permutation(len(y))
    X = X[idx]; y = y[idx]

    pdf = pd.DataFrame(X, columns=features)
    pdf["label"] = y
    return pdf

pdf = make_synthetic_breast_cancer(n=1500, seed=123)
print("Filas generadas:", len(pdf))

Filas generadas: 1500


In [5]:
pdf

Unnamed: 0,mean_radius,mean_texture,mean_perimeter,mean_area,smoothness,compactness,concavity,symmetry,fractal_dim,mean_density,cell_size_var,nuclei_clump,mitoses,label
0,12.016416,19.087914,50.186307,459.903624,0.098730,0.075753,0.048606,0.226450,0.053804,0.838194,1.232199,3.121839,1.548933,0
1,19.109970,21.233005,96.137164,1141.179806,0.093981,0.156071,0.107445,0.222217,0.065182,1.300904,2.458677,2.973516,0.729266,1
2,12.726332,19.088364,85.379364,530.028582,0.085500,0.061747,0.054609,0.157271,0.047185,0.852006,1.007864,2.225548,0.245449,0
3,10.526049,17.183792,92.971968,535.146137,0.090336,0.074980,0.057574,0.226720,0.052940,0.925060,1.631703,1.623197,0.749866,0
4,15.147575,22.799536,97.584910,780.532284,0.121393,0.106319,0.064491,0.237935,0.069361,1.049501,1.412589,3.481713,1.901984,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1495,16.109235,26.004893,125.946846,1121.300327,0.120899,0.135541,0.118529,0.178104,0.069398,1.135080,1.468590,1.857710,2.304726,1
1496,10.736741,19.211483,86.571062,533.342556,0.078971,0.074400,0.063280,0.248974,0.048668,0.850664,0.908038,2.122866,0.304082,0
1497,12.881699,14.673539,83.195971,547.067012,0.100714,0.072085,0.053395,0.172453,0.056241,0.823973,1.731577,1.854618,0.236313,0
1498,14.128698,25.919282,99.000860,780.386601,0.118191,0.103462,0.109456,0.155103,0.058645,0.944172,1.679150,3.079903,1.098214,1


In [6]:
sdf = spark.createDataFrame(pdf)

print("Conteo en Spark:", sdf.count())
sdf.printSchema()
                  

Conteo en Spark: 1500
root
 |-- mean_radius: double (nullable = true)
 |-- mean_texture: double (nullable = true)
 |-- mean_perimeter: double (nullable = true)
 |-- mean_area: double (nullable = true)
 |-- smoothness: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- concavity: double (nullable = true)
 |-- symmetry: double (nullable = true)
 |-- fractal_dim: double (nullable = true)
 |-- mean_density: double (nullable = true)
 |-- cell_size_var: double (nullable = true)
 |-- nuclei_clump: double (nullable = true)
 |-- mitoses: double (nullable = true)
 |-- label: long (nullable = true)



In [7]:
train_sdf, test_sdf = sdf.randomSplit([0.8, 0.2], seed=2025)
print("Train:", train_sdf.count(), "Test:", test_sdf.count())

Train: 1189 Test: 311


In [8]:
# =============================
# 5) Pipeline de ML
# =============================
feature_cols = [c for c in sdf.columns if c != "label"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)
lr = LogisticRegression(featuresCol="features", labelCol="label",
                        predictionCol="prediction", probabilityCol="probability")
pipe = Pipeline(stages=[assembler, scaler, lr])


In [12]:
# =============================
# 6) Cross-Validation (k=5)
# =============================

param_grid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.0, 0.01, 0.05, 0.1])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build()
)
evaluator = BinaryClassificationEvaluator(labelCol="label",
                                          rawPredictionCol="rawPrediction",
                                          metricName="areaUnderROC")

cv = CrossValidator(
    estimator=pipe,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5,
    parallelism=1,   # 0 = auto; cuando todo esté estable puedes subirlo
    seed=2025
)

# Entrenamiento
cv_model = cv.fit(train_sdf)

# Mejor modelo
best_model = cv_model.bestModel
from pyspark.ml.classification import LogisticRegressionModel

bm_lr = [st for st in best_model.stages if isinstance(st, LogisticRegressionModel)][0]

print("CV completada. Mejor LR -> regParam:", bm_lr.getRegParam(),
      "| elasticNetParam:", bm_lr.getElasticNetParam())

cv_avg_auc = float(max(cv_model.avgMetrics))
print("AUC promedio (CV):", round(cv_avg_auc, 4))

CV completada. Mejor LR -> regParam: 0.0 | elasticNetParam: 0.0
AUC promedio (CV): 1.0


In [13]:
# =============================
# 7) Evaluación en TEST
# =============================
pred_test = best_model.transform(test_sdf).cache()
auc_test = float(evaluator.evaluate(pred_test))
print("AUC (test):", round(auc_test, 4))

pred_rdd = pred_test.select("prediction", "label").rdd.map(lambda r: (float(r[0]), float(r[1])))
metrics = MulticlassMetrics(pred_rdd)

accuracy     = float(metrics.accuracy)
precision_1  = float(metrics.precision(1.0))
recall_1     = float(metrics.recall(1.0))
f1_1         = float(metrics.fMeasure(1.0))
cm = metrics.confusionMatrix().toArray()
tn, fp, fn, tp = int(cm[0,0]), int(cm[0,1]), int(cm[1,0]), int(cm[1,1])

print("Accuracy:", round(accuracy, 4))
print("Precision(1):", round(precision_1, 4))
print("Recall(1):", round(recall_1, 4))
print("F1(1):", round(f1_1, 4))
print("Confusion [[TN,FP],[FN,TP]]:\n", cm)

AUC (test): 1.0




Accuracy: 1.0
Precision(1): 1.0
Recall(1): 1.0
F1(1): 1.0
Confusion [[TN,FP],[FN,TP]]:
 [[183.   0.]
 [  0. 128.]]


In [15]:
# =============================
# 8) Exportar a Excel (misma carpeta)
# =============================
out_dir = Path(__file__).parent if "__file__" in globals() else Path.cwd()
ts = time.strftime("%Y%m%d_%H%M%S")
out_path = out_dir / f"bcancer_sintetico_resultados_{ts}.xlsx"

pdf_all  = sdf.toPandas()
pdf_pred = pred_test.select(*feature_cols, "label", "prediction").toPandas()

df_cv = pd.DataFrame([{"metric": "AUC_CV_avg", "value": cv_avg_auc}])
df_test_metrics = pd.DataFrame([{
    "AUC_test": auc_test,
    "Accuracy": accuracy,
    "Precision_pos": precision_1,
    "Recall_pos": recall_1,
    "F1_pos": f1_1
}])
df_conf = pd.DataFrame([[tn, fp], [fn, tp]],
                       columns=["Pred_0", "Pred_1"],
                       index=["Real_0", "Real_1"])

with pd.ExcelWriter(out_path, engine="openpyxl") as w:
    pdf_all.to_excel(w, index=False, sheet_name="data_sintetica")
    pdf_pred.to_excel(w, index=False, sheet_name="pred_test")
    df_cv.to_excel(w, index=False, sheet_name="cv_metrics")
    df_test_metrics.to_excel(w, index=False, sheet_name="best_model_metrics")
    df_conf.to_excel(w, sheet_name="confusion_matrix")

print(f"Excel guardado en: {out_path.resolve()}")

print("Proceso completo OK.")
# spark.stop()  # descomenta si quieres cerrar Spark explícitamente

Excel guardado en: D:\magali\ciencia de datos UCI\Cuarto\big data aplicada\IC\Y4AN1_Big_Data_Aplicac\Semana5\bcancer_sintetico_resultados_20250822_073724.xlsx
Proceso completo OK.
