In [None]:
# ==================================
# ResNet Implementation for Tabular CAN Data (Imbalance Handling + Test Evaluation)
# ==================================
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# ==================================
# Feature Engineering
# ==================================
def create_features(df):
    df['CAN_ID_numeric'] = df['CAN_ID_hex'].apply(lambda x: int(str(x), 16) if pd.notna(x) else 0)
    df['Data_clean'] = df['Data_hex'].astype(str).str.replace(' ', '').fillna('00000000')

    for i in range(8):
        df[f'Byte_{i}'] = df['Data_clean'].apply(
            lambda x: int(x[i*2:(i+1)*2], 16) if len(x) > i*2+1 else 0
        )

    byte_cols = [f'Byte_{i}' for i in range(8)]
    df['Data_mean'] = df[byte_cols].mean(axis=1)
    df['Data_std'] = df[byte_cols].std(axis=1).fillna(0)
    df['Data_sum'] = df[byte_cols].sum(axis=1)

    if 'DLC' in df.columns:
        df['DLC_cat'] = LabelEncoder().fit_transform(df['DLC'].astype(str))
    else:
        df['DLC_cat'] = 0

    df['CAN_Priority'] = pd.cut(df['CAN_ID_numeric'],
                                bins=[-1, 255, 1023, np.inf],
                                labels=[0, 1, 2]).astype(int)
    return df

# ==================================
# Dataset Class
# ==================================
class CANDataset(Dataset):
    def __init__(self, df, features, labels=None):
        self.X = torch.tensor(df[features].values, dtype=torch.float32)
        self.y = torch.tensor(labels.values, dtype=torch.long) if labels is not None else None

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        if self.y is not None:
            return self.X[idx], self.y[idx]
        return self.X[idx]

# ==================================
# ResNet for Tabular Data
# ==================================
class BasicBlock(nn.Module):
    def __init__(self, in_features, hidden_features):
        super().__init__()
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.bn1 = nn.BatchNorm1d(hidden_features)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_features, in_features)
        self.bn2 = nn.BatchNorm1d(in_features)

    def forward(self, x):
        identity = x
        out = self.relu(self.bn1(self.fc1(x)))
        out = self.bn2(self.fc2(out))
        out += identity
        return self.relu(out)

class ResNetTabular(nn.Module):
    def __init__(self, input_dim, num_classes=2, num_blocks=3, hidden_dim=128):
        super().__init__()
        self.input_layer = nn.Linear(input_dim, hidden_dim)
        self.bn = nn.BatchNorm1d(hidden_dim)
        self.relu = nn.ReLU()
        self.blocks = nn.Sequential(*[BasicBlock(hidden_dim, hidden_dim) for _ in range(num_blocks)])
        self.fc_out = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = self.relu(self.bn(self.input_layer(x)))
        x = self.blocks(x)
        return self.fc_out(x)

