In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.model_selection import train_test_split, StratifiedKFold
from imblearn.over_sampling import SMOTE
from collections import Counter
import torch
import torch.nn as nn
import torch.utils.data as data_utils
import torch.nn.functional as F
import torch.optim as optim
from sklearn.utils.class_weight import compute_class_weight

from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    classification_report,
    roc_auc_score, 
    roc_curve
)



from sklearn.metrics import classification_report
from torch.utils.data import TensorDataset, DataLoader
from tqdm.notebook import tqdm



import warnings
warnings.filterwarnings("ignore")

pd.set_option('display.max_columns', None)


In [None]:
df1 = pd.read_parquet(r"E:\Thesis\Defence\Datasets\UNSW_NB15_multiclass_label_is_label.parquet")
print(f"Dataset Shape: {df1.shape}")
display(df1.head(10))

In [None]:
X = df1.drop(columns=["label"])
y = df1["label"].values

In [None]:
X_tensor = torch.tensor(X.values, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.long)

X_binned = X.apply(lambda col: pd.qcut(col, q=3, labels=False, duplicates='drop'))
X_binned_tensor = torch.tensor(X_binned.values, dtype=torch.long)


In [None]:
from scipy.stats import chi2_contingency
import numpy as np
import pandas as pd

results = []
for col in X_binned.columns:
    contingency = pd.crosstab(X_binned[col], y)
    chi2, p, dof, expected = chi2_contingency(contingency)
    results.append((col, chi2, p))

results_sorted = sorted(results, key=lambda x: x[1], reverse=True)

print("\nAll features:")
for col, chi2_val, p_val in results_sorted:
    print(f"{col}: chi2 = {chi2_val:.3f}, p = {p_val:5f}")

top = results_sorted[:20]

print("\nTop 20 features by Chi2:")
for col, chi2_val, p_val in top:
    print(f"{col}: chi2 = {chi2_val:.3f}, p = {p_val:.5f}")

top_20idx = [col for col, _, _ in top]
print("\nTop 20 feature indices:", top_20idx)


In [None]:
top_20columns = top_20idx
X_20Ori = df1[top_20columns]

print (X_20Ori)

X_20Oritensor = torch.tensor(X_20Ori.values, dtype=torch.float32)
y_tensor = torch.tensor(df1['label'].values, dtype=torch.long) 

In [None]:
class CNN_BiLSTM_Model(nn.Module):
    def __init__(self, input_channels, seq_len, num_classes):
        super(CNN_BiLSTM_Model, self).__init__()

        self.conv1 = nn.Conv1d(
            in_channels=input_channels, 
            out_channels=32,
            kernel_size=3
        )
        self.batch_norm = nn.BatchNorm1d(32)
        self.pool = nn.MaxPool1d(kernel_size=2)
        self.relu = nn.ReLU()

        conv_len = seq_len - 3 + 1 
        pooled_len = conv_len // 2

  
        self.bilstm = nn.LSTM(
            input_size=32,
            hidden_size=32,
            batch_first=True,
            bidirectional=True
        )

        self.dropout = nn.Dropout(0.2)

        self.fc1 = nn.Linear(32 * 2, 25)  
        self.fc2 = nn.Linear(25, num_classes) 
        
        
        self._initialize_weights()

    def forward(self, x):
        x = x.permute(0, 2, 1)  # [B, L, F] â†’ [B, F, L]
        x = self.conv1(x)     
        x = self.batch_norm(x) 
        x = self.relu(x)
        x = self.pool(x)      

        
        x = x.permute(0, 2, 1)

        x, _ = self.bilstm(x)   
        x = x[:, -1, :]        

        x = self.dropout(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)         
        x = torch.softmax(x, dim=1)  
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.LSTM):
                for name, param in m.named_parameters():
                    if 'weight_ih' in name:
                        nn.init.xavier_uniform_(param.data)
                    elif 'weight_hh' in name:
                        nn.init.orthogonal_(param.data)
                    elif 'bias' in name:
                        nn.init.constant_(param.data, 0)




In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_20Oritensor, y_tensor, test_size=0.2, random_state=42, stratify=y_tensor)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42, stratify=y_train)


X_train = X_train.unsqueeze(2)  
X_val = X_val.unsqueeze(2)
X_test = X_test.unsqueeze(2)


batch_size = 64
train_data = TensorDataset(X_train, y_train)
val_data = TensorDataset(X_val, y_val)
test_data = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)


print(X_train.shape) 


y_train_np = y_train.cpu().numpy() if isinstance(y_train, torch.Tensor) else y_train


class_weights = compute_class_weight('balanced', classes=np.unique(y_train_np), y=y_train_np)


class_weights = torch.tensor(class_weights, dtype=torch.float32)


input_channels = X_train.shape[2]  
seq_len = X_train.shape[1]         
num_classes = len(np.unique(y_train_np))  


model = CNN_BiLSTM_Model(input_channels, seq_len, num_classes)


criterion = nn.CrossEntropyLoss(weight=class_weights)  
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
num_epochs = 50
train_losses = []
val_losses = []  
best_val_loss = float('inf')  
patience_counter = 0
early_stopping_patience = 5 

for epoch in range(num_epochs):
    model.train() 
    running_loss = 0.0

    # Training Loop
    for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False):
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels) 
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)
    train_losses.append(avg_train_loss)
    

    model.eval() 
    val_loss = 0.0
    with torch.no_grad():  
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)  
            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_loader)
    val_losses.append(avg_val_loss)
    
    # Early Stopping
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        patience_counter = 0
    else:
        patience_counter += 1

    if patience_counter >= early_stopping_patience:
        print(f"Early stopping at epoch {epoch+1}")
        break

    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")


plt.plot(range(len(train_losses)), train_losses, label="Training Loss")
plt.plot(range(len(val_losses)), val_losses, label="Validation Loss")
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.show()


In [None]:
model.eval()
test_loss = 0.0
correct = 0
total = 0
y_pred = []
y_true = []
y_score = [] 


with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        _, predicted = torch.max(outputs, 1)

        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())

        probabilities = torch.softmax(outputs, dim=1)  # [B, num_classes]
        y_score.extend(probabilities.cpu().numpy())  


avg_test_loss = test_loss / len(test_loader)
accuracy = 100 * correct / total

f1 = f1_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
precision = precision_score(y_true, y_pred, average='weighted')
conf_matrix = confusion_matrix(y_true, y_pred)

print(f"Test Loss: {avg_test_loss:.4f}")
print(f"Test Accuracy: {accuracy:.2f}%")
print(f"F1 Score (Weighted): {f1:.4f}")
print(f"Recall (Weighted): {recall:.4f}")
print(f"Precision (Weighted): {precision:.4f}")

sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=np.unique(y_train), yticklabels=np.unique(y_train))
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

fpr, tpr, _ = roc_curve(y_true, np.array(y_score))
roc_auc = roc_auc_score(y_true, np.array(y_score), average='weighted', multi_class='ovr') 
print(f"ROC AUC Score (One-vs-Rest): {roc_auc:.4f}")

plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, color='b', label='ROC curve (AUC = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='r', linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='best')
plt.show()