# AIS Negative Selection Algorithm (NSA) for Spam Detection
*Generated:* 2025-10-18T13:11:53.911542Z

This notebook sets up a runnable pipeline to detect spam using **Artificial Immune Systems** (AIS), specifically the **Negative Selection Algorithm (NSA)**:
- Learn *self* from ham only
- Generate detectors that do **not** match self (under r-contiguous or Hamming rules)
- Flag a message as spam if it matches **any** detector
- Evaluate with Precision/Recall/F1 (spam), ROC-AUC, PR-AUC

## 1. Setup & Configuration

In [None]:
# !pip install numpy pandas scikit-learn matplotlib tqdm

import os, re, hashlib, random
from textwrap import dedent
from typing import List
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score, average_precision_score, roc_curve, precision_recall_curve, precision_recall_fscore_support
import matplotlib.pyplot as plt

# Reproducibility
RNG_SEED = 42
random.seed(RNG_SEED)
np.random.seed(RNG_SEED)

# Data config
DATA_PATH = ""  # e.g., "SMSSpamCollection" or "your_data.csv"
DATA_FORMAT = "auto"  # 'auto' | 'smsspam' | 'csv'

# Encoding & NSA config
NGRAM_N = 3                  # character n-gram length
BIT_LENGTH = 256             # length of binary encoding
RADIUS_R = 8                 # r for r-contiguous match
HAMMING_T = 12               # Hamming distance threshold
MATCH_RULE = "r_contiguous"  # 'r_contiguous' | 'hamming'

# Detector generation
NUM_DETECTORS = 2000
MAX_ATTEMPTS = 100000
MAX_PER_MESSAGE_HITS = 1     # >= hits -> spam

# Splits
TEST_SIZE = 0.2
VAL_SIZE = 0.1

# Preprocessing
LOWERCASE = True
STRIP_PUNCT = True
KEEP_DIGITS = True
NORMALIZE_WHITESPACE = True

## 2. Load Data

In [None]:
def load_data_auto(path: str) -> pd.DataFrame:
    # Return DataFrame with columns ['label','text'] and label in {'ham','spam'}
    if not path or not os.path.exists(path):
        raise FileNotFoundError("Set DATA_PATH to your dataset file.")
    if path.endswith("SMSSpamCollection") or DATA_FORMAT in ("smsspam","auto"):
        try:
            df = pd.read_csv(path, sep="\t", header=None, names=["label","text"])
            if set(df.columns)=={"label","text"} and set(df["label"].unique())=={"ham","spam"}:
                return df
        except Exception:
            pass
    if path.endswith(".csv") or DATA_FORMAT in ("csv","auto"):
        df = pd.read_csv(path)
        assert {"label","text"}.issubset(df.columns), "CSV must have 'label' and 'text' columns."
        df["label"] = df["label"].astype(str).str.lower().map({"ham":"ham","spam":"spam"})
        assert df["label"].isin({"ham","spam"}).all(), "Labels must be 'ham' or 'spam'."
        return df
    raise ValueError("Unrecognized data format.")

def demo_data() -> pd.DataFrame:
    ham = [
        "Are we still on for lunch today at 12?",
        "Don't forget the meeting tomorrow morning.",
        "Please review the attached report and let me know your thoughts.",
        "Happy birthday! Hope you have a great day.",
        "I'll be there in 10 minutes."
    ]
    spam = [
        "WINNER!! Claim your free prize now, click here http://spam.biz",
        "Congratulations! You've been selected. Reply with BANK details to receive $1000.",
        "Urgent! Your account is compromised. Verify now at fake-site.com",
        "You won a lottery!!! Send your SSN ASAP to claim.",
        "Limited-time offer: Buy now and get 90% off!"
    ]
    return pd.DataFrame([{"label":"ham","text":t} for t in ham] + [{"label":"spam","text":t} for t in spam])

try:
    if DATA_PATH:
        df = load_data_auto(DATA_PATH)
    else:
        print("No DATA_PATH set. Using a tiny demo dataset.")
        df = demo_data()
except Exception as e:
    print("Data load failed, using demo dataset:", e)
    df = demo_data()

df.head()

## 3. Preprocessing

In [None]:
PUNCT_RE = re.compile(r"[\w\s]" if KEEP_DIGITS else r"[A-Za-z\s]")

def preprocess(text: str) -> str:
    s = str(text) if text is not None else ""
    if LOWERCASE:
        s = s.lower()
    if STRIP_PUNCT:
        # keep letters/digits/underscore/space by filtering char-wise
        s = "".join(ch if (ch.isalnum() or ch.isspace() or ch=='_') else " " for ch in s)
    if NORMALIZE_WHITESPACE:
        s = " ".join(s.split())
    return s

