<a href="https://colab.research.google.com/github/XanimGuliyeva/Spam_cClassification/blob/main/Spam_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install and Download Dataset
!pip install kaggle transformers --quiet
!kaggle datasets download -d wanderfj/enron-spam
!unzip -q enron-spam.zip -d enron_email_dataset

In [None]:
# Imports
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
from torch.utils.data import Dataset
import torch
from nltk.corpus import stopwords
import nltk
import re
from collections import defaultdict
import numpy as np
from collections import Counter

In [None]:
# Download NLTK Resources
nltk.download('punkt')
nltk.download('stopwords')

# Constants
BASE_DIR = 'enron_email_dataset'
ENRON_FOLDERS = [f'enron{i}' for i in range(1, 5)]
HAM_DIR = os.path.join(BASE_DIR, 'ham')
SPAM_DIR = os.path.join(BASE_DIR, 'spam')
STOP_WORDS = set(stopwords.words('english')) | {'email', 'subject', 'hi'}

In [None]:
# Load Emails
def load_all_emails(base_dir, folders):
    messages, labels = [], []
    for folder in folders:
        ham_dir = os.path.join(base_dir, folder, 'ham')
        spam_dir = os.path.join(base_dir, folder, 'spam')

        # Load ham emails
        for file in os.listdir(ham_dir):
            with open(os.path.join(ham_dir, file), 'r', encoding='latin-1') as f:
                messages.append(f.read())
                labels.append('ham')

        # Load spam emails
        for file in os.listdir(spam_dir):
            with open(os.path.join(spam_dir, file), 'r', encoding='latin-1') as f:
                messages.append(f.read())
                labels.append('spam')

    return messages, labels

all_messages, all_labels = load_all_emails(BASE_DIR, ENRON_FOLDERS)

# Create a DataFrame
emails = pd.DataFrame({
    'Label': all_labels,
    'Message': all_messages
})

# Display dataset summary
print(emails.head())
print(f"Dataset contains {len(emails)} emails.")

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from wordcloud import WordCloud

In [None]:
# Visualize Counts of Spam and Ham Emails
plt.figure(figsize=(6, 4))
sns.countplot(data=emails, x='Label', palette="viridis")
plt.title("Distribution of Ham and Spam Emails")
plt.xlabel("Email Type (0: Ham, 1: Spam)")
plt.ylabel("Count")
plt.xticks(ticks=[0, 1], labels=['Ham', 'Spam'])
plt.show()

In [None]:
# Data Preprocessing
def preprocess_text(text):
    text = re.sub(r'\W+', ' ', text.lower())
    return ' '.join([word for word in text.split() if word not in STOP_WORDS])

emails['Cleaned_Message'] = emails['Message'].apply(preprocess_text)
emails['Label'] = emails['Label'].map({'ham': 0, 'spam': 1})

# Print preprocessed data
print("Sample of preprocessed emails:")
print(emails[['Message', 'Cleaned_Message', 'Label']].head())

# Split Data
X_train, X_test, y_train, y_test = train_test_split(
    emails['Cleaned_Message'], emails['Label'], test_size=0.2, random_state=42
)

# Print dataset sizes
print(f"\nTraining data size: {len(X_train)}")
print(f"Testing data size: {len(X_test)}")

# Print a sample of training data
print("\nSample training data:")
for i in range(5):
    print(f"Email: {X_train.iloc[i]}")
    print(f"Label: {y_train.iloc[i]}")
    print("---")

In [None]:
# Visualize Most Frequent Words
all_words = ' '.join(emails['Cleaned_Message'])
word_freq = Counter(all_words.split())
most_common_words = word_freq.most_common(20)

In [None]:
# Plot Most Frequent Words
words, frequencies = zip(*most_common_words)
plt.figure(figsize=(12, 6))
plt.bar(words, frequencies, color='skyblue')
plt.xticks(rotation=45, ha='right')
plt.xlabel('Words')
plt.ylabel('Frequency')
plt.title('Top 20 Most Frequent Words in All Emails')
plt.show()

