<a href="https://colab.research.google.com/github/Gajeshgif/Gajesh/blob/main/PCA_Applied_IOTHealthcare.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
faisalmalik_iot_healthcare_security_dataset_path = kagglehub.dataset_download('faisalmalik/iot-healthcare-security-dataset')

print('Data source import complete.')

In [None]:
import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
import os

## First, Create a Unified CSV with data from all 3 csv, and well shuffeled.

In [None]:


# List of your CSV file paths
csv_files = ['/kaggle/input/iot-healthcare-security-dataset/ICUDatasetProcessed/Attack.csv',
             '/kaggle/input/iot-healthcare-security-dataset/ICUDatasetProcessed/environmentMonitoring.csv',
             '/kaggle/input/iot-healthcare-security-dataset/ICUDatasetProcessed/patientMonitoring.csv']

# Load all CSV files into a list of DataFrames
dfs = [pd.read_csv(file) for file in csv_files]

# Concatenate all DataFrames into a single DataFrame
combined_df = pd.concat(dfs, ignore_index=True)

# Shuffle the combined DataFrame
shuffled_df = combined_df.sample(frac=1).reset_index(drop=True)

# Export the shuffled DataFrame to a new CSV file
shuffled_df.to_csv('shuffled_data.csv', index=False)


## Load the Shuffeled dataset

In [None]:
df=pd.read_csv("/kaggle/working/shuffled_data.csv")
y=df["label"]
y_class=df["class"]
df.drop(["class","label"], axis=1, inplace=True)

In [None]:
df.head(5)

Get a count of number of columns

In [None]:
df.columns , len(df.columns) ## Too many columns

## Preprocess the data

In [None]:
## Consulted a domain expert to pick some relevant columns.

relevant_columns = [
    'tcp.srcport', 'tcp.dstport', 'tcp.flags', 'tcp.ack', 'tcp.window_size_value',
    'tcp.connection.fin', 'tcp.connection.syn', 'tcp.connection.rst', 'tcp.payload',
    'ip.src', 'ip.dst', 'ip.proto', 'ip.ttl', 'mqtt.clientid', 'mqtt.msgtype',
    'mqtt.topic', 'mqtt.kalive', 'mqtt.len', 'mqtt.qos', 'tcp.checksum',
    'tcp.hdr_len', 'frame.time_delta', 'frame.time_relative', 'tcp.time_delta'
]

# Filter DataFrame to keep only the relevant columns
df = df[relevant_columns]
df.drop("ip.proto",axis=1,inplace=True)

#### Let's Get a track on the number of categorical columns

In [None]:
categorial_df=df.select_dtypes(include="object")
categorical_cols=[]
for col in categorial_df.columns:
    print(col, ":", len(df[col].unique()), "labels")
    categorical_cols.append(col)

## Not Going to apply OHE, but Frequency Encoding; cuz too much unique vals.

## Train-Test Split Before further processing;
Prevent data leaks

In [None]:
from sklearn.model_selection import train_test_split

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(df, y, test_size=0.3, random_state=42)


### Frequency Categorical Encoding; there is data loss though.
For ex:

#### Example Dataset
| Category | Count |
|----------|-------|
| A        | 5     |
| B        | 3     |
| C        | 5     |
| D        | 3     |

#### Frequency Encoding
After encoding, the dataset looks like this:

| Category | Frequency Encoded |
|----------|--------------------|
| A        | 5                 |
| B        | 3                 |
| C        | 5                 |
| D        | 3                 |

#### Issue: Data Loss
- Categories **A** and **C** both have the same encoded value (`5`).
- Similarly, categories **B** and **D** both have the same encoded value (`3`).

This leads to **ambiguity** because the encoded values cannot distinguish between `A` and `C`, or `B` and `D`. As a result, downstream models may misinterpret these categories as identical, potentially affecting the model's performance and accuracy.

In [None]:
## But in this case, after testing it out; didn't face much issue.

