In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import LSTM, GlobalAveragePooling1D, Dense, Dropout, BatchNormalization, TimeDistributed
from keras import backend as K
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler


In [None]:

# def precision(y_true, y_pred):
#     """Precision metric."""
#     true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
#     predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
#     precision = true_positives / (predicted_positives + K.epsilon())
#     return precision

# # Register the custom metric function with Keras
# tf.keras.metrics.Precision = precision

In [None]:
def load_and_preprocess_data(file_list, sequence_length=500, num_classes=4):
    data = []
    targets = []

    for file in file_list:
        df = pd.read_csv(file, usecols=[1, 2, 3, 4])
        scaler = MinMaxScaler()
        df[["note", "velocity", "time"]] = scaler.fit_transform(
            df[["note", "velocity", "time"]]
        )

        if len(df) < sequence_length:
            padding = pd.DataFrame(
                np.zeros((sequence_length - len(df), 3)),
                columns=["note", "velocity", "time"],
            )
            df = pd.concat(
                [df[["note", "velocity", "time"]], padding], ignore_index=True
            )

        data.append(df.iloc[:sequence_length, :-1].values)
        targets.append(to_categorical(df.iloc[:sequence_length]["anomaly"].values, num_classes=num_classes))

    return np.array(data), np.array(targets)


In [None]:


directory = "./anomalous"  # Replace with the path to your directory
anomalous_file_list = []  # Initialize an empty list

for filename in os.listdir(directory):
    if filename.endswith(".csv"):
        file_path = os.path.join(directory, filename)
        anomalous_file_list.append(file_path)

sequence_length = 500  # Updated sequence length
data, targets = load_and_preprocess_data(anomalous_file_list, sequence_length)

X_train, X_test, y_train, y_test = train_test_split(
    data, targets, test_size=0.2, random_state=42
)

print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)

In [None]:
model = Sequential([
    LSTM(256, activation='tanh', input_shape=(sequence_length, 3), return_sequences=True),
    Dropout(0.2),
    LSTM(128, activation='tanh', return_sequences=True),
    Dropout(0.2),
    LSTM(64, activation='tanh', return_sequences=True),
    BatchNormalization(),
    Dense(64, activation='relu'),
    Dropout(0.2),
    Dense(32, activation='relu'),
    Dropout(0.2),
    TimeDistributed(Dense(4, activation='softmax'))
])

model.compile(optimizer='adam', loss='categorical_crossentropy', run_eagerly=True)

In [None]:

# Save the model and print accuracy with the epochs
early_stopping_callback = EarlyStopping(monitor="val_loss", patience=10, min_delta=0, mode="min")


checkpoint_path = "col-model.h5"
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    checkpoint_path, monitor="val_loss", verbose=1, save_best_only=True, mode="min"
)

history = model.fit(
    X_train,
    y_train,
    epochs=100,
    batch_size=32,
    validation_data=(X_test, y_test),
    callbacks=[checkpoint_callback, early_stopping_callback],
)

# Load the best model checkpoint and evaluate accuracy
model.load_weights(checkpoint_path)
accuracy = model.evaluate(X_test, y_test)
print(f"Accuracy: {accuracy}")

In [None]:
def predict_anomalies(model, file, sequence_length=500):
    df = pd.read_csv(file, usecols=[1, 2, 3], index_col=False)
    print(df.columns)
    scaler = MinMaxScaler()
    df[["note", "velocity", "time"]] = scaler.fit_transform(
        df[["note", "velocity", "time"]]
    )

    # Pad the input data if the number of notes is less than the sequence length
    if len(df) < sequence_length:
        padding = pd.DataFrame(
            np.zeros((sequence_length - len(df), 3)),
            columns=["note", "velocity", "time"],
        )
        df = pd.concat([df, padding], ignore_index=True)

    input_data = df.iloc[:sequence_length, :].values.reshape(1, sequence_length, -1)
    print(input_data.shape)
    predictions = model.predict(input_data)
    predictions[predictions < 0] = 0
    
    # Round predictions to the nearest integer
    rounded_predictions = np.round(predictions).flatten()

    # Calculate the number of anomalies by summing up the rounded predictions
    num_anomalies = int(np.sum(rounded_predictions))

    return num_anomalies


In [None]:
new_file = "anomalous/scn16_1_modified078.csv"
model = load_model("./model/model.h5")
num_anomalies = predict_anomalies(model, new_file)
print(f"Number of anomalies: {num_anomalies}")