# ==================================
# Training Function with Class Weights
# ==================================
def train_resnet(train_loader, val_loader, input_dim, epochs=30, lr=1e-3, device="cuda"):
    model = ResNetTabular(input_dim=input_dim).to(device)

    # Compute class weights
    all_labels = []
    for _, y_batch in train_loader:
        all_labels.extend(y_batch.numpy())
    class_counts = np.bincount(all_labels)
    class_weights = torch.tensor([1.0 / c for c in class_counts], dtype=torch.float32).to(device)
    class_weights = class_weights / class_weights.sum()  # normalize

    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    best_acc = 0
    for epoch in range(epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

        # Validation
        model.eval()
        all_preds, all_labels_val = [], []
        with torch.no_grad():
            for X_val, y_val in val_loader:
                X_val, y_val = X_val.to(device), y_val.to(device)
                preds = model(X_val).argmax(1).cpu().numpy()
                all_preds.extend(preds)
                all_labels_val.extend(y_val.cpu().numpy())
        acc = accuracy_score(all_labels_val, all_preds)
        if acc > best_acc:
            best_acc = acc
            torch.save(model.state_dict(), "best_resnet_model.pth")
        print(f"Epoch {epoch+1}/{epochs} - Val Accuracy: {acc:.4f}")

    print("\nðŸ“Š Final Validation Report:")
    print(classification_report(all_labels_val, all_preds, digits=4))
    return model

# ==================================
# Main Pipeline
# ==================================
if __name__ == "__main__":
    # Training files
    train_files = [
         "/Users/bodapati/Downloads/Car_Hacking_Challenge_Dataset_rev20Mar2021/0_Preliminary/0_Training/Pre_train_D_0.csv",
   "/Users/bodapati/Downloads/Car_Hacking_Challenge_Dataset_rev20Mar2021/0_Preliminary/0_Training/Pre_train_D_1.csv",
   "/Users/bodapati/Downloads/Car_Hacking_Challenge_Dataset_rev20Mar2021/0_Preliminary/0_Training/Pre_train_D_2.csv",
   "/Users/bodapati/Downloads/Car_Hacking_Challenge_Dataset_rev20Mar2021/0_Preliminary/0_Training/Pre_train_S_0.csv",
   "/Users/bodapati/Downloads/Car_Hacking_Challenge_Dataset_rev20Mar2021/0_Preliminary/0_Training/Pre_train_S_1.csv",
    "/Users/bodapati/Downloads/Car_Hacking_Challenge_Dataset_rev20Mar2021/0_Preliminary/0_Training/Pre_train_S_2.csv"
    ]
    # Load and concatenate training data
    train_df = pd.concat([pd.read_csv(f) for f in train_files], ignore_index=True)
    train_df.rename(columns={'Arbitration_ID':'CAN_ID_hex','Data':'Data_hex'}, inplace=True)
    train_df = create_features(train_df)

    # Binary labels
    if 'R' in train_df['Class'].values:
        train_df['Binary_Label'] = train_df['Class'].apply(lambda x: 1 if x == 'R' else 0)
    elif 'Attack' in train_df['Class'].values:
        train_df['Binary_Label'] = train_df['Class'].apply(lambda x: 1 if x == 'Attack' else 0)
    elif 'SubClass' in train_df.columns:
        train_df['Binary_Label'] = train_df['SubClass'].apply(lambda x: 0 if pd.isna(x) or x == 'Normal' else 1)
    else:
        train_df['Binary_Label'] = np.random.choice([0, 1], size=len(train_df), p=[0.7, 0.3])

    features = ['CAN_ID_numeric','Data_mean','Data_std','Data_sum'] + [f'Byte_{i}' for i in range(8)] + ['DLC_cat','CAN_Priority']

    # Train/validation split
    X_train, X_val, y_train, y_val = train_test_split(
        train_df[features], train_df['Binary_Label'],
        test_size=0.2, stratify=train_df['Binary_Label'], random_state=42
    )

    train_ds = CANDataset(X_train.join(y_train), features, y_train)
    val_ds = CANDataset(X_val.join(y_val), features, y_val)

    train_loader = DataLoader(train_ds, batch_size=128, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=128)

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = train_resnet(train_loader, val_loader, input_dim=len(features), device=device)

    # ==================================
    # Test Files
    # ==================================
    test_files = [
       "/Users/bodapati/Downloads/Car_Hacking_Challenge_Dataset_rev20Mar2021/0_Preliminary/1_Submission/Pre_submit_D.csv",
    "/Users/bodapati/Downloads/Car_Hacking_Challenge_Dataset_rev20Mar2021/0_Preliminary/1_Submission/Pre_submit_S.csv"
]

    test_df = pd.concat([pd.read_csv(f) for f in test_files], ignore_index=True)
    test_df.rename(columns={'Arbitration_ID':'CAN_ID_hex','Data':'Data_hex'}, inplace=True)
    test_df = create_features(test_df)

    # Binary labels if exist
    if 'Class' in test_df.columns:
        if 'R' in test_df['Class'].values:
            test_df['Binary_Label'] = test_df['Class'].apply(lambda x: 1 if x == 'R' else 0)
        elif 'Attack' in test_df['Class'].values:
            test_df['Binary_Label'] = test_df['Class'].apply(lambda x: 1 if x == 'Attack' else 0)
        elif 'SubClass' in test_df.columns:
            test_df['Binary_Label'] = test_df['SubClass'].apply(lambda x: 0 if pd.isna(x) or x == 'Normal' else 1)
        else:
            test_df['Binary_Label'] = np.random.choice([0, 1], size=len(test_df), p=[0.7, 0.3])
    else:
        test_df['Binary_Label'] = np.zeros(len(test_df))  # dummy labels

    test_ds = CANDataset(test_df, features, test_df['Binary_Label'])
    test_loader = DataLoader(test_ds, batch_size=128)

    # ==================================
    # Evaluate on Test Data
    # ==================================
    model.eval()
    all_preds, all_labels_test = [], []
    with torch.no_grad():
        for X_test, y_test in test_loader:
            X_test, y_test = X_test.to(device), y_test.to(device)
            preds = model(X_test).argmax(1).cpu().numpy()
            all_preds.extend(preds)
            all_labels_test.extend(y_test.cpu().numpy())

    print("\nðŸ“Š Test Set Report:")
    print(classification_report(all_labels_test, all_preds, digits=4))


Epoch 1/30 - Val Accuracy: 0.9819
Epoch 2/30 - Val Accuracy: 0.9825
Epoch 3/30 - Val Accuracy: 0.9827
Epoch 4/30 - Val Accuracy: 0.9806
Epoch 5/30 - Val Accuracy: 0.9808
Epoch 6/30 - Val Accuracy: 0.9789
Epoch 7/30 - Val Accuracy: 0.9792
Epoch 8/30 - Val Accuracy: 0.9839
Epoch 9/30 - Val Accuracy: 0.9770
Epoch 10/30 - Val Accuracy: 0.9816
Epoch 11/30 - Val Accuracy: 0.9836
Epoch 12/30 - Val Accuracy: 0.9812
Epoch 13/30 - Val Accuracy: 0.9782
Epoch 14/30 - Val Accuracy: 0.9804
Epoch 15/30 - Val Accuracy: 0.9788
Epoch 16/30 - Val Accuracy: 0.9808
Epoch 17/30 - Val Accuracy: 0.9805
Epoch 18/30 - Val Accuracy: 0.9816
Epoch 19/30 - Val Accuracy: 0.9816
Epoch 20/30 - Val Accuracy: 0.9800
Epoch 21/30 - Val Accuracy: 0.9834
Epoch 22/30 - Val Accuracy: 0.9816
Epoch 23/30 - Val Accuracy: 0.9835
Epoch 24/30 - Val Accuracy: 0.9788
Epoch 25/30 - Val Accuracy: 0.9799
Epoch 26/30 - Val Accuracy: 0.9819
Epoch 27/30 - Val Accuracy: 0.9805
Epoch 28/30 - Val Accuracy: 0.9804
Epoch 29/30 - Val Accuracy: 0