for col in categorical_cols:
    # Create a frequency map using the training set
    frequency_map = X_train[col].value_counts().to_dict()

    # Apply frequency mapping to both train and test sets, handling unseen categories by assigning 0
    X_train[f"{col}_frequency"] = X_train[col].map(frequency_map)
    X_test[f"{col}_frequency"] = X_test[col].map(frequency_map).fillna(0)  # Replace NaNs with 0 or another value

    # Drop the original column (optional)
    X_train.drop([col], axis=1, inplace=True)
    X_test.drop([col], axis=1, inplace=True)

### Perform Correlation Analysis

In [None]:
def correlation(dataset,threshold):
    col_corr=set()
    corr_matrix=dataset.corr()
    for i in range(len(corr_matrix.columns)):
        for j in range(i):
            if (corr_matrix.iloc[i,j])>threshold:
                colname=corr_matrix.columns[i]
                col_corr.add(colname)

    return col_corr

In [None]:
plt.figure(figsize=(24,20))
corr=X_train.corr()
sns.heatmap(corr, annot=True,cmap=plt.cm.CMRmap_r)
plt.show()

In [None]:
## Drop 1 of the features with corr>0.75

corr_features=correlation(X_train,0.75)
len(set(corr_features)),corr_features

In [None]:
X_train.drop("ip.ttl",axis=1,inplace=True)
X_test.drop("ip.ttl",axis=1,inplace=True)

### Apply Scaling

In [None]:
from sklearn.preprocessing import  StandardScaler

In [None]:
scaler = StandardScaler()
scaler.fit(X_train)

In [None]:
X_train=scaler.transform(X_train)
X_test=scaler.fit_transform(X_test)

### Apply PCA

In [None]:
from sklearn.decomposition import  PCA
pca=PCA(n_components=2) ## Only 2 features, easy to plot and works surprisingly good
pca.fit(X_train)


In [None]:
X_train=pca.transform(X_train)
X_test=pca.fit_transform(X_test)

## Visualization Post PCA

In [None]:
## Train Dataset

import matplotlib.colors as mcolors

# Create a custom colormap
cmap = mcolors.LinearSegmentedColormap.from_list('purple_orange', ['purple', 'orange'])

plt.figure(figsize=(8, 12))

plt.subplot(2,1,1)
plt.scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=cmap)
plt.xlabel("First Principal Component")
plt.ylabel("Second Principal Component")
plt.colorbar(label='Class')  # Optional: add a color bar for clarity
plt.title("PCA of TRAIN Dataset Colored by Class")

plt.subplot(2,1,2)
plt.scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=cmap)
plt.xlabel("First Principal Component")
plt.ylabel("Second Principal Component")
plt.colorbar(label='Class')  # Optional: add a color bar for clarity
plt.title("PCA of TEST Dataset Colored by Class")
plt.show()


#### We can Infer, the dataset is pretty well seperated (the normal cases [purple] are pretty well clustered) compared to the attack ones[yellow] which are pretty far

### Converting y-train and y-test to numpy arrays

In [None]:
y_train=y_train.to_numpy()
y_test=y_test.to_numpy()

### Configure device agnostic code for torch

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

### Create Tensors and send them to device

In [None]:
X_train=torch.from_numpy(X_train).type(torch.float)
X_test=torch.from_numpy(X_test).type(torch.float)
y_train=torch.from_numpy(y_train).type(torch.float)
y_test=torch.from_numpy(y_test).type(torch.float)

X_train.to(device)
X_test.to(device)
y_train.to(device)
y_test.to(device)
len(X_train),len(y_train),len(X_test),len(y_test)

## Model Architecture

The Architecture is influenced from Tensorboard playground spiral classification dataset fitting model

In [None]:
import torch
import torch.nn as nn

class PCABinaryClassifier(nn.Module):
    def __init__(self):
        super().__init__()

        # Update in_features to 6 since we now have 6 features (2 original + 4 intricate)
        self.layer_1 = nn.Linear(in_features=6, out_features=8)
        self.layer_2 = nn.Linear(in_features=8, out_features=1)  # Assuming binary classification
        self.relu = nn.ReLU()

    def forward(self, x):
        # Calculate intricate features inside the model
        x1 = x[:, 0]  # PCA1
        x2 = x[:, 1]  # PCA2

        # Create intricate features
        x1_squared = x1 ** 2
        x2_squared = x2 ** 2
        sin_x1 = torch.sin(x1)
        sin_x2 = torch.sin(x2)

        # Concatenate original features and intricate features
        x = torch.stack((x1, x2, x1_squared, x2_squared, sin_x1, sin_x2), dim=1)

        # Forward pass through the layers
        x = self.relu(self.layer_1(x))
        return self.layer_2(x)


