In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

In [2]:
# building spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "12g") \
    .config("spark.local.dir", "spark") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()
spark

In [3]:
data_cleaned = spark.read.parquet("dataset_no_missing_values")
data_cleaned = data_cleaned.repartition(400)

print("Number of rows:")
print(data_cleaned.count())

print("\nColumn Names:")
print(data_cleaned.columns)

Number of rows:
2000000

Column Names:
['Label', 'I1', 'I2', 'I4', 'I5', 'I6', 'I7', 'I10', 'I11', 'I13', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'C23', 'C24', 'C25', 'C26']


## PCA


In [4]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

Testowanie PCA (do wykonania)

In [5]:
numeric_cols = [col for col in data_cleaned.columns if col != "Label"]  
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")

scaler = StandardScaler(inputCol="features", 
                             outputCol="scaledFeatures",
                             withStd=True, 
                             withMean=True)


In [6]:
pipeline_pca = Pipeline(stages=[assembler, scaler])
scaler_model = pipeline_pca.fit(data_cleaned)
scaled_data= scaler_model.transform(data_cleaned)

In [7]:
num_features = len(numeric_cols)
pca_model_selection= PCA(k=num_features, inputCol="scaledFeatures", outputCol="pcaFeatures").fit(scaled_data)

explained_variance = pca_model_selection.explainedVariance
cumulative_variance = []
total_variance = 0.0
for variance in explained_variance:
    total_variance += float(variance)
    cumulative_variance.append(total_variance)

In [8]:
variance_thresholds = [0.50, 0.70, 0.80, 0.90, 0.95]

cumulative_variance = []
total = 0.0
for variance in explained_variance:
    total += float(variance)
    cumulative_variance.append(total)

for threshold in variance_thresholds:
    try:
        best_k = next(i + 1 for i, cum_var in enumerate(cumulative_variance) if cum_var >= threshold)
        print(f"Threshold: {int(threshold * 100)}% — Components: {best_k}, Cumulative Variance: {cumulative_variance[best_k - 1]:.4f}")
    except StopIteration:
        print(f"Threshold: {int(threshold * 100)}% — No sufficient components found.")

Threshold: 50% — Components: 12, Cumulative Variance: 0.5008
Threshold: 70% — Components: 19, Cumulative Variance: 0.7041
Threshold: 80% — Components: 23, Cumulative Variance: 0.8101
Threshold: 90% — Components: 27, Cumulative Variance: 0.9092
Threshold: 95% — Components: 30, Cumulative Variance: 0.9653


In [9]:
train_data, val_data = data_cleaned.randomSplit([0.8, 0.2], seed=42)

In [10]:
pca = PCA(k=best_k, inputCol="scaledFeatures", outputCol="pcaFeatures")  # k=10 oznacza 10 głównych komponentów
lr = LogisticRegression(featuresCol="pcaFeatures", labelCol="Label")
pipeline = Pipeline(stages=[assembler, scaler, pca, lr])

In [11]:
# trenowanie modelu
model = pipeline.fit(train_data)

In [12]:
print(f"Model coefficients: {model.stages[3].coefficients}")
print(f"Model intercept: {model.stages[3].intercept}")


Model coefficients: [-0.3055521929575967,0.26427957201809993,-0.12761040303106366,-0.18980362682351065,0.030457231918428085,0.015635035268513914,0.07378748940339769,-0.08393791533699295,-0.04971575947813767,-0.09065013044994139,-0.02031274201551959,-0.0055794188834360344,-0.012305927088685466,0.0036850075320577613,0.0010349753198113953,0.003482738063612042,0.011393980210469,-0.01723397182546998,0.062133004156348655,-0.03024204768073731,-0.05929827721385613,0.023526153211246233,-0.05121736865089754,0.07625639683391355,0.0040133041269579915,0.02395977575907546,0.010037247933088456,-0.02990902560385721,0.09478863455038632,0.004978886034166058]
Model intercept: -1.2224296491708242


In [13]:
pca_model = model.stages[2]  # model PCA jest na etapie 2 w pipeline
print(f"Explained Variance by PCA: {pca_model.explainedVariance.sum()}")

Explained Variance by PCA: 0.9652834469691515


In [14]:
val_predictions = model.transform(val_data)

In [15]:
evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(val_predictions)
print(f"Validation F1-score: {f1_score:.4f}")

Validation F1-score: 0.6945


In [16]:
evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(val_predictions)
print(f"Validation Accuracy: {accuracy:.4f}")

Validation Accuracy: 0.7563


Testowe

In [19]:
data_test_clean = spark.read.parquet("data_no_missing_values_test")

In [20]:
input_cols = model.stages[0].getInputCols()  
data_test_clean = data_test_clean.select(input_cols) 

In [21]:
test_predictions = model.transform(data_test_clean)

In [22]:
test_predictions.select("features", "prediction").show(100)

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[1.0,26.0,4.0,244...|       0.0|
|[1.0,0.0,4.0,512....|       0.0|
|[5.0,23.0,3.0,169...|       0.0|
|[4.0,0.0,15.0,1.0...|       0.0|
|[0.0,36.0,6.0,244...|       0.0|
|[0.0,68.0,1.0,166...|       0.0|
|[1.0,0.0,4.0,2447...|       0.0|
|[5.0,33.0,2.0,2.0...|       0.0|
|[1.0,30.0,4.0,133...|       0.0|
|[1.0,6.0,10.0,12....|       1.0|
|[1.0,5.0,12.0,96....|       0.0|
|[0.0,0.0,4.0,1255...|       0.0|
|[0.0,1.0,4.0,1406...|       0.0|
|[1.0,69.0,4.0,278...|       0.0|
|[0.0,0.0,10.0,291...|       0.0|
|[0.0,0.0,8.0,1626...|       0.0|
|[1.0,57.0,4.0,719...|       0.0|
|[1.0,1.0,4.0,1003...|       0.0|
|[1.0,66.0,4.0,278...|       0.0|
|[1.0,0.0,21.0,170...|       0.0|
|[0.0,87.5,8.0,568...|       0.0|
|[1.0,0.0,4.0,1649...|       0.0|
|[4.0,2.0,0.0,57.0...|       1.0|
|[1.0,20.0,9.0,433...|       0.0|
|[1.0,85.0,4.0,278...|       0.0|
|[0.0,48.0,1.0,717...|       0.0|
|[7.5,87.5,6.0

In [23]:
result = test_predictions.agg(
    (F.count(F.when(F.col("prediction") == 1, 1)) / F.count("*") * 100).alias("percentage_1")
).collect()[0]["percentage_1"]

print(f"Procent predykcji == 1: {result:.2f}%")

Procent predykcji == 1: 6.73%
