importing the required libraries and data pre processing 

In [1]:
import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


In [2]:


def preprocess_data():
    # Load and preprocess the data
    raw_mail_data = pd.read_csv("mail_data.csv")
    mail_data = raw_mail_data.where((pd.notnull(raw_mail_data)), '')

    # Label spam mails as 0 and ham mails as 1
    mail_data.loc[mail_data['Category']=='spam','Category'] = 0
    mail_data.loc[mail_data['Category']=='ham','Category'] = 1

    # Convert Category column to numeric type
    mail_data['Category'] = pd.to_numeric(mail_data['Category'])

    # Separate the data as text and labels
    X = mail_data['Message']
    Y = mail_data['Category']

    # Split the data into training and testing sets
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=3)

    # Convert text data to feature vectors
    feature_extraction = TfidfVectorizer(min_df=1, stop_words='english', lowercase=True)
    X_train_features = feature_extraction.fit_transform(X_train)
    X_test_features = feature_extraction.transform(X_test)

    # Convert to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train_features.toarray())
    Y_train_tensor = torch.FloatTensor(Y_train.values).view(-1, 1)
    X_test_tensor = torch.FloatTensor(X_test_features.toarray())
    Y_test_tensor = torch.FloatTensor(Y_test.values).view(-1, 1)

    print(f"Training data shape: {X_train_tensor.shape}")
    print(f"Testing data shape: {X_test_tensor.shape}")

    return X_train_tensor, Y_train_tensor, X_test_tensor, Y_test_tensor, feature_extraction

if __name__ == "__main__":
    X_train, Y_train, X_test, Y_test, feature_extractor = preprocess_data() 

Training data shape: torch.Size([4457, 7431])
Testing data shape: torch.Size([1115, 7431])


# Creating the model 

In [3]:


class MailDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

class SpamClassifier(nn.Module):
    def __init__(self, input_size):
        super(SpamClassifier, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, 512),  # First hidden layer
            nn.ReLU(),                   # Activation function
            nn.Dropout(0.3),            # Dropout for regularization
            nn.Linear(512, 256),        # Second hidden layer
            nn.ReLU(),                   # Activation function
            nn.Dropout(0.2),            # Dropout for regularization
            nn.Linear(256, 128),        # Third hidden layer
            nn.ReLU(),                   # Activation function
            nn.Dropout(0.1),            # Dropout for regularization
            nn.Linear(128, 1),          # Output layer
            nn.Sigmoid()                # Sigmoid activation for binary classification
        )
    
    def forward(self, x):
        return self.network(x)

def create_dataloaders(X_train, Y_train, X_test, Y_test, batch_size=32):
    # Create datasets
    train_dataset = MailDataset(X_train, Y_train)
    test_dataset = MailDataset(X_test, Y_test)
    
    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, test_loader

def initialize_model(input_size):
    model = SpamClassifier(input_size)
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    return model, criterion, optimizer 

# Training the model

In [4]:



def train_model(num_epochs=10, batch_size=32):
    # Preprocess data
    X_train, Y_train, X_test, Y_test, feature_extractor = preprocess_data()
    
    # Create dataloaders
    train_loader, test_loader = create_dataloaders(X_train, Y_train, X_test, Y_test, batch_size)
    
    # Initialize model
    input_size = X_train.shape[1]
    model, criterion, optimizer = initialize_model(input_size)
    
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    
    print(f"Training on device: {device}")
    
    # Training loop
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for batch_idx, (features, labels) in enumerate(train_loader):
            features, labels = features.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(features)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            predicted = (outputs > 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            # Print progress every 100 batches
            if (batch_idx + 1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], '
                      f'Loss: {loss.item():.4f}, Accuracy: {100 * correct / total:.2f}%')
        
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = 100 * correct / total
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%')
        print('-' * 50)
    
    # Save the model
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'feature_extractor': feature_extractor
    }, 'spam_classifier.pth')
    
    return model, feature_extractor

if __name__ == "__main__":
    model, feature_extractor = train_model() 

Training data shape: torch.Size([4457, 7431])
Testing data shape: torch.Size([1115, 7431])
Training on device: cpu
Epoch [1/10], Batch [100/140], Loss: 0.0954, Accuracy: 87.34%
Epoch [1/10], Loss: 0.2262, Accuracy: 90.26%
--------------------------------------------------
Epoch [2/10], Batch [100/140], Loss: 0.0013, Accuracy: 99.00%
Epoch [2/10], Loss: 0.0311, Accuracy: 99.10%
--------------------------------------------------
Epoch [3/10], Batch [100/140], Loss: 0.0007, Accuracy: 99.66%
Epoch [3/10], Loss: 0.0071, Accuracy: 99.71%
--------------------------------------------------
Epoch [4/10], Batch [100/140], Loss: 0.0001, Accuracy: 99.97%
Epoch [4/10], Loss: 0.0027, Accuracy: 99.96%
--------------------------------------------------
Epoch [5/10], Batch [100/140], Loss: 0.0003, Accuracy: 99.97%
Epoch [5/10], Loss: 0.0015, Accuracy: 99.96%
--------------------------------------------------
Epoch [6/10], Batch [100/140], Loss: 0.0000, Accuracy: 100.00%
Epoch [6/10], Loss: 0.0009, Accu

# Predictions 

In [5]:

def evaluate_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for features, labels in test_loader:
            features, labels = features.to(device), labels.to(device)
            outputs = model(features)
            predicted = (outputs > 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')
    return accuracy

def predict_mail(text, model, feature_extractor, device):
    # Convert text to feature vector
    input_features = feature_extractor.transform([text])
    input_tensor = torch.FloatTensor(input_features.toarray()).to(device)
    
    # Make prediction
    model.eval()
    with torch.no_grad():
        output = model(input_tensor)
        prediction = (output > 0.5).float()
    
    return 'Ham mail' if prediction.item() == 1 else 'Spam mail'

def load_model(model_path, input_size):
    checkpoint = torch.load(model_path)
    model = SpamClassifier(input_size)
    model.load_state_dict(checkpoint['model_state_dict'])
    feature_extractor = checkpoint['feature_extractor']
    return model, feature_extractor

if __name__ == "__main__":
    # Load or train the model
    try:
        # Try to load the saved model
        X_train, _, _, _, _ = preprocess_data()
        model, feature_extractor = load_model('spam_classifier.pth', X_train.shape[1])
    except:
        # If no saved model exists, train a new one
        print("No saved model found. Training a new model...")
        model, feature_extractor = train_model()
    
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    
    
    # You can also evaluate the model on the test set
    _, _, X_test, Y_test, _ = preprocess_data()
    test_loader = torch.utils.data.DataLoader(
        torch.utils.data.TensorDataset(X_test, Y_test),
        batch_size=32,
        shuffle=False
    )
    evaluate_model(model, test_loader, device) 

Training data shape: torch.Size([4457, 7431])
Testing data shape: torch.Size([1115, 7431])


  checkpoint = torch.load(model_path)


Training data shape: torch.Size([4457, 7431])
Testing data shape: torch.Size([1115, 7431])
Test Accuracy: 98.39%


In [6]:
test_mail = 'REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode'
result = predict_mail(test_mail, model, feature_extractor, device)
print(f"Prediction for test mail: {result}")

Prediction for test mail: Spam mail
