In [None]:
# --- Core Imports for Data Handling ---
import pandas as pd
import numpy as np
import warnings
# --- Scikit-learn Imports for ML ---

# 1. For splitting your data into training and test sets
from sklearn.model_selection import train_test_split

# 2. For converting text data into a matrix of numerical features (TF-IDF)
from sklearn.feature_extraction.text import TfidfVectorizer

# 3. The Naive Bayes classifier model
from sklearn.naive_bayes import MultinomialNB

# 4. For evaluating your model's performance
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

print("All necessary libraries are imported.")

In [None]:
# Define the path to your dataset
filepath = 'SMSSpamCollection'

try:
    # Load the dataset using pandas
    # The SMSSpamCollection dataset is a TSV (tab-separated-value) file
    # It also doesn't have a header row, so we set header=None
    # We provide our own column names: 'label' and 'message'
    df = pd.read_csv(filepath, sep='\t', header=None, names=['label', 'message'])

    # Display the first 5 rows to make sure it loaded correctly
    print("--- Dataset Head ---")
    print(df.head())
    print("\n") # Adds a blank line for better readability

    # Count the number of 'spam' and 'ham' messages
    print("--- Message Counts (Ham vs. Spam) ---")
    print(df['label'].value_counts())

except FileNotFoundError:
    print(f"Error: The file '{filepath}' was not found.")
    print("Please make sure the file is in the same directory as your notebook, or update the 'filepath' variable.")

In [None]:
# Define the path to your new dataset
filepath_emails = 'emails.csv'

try:
    # Load the CSV dataset using pandas
    # This file (unlike the last one) appears to have a header row
    df_emails = pd.read_csv(filepath_emails)

    # Display the first 5 rows to make sure it loaded correctly
    print("--- Dataset Head (emails.csv) ---")
    print(df_emails.head())
    print("\n") # Adds a blank line

    # Count the number of 'spam' (1) and 'ham' (0) messages
    print("--- Message Counts (0 = Ham, 1 = Spam) ---")
    # We can use value_counts() on the 'spam' column
    print(df_emails['spam'].value_counts())

except FileNotFoundError:
    print(f"Error: The file '{filepath_emails}' was not found.")
    print("Please make sure the file is in the same directory as your notebook, or update the 'filepath_emails' variable.")

In [None]:
# Define the path to your spambase dataset
# The original file is often named 'spambase.data', but use '.csv' if you've saved it that way
filepath_spambase = 'spambase.data' 

try:
    # Load the dataset using pandas
    # We set header=None because the file doesn't have a title row
    df_spambase = pd.read_csv(filepath_spambase, header=None)

    # Display the first 5 rows to make sure it loaded correctly
    # You will see columns indexed from 0 to 57
    print("--- Dataset Head (spambase.data) ---")
    print(df_spambase.head())
    print("\n")

    # For this dataset, the label is in the last column (index 57)
    # 1 = spam, 0 = not spam
    print("--- Message Counts (0 = Not Spam, 1 = Spam) ---")
    print(df_spambase[57].value_counts())

except FileNotFoundError:
    print(f"Error: The file '{filepath_spambase}' was not found.")
    print("Please make sure the file is in the same directory as your notebook, or update the 'filepath_spambase' variable.")

In [None]:
# Define the path to your new CSV dataset
filepath_better30 = 'BETTER30.csv'

try:
    # Load the CSV file using pandas
    df_better30 = pd.read_csv(filepath_better30)

    # Display the first 5 rows to make sure it loaded correctly
    print("--- Dataset Head (BETTER30.csv) ---")
    print(df_better30.head())
    print("\n") # Adds a blank line

    # Count the number of messages in each category
    print("--- Message Counts (by LABEL) ---")
    print(df_better30['LABEL'].value_counts())

except FileNotFoundError:
    print(f"Error: The file '{filepath_better30}' was not found.")
    print("Please make sure the file is in the same directory as your notebook, or update the 'filepath_better30' variable.")
except KeyError:
    print("\n--- ERROR ---")
    print("A 'KeyError' means the column name 'LABEL' wasn't found.")
    print("Please check your .csv file for the exact column name and update the code.")
    print("It might have different capitalization (e.g., 'Label' or 'label').")