df["text_clean"] = df["text"].map(preprocess)
df.head()

## 4. Encoding to Fixed-Length Binary (hashed character n-grams)

In [None]:
def char_ngrams(s: str, n: int) -> List[str]:
    if len(s) < n:
        return [s] if s else []
    return [s[i:i+n] for i in range(len(s)-n+1)]

def hash_to_bits(token: str, bit_length: int) -> np.ndarray:
    h = hashlib.sha256(token.encode("utf-8")).digest()
    bits = np.unpackbits(np.frombuffer(h, dtype=np.uint8))
    if bit_length == 256:
        return bits.astype(np.uint8)
    folded = np.zeros(bit_length, dtype=np.uint16)
    for i, b in enumerate(bits):
        folded[i % bit_length] += int(b)
    return (folded % 2).astype(np.uint8)

def encode_message(s: str, n: int, bit_length: int) -> np.ndarray:
    grams = char_ngrams(s, n)
    if not grams:
        return np.zeros(bit_length, dtype=np.uint8)
    acc = np.zeros(bit_length, dtype=np.uint8)
    for g in grams:
        acc ^= hash_to_bits(g, bit_length)
    return acc

X_bits = np.vstack([encode_message(s, NGRAM_N, BIT_LENGTH) for s in tqdm(df["text_clean"], desc="Encoding")])
y = (df["label"].values == "spam").astype(np.uint8)
X_bits.shape, y.shape

## 5. Train / Validation / Test Split

In [None]:
X_train, X_test, y_train, y_test, df_train, df_test = train_test_split(
    X_bits, y, df, test_size=TEST_SIZE, random_state=RNG_SEED, stratify=y
)
X_tr, X_val, y_tr, y_val = train_test_split(
    X_train, y_train, test_size=VAL_SIZE, random_state=RNG_SEED, stratify=y_train
)
X_self = X_tr[y_tr == 0]
len(X_self), X_self.shape

## 6. Matching Rules

In [None]:
def r_contiguous_match(a: np.ndarray, b: np.ndarray, r: int) -> bool:
    eq = (a == b).astype(np.uint8)
    run = 0
    for bit in eq:
        if bit == 1:
            run += 1
            if run >= r:
                return True
        else:
            run = 0
    return False

def hamming_match(a: np.ndarray, b: np.ndarray, t: int) -> bool:
    return int(np.sum(a != b)) <= t

def matches_any(x: np.ndarray, detectors: np.ndarray, rule: str) -> bool:
    if detectors.size == 0:
        return False
    if rule == "r_contiguous":
        for d in detectors:
            if r_contiguous_match(x, d, RADIUS_R):
                return True
        return False
    elif rule == "hamming":
        for d in detectors:
            if hamming_match(x, d, HAMMING_T):
                return True
        return False
    else:
        raise ValueError("Unknown MATCH_RULE")

## 7. Detector Generation (Negative Selection)

In [None]:
def random_bitstring(bit_length: int) -> np.ndarray:
    return (np.random.rand(bit_length) > 0.5).astype(np.uint8)

def detector_matches_self(candidate: np.ndarray, X_self: np.ndarray, rule: str) -> bool:
    if rule == "r_contiguous":
        for s in X_self:
            if r_contiguous_match(candidate, s, RADIUS_R):
                return True
        return False
    elif rule == "hamming":
        for s in X_self:
            if hamming_match(candidate, s, HAMMING_T):
                return True
        return False
    else:
        raise ValueError("Unknown MATCH_RULE")

def generate_detectors(num_detectors: int, max_attempts: int, X_self: np.ndarray, rule: str) -> np.ndarray:
    detectors = []
    attempts = 0
    pbar = tqdm(total=num_detectors, desc="Generating detectors")
    while len(detectors) < num_detectors and attempts < max_attempts:
        attempts += 1
        cand = random_bitstring(BIT_LENGTH)
        if not detector_matches_self(cand, X_self, rule):
            detectors.append(cand)
            pbar.update(1)
    pbar.close()
    print(f"Generated {len(detectors)} detectors with {attempts} attempts.")
    return np.array(detectors, dtype=np.uint8)

detectors = generate_detectors(NUM_DETECTORS, MAX_ATTEMPTS, X_self, MATCH_RULE)
detectors.shape

## 8. Inference

