In [1]:
# PySpark end-to-end: EDA ‚Üí preprocessing ‚Üí CV/tuning (MLlib) ‚Üí pseudo-AutoML ‚Üí ewaluacja ‚Üí zapis modelu
# Autor: Ty (i trochƒô ja üòâ)
# Wymagania: pyspark>=3.3 (dzia≈Ça na 3.5.x). Brak zewnƒôtrznych paczek.
# Uwaga: Spark MLlib nie korzysta z GPU bez ekosystemu RAPIDS; tu wszystko leci na CPU (OK dla projektu).

import os
import numpy as np
import pandas as pd
import sys
from pyspark.sql import SparkSession, functions as F, types as T, Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from sklearn.datasets import load_breast_cancer

In [2]:
# # =========================================================
# # 1) Start Spark
# # =========================================================
# spark = (
#     SparkSession.builder
#     .appName("PySpark_MLlib_AutoML_BreastCancer")
#     .config("spark.sql.shuffle.partitions", "200")
#     .config("spark.sql.adaptive.enabled", "true")
#     .getOrCreate()
# )
# spark.sparkContext.setLogLevel("WARN")
# print("Spark:", spark.version)


In [3]:
# Ten sam python wszƒôdzie
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# Tymczasowy katalog bez polskich znak√≥w
tmpdir = r"C:\spark-tmp"
os.makedirs(tmpdir, exist_ok=True)

spark = (SparkSession.builder
    .appName("debug")
    .master("local[*]")
    .config("spark.executorEnv.PYSPARK_PYTHON", sys.executable)
    .config("spark.pyspark.driver.python", sys.executable)
    .config("spark.pyspark.python", sys.executable)
    .config("spark.local.dir", tmpdir)         # unikamy %USERPROFILE%\AppData\Local\Temp\Pawe≈Ç...
    .getOrCreate())

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 49422)
Traceback (most recent call last):
  File "C:\Program Files\Python311\Lib\socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "C:\Program Files\Python311\Lib\socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "C:\Program Files\Python311\Lib\socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "C:\Program Files\Python311\Lib\socketserver.py", line 755, in __init__
    self.handle()
  File "c:\REPO\studia\venv\Lib\site-packages\pyspark\accumulators.py", line 295, in handle
    poll(accum_updates)
  File "c:\REPO\studia\venv\Lib\site-packages\pyspark\accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "c:\REPO\studia\venv\Lib\site-packages\pyspark\

In [4]:
# =========================
# 2) Dane z pakietu (sklearn)
# =========================
sk = load_breast_cancer()
X = pd.DataFrame(sk.data, columns=[c.replace(" ", "_") for c in sk.feature_names])
y = pd.Series(sk.target, name="label").astype(float)

In [5]:
# Dodajmy 2 kategorie przez binning (≈ºeby zademonstrowaƒá Indexer/OHE)
# medianowe tercyle -> kategorie: low/med/high
def tercyle(s):
    q = s.quantile([1/3, 2/3]).values
    return pd.cut(s, bins=[-np.inf, q[0], q[1], np.inf], labels=["low","med","high"])

X["mean_radius_cat"]  = tercyle(X["mean_radius"])
X["mean_texture_cat"] = tercyle(X["mean_texture"])

pdf = pd.concat([X, y], axis=1)
raw = spark.createDataFrame(pdf)


In [6]:
# =========================
# 3) EDA (Spark)
# =========================
print("\n=== Schemat ===")
raw.printSchema()