In [None]:
model = PCABinaryClassifier()
model.to(device)

## Configure Loss and Optimizer

In [None]:
loss_fn=nn.BCEWithLogitsLoss()
optimizer=torch.optim.SGD(params=model.parameters(), lr=0.1)

### Create an accuracy Helper function

In [None]:
def accuracy_fn(y_true,y_pred):
  correct=torch.eq(y_true,y_pred).sum().item()
  acc=correct/len(y_pred)
  return acc

## Training and Testing Loop

In [None]:
train_loss_arr=[]
test_loss_arr=[]

In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)

epochs=2000

for epoch in range(epochs):
  model.train()

  ## Forward Pass
  y_logits=model(X_train).squeeze(dim=1)
  y_pred=torch.round(torch.sigmoid(y_logits))

  ## Calc loss
  loss=loss_fn(y_logits,y_train)
  train_loss_arr.append(loss)
  acc=accuracy_fn(y_pred=y_pred,y_true=y_train)

  ## Zero Grad
  optimizer.zero_grad()

  ## Backpropagation
  loss.backward()

  ## Step
  optimizer.step()

  ## Testing
  model.eval()

  with torch.inference_mode():
    test_logits=model(X_test).squeeze(dim=1)
    test_loss=loss_fn(test_logits,y_test)
    test_loss_arr.append(test_loss)
    test_pred=torch.round(torch.sigmoid(test_logits))
    test_acc=accuracy_fn(y_pred=test_pred,y_true=y_test)


  if epoch%10==0:
    print(f"Epoch: {epoch}| Training Loss: {loss:.4f} | Testing Loss: {test_loss:.4f}")
    print(f"Epoch: {epoch}| Training Acc: {acc*100} % | Testing Acc: {test_acc*100} %")


In [None]:
# Convert the tensor arrays to NumPy arrays
train_loss_arr = np.array([loss.detach().numpy() for loss in train_loss_arr])
test_loss_arr = np.array([loss.detach().numpy() for loss in test_loss_arr])

In [None]:
epochs_range= range(1, len(train_loss_arr) + 1)
plt.figure(figsize=(10, 6))
plt.plot(epochs_range, train_loss_arr, label='Train Loss', color='blue')
plt.plot(epochs_range, test_loss_arr, label='Test Loss', color='orange')

# Add titles, labels, and legend
plt.title('Train and Test Loss over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Display the plot
plt.show()

## Get Some Helper Functions (A Decision Boundary plotter)

#### "https://raw.githubusercontent.com/mrdbourke/pytorch-deep-learning/refs/heads/main/helper_functions.py"

In [None]:
# This code block wasn't working on kaggle, it should otherwise.

# import requests
# import os

# if os.path.exists("helper_functions.py"):
#   print("Skipping download, helper_functions.py exists!")
# else:
#   print("Downloading helper_functions.py")
#   request = requests.get("https://raw.githubusercontent.com/mrdbourke/pytorch-deep-learning/refs/heads/main/helper_functions.py")
#   with open("helper_functions.py", "wb") as f:
#     f.write(request.content)

In [None]:
## Credits to Daniel Bourke for this amazing function, check out his
# "https://youtu.be/Z_ikDlimN6A?si=pdyrLr9IM_za8AZH" --> Learn PyTorch for deep learning in a day. Literally.

def plot_decision_boundary(model: torch.nn.Module, X: torch.Tensor, y: torch.Tensor):
    """Plots decision boundaries of model predicting on X in comparison to y.

    Source - https://madewithml.com/courses/foundations/neural-networks/ (with modifications)
    """
    # Put everything to CPU (works better with NumPy + Matplotlib)
    model.to("cpu")
    X, y = X.to("cpu"), y.to("cpu")

    # Setup prediction boundaries and grid
    x_min, x_max = X[:, 0].min() - 0.1, X[:, 0].max() + 0.1
    y_min, y_max = X[:, 1].min() - 0.1, X[:, 1].max() + 0.1
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 101), np.linspace(y_min, y_max, 101))

    # Make features
    X_to_pred_on = torch.from_numpy(np.column_stack((xx.ravel(), yy.ravel()))).float()

    # Make predictions
    model.eval()
    with torch.inference_mode():
        y_logits = model(X_to_pred_on)

    # Test for multi-class or binary and adjust logits to prediction labels
    if len(torch.unique(y)) > 2:
        y_pred = torch.softmax(y_logits, dim=1).argmax(dim=1)  # mutli-class
    else:
        y_pred = torch.round(torch.sigmoid(y_logits))  # binary

    # Reshape preds and plot
    y_pred = y_pred.reshape(xx.shape).detach().numpy()
    plt.contourf(xx, yy, y_pred, cmap=plt.cm.RdYlBu, alpha=0.7)
    plt.scatter(X[:, 0], X[:, 1], c=y, s=40, cmap=plt.cm.RdYlBu)
    plt.xlim(xx.min(), xx.max())
    plt.ylim(yy.min(), yy.max())