In [None]:
# First, load the dataset as you did before
df_better30 = pd.read_csv('BETTER30.csv')

# 1. Create a dictionary to map your old labels to new ones
#    1 = Spam, 0 = Ham (Not Spam)
#    NOTE: You must add ALL the unique labels from your 'LABEL' column here!
label_map = {
    # --- Map "Spam-like" labels to 1 ---
    'suspicious': 1,
    'threat': 1,
    'evasive': 1,
    'Refusing': 1,  # From your image
    'Insisting': 1, # From your image
    'Classic sca': 1, # From your image (Classic scam)
    
    # --- Map "Ham-like" (normal) labels to 0 ---
    'Standard': 0,
    'Encourage': 0,
    'Reinforce': 0,
    'Demonstrate': 0,
    'Fulfills ca': 0, # From your image (Fulfills call)
    'Positive': 0,
    'Potential': 0,
    'Requesting': 0,
    'Adhering': 0,
    'neutral': 0 # From previous context
}

# 2. Apply this map to the 'LABEL' column to create a new 'is_spam' column
df_better30['is_spam'] = df_better30['LABEL'].map(label_map)

# 3. IMPORTANT: Drop any rows that didn't have a label in our map
#    .dropna() removes rows that now have 'NaN' (Not a Number)
df_better30_clean = df_better30.dropna(subset=['is_spam'])

# 4. (Optional but good) Change the new column to be an integer
df_better30_clean = df_better30_clean.astype({'is_spam': int})


# --- Check your work ---
print("--- New Binary Counts (0=Ham, 1=Spam) ---")
print(df_better30_clean['is_spam'].value_counts())

print("\n--- New DataFrame Head ---")
# You'll see the old 'LABEL' column and your new 'is_spam' column
print(df_better30_clean.head())

In [None]:
# We'll save our cleaned DataFrame to a new file
df_better30_clean.to_csv('better30_cleaned.csv', index=False)

print("Cleaned data has been saved to 'better30_cleaned.csv'")

In [None]:
# Define the path to your file
filepath_fraud = 'fraud_call.file'

try:
    # This is the "quick fix" code
    # It will skip any lines that have too many tabs
    df_fraud = pd.read_csv(
        filepath_fraud, 
        sep='\t', 
        header=None, 
        names=['label', 'message'],
        on_bad_lines='skip' 
    )

    # Display the first 5 rows to check
    print("--- Dataset Head (fraud_call.file) ---")
    print(df_fraud.head())
    print("\n")

    # Count the 'fraud' and 'normal' messages
    print("--- Message Counts ---")
    print(df_fraud['label'].value_counts())

except FileNotFoundError:
    print(f"Error: The file '{filepath_fraud}' was not found.")
    print("Please make sure the file is in the same folder as your Jupyter Notebook.")
except Exception as e:
    print(f"An error occurred: {e}")

DATASETS USED:  'SMSSpamCollection', 'emails.csv', 'spambase.data', 'better30_cleaned.csv', 'fraud_call.file', 'enron.csv'

Before oversampling Naive Bayes was struggling in identifying spam correctly and with confidence. Afterwards, it became to paranoid and was going for spam a lot.

In [None]:
# We'll just read the first 5 rows to find the column names
df_enron_preview = pd.read_csv('enron_spam_data.csv', nrows=5)

print("--- Enron.csv First 5 Rows ---")
print(df_enron_preview.head())
print("\n")

print("--- Enron.csv Column Names ---")
print(df_enron_preview.columns)

Below is the model code

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC  # Import the SVM
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import warnings

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# This master map will standardize all labels
# 0 = Ham/Not Spam, 1 = Spam/Fraud
LABEL_MAP = {
    # --- "Good" labels ---
    'ham': 0,
    'normal': 0,
    0: 0,
    
    # --- "Bad" labels ---
    'spam': 1,
    'fraud': 1,
    1: 1
}

# --- Step 1: Load and Standardize All 5 Datasets ---
all_dataframes = []

