In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from torchsummary import summary

# CustomResidualUnit (unchanged)
class CustomResidualUnit(nn.Module):
    def __init__(self, input_filters, output_filters):
        super(CustomResidualUnit, self).__init__()
        self.pointwise1 = nn.Conv1d(input_filters, input_filters, kernel_size=1, stride=1, padding=0, bias=False)
        self.norm1 = nn.BatchNorm1d(input_filters)
        self.depthwise_conv = nn.Conv1d(input_filters, input_filters, kernel_size=5, stride=1, padding=2, groups=input_filters, bias=False)
        self.norm2 = nn.BatchNorm1d(input_filters)
        self.pointwise2 = nn.Conv1d(input_filters, output_filters, kernel_size=1, stride=1, padding=0, bias=False)
        self.norm3 = nn.BatchNorm1d(output_filters)
        self.act = nn.GELU()
        self.use_shortcut = input_filters == output_filters
    
    def forward(self, x):
        shortcut = x
        out = self.pointwise1(x)
        out = self.norm1(out)
        out = self.act(out)
        out = self.depthwise_conv(out)
        out = self.norm2(out)
        out = self.act(out)
        out = self.pointwise2(out)
        out = self.norm3(out)
        if self.use_shortcut:
            out += shortcut
        return self.act(out)

# TabularFeatureExtractor (updated to handle 25 features)
class TabularFeatureExtractor(nn.Module):
    def __init__(self, num_features=25, num_outputs=2, dropout_prob=0.3):
        super(TabularFeatureExtractor, self).__init__()
        self.dense1 = nn.Linear(num_features, 256)
        self.conv_initial = nn.Conv1d(1, 32, kernel_size=5, stride=1, padding=2)
        self.norm_initial = nn.BatchNorm1d(32, track_running_stats=False)
        self.act_initial = nn.GELU()
        self.res_unit1 = CustomResidualUnit(32, 32)
        self.res_unit2 = CustomResidualUnit(32, 64)
        self.conv_final = nn.Conv1d(64, 1, kernel_size=3, stride=1, padding=1)
        self.norm_final = nn.BatchNorm1d(1)
        self.act_final = nn.GELU()
        self.pooling = nn.MaxPool1d(kernel_size=3, stride=3)
        self.dropout_layer = nn.Dropout(dropout_prob)
        self.dense2 = nn.Linear(85, 48)  # Note: this might need adjustment based on output shape
        self.dense3 = nn.Linear(48, num_outputs)

    def forward(self, x):
        x = self.dense1(x).unsqueeze(1)
        x = self.conv_initial(x)
        x = self.norm_initial(x)
        x = self.act_initial(x)
        x = self.res_unit1(x)
        x = self.res_unit2(x)
        x = self.conv_final(x)
        x = self.norm_final(x)
        x = self.act_final(x)
        x = self.pooling(x)
        x = x.view(x.size(0), -1)
        x = torch.nn.functional.gelu(self.dense2(x))
        x = self.dropout_layer(x)
        x = self.dense3(x)
        return x

# Custom Dataset Class
class TabularDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# Load and preprocess data
file_path = "/Users/gnaneshwarkandula/Downloads/phishing/50000_samples_data.csv"
df = pd.read_csv(file_path)

# Extract features, labels, and URLs
feature_cols = [f'f{i}' for i in range(1, 56)]  # f1 to f55
features = df[feature_cols].values
labels = df['label'].values
urls = df['URL'].values

# Split the data into training and testing sets
X_train, X_test, y_train, y_test, urls_train, urls_test = train_test_split(
    features, labels, urls, test_size=0.3, random_state=42, stratify=labels
)

# Standardize features before PCA
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Apply PCA to reduce to 25 features
pca = PCA(n_components=25)
X_train_pca = pca.fit_transform(X_train_scaled)
X_test_pca = pca.transform(X_test_scaled)

print(f"Explained variance ratio of 25 components: {sum(pca.explained_variance_ratio_):.4f}")

# Create train and test datasets
train_dataset = TabularDataset(X_train_pca, y_train)
test_dataset = TabularDataset(X_test_pca, y_test)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Initialize model, loss, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TabularFeatureExtractor(num_features=25, num_outputs=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training Loop
num_epochs = 5 
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Training Accuracy: {epoch_acc:.2f}%")

# Evaluation on test set
model.eval()
test_correct = 0
test_total = 0
test_predictions = []
test_true_labels = []

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        
        test_total += targets.size(0)
        test_correct += (predicted == targets).sum().item()
        
        test_predictions.extend(predicted.cpu().numpy())
        test_true_labels.extend(targets.cpu().numpy())

# Calculate test accuracy
test_accuracy = 100 * test_correct / test_total
print(f"\nTest Accuracy: {test_accuracy:.2f}%")

# Save results to a CSV
results_df = pd.DataFrame({
    'URL': urls_test,
    'True_Label': test_true_labels,
    'Predicted_Label': test_predictions
})
results_df.to_csv('model_predictions.csv', index=False)
print("Predictions saved to model_predictions.csv")

# Save the model
model_path = "tabular_feature_extractor1.pth"
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

# Corrected Model Summary
summary(model, input_size=(25,), batch_size=1)  # Adjusted to match raw input before unsqueeze

Explained variance ratio of 25 components: 0.8543
Epoch 1/2, Loss: 0.0422, Training Accuracy: 98.93%
Epoch 2/2, Loss: 0.0213, Training Accuracy: 99.55%

Test Accuracy: 99.51%
Predictions saved to model_predictions.csv
Model saved to tabular_feature_extractor1.pth
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                   [1, 256]           6,656
            Conv1d-2               [1, 32, 256]             192
       BatchNorm1d-3               [1, 32, 256]              64
              GELU-4               [1, 32, 256]               0
            Conv1d-5               [1, 32, 256]           1,024
       BatchNorm1d-6               [1, 32, 256]              64
              GELU-7               [1, 32, 256]               0
            Conv1d-8               [1, 32, 256]             160
       BatchNorm1d-9               [1, 32, 256]              64
             GELU-10           

In [14]:
import joblib
joblib.dump(scaler, "scaler.pkl")
joblib.dump(pca, "pca.pkl")
# Load later: scaler = joblib.load("scaler.pkl"), pca = joblib.load("pca.pkl")

['pca.pkl']