## Plot the decision boundaries

In [None]:
plt.figure(figsize=(12,6))
plt.subplot(1,2,1)
plt.title("Train")
plot_decision_boundary(model, X_train, y_train)
plt.subplot(1,2,2)
plt.title("Test")
plot_decision_boundary(model, X_test, y_test)
plt.show()


## Classification Report and Confusion Matrix

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
predicted_labels_arr=[]

In [None]:
with torch.no_grad():
    outputs = model(X_test)
    y_pred = torch.sigmoid(outputs)
    predicted_labels = (y_pred > 0.5).cpu().numpy().astype(int)
    predicted_labels_arr.append(predicted_labels)

# Generate classification report
report_dict = classification_report(y_test, predicted_labels, output_dict=True)

# Create confusion matrix
cm = confusion_matrix(y_test, predicted_labels)

In [None]:
# Plot confusion matrix
plt.figure(figsize=(8, 6))
class_names = ['Non-Attack', 'Attack']

sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names)
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')

# Print detailed report
print("\n=== Classification Report ===")
print(f"\nAccuracy: {report_dict['accuracy']:.4f}")
print("\nDetailed Metrics:")

metrics_df = pd.DataFrame({
        'Precision': [report_dict['0.0']['precision'], report_dict['1.0']['precision']],
        'Recall': [report_dict['0.0']['recall'], report_dict['1.0']['recall']],
        'F1-Score': [report_dict['0.0']['f1-score'], report_dict['1.0']['f1-score']],
        'Support': [report_dict['0.0']['support'], report_dict['1.0']['support']]
    }, index=class_names)

print(metrics_df.round(4))

In [None]:
## Convert predicted labels to a list
predicted_labels_arr=predicted_labels_arr[0].flatten().tolist()

In [None]:
## Convert true values to a list
true_labels=y_test.detach().numpy().tolist()

## ROC AUC

In [None]:
from sklearn.metrics import roc_curve, roc_auc_score
# Compute ROC curve
fpr, tpr, thresholds = roc_curve(true_labels, predicted_labels_arr)

# Compute AUC score
auc_score = roc_auc_score(true_labels, predicted_labels_arr)

# Plot the ROC curve
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f"AUC = {auc_score:.4f}", color="blue")
plt.plot([0, 1], [0, 1], linestyle="--", color="gray")  # Diagonal line for random guessing
plt.title("ROC Curve")
plt.xlabel("False Positive Rate (FPR)")
plt.ylabel("True Positive Rate (TPR)")
plt.legend(loc="lower right")
plt.grid()
# # Add threshold annotations
# threshold_points = [0.2, 0.5]
# for thresh in threshold_points:
#     idx = (np.abs(thresholds - thresh)).argmin()
#     plt.annotate(f'threshold={thresh:.1f}',
#                     xy=(fpr[idx], tpr[idx]),
#                     xytext=(10, -10),
#                     textcoords='offset points',
#                     ha='left',
#                     va='top',
#                     fontsize=8)
plt.show()