In [None]:
# Generate Word Cloud
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(all_words)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title('Word Cloud of Frequent Words in Emails')
plt.show()

In [None]:
# Visualize Most Frequent Words for Spam Emails
spam_words = ' '.join(emails[emails['Label'] == 1]['Cleaned_Message'])
spam_word_freq = Counter(spam_words.split())
spam_most_common = spam_word_freq.most_common(20)

# Plot Most Frequent Words in Spam Emails
spam_words, spam_freqs = zip(*spam_most_common)
plt.figure(figsize=(12, 6))
plt.bar(spam_words, spam_freqs, color='red')
plt.xticks(rotation=45, ha='right')
plt.xlabel('Words')
plt.ylabel('Frequency')
plt.title('Top 20 Most Frequent Words in Spam Emails')
plt.show()

In [None]:
# Visualize Most Frequent Words for Ham Emails
ham_words = ' '.join(emails[emails['Label'] == 0]['Cleaned_Message'])
ham_word_freq = Counter(ham_words.split())
ham_most_common = ham_word_freq.most_common(20)

# Plot Most Frequent Words in Ham Emails
ham_words, ham_freqs = zip(*ham_most_common)
plt.figure(figsize=(12, 6))
plt.bar(ham_words, ham_freqs, color='green')
plt.xticks(rotation=45, ha='right')
plt.xlabel('Words')
plt.ylabel('Frequency')
plt.title('Top 20 Most Frequent Words in Ham Emails')
plt.show()

In [None]:
# Save Visualizations as Files
wordcloud.to_file("wordcloud_all_emails.png")

In [None]:
print("Unique values in 'Label' column before mapping:")
print(emails['Label'].unique())


In [None]:
from collections import Counter
from wordcloud import WordCloud

# Visualize Counts of Spam and Ham Emails
plt.figure(figsize=(6, 4))
sns.countplot(data=emails, x='Label', palette="viridis")
plt.title("Distribution of Ham and Spam Emails")
plt.xlabel("Email Type (0: Ham, 1: Spam)")
plt.ylabel("Count")
plt.xticks(ticks=[0, 1], labels=['Ham', 'Spam'])
plt.show()

In [None]:
vectorizer = TfidfVectorizer(max_features=5000)
X_train_tfidf = vectorizer.fit_transform(X_train)
X_test_tfidf = vectorizer.transform(X_test)

nb_model = MultinomialNB()
nb_model.fit(X_train_tfidf, y_train)

In [None]:
# Get the mapping of tokens to IDs
token_to_id = vectorizer.vocabulary_

# Print the token-to-ID mapping
print(token_to_id)


In [None]:
# Print the tokenized vocabulary
print(vectorizer.get_feature_names_out())

In [None]:
# Evaluate Model
y_pred_nb = nb_model.predict(X_test_tfidf)
accuracy = accuracy_score(y_test, y_pred_nb)
print(f"Naive Bayes Accuracy: {accuracy:.4f}")
print("\nClassification Report:\n", classification_report(y_test, y_pred_nb))

In [None]:
# Plot Confusion Matrix
conf_matrix = confusion_matrix(y_test, y_pred_nb)
plt.figure(figsize=(6, 5))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['Not Spam', 'Spam'], yticklabels=['Not Spam', 'Spam'])
plt.title('Confusion Matrix - Naive Bayes')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

In [None]:
# Save Naive Bayes Model
import joblib
joblib.dump(nb_model, 'naive_bayes_model.pkl')
joblib.dump(vectorizer, 'tfidf_vectorizer.pkl')

In [None]:
# Check for GPU Availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

DistilBertTokenizer.from_pretrained(...) - 	Loads a pre-trained tokenizer for tokenizing input text into a format suitable for DistilBERT.
DistilBertForSequenceClassification.from_pretrained(...) - Loads a pre-trained DistilBERT model fine-tuned for classification tasks.


