### Načítanie knižníc a vytvorenie Spark session

Importujú sa potrebné knižnice: `pyspark` pre prácu s veľkými dátami, `numpy` pre numerické operácie a `matplotlib` na vizualizáciu. Následne sa vytvára Spark session s názvom `zadanieTSVD`.


In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import udf, col
from pyspark.ml.linalg import Vectors
import numpy as np
import matplotlib.pyplot as plt
import os
import shutil

In [2]:
spark = SparkSession.builder.appName("zadanieTSVD").getOrCreate()

### Načítanie a zlúčenie datasetov

Načítavajú sa dva samostatné datasety: trénovací (`train.csv`) a testovací (`test.csv`). Po ich načítaní sa spoja do jedného veľkého datasetu a zároveň vypíše počet riadkov pred a po zlúčení.


In [6]:
## Načítanie a spojenie datasetov ##

train_df = spark.read.csv("train.csv", header=True, inferSchema=True)
test_df  = spark.read.csv("test.csv", header=True, inferSchema=True)

#odtstranenie Accident_Severity ako cieloveho atributu
train_df = train_df.drop("Accident_Severity")
test_df = test_df.drop("Accident_Severity")

train_df = train_df.drop("Casualty_Severity")
test_df = test_df.drop("Casualty_Severity")

train_count = train_df.count()
test_count = test_df.count()
print(f"Train rows: {train_count}, Test rows: {test_count}")

data_df = train_df.unionByName(test_df)
merged_count = data_df.count()
print(f"Merged rows: {merged_count}")

Train rows: 256657, Test rows: 170872
Merged rows: 427529


### Príprava atribútov pre klasifikáciu

Zo zlúčeného datasetu sa odstraňuje nepotrebný stĺpec `id`, a zvyšné stĺpce sa transformujú do jedného vektora pomocou `VectorAssembler`, ktorý vytvorí nový stĺpec `features` – vstup pre model KMeans.


In [7]:
## Príprava atribútov ##

feature_cols = data_df.columns.copy()
if 'id' in feature_cols:
    feature_cols.remove('id')
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_feat = assembler.transform(data_df)

### Trénovanie KMeans modelu

Používa sa KMeans algoritmus na rozdelenie dát do `k=3` klastrov. Výsledný model sa uloží.


In [8]:
## Trénovanie K-Means ##

k = 3
kmeans = KMeans(k=k, featuresCol="features", predictionCol="prediction", seed=42)
model = kmeans.fit(df_feat)
# Uloženie modelu
model.save("kmeans__model")


### Načítanie uloženého modelu

Model, ktorý bol uložený sa načíta späť pomocou `KMeansModel.load`.


In [12]:
## Načítanie modelu ##

from pyspark.ml.clustering import KMeansModel
loaded_model = KMeansModel.load("kmeans__model")

### Detekcia anomálií

Získajú sa centroidy (stredy) jednotlivých klastrov. Pomocou UDF funkcie sa vypočíta vzdialenosť každého bodu od svojho klastrového centroidu. Potom sa na základe 95. percentilu týchto vzdialeností nastaví prah, nad ktorým sa záznam považuje za anomáliu. Anomálie sa označia stĺpcom `anomaly`.


In [9]:
## Detekcia anomálií ##

## Centroidy algoritmu##
centers = model.clusterCenters()
## UDF na výpočet vzdialenosti ##
@udf(returnType="double")
def calc_dist_udf(features, cluster):
    center = centers[cluster]
    diff = np.array(features) - center
    return float(np.sqrt(np.dot(diff, diff)))

## Pridanie stĺpca "distance" ##
df_dist = model.transform(df_feat) \
    .withColumn("distance", calc_dist_udf(col("features"), col("prediction")))
## 95. percentil ako prah anomálie ##
threshold = df_dist.approxQuantile("distance", [0.95], 0.0)[0]
## Pridanie príznaku anomálie ##
df_anom = df_dist.withColumn("anomaly", col("distance") > threshold)

### Uloženie výsledkov do CSV

Vyfiltrované výsledky bez stĺpca `features` sa spoja do jedného CSV súboru a uložia sa do `data_with__anomalies.csv` a vypíše sa použitý prah pre detekciu anomálií.


In [13]:
## Uloženie výsledkov do CSV ##

output_cols = [c for c in df_anom.columns if c != 'features']
# Spojíme partičky do jedného súboru
single_df = df_anom.select(*output_cols).coalesce(1)
single_df.write.csv("data___with___anomalies.csv", header=True)
print(f"Výsledky uložené do 'data_with__anomalies.csv'. Anomálny prah = {threshold:.4f}.")

Výsledky uložené do 'data_with__anomalies.csv'. Anomálny prah = 162.3869.


### Zobrazenie výsledkov klastrovania

Na záver sa vypočíta a zobrazí počet záznamov v jednotlivých klastroch.


In [14]:
## Zobrazenie výsledkov ##

cluster_sizes = df_anom.groupBy("prediction").count().toPandas()
print("Počet záznamov v jednotlivých klastroch:")
print(cluster_sizes)

Počet záznamov v jednotlivých klastroch:
   prediction   count
0           1  180295
1           2  192009
2           0   55225
