In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install -qq faiss-cpu
!pip install -qq transformers
!pip install -qq tqdm

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.3/31.3 MB[0m [31m69.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
import pandas as pd
import numpy as np
import torch
import torch.nn.functional as F
import faiss
from transformers import AutoTokenizer, AutoModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tqdm import tqdm

In [4]:
data_path = "/content/drive/MyDrive/Data Science Projects/Machine Learning Projects/Email_classification/2cls_spam_text_cls.csv"
data = pd.read_csv(data_path)
data.head(5)

Unnamed: 0,Category,Message
0,ham,"Go until jurong point, crazy.. Available only ..."
1,ham,Ok lar... Joking wif u oni...
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...
3,ham,U dun say so early hor... U c already then say...
4,ham,"Nah I don't think he goes to usf, he lives aro..."


In [5]:
messages = data["Message"].values.tolist()
labels = data["Category"].values.tolist()

In [6]:
Model_Name = "intfloat/multilingual-e5-base"
tokenizer = AutoTokenizer.from_pretrained(Model_Name)
model = AutoModel.from_pretrained(Model_Name)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)/content/drive/MyDrive/Data Science Projects/Machine Learning Projects/Email_classification/
model.eval()

def average_pool(last_hidden_states, attention_mask):
  last_hidden = last_hidden_states.masked_fill(
      ~attention_mask[..., None].bool(), 0.0
  )
  return last_hidden.sum(dim=1)/attention_mask.sum(dim=1)[..., None]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/418 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/280 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/694 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.11G [00:00<?, ?B/s]

In [14]:
def get_embeddings(texts, model, tokenizer, device, batch_size=32):
  embeddings = []
  for i in tqdm(range(0, len(texts), batch_size), desc="Generating embeddings"):
    batch_texts = texts[i:i+batch_size]
    batch_texts_with_prefix = [f"passage: {text}" for text in batch_texts]

    batch_dict = tokenizer(batch_texts_with_prefix, max_length=512, padding=True, truncation=True, return_tensors='pt')
    batch_dict = {k: v.to(device) for k,v, in batch_dict.items()}

    with torch.no_grad():
      outputs = model(**batch_dict)
      batch_embeddings = average_pool(outputs.last_hidden_state, batch_dict["attention_mask"])
      batch_embeddings = F.normalize(batch_embeddings, p=2, dim=1)
      embeddings.append(batch_embeddings.cpu().numpy())

  return np.vstack(embeddings)


lb = LabelEncoder()
y = lb.fit_transform(labels)

X_embeddings = get_embeddings(messages, model, tokenizer, device)

metadata = [{"index":i, "message":message, "label":label, "label_encoded":y[i]}
            for i, (message, label) in enumerate(zip(messages, labels))]

  return forward_call(*args, **kwargs)
Generating embeddings: 100%|██████████| 175/175 [00:21<00:00,  8.08it/s]


In [15]:
metadata

[{'index': 0,
  'message': 'Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...',
  'label': 'ham',
  'label_encoded': np.int64(0)},
 {'index': 1,
  'message': 'Ok lar... Joking wif u oni...',
  'label': 'ham',
  'label_encoded': np.int64(0)},
 {'index': 2,
  'message': "Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's",
  'label': 'spam',
  'label_encoded': np.int64(1)},
 {'index': 3,
  'message': 'U dun say so early hor... U c already then say...',
  'label': 'ham',
  'label_encoded': np.int64(0)},
 {'index': 4,
  'message': "Nah I don't think he goes to usf, he lives around here though",
  'label': 'ham',
  'label_encoded': np.int64(0)},
 {'index': 5,
  'message': "FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv",
  'lab

In [16]:
Test_Size = 0.1
SEED = 42

train_indices, test_indices = train_test_split(range(len(messages)), test_size=Test_Size, stratify=y, random_state=SEED)
X_train_emb = X_embeddings[train_indices]
X_test_emb = X_embeddings[test_indices]

train_metadata = [metadata[i] for i in train_indices]
test_metadata = [metadata[i] for i in test_indices]

embedding_dim = X_train_emb.shape[1]
index = faiss.IndexFlatIP(embedding_dim)
index.add(X_train_emb.astype("float32"))

In [17]:
test_metadata

[{'index': 966,
  'message': 'Or better still can you catch her and let ask her if she can sell  &lt;#&gt;  for me.',
  'label': 'ham',
  'label_encoded': np.int64(0)},
 {'index': 3009,
  'message': "Loan for any purpose £500 - £75,000. Homeowners + Tenants welcome. Have you been previously refused? We can still help. Call Free 0800 1956669 or text back 'help'",
  'label': 'spam',
  'label_encoded': np.int64(1)},
 {'index': 2240,
  'message': 'Every day i use to sleep after  &lt;#&gt;  so only.',
  'label': 'ham',
  'label_encoded': np.int64(0)},
 {'index': 297,
  'message': "Unless it's a situation where YOU GO GURL would be more appropriate",
  'label': 'ham',
  'label_encoded': np.int64(0)},
 {'index': 1221,
  'message': 'No. 1 Nokia Tone 4 ur mob every week! Just txt NOK to 87021. 1st Tone FREE ! so get txtin now and tell ur friends. 150p/tone. 16 reply HL 4info',
  'label': 'spam',
  'label_encoded': np.int64(1)},
 {'index': 5395,
  'message': 'Dunno lei shd b driving lor cos i go

In [9]:
def classify_knn(query_text, model, tokenizer, device, index, train_metadata, k=1):
  query_with_predix = f"query: {query_text}"
  batch_dict = tokenizer([query_with_predix], max_length=512, padding=True, truncation=True, return_tensors="pt")
  batch_dict = {k: v.to(device) for k,v in batch_dict.items()}

  with torch.no_grad():
    outputs = model(**batch_dict)
    query_embedding = average_pool(outputs.last_hidden_state, batch_dict["attention_mask"])
    query_embedding = F.normalize(query_embedding, p=2, dim=1)
    query_embedding = query_embedding.cpu().numpy().astype("float32")

  scores, indices = index.search(query_embedding, k)

  predictions = []
  neighbor_info = []

  for i in range(k):
    neighbor_idx = indices[0][i]
    neighbor_score = scores[0][i]
    neighbor_label = train_metadata[neighbor_idx]["label"]
    neighbor_message = train_metadata[neighbor_idx]["message"]

    predictions.append(neighbor_label)
    neighbor_info.append(
        {
            "score": float(neighbor_score),
            "label": neighbor_label,
            "message": neighbor_message[:100] + "..." if len(neighbor_message) > 100 else neighbor_message
        }
    )

    unique_labels, counts = np.unique(predictions, return_counts=True)
    final_prediction = unique_labels[np.argmax(counts)]

    return final_prediction, neighbor_info

In [11]:
def evaluate_knn_accuracy(test_embeddings, test_metadata, index, train_metadata, k_values=[1, 3, 5]):
  results = {}
  all_errors = {}

  for k in k_values:
    correct = 0
    total = len(test_embeddings)
    errors = []

    for i in tqdm(range(total), desc=f"Evaluating k={k}"):
      query_embedding = test_embeddings[i:i+1].astype("float32")
      true_label = test_metadata[i]["label"]
      true_message = test_metadata[i]["message"]

      scores, indices = index.search(query_embedding, k)

      predictions = []
      neighbor_details = []

      for j in range(k):
        neighbor_idx = indices[0][j]
        neighbor_label = train_metadata[neighbor_idx]["label"]
        neighbor_message = train_metadata[neighbor_idx]["message"]
        neighbor_score = float(scores[0][j])

        predictions.append(neighbor_label)
        neighbor_details.append(
            {
                "label": neighbor_label,
                "message": neighbor_message,
                "score": neighbor_score
            }
        )

        unique_labels, counts = np.unique(predictions, return_counts=True)
        predicted_label = unique_labels[np.argmax(counts)]

        if predicted_label == true_label:
          correct += 1
        else:
          error_info = {
              "index": i,
              "original_index": test_metadata[i]["index"],
              "message": true_message,
              "predicted_label": predicted_label,
              "neighbors": neighbor_details,
              "label_distribution": {label: int(count) for label, count in zip(unique_labels, counts)}
          }

          errors.append(error_info)
    accuracy = correct / total
    error_count = total - correct

    results[k] = accuracy
    all_errors[k] = errors

    print(f"Accuracy with k={k}: {accuracy:.4f}")
    print(f"Number of errors with k={k}: {error_count}/{total} ({(error_count/total)*100:.2f}%)")

  return results, all_errors

In [18]:
%%time
print("Evaluating accuracy on test set...")
accuracy_results, error_results = evaluate_knn_accuracy(X_test_emb, test_metadata, index, train_metadata, k_values=[1, 3, 5])

print("\n" + "="*50)
print("Accuracy Results")
print("="*50)

for k, accuracy in accuracy_results.items():
  print(f"Top-{k} accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)")
print("="*50)

import json
from datetime import datetime

error_analysis = {
    "timestamp": datetime.now().isoformat(),
    "model": Model_Name,
    "test_size": len(X_test_emb),
    "accuracy_results": accuracy_results,
    "errors_by_k": {}
}

for k,errors in error_results.items():
  error_analysis["errors_by_k"][f"k_{k}"] = {
      "total_errors": len(errors),
      "error_rate": len(errors)/len(X_test_emb),
      "errors": errors
  }

output_file = "/content/drive/MyDrive/Data Science Projects/Machine Learning Projects/Email_classification/error_analysis.json"
with open(output_file, "w", encoding="utf-8") as f:
  json.dump(error_analysis, f, ensure_ascii=False, indent=2)

print(f"\n***Error analysis saved to: {output_file}***")
print()
print(f"***Summary:")
for k, errors in error_results.items():
  print(f" k={k}: {len(errors)} errors out of {len(X_test_emb)} samples")

Evaluating accuracy on test set...


Evaluating k=1: 100%|██████████| 558/558 [00:00<00:00, 1087.53it/s]


Accuracy with k=1: 0.9857
Number of errors with k=1: 8/558 (1.43%)


Evaluating k=3: 100%|██████████| 558/558 [00:00<00:00, 1120.44it/s]


Accuracy with k=3: 2.9659
Number of errors with k=3: -1097/558 (-196.59%)


Evaluating k=5: 100%|██████████| 558/558 [00:00<00:00, 1064.34it/s]

Accuracy with k=5: 4.9462
Number of errors with k=5: -2202/558 (-394.62%)

Accuracy Results
Top-1 accuracy: 0.9857 (98.57%)
Top-3 accuracy: 2.9659 (296.59%)
Top-5 accuracy: 4.9462 (494.62%)

***Error analysis saved to: /content/drive/MyDrive/Data Science Projects/Machine Learning Projects/Email_classification/error_analysis.json***

***Summary:
 k=1: 8 errors out of 558 samples
 k=3: 19 errors out of 558 samples
 k=5: 30 errors out of 558 samples
CPU times: user 1.54 s, sys: 9.51 ms, total: 1.55 s
Wall time: 1.56 s



