Normalize data

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler


def load_and_normalize_data(file_path):
    data = pd.read_csv(file_path, header=None)

    # Normalize the data using z-score
    scaler = StandardScaler()
    normalized_data = scaler.fit_transform(data)

    return normalized_data

#give the required file name to be normalized
file_names = ['1.txt']


common_data = pd.DataFrame()

for file_name in file_names:
    file_path = f'/content/drive/MyDrive/S-1/Test/{file_name}'  # Replace with the actual path
    normalized_data = load_and_normalize_data(file_path)
    common_data = common_data.append(pd.DataFrame(normalized_data), ignore_index=True)

# change the name to required file name after normalization
common_data.to_csv('/content/drive/MyDrive/S-1/Normal data/S1_2_test_normal.csv', index=False)

Calculating a threshold value

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, RepeatVector, TimeDistributed, Dense


def load_and_preprocess_data(file_path, time_steps=10):
    data = pd.read_csv(file_path, header=None)
    return np.array(data)


def calculate_threshold(X_train, model, percentile=95):

    reconstructions = model.predict(X_train)
    mse = np.mean(np.square(X_train - reconstructions), axis=(1, 2))


    threshold = np.percentile(mse, percentile)
    return threshold


def train_lstm_autoencoder(X_train, epochs=10, batch_size=32):
    model = Sequential()
    model.add(LSTM(units=64, input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(RepeatVector(X_train.shape[1]))
    model.add(LSTM(units=64, return_sequences=True))
    model.add(TimeDistributed(Dense(X_train.shape[2])))
    model.compile(optimizer='adam', loss='mse')
    model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size)
    return model




# give the correct normalized train file path
train_file_path = r'/content/drive/MyDrive/S-1/Normal data/S1_3_train_normal.csv'
X_train = load_and_preprocess_data(train_file_path)


X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))


lstm_autoencoder_model = train_lstm_autoencoder(X_train)


threshold = calculate_threshold(X_train, lstm_autoencoder_model)

print(f"Threshold: {threshold}")

Predicting the future usage

In [None]:
from tensorflow.keras.layers import LSTM
import pandas as pd

def save_predicted_labels(labels, output_path):
    df = pd.DataFrame({'Anomaly': labels})
    df.to_csv(output_path, index=False)



def load_and_preprocess_data(file_path, time_steps=10):
    data = pd.read_csv(file_path, header=None)


    X, y = [], []
    for i in range(len(data) - time_steps + 1):
        X.append(data.values[i:i + time_steps, :])
        y.append(data.values[i + time_steps - 1, :])

    return np.array(X), np.array(y)


def detect_anomalies(model, X_test, threshold=0.5):

    X_test_reshaped = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2])

    reconstructions = model.predict(X_test_reshaped)


    mse = np.mean(np.square(X_test_reshaped - reconstructions), axis=(1, 2))


    anomalies = (mse > threshold).astype(int)

    return anomalies


def train_lstm_autoencoder(X_train, epochs=10, batch_size=32):
    model = Sequential()
    model.add(LSTM(units=64, input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(RepeatVector(X_train.shape[1]))
    model.add(LSTM(units=64, return_sequences=True))
    model.add(TimeDistributed(Dense(X_train.shape[2])))

    model.compile(optimizer='adam', loss='mse')
    model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size)

    return model

# give the correct normalized train file path
train_file_path = r'/content/drive/MyDrive/S-1/Normal data/S1_3_train_normal.csv'
X_train, y_train = load_and_preprocess_data(train_file_path)

# give the correct normalized test file path
test_file_path = r'/content/drive/MyDrive/S-1/Normal data/S1_3_test_normal.csv'
X_test, y_test = load_and_preprocess_data(test_file_path)


lstm_autoencoder_model = train_lstm_autoencoder(X_train)


anomalies = detect_anomalies(lstm_autoencoder_model, X_test, threshold=0.443)  # Adjust threshold

# Save predicted labels to a CSV file
output_csv_path = r'/content/drive/MyDrive/S-1/Pre_lab/3_predicted_labels_lstm.csv'
save_predicted_labels(anomalies, output_csv_path)