try:
    print("Loading datasets... (This may take a moment)")
    
    # Load SMSSpamCollection
    df_sms = pd.read_csv('SMSSpamCollection', sep='\t', header=None, names=['label', 'message'])
    df_sms['is_spam'] = df_sms['label'].map(LABEL_MAP)
    df_sms['text'] = df_sms['message']
    all_dataframes.append(df_sms[['text', 'is_spam']])

    # Load emails.csv
    df_emails = pd.read_csv('emails.csv')
    df_emails['is_spam'] = df_emails['spam'].map(LABEL_MAP)
    all_dataframes.append(df_emails[['text', 'is_spam']])

    # Load better30_cleaned.csv (Our processed file)
    df_better30 = pd.read_csv('better30_cleaned.csv')
    df_better30['text'] = df_better30['CONTEXT']
    all_dataframes.append(df_better30[['text', 'is_spam']])

    # Load fraud_call.file (using the 'skip' fix)
    df_fraud = pd.read_csv('fraud_call.file', sep='\t', header=None, names=['label', 'message'], on_bad_lines='skip')
    df_fraud['is_spam'] = df_fraud['label'].map(LABEL_MAP)
    df_fraud['text'] = df_fraud['message']
    all_dataframes.append(df_fraud[['text', 'is_spam']])
    
    # Load enron_spam_data.csv (Your new labeled file)
    df_enron = pd.read_csv('enron_spam_data.csv', usecols=['Message', 'Spam/Ham'], on_bad_lines='skip')
    df_enron['is_spam'] = df_enron['Spam/Ham'].map(LABEL_MAP)
    df_enron['text'] = df_enron['Message']
    all_dataframes.append(df_enron[['text', 'is_spam']])
    
    print("All 5 datasets loaded successfully.\n")

    # --- Step 2: Combine and Clean ---
    df_combined = pd.concat(all_dataframes, ignore_index=True)
    df_combined = df_combined.dropna()
    df_combined['is_spam'] = df_combined['is_spam'].astype(int)

    print(f"--- Total Combined Dataset ---")
    print(f"Total messages: {len(df_combined)}")
    print(df_combined['is_spam'].value_counts())
    print("\n")

    # --- Step 3: Define X and y, then Split Data ---
    X = df_combined['text']
    y = df_combined['is_spam']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    print(f"Training data size: {len(X_train)}")
    print(f"Testing data size: {len(X_test)}\n")

    # --- Step 4: Vectorize the Text (WITH N-GRAMS) ---
    print("Vectorizing text with N-Grams... (This may take a minute)")
    
    vectorizer = TfidfVectorizer(
        stop_words='english', 
        max_features=10000, 
        ngram_range=(1, 2)  # <-- This looks for 1-word AND 2-word phrases
    )
    
    X_train_tfidf = vectorizer.fit_transform(X_train)
    X_test_tfidf = vectorizer.transform(X_test)
    print("Text vectorization complete.\n")

    # --- Step 5: Train the SVM Model ---
    print("Training SVM model... (This may take a few minutes)")
    
    model = LinearSVC(
        class_weight='balanced',  
        dual="auto",              
        max_iter=3000             
    )
    
    model.fit(X_train_tfidf, y_train)

    print("✅ SVM N-Gram Model training complete! ✅\n")

    # --- Step 6: Evaluate the Model ---
    y_pred = model.predict(X_test_tfidf)
    
    print("==========================================================")
    print("   RESULTS: SVM on ALL 5 Datasets (with N-Grams)   ")
    print("==========================================================\n")
    
    print(f"Overall Accuracy: {accuracy_score(y_test, y_pred) * 100:.2f}%\n")
    
    print("Classification Report:")
    print(classification_report(y_test, y_pred, target_names=['Ham (0)', 'Spam (1)']))
    print("\n")

    print("Confusion Matrix:")
    print("         Predicted Ham | Predicted Spam")
    print("Actual Ham  ", confusion_matrix(y_test, y_pred)[0])
    print("Actual Spam ", confusion_matrix(y_test, y_pred)[1])

except FileNotFoundError as e:
    print(f"--- FILE NOT FOUND ERROR ---")
    print(f"Could not find the file: {e.filename}")
except KeyError as e:
    print(f"--- COLUMN NOT FOUND ERROR ---")
    print(f"Could not find the column: {e}")
except Exception as e:
    print(f"An error occurred: {e}")

Below is code for passing transcript through rule based filtering and then the model.