In [None]:
def predict_spam(X: np.ndarray, detectors: np.ndarray, rule: str, min_hits: int = 1) -> np.ndarray:
    preds = np.zeros(X.shape[0], dtype=np.uint8)
    for i, x in enumerate(tqdm(X, desc="Classifying")):
        hits = 0
        if detectors.size > 0:
            if rule == "r_contiguous":
                for d in detectors:
                    if r_contiguous_match(x, d, RADIUS_R):
                        hits += 1
                        if hits >= min_hits:
                            preds[i] = 1
                            break
            elif rule == "hamming":
                for d in detectors:
                    if hamming_match(x, d, HAMMING_T):
                        hits += 1
                        if hits >= min_hits:
                            preds[i] = 1
                            break
            else:
                raise ValueError("Unknown MATCH_RULE")
    return preds

y_val_pred = predict_spam(X_val, detectors, MATCH_RULE, min_hits=MAX_PER_MESSAGE_HITS)
print(classification_report(y_val, y_val_pred, target_names=["ham","spam"], digits=4))

## 9. Quick Parameter Sweep on Validation (optional)

In [None]:
def evaluate_preds(y_true, y_pred):
    p, r, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="binary", zero_division=0)
    return {"precision": p, "recall": r, "f1": f1}

min_hits_grid = [1, 2, 3]
results = []
for mh in min_hits_grid:
    preds = predict_spam(X_val, detectors, MATCH_RULE, min_hits=mh)
    metrics = evaluate_preds(y_val, preds)
    metrics["min_hits"] = mh
    results.append(metrics)

pd.DataFrame(results).sort_values("f1", ascending=False)

## 10. Final Test Evaluation

In [None]:
def best_min_hits(X_val, y_val, detectors) -> int:
    grid = [1, 2, 3]
    best_f1, best_mh = -1, 1
    for mh in grid:
        preds = predict_spam(X_val, detectors, MATCH_RULE, min_hits=mh)
        p, r, f1, _ = precision_recall_fscore_support(y_val, preds, average="binary", zero_division=0)
        if f1 > best_f1:
            best_f1, best_mh = f1, mh
    return best_mh

best_mh = best_min_hits(X_val, y_val, detectors)
print("Best min_hits from validation:", best_mh)

y_test_pred = predict_spam(X_test, detectors, MATCH_RULE, min_hits=best_mh)
print(classification_report(y_test, y_test_pred, target_names=["ham","spam"], digits=4))

def match_counts(X: np.ndarray, detectors: np.ndarray, rule: str) -> np.ndarray:
    scores = np.zeros(X.shape[0], dtype=np.float32)
    for i, x in enumerate(tqdm(X, desc="Scoring")):
        hits = 0
        if detectors.size > 0:
            if rule == "r_contiguous":
                for d in detectors:
                    if r_contiguous_match(x, d, RADIUS_R):
                        hits += 1
            elif rule == "hamming":
                for d in detectors:
                    if hamming_match(x, d, HAMMING_T):
                        hits += 1
            else:
                raise ValueError("Unknown MATCH_RULE")
        scores[i] = hits
    return scores

scores = match_counts(X_test, detectors, MATCH_RULE)
if scores.max() > 0:
    scores_norm = (scores - scores.min()) / (scores.max() - scores.min())
else:
    scores_norm = scores

try:
    roc = roc_auc_score(y_test, scores_norm)
except Exception:
    roc = float("nan")
try:
    pr_auc = average_precision_score(y_test, scores_norm)
except Exception:
    pr_auc = float("nan")
print(f"ROC-AUC: {roc:.4f} | PR-AUC: {pr_auc:.4f}")

fpr, tpr, _ = roc_curve(y_test, scores_norm)
plt.figure()
plt.plot(fpr, tpr, label="ROC")
plt.plot([0,1], [0,1], linestyle="--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve (NSA Spam Detection)")
plt.legend()
plt.show()

prec, rec, _ = precision_recall_curve(y_test, scores_norm)
plt.figure()
plt.plot(rec, prec, label="PR")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve (NSA Spam Detection)")
plt.legend()
plt.show()

## 11. Utilities & Next Steps

In [None]:
def regenerate_detectors(new_num_detectors=None, new_rule=None):
    global detectors
    n = int(new_num_detectors) if new_num_detectors is not None else NUM_DETECTORS
    rule = new_rule or MATCH_RULE
    detectors = generate_detectors(n, MAX_ATTEMPTS, X_self, rule)
    return detectors

def quick_eval_on_val(min_hits=1):
    preds = predict_spam(X_val, detectors, MATCH_RULE, min_hits=min_hits)
    print(classification_report(y_val, preds, target_names=["ham","spam"], digits=4))

print(dedent("""
Notes:
- Increase NUM_DETECTORS for better coverage (more recall), but runtime increases.
- Tune RADIUS_R (r-contiguous) or HAMMING_T (Hamming) to balance precision/recall.
- Replace demo data by setting DATA_PATH to your real dataset.
- Consider vectorization or sub-sampling self to speed up detector generation.
"""))