<a href="https://colab.research.google.com/github/fikrinotes/LSTM-IDS/blob/main/Final_Project_IDS_with_Autoencoder_and_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
import kagglehub
kagglehub.login()


In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

dataset_path = kagglehub.dataset_download('chethuhn/network-intrusion-dataset')
model_path = kagglehub.dataset_download('fikrimulyanasetiawan/rnn-model')
encoder_path = kagglehub.dataset_download('fikrimulyanasetiawan/encoder')

print('Data source import complete. \n')
print("Information about your data sources:")
print(f"Dataset path: {dataset_path}")
print(f"Model path: {model_path}")
print(f"Encoder path: {encoder_path}")


# Intrusion Detection System

## Import Library

In [None]:
# Import library yang diperlukan
import pandas as pd
import numpy as np
import os
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler
from sklearn.impute import SimpleImputer
import tensorflow as tf
from tensorflow.keras import layers, Model
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns


## Preprocessing Data

### Load Data

In [None]:
# Fungsi untuk membaca dan preprocessing setiap file
def read_and_clean_file(file_path):
    print(f"Membaca file: {file_path}")
    df = pd.read_csv(file_path, low_memory=False, sep=",")

    # Bersihkan nama kolom dari whitespace
    df.columns = df.columns.str.strip()

    # Hapus kolom yang tidak diperlukan
    redundant_column = ['Flow ID', 'Source IP', 'Source Port', 'Destination IP',
                 'Destination Port', 'Protocol', 'Timestamp']
    df = df.drop(redundant_column, axis=1, errors='ignore')

    # drop baris yang tidak punya label
    df.dropna(subset = ['Label'], inplace=True)

    # Handling missing values dan infinite values
    df = df.replace([np.inf, -np.inf], np.nan)

    return df


# Baca semua file CSV dari folder
data1 = dataset_path + "/Monday-WorkingHours.pcap_ISCX.csv"
data2 = dataset_path + "/Tuesday-WorkingHours.pcap_ISCX.csv"
data3 = dataset_path + "/Wednesday-workingHours.pcap_ISCX.csv"
data4 = dataset_path + "/Thursday-WorkingHours-Morning-WebAttacks.pcap_ISCX.csv"
data5 = dataset_path + "/Thursday-WorkingHours-Afternoon-Infilteration.pcap_ISCX.csv"
data6 = dataset_path + "/Friday-WorkingHours-Morning.pcap_ISCX.csv"
data7 = dataset_path + "/Friday-WorkingHours-Afternoon-PortScan.pcap_ISCX.csv"
data8 = dataset_path + "/Friday-WorkingHours-Afternoon-DDos.pcap_ISCX.csv"


# Buat list semua dataset yang tersedia
all_files = [data2, data3, data4, data5, data6, data7, data8]

# Membaca file dan mengkonversi semua data file dari list "all_files" menjadi dataframe
dataframes = []
for file in all_files:
    df = read_and_clean_file(file)
    dataframes.append(df)
    del df

In [None]:
# Menggabungkan semua dataframe
print("Menggabungkan semua file...")
df = pd.concat(dataframes, ignore_index=True)
try:
    print("Semua file dataset berhasil digabungkan!")
except:
    print("Error! file dataset tidak berhasil digabungkan")

### Pembersihan Data Duplikat

In [None]:
# ganti nama kolom dengan cara hapus whitespaces
col_names = {col: col.strip() for col in df.columns}
df.rename(columns = col_names, inplace = True)

# informasi data duplikat
dups = df[df.duplicated()]
print(f'Banyak data duplikat : {len(dups)}')
print(f'Banyak data sebelum duplikat : {df.shape[0]}')

print("menghapus data duplikat...")

# Hapus data duplikat
df.drop_duplicates(inplace = True)
print("data duplikat selesai dihapus!")
df.shape
print(f"banyak data setelah data duplikat dihapus : {df.shape[0]}")

### Persiapan Label Kelas untuk Klasifikasi Biner

In [None]:
# konversi semua label selain BENIGN jadi ATTACK
df["Label"] = df["Label"].where(df["Label"] == "BENIGN", "ATTACK")
print("Informasi Kelas : ")
df["Label"].unique()