In [None]:
# -*- coding: utf-8 -*-
"""
interactive_spam_tester.py

Run this script in a new cell *after* you have run your
model training script.

This script assumes that the variables 'model' and 'vectorizer'
already exist in memory from the previous cell.

It will:
1. Define the High-Certainty Rule Analyzer.
2. Define a function to test new messages.
3. Run test cases.
"""

import re
import json
import os
import nltk
from nltk.tokenize import word_tokenize
from typing import Dict, Any, List, Tuple
import time

# ------------------------------------------------------------
# Ensure NLTK resources
# ------------------------------------------------------------
# (This is the same class from your spam_rule_analyzer.py file)
def ensure_nltk_resources():
    resources = ["punkt"]
    for res in resources:
        try:
            nltk.data.find(f"tokenizers/{res}")
        except LookupError:
            print(f"[NLTK] Downloading missing resource: {res}")
            nltk.download(res, quiet=True)

ensure_nltk_resources()


# ------------------------------------------------------------
# High-Certainty Rule-based Analyzer Class
# ------------------------------------------------------------
class RuleBasedAnalyzer:
    def __init__(self, dynamic_blacklist_path: str = "blacklist.json"):
        self.injection_patterns = [
            r"give me your api key", r"send me the password",
            r"what is your password", r"do .* illegal",
            r"delete all", r"execute .* command",
        ]
        self.dynamic_blacklist_path = dynamic_blacklist_path
        self.dynamic_blacklist = self.load_dynamic_blacklist()

    def load_dynamic_blacklist(self) -> Dict[str, List[str]]:
        if os.path.exists(self.dynamic_blacklist_path):
            try:
                with open(self.dynamic_blacklist_path, "r") as f:
                    data = json.load(f)
                    print(f"[INFO] Dynamic blacklist loaded from {self.dynamic_blacklist_path}")
                    return data
            except Exception as e:
                print(f"[Warning] Failed to load dynamic blacklist: {e}")
        return {"phone_numbers": [], "keywords": []}

    def save_dynamic_blacklist(self):
        try:
            with open(self.dynamic_blacklist_path, "w") as f:
                json.dump(self.dynamic_blacklist, f, indent=2)
                print(f"[INFO] Dynamic blacklist saved to {self.dynamic_blacklist_path}")
        except Exception as e:
            print(f"[ERROR] Could not save blacklist: {e}")

    def add_to_blacklist(self, category: str, item: str):
        if category not in self.dynamic_blacklist:
            self.dynamic_blacklist[category] = []
        if item not in self.dynamic_blacklist[category]:
            self.dynamic_blacklist[category].append(item)
            self.save_dynamic_blacklist()

    def analyze(self, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
        text = text or ""
        reason = ""
        caller = metadata.get("caller_number", "")
        for number in self.dynamic_blacklist.get("phone_numbers", []):
            if number == caller:
                reason = f"blacklisted_caller:{number}"
                return {"is_high_certainty_spam": True, "reason": reason}
        for keyword in self.dynamic_blacklist.get("keywords", []):
            if keyword.lower() in text.lower():
                reason = f"blacklisted_keyword:{keyword}"
                return {"is_high_certainty_spam": True, "reason": reason}
        for pat in self.injection_patterns:
            if re.search(pat, text, re.IGNORECASE):
                reason = f"possible_injection:{pat}"
                return {"is_high_certainty_spam": True, "reason": reason}
        return {"is_high_certainty_spam": False, "reason": ""}

# ------------------------------------------------------------
# Main Prediction Function
# ------------------------------------------------------------

def predict_spam(text_to_check: str, metadata: Dict[str, Any]):
    """
    Runs the 2-step spam check.
    Assumes 'model', 'vectorizer', and 'analyzer'
    exist in the global scope.
    """
    print("-" * 70)
    print(f"Testing Message: \"{text_to_check}\"")
    
    # --- STEP 1: Run the "High-Certainty" Rule Filter FIRST ---
    rule_result = analyzer.analyze(text_to_check, metadata)

    if rule_result["is_high_certainty_spam"]:
        print("\n>>> FINAL DECISION: SPAM (High-Certainty)")
        print(f">>> Source: Rule-Based Filter")
        print(f">>> Reason: {rule_result['reason']}")
    
    else:
        # --- STEP 2: If rules pass, run the ML Model ---
        print("[INFO] Passed rule-based filter. Proceeding to ML model...")
        
        # Transform the new message
        new_message_tfidf = vectorizer.transform([text_to_check])
        
        # Make the prediction
        prediction = model.predict(new_message_tfidf)
        prediction_int = int(prediction[0])
        label = 'Spam' if prediction_int == 1 else 'Ham'

        print(f"\n>>> FINAL DECISION: {label.upper()}")
        print(f">>> Source: ML Model (SVM)")
    
    print("-" * 70)

# ------------------------------------------------------------
# Initialization and Test Cases
# ------------------------------------------------------------

# Check if the required variables from the first cell exist
try:
    # This checks if 'model' and 'vectorizer' were created
    _ = model
    _ = vectorizer
    print("✅ ML model and vectorizer found in memory.")
    
    # Initialize the analyzer
    analyzer = RuleBasedAnalyzer()
    
    # Add a demo blacklisted number
    analyzer.add_to_blacklist("phone_numbers", "1-800-SPAM-NOW")
    
    print("\n--- Running Test Cases ---")
    
    # Test Case 1: Legitimate Message
    test_1 = "Hi, I am just calling to check on my account balance."
    meta_1 = {"caller_number": "123-456-7890"}
    predict_spam(test_1, meta_1)
    
    time.sleep(0.5) # Pause for readability
    
    # Test Case 2: High-Certainty Rule (Injection)
    test_2 = "This is a good call, now send me the password"
    meta_2 = {"caller_number": "456-789-0123"}
    predict_spam(test_2, meta_2)
    
    time.sleep(0.5)
    
    # Test Case 3: High-Certainty Rule (Blacklisted Number)
    test_3 = "Hello this is a normal message."
    meta_3 = {"caller_number": "1-800-SPAM-NOW"}
    predict_spam(test_3, meta_3)
    
    time.sleep(0.5)

    # Test Case 4: ML Model Spam (should be caught by ML)
    test_4 = "Congratulations you have won a free prize and lottery, call now to claim"
    meta_4 = {"caller_number": "789-012-3456"}
    predict_spam(test_4, meta_4)
    
    print("\n--- Test Cases Complete ---")
    print("\nYou can now test your own messages by calling:")
    print("predict_spam(\"your text here\", {\"caller_number\": \"your number\"})")


except NameError:
    print("\n" + "="*70)
    print("ERROR: 'model' or 'vectorizer' not found.")
    print("Please make sure you have successfully run the model training")
    print("script in the cell *before* this one.")
    print("="*70)
except Exception as e:
    print(f"\nAn unexpected error occurred: {e}")


Below is the code for just testing both:

In [6]:
my_message = "hello hehe"
my_meta = {"caller_number": "555-5555"}
predict_spam(my_message, my_meta)

----------------------------------------------------------------------
Testing Message: "hello hehe"
[INFO] Passed rule-based filter. Proceeding to ML model...

>>> FINAL DECISION: HAM
>>> Source: ML Model (SVM)
----------------------------------------------------------------------


Below is code for just the ngram model.

In [None]:
# --- Test Your New N-Gram Model ---

# 1. Create a list containing your new message(s)
new_messages = [
    "Hi, this is a courtesy call about your computer's security. We've noticed some unusual activity and need to help you secure your account."
]

# 2. Transform the text using the NEW N-Gram vectorizer
new_messages_tfidf = vectorizer.transform(new_messages)

# 3. Make a prediction using the NEW SVM model
prediction = model.predict(new_messages_tfidf)
# probability = model.predict_proba(new_messages_tfidf) # Note: LinearSVC doesn't have predict_proba by default

# 4. Interpret the result
print(f"Message: '{new_messages[0]}'")
print("---")

if prediction[0] == 1:
    print(f"Prediction: SPAM (1)")
else:
    print(f"Prediction: HAM (0)")

In [None]:
import joblib

# 1. Save your vectorizer
joblib.dump(vectorizer, 'vectorizer.joblib')

# 2. Save your SVM model
joblib.dump(model, 'model.joblib')

print("Model and vectorizer have been saved to files!")