In [0]:
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler

spark = SparkSession.builder.getOrCreate()

# -----------------------------
# 1) Generowanie danych: normal + outliers
# -----------------------------

n_total = 300_000
frac_outliers = 0.02
n_out = int(n_total * frac_outliers)
n_norm = n_total - n_out

# Dane normalne ~ N(0,1)
df_norm = (spark.range(n_norm).select(F.col("id").alias("row_id"))
.withColumn("x1", F.randn(1))
.withColumn("x2", F.randn(2))
.withColumn("x3", F.randn(3))
.withColumn("is_outlier", F.lit(0))
)

# Outliery ~ N(8,1.5)
df_out = (spark.range(n_out).select((F.col("id") + n_norm).alias("row_id"))
          .withColumn("x1", 8.0 + 1.5 * F.randn(11))
            .withColumn("x2", 8.0 + 1.5 * F.randn(12))
            .withColumn("x3", 8.0 + 1.5 * F.randn(13))
            .withColumn("is_outlier", F.lit(1))
)

df = df_norm.unionByName(df_out).orderBy(F.rand(999))

In [0]:
# -----------------------------
# 2) Kontrakt cech: vector + standardization (Spark)
# -----------------------------

assembler = VectorAssembler(inputCols=["x1", "x2", "x3"], outputCol="features_raw")
df_vec = assembler.transform(df)
scaler = StandardScaler(inputCol=
"features_raw", outputCol="features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_vec)
df_scaled = scaler_model.transform(df_vec).select("row_id", "features", "is_outlier")


In [0]:
# -----------------------------
# 3) Autoenkoder: trening na NORMAL (sample) w TensorFlow
# -----------------------------

train_pd = (df_scaled
.filter(F.col("is_outlier") == 0)
.sample(withReplacement=False, fraction=0.10, seed=42)
.select("features")
.toPandas()
)

import numpy as np

X_train = np.vstack(train_pd["features"].values).astype(np.float32)
input_dim = X_train.shape[1]

import tensorflow as tf

inputs = tf.keras.Input(shape=(input_dim,))
h = tf.keras.layers.Dense(8, activation="relu")(inputs)
z = tf.keras.layers.Dense(2, activation="relu")(h) h2 = tf.keras.layers.Dense(8, activation="relu")(z)
outputs = tf.keras.layers.Dense(input_dim)(h2)

# latent

autoencoder = tf.keras.Model(inputs, outputs)
autoencoder.compile(optimizer="adam", loss="mse")
autoencoder.fit(X_train, X_train, epochs=8, batch_size=256, verbose=0)
MODEL_PATH = "/tmp/ae_model.keras"
autoencoder.save(MODEL_PATH)

In [0]:
# -----------------------------
# 4) Skoring w Spark: reconstruction error jako anomaly_score (mapInPandas)
# -----------------------------
import pandas as pd
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, IntegerType

schema = StructType([
    StructField("row_id", LongType(), False),
    StructField("anomaly_score", DoubleType(), False),
    StructField("is_outlier", IntegerType(), False),
])

def score_with_autoencoder(pdf_iter):

    import numpy as np
    import tensorflow as tf

    model = tf.keras.models.load_model(MODEL_PATH)

    for pdf in pdf_iter:
        X = np.vstack(pdf["features"].values).astype(np.float32)
        recon = model.predict(X, verbose=0)
        err = np.mean((X - recon) ** 2, axis=1) # MSE per-row
        out = pd.DataFrame({
            "row_id": pdf["row_id"].values.astype(np.int64),
            "anomaly_score": err.astype(np.float64),
            "is_outlier": pdf["is_outlier"].values.astype(np.int32),
        })
        yield out
        
scored = df_scaled.mapInPandas(score_with_autoencoder, schema=schema)

In [0]:
# -----------------------------
# 5) PrÂ´og i flaga anomalii (percentyl)
# -----------------------------
threshold = scored.approxQuantile("anomaly_score", [0.995], 0.01)[0]
scored = scored.withColumn("is_anomaly", (F.col("anomaly_score") >= F.lit(threshold)).cast("int"))
print("Threshold (approx 99.5% quantile):", threshold)

In [0]:
# -----------------------------
# 6) Ewaluacja (etykieta is_outlier tylko do oceny)
# -----------------------------
cm = (scored.groupBy("is_outlier", "is_anomaly").count().orderBy("is_outlier", "is_anomaly"))
cm.show(truncate=False)
tp = scored.filter("is_outlier=1 AND is_anomaly=1").count()
fp = scored.filter("is_outlier=0 AND is_anomaly=1").count()
fn = scored.filter("is_outlier=1 AND is_anomaly=0").count()
precision = tp / (tp + fp) if (tp + fp) else 0.0
recall = tp / (tp + fn) if (tp + fn) else 0.0
print(f"TP={tp} FP={fp} FN={fn}")
print(f"Precision={precision:.4f} Recall={recall:.4f}")
scored.orderBy(F.desc("anomaly_score")).show(20, truncate=False)