## Informasi Umum Dataset

In [None]:
# Menampilkan informasi dataset
print("\nInformasi Dataset:")
print(f"\nJumlah total data: {len(df)}")
print(f"Jumlah fitur : {len(df.columns)}")
print("\nDistribusi Label sebelum preprocessing:")

# tabel distribusi label
def create_distribution_table(df):
    label_dist = pd.DataFrame(df['Label'].value_counts())
    label_dist['percentage'] = df['Label'].value_counts()/len(df)
    return label_dist

create_distribution_table(df)

In [None]:
df

In [None]:
# Menampilkan informasi dataset
print("\nInformasi Dataset:")
print(f"\nJumlah total data: {len(df)}")
print(f"Jumlah fitur : {len(df.columns)}")
print("\nDistribusi Label setelah preprocessing:")

create_distribution_table(df)

## Pemisahan Data Fitur (X) dan Ouput (y)

In [None]:
numerical_columns = df.select_dtypes(include=[np.number]).columns
X = df[numerical_columns]
y = df["Label"]
print(f"jumlah fitur : {len(X.columns)}")
print(f"jumlah label : {len(y.unique())}")

## Training-Test Split

In [None]:
tss = TimeSeriesSplit(n_splits=7)
print(tss)

In [None]:
#X = np.array([[1, 2], [3, 4], [1, 2], [3, 4], [1, 2], [3, 4]])
train_index, test_index = [], []
for i, (train_interval, test_interval) in enumerate(tss.split(X)):
    print(f"fold {i}:")
    print(f"  Train: index : from {train_interval.min()} up to {train_interval.max()}")
    print(f"  Test:  index=from {test_interval.min()} up to {test_interval.max()}")
    print(f"  Jumlah kelas pada training set : {y.iloc[train_interval].nunique()}")
    print(f"  Jumlah kelas pada testing set : {y.iloc[test_interval].nunique()}")
    train_index, test_index = train_interval, test_interval


In [None]:
# Split dataset dengan stratifikasi
X_train, X_test = X.iloc[train_index], X.iloc[test_index]
y_train, y_test  = y.iloc[train_index], y.iloc[test_index]

In [None]:
y_test.value_counts()

In [None]:
y_train.value_counts()

## Transformasi Data

In [None]:
# imputer
imputer = SimpleImputer(missing_values=np.nan, strategy='mean', copy=False)
print("fitting imputer...")
imputer.fit(X_train)
print("selesai!")

# scaler
scaler = StandardScaler(copy=False)
print("\nfitting scaler...")
scaler.fit(X_train)
print("selesai!")

# label encoder (le)
le = LabelEncoder()
print("\nfitting label encoder...")
le.fit(y_train.astype(str))
print("selesai!")

In [None]:
for i, label in enumerate(le.classes_):
    print(f"i : {i} , label : {label}")

In [None]:
# Menampilkan informasi kelas
print("\nKelas yang terdeteksi:")
for i, label in enumerate(le.classes_):
    count = (df["Label"] == i).sum()
    print(f"{label}: {count} samples (encoded as {i})")


In [None]:
df["Label"].value_counts()

In [None]:
def transform_data(X, y, scaler, imputer, le):
    # Handling missing values untuk dataset training
    print("\nMenangani missing values...")
    X = imputer.transform(X)
    print("selesai!")

    # Normalisasi Data
    print("\nMelakukan normalisasi data...")
    X = scaler.transform(X)
    print("selesai!")

    # Pelabelan Kelas
    num_classes = len(le.classes_)
    print("\nMelakukan one-hot encoding...")
    y = le.transform(y)
    print("selesai!")

    return X, y

In [None]:
## Transformasi Data Training
X_train, y_train = transform_data(X_train, y_train, scaler, imputer, le)

# Transformmasi data testing
X_test, y_test = transform_data(X_test, y_test, scaler, imputer, le)

## Feature Selection

In [None]:
# prompt: lakukan feature selection menggunakan anova f-test