=== Schemat ===
root
 |-- mean_radius: double (nullable = true)
 |-- mean_texture: double (nullable = true)
 |-- mean_perimeter: double (nullable = true)
 |-- mean_area: double (nullable = true)
 |-- mean_smoothness: double (nullable = true)
 |-- mean_compactness: double (nullable = true)
 |-- mean_concavity: double (nullable = true)
 |-- mean_concave_points: double (nullable = true)
 |-- mean_symmetry: double (nullable = true)
 |-- mean_fractal_dimension: double (nullable = true)
 |-- radius_error: double (nullable = true)
 |-- texture_error: double (nullable = true)
 |-- perimeter_error: double (nullable = true)
 |-- area_error: double (nullable = true)
 |-- smoothness_error: double (nullable = true)
 |-- compactness_error: double (nullable = true)
 |-- concavity_error: double (nullable = true)
 |-- concave_points_error: double (nullable = true)
 |-- symmetry_error: double (nullable = true)
 |-- fractal_dimension_error: double (nullable = true)
 |-- worst_radius: double (nullable = 

In [7]:
print("\n=== Pr√≥bka ===")
raw.show(5, truncate=True)


=== Pr√≥bka ===
+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+---------------+----------------+-----+
|mean_radius|mean_texture|mean_perimeter|mean_area|mean_smoothness|mean_compactness|mean_concavity|mean_concave_points|mean_symmetry|mean_fractal_dimension|radius_error|texture_error|perimeter_error|area_error|smoothness_error|compactness_error|concavity_error|concave_points_error|symmetry_error|fractal_dimension_error|worst_radius|worst_texture|worst_perimeter|worst_area|worst_smoothness|worst_compactness|worst_concavity|worst_concave_points|worst_symm

In [8]:
print("\n=== Rozk≈Çad label ===")
raw.groupBy("label").count().orderBy("label").show()


=== Rozk≈Çad label ===
+-----+-----+
|label|count|
+-----+-----+
|  0.0|  212|
|  1.0|  357|
+-----+-----+



In [9]:
numeric_cols = [c for (c,t) in raw.dtypes if t in ("double","float","int","bigint") and c!="label"]
categorical_cols = ["mean_radius_cat","mean_texture_cat"]

In [10]:
print("\n=== Summary (numeryczne) ===")
raw.select(numeric_cols).summary().show()


=== Summary (numeryczne) ===
+-------+------------------+------------------+-----------------+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+----------------------+------------------+------------------+------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------------+------------------+------------------+------------------+-----------------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------------+
|summary|       mean_radius|      mean_texture|   mean_perimeter|        mean_area|     mean_smoothness|    mean_compactness|     mean_concavity|mean_concave_points|      mean_symmetry|mean_fractal_dimension|      radius_error|     texture_error|   perimeter_error|      area_error|    smoothness_error|   compactness_error|     concavity_error|concave_point

In [11]:
print("\n=== Korelacje (kilka) ===")
pairs = [("mean_radius","mean_perimeter"),
         ("mean_area","worst_area"),
         ("mean_texture","mean_smoothness")]
for a,b in pairs:
    print(f"corr({a},{b}) = {raw.stat.corr(a,b):.4f}")


=== Korelacje (kilka) ===
corr(mean_radius,mean_perimeter) = 0.9979
corr(mean_area,worst_area) = 0.9592
corr(mean_texture,mean_smoothness) = -0.0234


In [12]:
# Window example: udzia≈Ç klasy 1 po kategorii
win = Window.partitionBy("mean_radius_cat")
eda_win = (raw
    .withColumn("cnt", F.lit(1))
    .withColumn("sum1", F.sum(F.col("label")).over(win))
    .withColumn("den", F.sum(F.col("cnt")).over(win))
    .withColumn("share_label1", F.col("sum1")/F.col("den"))
    .select("mean_radius_cat","share_label1").distinct().orderBy(F.desc("share_label1")))
print("\n=== Udzia≈Ç klasy 1 wg mean_radius_cat ===")
eda_win.show()


=== Udzia≈Ç klasy 1 wg mean_radius_cat ===
+---------------+-------------------+
|mean_radius_cat|       share_label1|
+---------------+-------------------+
|            low| 0.9685863874345549|
|            med| 0.7925531914893617|
|           high|0.12105263157894737|
+---------------+-------------------+



In [13]:
# =========================
# 4) Preprocessing pipeline
# =========================
imputer = Imputer(strategy="median", inputCols=numeric_cols, outputCols=[f"{c}_imputed" for c in numeric_cols])
indexers = [StringIndexer(handleInvalid="keep", inputCol=c, outputCol=f"{c}_idx") for c in categorical_cols]
ohe = OneHotEncoder(inputCols=[f"{c}_idx" for c in categorical_cols],
                    outputCols=[f"{c}_oh"  for c in categorical_cols])
assembler = VectorAssembler(
    inputCols=[f"{c}_imputed" for c in numeric_cols] + [f"{c}_oh" for c in categorical_cols],
    outputCol="features_raw"
)
scaler = StandardScaler(withMean=True, withStd=True, inputCol="features_raw", outputCol="features")
preprocess_stages = [imputer] + indexers + [ohe, assembler, scaler]

In [14]:
# =========================
# 5) Train / Test + wagi klas
# =========================
train, test = raw.randomSplit([0.8, 0.2], seed=42)
train = train.cache(); test = test.cache()
print("\nTrain:", train.count(), " Test:", test.count())

class_counts = train.groupBy("label").count().collect()
n_train = sum(r["count"] for r in class_counts)
counts = {r["label"]: r["count"] for r in class_counts}
w0 = n_train / (2.0 * counts.get(0.0, 1))
w1 = n_train / (2.0 * counts.get(1.0, 1))
train = train.withColumn("weight", F.when(F.col("label")==1.0, F.lit(w1)).otherwise(F.lit(w0)).cast("double"))


Train: 443  Test: 126


In [15]:
# =========================
# 6) Modele + siatki
# =========================
lr  = LogisticRegression(featuresCol="features", labelCol="label", weightCol="weight", maxIter=100)
rf  = RandomForestClassifier(featuresCol="features", labelCol="label", weightCol="weight")
gbt = GBTClassifier(featuresCol="features", labelCol="label", maxIter=80, maxDepth=5, stepSize=0.1)

In [16]:
lr_grid = (ParamGridBuilder()
           .addGrid(lr.regParam, [0.0, 0.01, 0.1])
           .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
           .build())

rf_grid = (ParamGridBuilder()
           .addGrid(rf.numTrees, [200, 400])
           .addGrid(rf.maxDepth, [6, 12])
           .addGrid(rf.featureSubsetStrategy, ["sqrt","log2"])
           .build())

gbt_grid = (ParamGridBuilder()
           .addGrid(gbt.maxDepth, [4, 6])
           .addGrid(gbt.maxIter, [60, 100])
           .addGrid(gbt.stepSize, [0.05, 0.1])
           .build())

In [17]:
evaluator_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [18]:
def make_cv(estimator, grid, k=5):
    return CrossValidator(
        estimator=Pipeline(stages=preprocess_stages + [estimator]),
        estimatorParamMaps=grid,
        evaluator=evaluator_auc,
        numFolds=k,
        parallelism=2,
        seed=42
    )

In [19]:
cv_lr  = make_cv(lr,  lr_grid)
cv_rf  = make_cv(rf,  rf_grid)
cv_gbt = make_cv(gbt, gbt_grid)

In [20]:
# =========================
# 7) Pseudo-AutoML: wyb√≥r najlepszego po AUC (CV)
# =========================
def fit_and_score(cv, name):
    print(f"\n=== Trenujƒô {name} (CV) ===")
    model = cv.fit(train)
    val_auc = max(model.avgMetrics)
    print(f"{name}: najlepszy AUC (CV) = {val_auc:.4f}")
    return name, model, val_auc

In [21]:
candidates = []
for name, cv in [("LogisticRegression", cv_lr), ("RandomForest", cv_rf), ("GBT", cv_gbt)]:
    candidates.append(fit_and_score(cv, name))


=== Trenujƒô LogisticRegression (CV) ===
LogisticRegression: najlepszy AUC (CV) = 0.9950

=== Trenujƒô RandomForest (CV) ===
RandomForest: najlepszy AUC (CV) = 0.9928

=== Trenujƒô GBT (CV) ===
GBT: najlepszy AUC (CV) = 0.9836


In [23]:
best_name, best_cv_model, best_auc = sorted(candidates, key=lambda x: x[2], reverse=True)[0]
print(f"\n>>> Najlepszy (CV) wg AUC: {best_name} ({best_auc:.4f})")

best_pipeline_model = best_cv_model.bestModel  # to jest PipelineModel: preprocessing + estimator


>>> Najlepszy (CV) wg AUC: LogisticRegression (0.9950)


In [24]:
# =========================
# 8) Ewaluacja na te≈õcie
# =========================
pred_test = best_pipeline_model.transform(test).cache()

evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_f1  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_pr  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_rc  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

test_auc = evaluator_auc.evaluate(pred_test)
test_acc = evaluator_acc.evaluate(pred_test)
test_f1  = evaluator_f1.evaluate(pred_test)
test_pr  = evaluator_pr.evaluate(pred_test)
test_rc  = evaluator_rc.evaluate(pred_test)

In [25]:
print("\n=== METRYKI TEST ===")
print(f"AUC:       {test_auc:.4f}")
print(f"Accuracy:  {test_acc:.4f}")
print(f"F1:        {test_f1:.4f}")
print(f"Precision: {test_pr:.4f}")
print(f"Recall:    {test_rc:.4f}")



=== METRYKI TEST ===
AUC:       0.9974
Accuracy:  0.9841
F1:        0.9841
Precision: 0.9845
Recall:    0.9841


In [26]:
print("\n=== Macierz pomy≈Çek (test) ===")
(pred_test.groupBy("label","prediction").count().orderBy("label","prediction")).show()

scored_rdd = (pred_test
              .select("probability","label")
              .rdd
              .map(lambda r: (float(r["probability"][1]), float(r["label"]))))
bicm = BinaryClassificationMetrics(scored_rdd)
print(f"AUPR (RDD metrics): {bicm.areaUnderPR:.4f}")


=== Macierz pomy≈Çek (test) ===
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|   49|
|  0.0|       1.0|    2|
|  1.0|       1.0|   75|
+-----+----------+-----+





AUPR (RDD metrics): 0.9982


In [27]:
# =========================
# 9) Wa≈ºno≈õƒá cech / wsp√≥≈Çczynniki
# =========================
def extract_feature_names(pipeline_model, numeric_cols, categorical_cols):
    # Spark nie zwraca pe≈Çnych nazw dla OHE ‚Äî oszacuj liczbƒô wyj≈õƒá z indexer√≥w
    indexers = [s for s in pipeline_model.stages if isinstance(s, StringIndexer)]
    sizes = [len(s.labels) for s in indexers]
    expanded_cat = []
    for c, k in zip(categorical_cols, sizes):
        expanded_cat += [f"{c}__oh_{i}" for i in range(max(k-1, 1))]
    return [f"{c}_imputed" for c in numeric_cols] + expanded_cat

feat_names = extract_feature_names(best_pipeline_model, numeric_cols, categorical_cols)
last = best_pipeline_model.stages[-1]

In [28]:
if hasattr(last, "featureImportances"):
    imps = list(last.featureImportances.toArray())
    top = sorted(zip(feat_names, imps), key=lambda x: x[1], reverse=True)[:20]
    print("\n=== TOP cechy (drzewiaste) ===")
    for n, v in top:
        print(f"{n:30s} {v:.5f}")
elif isinstance(last, LogisticRegression):
    coefs = list(last.coefficients.toArray())
    top = sorted(zip(feat_names, coefs), key=lambda x: abs(x[1]), reverse=True)[:20]
    print("\n=== TOP |wsp√≥≈Çczynnik√≥w| (LR) ===")
    for n, v in top:
        print(f"{n:30s} {v:+.5f}")

In [29]:
# =========================
# 10) TrainValidationSplit (LR ‚Äì przyk≈Çad)
# =========================
tvs = TrainValidationSplit(
    estimator=Pipeline(stages=preprocess_stages + [lr]),
    estimatorParamMaps=lr_grid,
    evaluator=evaluator_auc,
    trainRatio=0.8,
    parallelism=2,
    seed=42
)
print("\n=== LR przez TrainValidationSplit ===")
tvs_model = tvs.fit(train)
tvs_auc = evaluator_auc.evaluate(tvs_model.transform(test))
print(f"AUC (TVS, LR): {tvs_auc:.4f}")



=== LR przez TrainValidationSplit ===
AUC (TVS, LR): 0.9974
