In [1]:
import sys
import os
sys.path.append(os.path.abspath(os.path.join('../')))
from pyspark.sql import SparkSession
from shared.schemas import ml_schema
import dask.dataframe as dd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dense, Input
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from pyspark.sql.functions import col

spark = (
    SparkSession.builder.appName("DataAggregations")    
    .config("spark.sql.parquet.enableVectorizedReader", "true")
    .config("spark.sql.parquet.mergeSchema", "false") # No need as we explicitly specify the schema
    .config("spark.executor.memory", "6g")  # Increase executor memory
    .config("spark.driver.memory", "2g")    # Increase driver memory
    #.config("spark.local.dir", "/mnt/d/spark-temp")  # Change temp directory
    .getOrCreate()
)

train_df = dd.read_parquet("../../data/historical/training/eth")
test_df = spark.read.schema(ml_schema).parquet("../../data/benchmark/testing/eth")

ERROR! Session/line number was not unique in database. History logging moved to new session 47


ModuleNotFoundError: No module named 'shared'

In [8]:
def create_sequences(data, seq_length):
    sequences = []
    for i in range(len(data) - seq_length + 1):
        sequences.append(data[i:i + seq_length])
    return np.array(sequences)

def build_autoencoder(input_dim, seq_length):
    inputs = Input(shape=(seq_length, input_dim))
    encoded = LSTM(64, activation="relu", return_sequences=True)(inputs)
    encoded = LSTM(32, activation="relu", return_sequences=False)(encoded)
    decoded = LSTM(64, activation="relu", return_sequences=True)(encoded)
    decoded = LSTM(input_dim, activation="sigmoid", return_sequences=True)(decoded)

    autoencoder = Model(inputs, decoded)
    autoencoder.compile(optimizer="adam", loss="mse")
    return autoencoder

input_dim = None  
autoencoder = None

In [6]:
seq_length = 100
batch_size = 64
epochs = 10
partition_size = "10MB"

for partition in train_df.repartition(partition_size=partition_size).to_delayed():
    pandas_df = partition.compute()  
    data = pandas_df.to_numpy()

    if input_dim is None:
        input_dim = data.shape[1]
        autoencoder = build_autoencoder(input_dim, seq_length)

    sequences = create_sequences(data, seq_length)

    autoencoder.fit(sequences, sequences, epochs=epochs, batch_size=batch_size, verbose=1)
    train_reconstructions = autoencoder.predict(sequences)
    train_mse = np.mean(np.mean(np.square(sequences - train_reconstructions), axis=-1), axis=-1)
    threshold = np.percentile(train_mse, 95)

test_data = test_df.toPandas().to_numpy()  
test_sequences = create_sequences(test_data, seq_length)

test_reconstructions = autoencoder.predict(test_sequences)
test_mse = np.mean(np.mean(np.square(test_sequences - test_reconstructions), axis=-1), axis=-1)
anomalies = test_mse > threshold

test_labels = test_df.select(col("labey")).toPandas().values[seq_length-1:]  
precision, recall, f1, _ = precision_recall_fscore_support(test_labels, anomalies, average="binary")

accuracy = accuracy_score(test_labels, anomalies)

print(f"Precision: {precision:.2f}, Recall: {recall:.2f}, F1-Score: {f1:.2f}, Accuracy: {accuracy:.2f}")
    