from sklearn.feature_selection import SelectKBest, f_classif

# Lakukan feature selection menggunakan ANOVA F-test
# Pilih k fitur terbaik, misalnya k=15
k = 20
selector = SelectKBest(score_func=f_classif, k=k)
X_train_selected = selector.fit_transform(X_train, y_train)

# Dapatkan nama fitur yang terpilih
selected_features_indices = selector.get_support(indices=True)
selected_feature_names = X.columns[selected_features_indices]

print(f"\nFitur yang terpilih menggunakan ANOVA F-test (k={k}):")
selected_feature_names

In [None]:
# Select feature untuk data testing juga
X_test_selected = selector.transform(X_test)

In [None]:
# prompt: tampilkan score untuk tiap fitur yang terpilih beserta dengan nama fiturnya

# Dapatkan skor untuk setiap fitur
scores = selector.scores_[selected_features_indices]

# Tampilkan nama fitur dan skornya
print("\nSkor untuk fitur yang terpilih:")
for feature, score in zip(selected_feature_names, scores):
    print(f"{feature}: {score}")

## Konstruksi Model Autoencoder (*Batal*)

In [None]:
# # Membuat Autoencoder
# input_dim = X_train.shape[1]
# encoding_dim = 40  # Meningkatkan dimensi encoding karena data lebih kompleks. saran : 64

# # Encoder
# input_layer = layers.Input(shape=(input_dim,))

# encoded = layers.Dense(256, activation='relu')(input_layer)
# encoded = layers.BatchNormalization()(encoded)
# encoded = layers.Dropout(0.2)(encoded)

# encoded = layers.Dense(128, activation='relu')(encoded)
# encoded = layers.BatchNormalization()(encoded)
# encoded = layers.Dropout(0.2)(encoded)

# encoded = layers.Dense(64, activation='relu')(encoded)
# encoded = layers.BatchNormalization()(encoded)
# encoded = layers.Dropout(0.2)(encoded)

# # Bottleneck
# encoded = layers.Dense(encoding_dim, activation='linear')(encoded)

# # Decoder
# decoded = layers.Dense(64, activation='leaky_relu')(encoded) # coba leaky_relu
# decoded = layers.BatchNormalization()(decoded)
# decoded = layers.Dropout(0.2)(decoded)

# decoded = layers.Dense(128, activation='leaky_relu')(decoded)
# decoded = layers.BatchNormalization()(decoded)
# decoded = layers.Dropout(0.2)(decoded)

# decoded = layers.Dense(256, activation='leaky_relu')(decoded)
# decoded = layers.BatchNormalization()(decoded)
# decoded = layers.Dropout(0.2)(decoded)

# decoded = layers.Dense(input_dim, activation='linear')(decoded)

# # Model Autoencoder
# autoencoder = Model(input_layer, decoded)
# encoder = Model(input_layer, encoded)

# # Compile dengan learning rate yang sesuai
# optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
# autoencoder.compile(optimizer=optimizer, loss='mse')

In [None]:

# # Training Autoencoder dengan early stopping
# early_stopping = tf.keras.callbacks.EarlyStopping(
#     monitor='val_loss',
#     patience=5,
#     restore_best_weights=True
# )

# print("\nTraining Autoencoder...")
# # history_autoencoder = autoencoder.fit(X_train, X_train,
# #                                     epochs=50,
# #                                     batch_size=512,  # Meningkatkan batch size
# #                                     shuffle=True,
# #                                     validation_split=0.2,
# #                                     callbacks=[early_stopping])
# history_autoencoder = autoencoder.fit(X_train, X_train,
#                                     epochs=200,
#                                     batch_size=256,
#                                     validation_data=(X_test, X_test))


In [None]:
# # Mendapatkan encoded features
# X_train_encoded = encoder.predict(X_train)

In [None]:
# plt.plot(history_autoencoder.history['loss'], label='Training Loss')
# plt.plot(history_autoencoder.history['val_loss'], label='Validation Loss')
# plt.title('Autoencoder Training History')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.legend()
# plt.savefig("ae_plot_history")