In [None]:
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
model = DistilBertForSequenceClassification.from_pretrained(
    "distilbert-base-uncased",
    num_labels=2
)
model.to(device)

In [None]:
class EmailDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=512):
        self.encodings = tokenizer(
            list(texts),
            truncation=True,
            padding=True,
            max_length=max_len
        )
        # store labesl
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {
            'input_ids': torch.tensor(self.encodings['input_ids'][idx], dtype=torch.long),
            'attention_mask': torch.tensor(self.encodings['attention_mask'][idx], dtype=torch.long),
            'labels': torch.tensor(self.labels[idx], dtype=torch.long)
        }


# Prepare Data for BERT
train_dataset = EmailDataset(X_train.tolist(), y_train.tolist(), tokenizer)
test_dataset = EmailDataset(X_test.tolist(), y_test.tolist(), tokenizer)

In [None]:
from transformers import DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
from sklearn.metrics import precision_recall_fscore_support
from transformers import EarlyStoppingCallback
import numpy as np

In [None]:
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = torch.tensor(predictions).argmax(dim=-1)
    labels = torch.tensor(labels)
    precision, recall, f1, _ = precision_recall_fscore_support(labels.numpy(), predictions.numpy(), average="binary")
    acc = accuracy_score(labels.numpy(), predictions.numpy())
    return {"accuracy": acc, "precision": precision, "recall": recall, "f1": f1}

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=1e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=4,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=50,
    load_best_model_at_end=True,
    greater_is_better=True,
    fp16=False,                                # Whether to use mixed precision training (False in this case)
    #  means the model will use full precision
    # It reduces memory usage and speeds up computations, especially on GPUs that support mixed precision
    dataloader_num_workers=4,
    metric_for_best_model="eval_loss"
)

# Trainer Initialization
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    data_collator=data_collator,   # Include tokenizer for logging compatibility
    compute_metrics=compute_metrics
)

# Train the model
trainer.train()


In [None]:
eval_results = trainer.evaluate()
print(f"Available metrics: {eval_results.keys()}")


In [None]:
# Predictions and Evaluation
predictions = trainer.predict(test_dataset)
y_pred_bert = predictions.predictions.argmax(axis=-1)

# Metrics
print(f"BERT Accuracy: {accuracy_score(y_test, y_pred_bert):.4f}")
print("\nBERT Classification Report:\n", classification_report(y_test, y_pred_bert))

In [None]:
# Group training loss by epoch
training_logs = trainer.state.log_history
train_loss_per_epoch = defaultdict(list)

for log in training_logs:
    if 'loss' in log and 'epoch' in log:
        train_loss_per_epoch[int(log['epoch'])].append(log['loss'])

# Compute average training loss per epoch
avg_train_loss = [np.mean(train_loss_per_epoch[epoch]) for epoch in range(1, len(train_loss_per_epoch) + 1)]

# Validation loss per epoch
eval_loss = [log['eval_loss'] for log in training_logs if 'eval_loss' in log]

# Ensure lengths match
epochs = list(range(1, min(len(avg_train_loss), len(eval_loss)) + 1))
avg_train_loss = avg_train_loss[:len(epochs)]
eval_loss = eval_loss[:len(epochs)]

# Plot Training and Validation Loss
plt.figure(figsize=(8, 6))
plt.plot(epochs, avg_train_loss, label='Training Loss (Epoch Average)', marker='o')
plt.plot(epochs, eval_loss, label='Validation Loss', marker='o')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

y_true = predictions.label_ids                    # True labels

# Compute confusion matrix
cm = confusion_matrix(y_true, y_pred_bert, labels=[0, 1])

# Display confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Ham", "Spam"])
disp.plot(cmap="Blues", values_format="d")
plt.title("Confusion Matrix for BERT Model")
plt.show()

In [None]:
# Save BERT Model
model.save_pretrained('./bert_saved_model')
tokenizer.save_pretrained('./bert_saved_model')