### Mount Google Drive

> Add blockquote



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Set environment

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("AnomalyDetection").getOrCreate()

### Set the file path from Google Drive

In [None]:
data_path = '/content/drive/MyDrive/combined_output_cleaned_part_2.csv'

### Importing Libraries

In [None]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA as PCAml
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
import matplotlib.pyplot as plt
import seaborn as sns


### Read CSV using PySpark

In [None]:
df = spark.read.csv(data_path, header=True, inferSchema=True)
df.show(5)

+-----+----------+--------+---------+---------+-----+----+-----+---+----+------+------+
|P-PDG|     P-TPT|   T-TPT|P-MON-CKP|T-JUS-CKP|class|year|month|day|hour|minute|second|
+-----+----------+--------+---------+---------+-----+----+-----+---+----+------+------+
|    0|1.009211E7|119.0944|1609800.0| 84.59782|    0|2017|    2|  1|   2|     2|     7|
|    0|  1.0092E7|119.0944|1618206.0| 84.58997|    0|2017|    2|  1|   2|     2|     8|
|    0|1.009189E7|119.0944|1626612.0| 84.58213|    0|2017|    2|  1|   2|     2|     9|
|    0|1.009178E7|119.0944|1635018.0| 84.57429|    0|2017|    2|  1|   2|     2|    10|
|    0|1.009167E7|119.0944|1643424.0| 84.56644|    0|2017|    2|  1|   2|     2|    11|
+-----+----------+--------+---------+---------+-----+----+-----+---+----+------+------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline

### Combine features into a single vector

In [None]:
feature_cols = [col for col in df.columns if col != 'class']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

### Standardize features

In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=True, withStd=True)

In [None]:
pipeline = Pipeline(stages=[assembler, scaler])
processed_df = pipeline.fit(df).transform(df)

### Split data into train and test

In [None]:
train_df, test_df = processed_df.randomSplit([0.67, 0.33], seed=2018)

### Apply PCA to reduce dimensions

In [None]:
from pyspark.ml.feature import PCA

pca = PCA(k=9, inputCol="scaledFeatures", outputCol="pcaFeatures")
pca_model = pca.fit(train_df)

### Transform the training data

In [None]:
train_pca_df = pca_model.transform(train_df)
train_pca_df.select("pcaFeatures").show(5)

+--------------------+
|         pcaFeatures|
+--------------------+
|[0.85078772242648...|
|[0.85061678745645...|
|[0.85044619834633...|
|[0.85029930335095...|
|[0.85015253700283...|
+--------------------+
only showing top 5 rows



In [None]:
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

### Calculate reconstruction Error


In [None]:
def reconstruct_pca(pca_features, pca_components):
    pca_features = np.array(pca_features)  # Convert to NumPy array
    pca_components_reduced = pca_components[:pca_features.shape[0], :]  # Take only the first k rows
    reconstructed = np.dot(pca_features, pca_components_reduced.T)  # Reconstruct the original features
    return float(np.sum((reconstructed - pca_features) ** 2))  # Return reconstruction error as float

### Convert PCA components to NumPy array

In [None]:
pca_components = np.array(pca_model.pc.toArray())

### Define UDF for reconstruction error

In [None]:
reconstruction_udf = udf(lambda pca_feat: reconstruct_pca(pca_feat, pca_components), FloatType())
train_pca_df = train_pca_df.withColumn("reconstructionError", reconstruction_udf("pcaFeatures"))

### Apply reconstruction error as UDF

In [None]:
reconstruction_udf = udf(lambda pca_feat: reconstruct_pca(pca_feat, pca_components), FloatType())
train_pca_df = train_pca_df.withColumn("reconstructionError", reconstruction_udf("pcaFeatures"))

### Normalize anomaly scores

In [None]:
min_error = train_pca_df.agg({"reconstructionError": "min"}).collect()[0][0]
max_error = train_pca_df.agg({"reconstructionError": "max"}).collect()[0][0]


+----------+----------+--------+---------+---------+-----+----+-----+---+----+------+------+--------------------+--------------------+--------------------+-------------------+------------+
|     P-PDG|     P-TPT|   T-TPT|P-MON-CKP|T-JUS-CKP|class|year|month|day|hour|minute|second|            features|      scaledFeatures|         pcaFeatures|reconstructionError|anomalyScore|
+----------+----------+--------+---------+---------+-----+----+-----+---+----+------+------+--------------------+--------------------+--------------------+-------------------+------------+
|-125436200|1.421613E7|116.9995|6055496.0|  69.7748|    0|2017|    6| 27|   0|    58|    23|[-1.254362E8,1.42...|[-0.8629609262824...|[0.85078772242648...|          19.000492| 0.002418188|
|-124884800|1.421597E7|116.9997|6055371.0| 69.77486|    0|2017|    6| 27|   0|    58|    22|[-1.248848E8,1.42...|[-0.8593846718647...|[0.85061678745645...|          18.947947|0.0024114272|
|-124333400| 1.42158E7|116.9999|6055247.0| 69.77493|   

In [None]:
from pyspark.sql.types import DoubleType

normalize_udf = udf(lambda x: (x - min_error) / (max_error - min_error), DoubleType())
train_pca_df = train_pca_df.withColumn("anomalyScore", normalize_udf("reconstructionError"))
train_pca_df.show(5)

+----------+----------+--------+---------+---------+-----+----+-----+---+----+------+------+--------------------+--------------------+--------------------+-------------------+--------------------+
|     P-PDG|     P-TPT|   T-TPT|P-MON-CKP|T-JUS-CKP|class|year|month|day|hour|minute|second|            features|      scaledFeatures|         pcaFeatures|reconstructionError|        anomalyScore|
+----------+----------+--------+---------+---------+-----+----+-----+---+----+------+------+--------------------+--------------------+--------------------+-------------------+--------------------+
|-125436200|1.421613E7|116.9995|6055496.0|  69.7748|    0|2017|    6| 27|   0|    58|    23|[-1.254362E8,1.42...|[-0.8629609262824...|[0.85078772242648...|          19.000492|0.002418187904463...|
|-124884800|1.421597E7|116.9997|6055371.0| 69.77486|    0|2017|    6| 27|   0|    58|    22|[-1.248848E8,1.42...|[-0.8593846718647...|[0.85061678745645...|          18.947947|0.002411427266089...|
|-124333400| 1.

### Evaluate using AUC

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Prepare the labels and anomaly scores
evaluation_df = train_pca_df.selectExpr("class as label", "anomalyScore as prediction")

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
auc = evaluator.evaluate(evaluation_df)
print(f"AUC: {auc}")

AUC: 0.5711606138328211