In [None]:
# from tensorflow.keras.utils import plot_model
# plot_model(autoencoder, to_file='model.png')

In [None]:
# autoencoder.summary()

## Konstruksi Model LSTM

In [None]:
num_features = X_train_selected.shape[1]
num_classes = len(le.classes_)

In [None]:
def create_sequences(data, targets, timesteps):
    X, y = [], []
    for i in range(len(data) - timesteps):
        X.append(data[i:i+timesteps])  # Ambil blok sekuensial
        y.append(targets[i+timesteps]) # Target berikutnya
    return np.array(X), np.array(y)

# Pilih jumlah timestep (contoh: 10 langkah waktu)
timesteps = 10

# Buat sequence untuk training dan testing
X_train_seq, y_train_seq = create_sequences(X_train_selected, y_train, timesteps)
X_test_seq, y_test_seq = create_sequences(X_test_selected, y_test, timesteps)

In [None]:
print("Shape Training Data:")
print("X_train_seq:", X_train_seq.shape)  # (samples, timesteps, 15)
print("y_train_seq:", y_train_seq.shape)  # (samples,)

print("\nShape Testing Data:")
print("X_test_seq:", X_test_seq.shape)    # (samples, timesteps, 15)
print("y_test_seq:", y_test_seq.shape)    # (samples,)

In [None]:
import numpy as np
# set seed
seed = 42
np.random.seed(seed)
tf.random.set_seed(seed)

In [None]:
# Membuat model LSTM untuk multi-kelas dengan data dari Autoencoder
rnn_model = tf.keras.Sequential([
    layers.Input(shape=(timesteps, num_features)),

    layers.LSTM(128, return_sequences=True),
    layers.BatchNormalization(),
    layers.Dropout(0.35),

    layers.LSTM(64),
    layers.BatchNormalization(),
    layers.Dropout(0.35),

    layers.Dense(32, activation='relu'),
    layers.Dense(1, activation='sigmoid')
])

# Compile RNN
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
rnn_model.compile(optimizer=optimizer,
                 loss='binary_crossentropy',
                 metrics=['accuracy', 'precision', 'recall'])

# Training RNN dengan early stopping
early_stopping_rnn = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy',
    patience=5,
    restore_best_weights=True
)

print("\nTraining RNN...")
# history_rnn = rnn_model.fit(X_train, y_train,
#                            epochs=100,
#                            batch_size=512,
#                            validation_data=(X_test, y_test),
#                            callbacks=[early_stopping_rnn])
history_rnn = rnn_model.fit(X_train_seq, y_train_seq,
                           epochs=100,
                           batch_size=128,
                           validation_data=(X_test_seq, y_test_seq),
                           callbacks=[early_stopping_rnn]
                           )


In [None]:
plt.plot(history_rnn.history['accuracy'], label='Training Accuracy')
plt.plot(history_rnn.history['val_accuracy'], label='Validation Accuracy')
plt.title('LSTM Training History')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig("lstm_training_history")

In [None]:
rnn_model.summary()

## Evaluasi Model pada Data Test

In [None]:
# Evaluasi model
y_pred_prob = rnn_model.predict(X_test_seq)
y_pred_classes = (y_pred_prob > 0.5).astype(int)
y_test_classes = y_test_seq

# Tampilkan hasil evaluasi
print("\nClassification Report:")
print(classification_report(y_test_classes, y_pred_classes, target_names=le.classes_))

In [None]:
plt.plot(history_rnn.history['accuracy'], label='Training Accuracy')
plt.plot(history_rnn.history['val_accuracy'], label='Validation Accuracy')
plt.title('LSTM Training History')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig("lstm_training_history")

In [None]:
# Simpan model
print("\nMenyimpan model...")
# autoencoder.save('autoencoder_model.h5')
# encoder.save('encoder_model.h5')
rnn_model.save('rnn_model.keras')

In [None]:
# Simpan label encoder
import joblib
joblib.dump(le, 'label_encoder.joblib')
joblib.dump(scaler, 'scaler.joblib')
joblib.dump(imputer, 'imputer.joblib')
joblib.dump(selector, 'selector.joblib')

---