In [1]:
import os
import requests
import tarfile
import zipfile
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import classification_report
import joblib
from sklearn.linear_model import LogisticRegression

In [2]:
# URLs for datasets
SPAMASSASSIN_SPAM_URL = 'https://spamassassin.apache.org/old/publiccorpus/20030228_spam.tar.bz2'
SPAMASSASSIN_HAM_URL = 'https://spamassassin.apache.org/old/publiccorpus/20030228_easy_ham.tar.bz2'
SMS_SPAM_URL = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip'

# Directories for extracted data
DATA_DIR = 'combined_spam_data'
SPAMASSASSIN_DIR = os.path.join(DATA_DIR, 'spamassassin')
SMS_DIR = os.path.join(DATA_DIR, 'sms_spam')

In [3]:
# Function to download and extract tar.bz2 datasets
def download_and_extract_tar(url, extract_to):
    response = requests.get(url, stream=True)
    tar_path = os.path.join(extract_to, os.path.basename(url))
    os.makedirs(extract_to, exist_ok=True)  # Ensure directory exists
    with open(tar_path, 'wb') as file:
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                file.write(chunk)
    with tarfile.open(tar_path, 'r:bz2') as tar:
        tar.extractall(path=extract_to)
    os.remove(tar_path)

In [4]:
# Function to download and extract zip datasets
def download_and_extract_zip(url, extract_to):
    response = requests.get(url, stream=True)
    zip_path = os.path.join(extract_to, os.path.basename(url))
    os.makedirs(extract_to, exist_ok=True)  # Ensure directory exists
    with open(zip_path, 'wb') as file:
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                file.write(chunk)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    os.remove(zip_path)

In [5]:
# Create data directory
os.makedirs(DATA_DIR, exist_ok=True)

# Download and extract SpamAssassin datasets
download_and_extract_tar(SPAMASSASSIN_SPAM_URL, SPAMASSASSIN_DIR)
download_and_extract_tar(SPAMASSASSIN_HAM_URL, SPAMASSASSIN_DIR)

# Download and extract SMS Spam dataset
download_and_extract_zip(SMS_SPAM_URL, SMS_DIR)

In [6]:
# Function to load emails from a directory
def load_emails_from_directory(directory):
    emails = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            with open(os.path.join(root, file), 'r', encoding='latin-1') as f:
                emails.append(f.read())
    return emails

In [7]:
# Load SpamAssassin dataset
spam_emails = load_emails_from_directory(os.path.join(SPAMASSASSIN_DIR, 'spam'))
ham_emails = load_emails_from_directory(os.path.join(SPAMASSASSIN_DIR, 'easy_ham'))

# Load SMS Spam dataset
sms_data_path = os.path.join(SMS_DIR, 'SMSSpamCollection')
sms_data = pd.read_csv(sms_data_path, sep='\t', names=['label', 'message'])
sms_data['label'] = sms_data['label'].map({'ham': 0, 'spam': 1})

In [8]:
# Create DataFrame for SpamAssassin data
spamassassin_data = pd.DataFrame({
    'message': spam_emails + ham_emails,
    'label': [1]*len(spam_emails) + [0]*len(ham_emails)
})

# Combine both datasets
combined_data = pd.concat([spamassassin_data, sms_data[['message', 'label']]], ignore_index=True)

# Shuffle the data
combined_data = combined_data.sample(frac=1).reset_index(drop=True)

In [9]:
combined_data

Unnamed: 0,message,label
0,Jokin only lar... :-) depends on which phone m...,0
1,No we put party 7 days a week and study lightl...,0
2,Call me when u're done...,0
3,What i mean is do they come chase you out when...,0
4,No break time one... How... I come out n get m...,0
...,...,...
8569,Don no da:)whats you plan?,0
8570,This is my number by vivek..,0
8571,Prepare to be pounded every night...,0
8572,Hi da:)how is the todays class?,0


In [10]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(combined_data['message'], combined_data['label'], test_size=0.3, random_state=42)

# Initialize the TF-IDF Vectorizer
vectorizer = TfidfVectorizer(stop_words='english')

# Fit and transform the training data, and transform the test data
X_train_tfidf = vectorizer.fit_transform(X_train)
X_test_tfidf = vectorizer.transform(X_test)

# Initialize the Logistic Regression classifier
model = LogisticRegression(max_iter=1000)

# Train the model
model.fit(X_train_tfidf, y_train)

# Make predictions
y_pred = model.predict(X_test_tfidf)

# Evaluate the model
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

           0       0.96      1.00      0.98      2189
           1       0.99      0.74      0.85       384

    accuracy                           0.96      2573
   macro avg       0.97      0.87      0.91      2573
weighted avg       0.96      0.96      0.96      2573



In [11]:
# Save the model and the vectorizer
joblib.dump(model, 'spam_classifier_model.joblib')
joblib.dump(vectorizer, 'tfidf_vectorizer.joblib')

['tfidf_vectorizer.joblib']

In [12]:
# Function to predict if a new message is spam or not
def predict_spam(message_content):
    # Load the model and vectorizer
    model = joblib.load('spam_classifier_model.joblib')
    vectorizer = joblib.load('tfidf_vectorizer.joblib')
    message_tfidf = vectorizer.transform([message_content])
    prediction = model.predict(message_tfidf)
    return 'Spam' if prediction[0] == 1 else 'Not Spam'


In [16]:

# Example usage
new_message = "Congratulations! You've won a $1,000 Walmart gift card. Click here to claim your prize."
print(predict_spam(new_message))


Spam


In [15]:
# Example usage
new_message = "Don no da:)whats you plan?"
print(predict_spam(new_message))

Not Spam
