In [None]:
# Install required packages
!pip install fastapi uvicorn pydantic spacy pandas openai python-dotenv scikit-learn
!python -m spacy download en_core_web_lg

Collecting fastapi
  Downloading fastapi-0.115.12-py3-none-any.whl.metadata (27 kB)
Collecting uvicorn
  Downloading uvicorn-0.34.1-py3-none-any.whl.metadata (6.5 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Collecting starlette<0.47.0,>=0.40.0 (from fastapi)
  Downloading starlette-0.46.2-py3-none-any.whl.metadata (6.2 kB)
Downloading fastapi-0.115.12-py3-none-any.whl (95 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.2/95.2 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading uvicorn-0.34.1-py3-none-any.whl (62 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.4/62.4 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading python_dotenv-1.1.0-py3-none-any.whl (20 kB)
Downloading starlette-0.46.2-py3-none-any.whl (72 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.0/72.0 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: uv

In [None]:
# Install required packages
!pip install transformers datasets scikit-learn pandas numpy spacy fastapi uvicorn pydantic python-multipart
!python -m spacy download en_core_web_sm

# Create project directories
!mkdir -p model_data

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting python-multipart
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading python_multipart-0.0.20-py3-none-any.whl (24 kB)
Downloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   

In [23]:
%%writefile utils.py
import re
import spacy
from typing import List, Dict, Tuple, Any
from datetime import datetime

class PiiMasker:
    """Enhanced PII masking with more precise pattern matching"""

    def __init__(self):
        """Initialize with spaCy model and enhanced patterns"""
        self.nlp = self._load_spacy_model()

        # More precise regex patterns
        self.patterns = {
            "email": r'\b[\w\.\+\-]+@[\w\-]+\.[\w\.\-]+\b',
            "phone_number": r'(?:\+?\d{1,3}[-.\s]?)?\(?\d{2,4}\)?[-.\s]?\d{2,4}[-.\s]?\d{2,5}\b',
            "aadhar_num": r'\b\d{4}[\s\-]?\d{4}[\s\-]?\d{4}\b',
            "credit_debit_no": r'\b(?:\d[ \-]*?){13,19}\b',
            "cvv_no": r'(?<!\d)\b\d{3,4}\b(?!\d)',
            "expiry_no": r'\b(?:0[1-9]|1[0-2])[/\-](?:20)?\d{2}\b',
            "dob": r'\b(?:0[1-9]|[12][0-9]|3[01])[/\-](?:0[1-9]|1[0-2])[/\-](?:19|20)\d{2}\b',
            "account_id": r'\b(?:[A-Za-z]+[ \-_]?)?\d{4,}\b',
            "ssn": r'\b\d{3}[ \-]?\d{2}[ \-]?\d{4}\b'
        }

        self.compiled_patterns = {k: re.compile(v, re.IGNORECASE) for k, v in self.patterns.items()}

    def _load_spacy_model(self):
        """Load spaCy model with more conservative name detection"""
        try:
            nlp = spacy.load("en_core_web_sm")
            # Only detect proper nouns as names
            ruler = nlp.add_pipe("entity_ruler", before="ner")
            patterns = [
                {"label": "PERSON", "pattern": [
                    {"POS": "PROPN", "OP": "+"},
                    {"POS": "PROPN", "OP": "*"}
                ]}
            ]
            ruler.add_patterns(patterns)
            return nlp
        except OSError:
            import subprocess
            import sys
            subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=True)
            return self._load_spacy_model()

    def _is_valid_name(self, text):
        """Validate detected names to reduce false positives"""
        # Skip single-word names unless they're clearly proper nouns
        words = text.split()
        if len(words) == 1:
            return False
        # Skip names that are too long (likely not actual names)
        if len(words) > 3:
            return False
        # Skip names that are all lowercase
        if text.islower():
            return False
        return True

    def extract_masked_entities(self, text: str) -> Tuple[str, List[Dict[str, Any]]]:
        """More precise PII extraction"""
        text = text.replace('\r\n', '\n')
        entities = []
        masked_text = text

        # Process with regex patterns first
        for entity_type, pattern in self.compiled_patterns.items():
            for match in pattern.finditer(text):
                if any(self._is_overlap(match.start(), match.end(), e) for e in entities):
                    continue
                entities.append({
                    "start_index": match.start(),
                    "end_index": match.end(),
                    "entity_type": entity_type,
                    "entity_value": match.group()
                })

        # Process with spaCy NER (more conservative)
        doc = self.nlp(text)
        for ent in doc.ents:
            if ent.label_ == "PERSON" and self._is_valid_name(ent.text):
                if not any(self._is_overlap(ent.start_char, ent.end_char, e) for e in entities):
                    entities.append({
                        "start_index": ent.start_char,
                        "end_index": ent.end_char,
                        "entity_type": "full_name",
                        "entity_value": ent.text
                    })

        # Sort and mask
        entities.sort(key=lambda x: x["start_index"])
        for entity in sorted(entities, key=lambda x: x["start_index"], reverse=True):
            masked_text = (
                masked_text[:entity["start_index"]] +
                f"[{entity['entity_type']}]" +
                masked_text[entity["end_index"]:]
            )

        return masked_text, entities

    def _is_overlap(self, start: int, end: int, entity: Dict) -> bool:
        """Check for entity overlaps"""
        return not (end <= entity["start_index"] or start >= entity["end_index"])

def mask_email(email_body: str) -> Tuple[str, List[Dict[str, Any]]]:
    """Interface for PII masking"""
    masker = PiiMasker()
    return masker.extract_masked_entities(email_body)

Overwriting utils.py


In [4]:
%%writefile models.py
import os
import pickle
import logging
import torch
import pandas as pd
from typing import List, Tuple
from torch.utils.data import Dataset, DataLoader
from transformers import (
    BertTokenizer, BertForSequenceClassification,
    AdamW, get_linear_schedule_with_warmup
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


class EmailDataset(Dataset):
    """Dataset for email classification."""

    def __init__(self, texts, labels, tokenizer, max_length=256):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'label': torch.tensor(label, dtype=torch.long)
        }


class EmailClassifier:
    """BERT-based email classifier."""

    def __init__(self, device=None, model_dir='./model_data'):
        self.device = device or torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model_dir = model_dir
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.label_mapping = {}
        self.inverse_label_mapping = {}
        self.model = None

        os.makedirs(model_dir, exist_ok=True)

    def prepare_data(self, data_csv: str) -> Tuple[List[str], List[int]]:
        logger.info(f"Loading data from {data_csv}")
        df = pd.read_csv(data_csv)

        if 'email' not in df.columns or 'type' not in df.columns:
            raise ValueError("CSV must have 'email' and 'type' columns")

        df = df.dropna(subset=['email', 'type'])

        unique_labels = df['type'].unique()
        self.label_mapping = {label: idx for idx, label in enumerate(unique_labels)}
        self.inverse_label_mapping = {idx: label for label, idx in self.label_mapping.items()}

        with open(os.path.join(self.model_dir, 'label_mapping.pkl'), 'wb') as f:
            pickle.dump({
                'label_mapping': self.label_mapping,
                'inverse_label_mapping': self.inverse_label_mapping
            }, f)

        texts = df['email'].tolist()
        labels = [self.label_mapping[label] for label in df['type']]

        return texts, labels

    def train(self, data_csv: str, epochs=4, batch_size=16, learning_rate=2e-5):
        texts, labels = self.prepare_data(data_csv)

        train_texts, val_texts, train_labels, val_labels = train_test_split(
            texts, labels, test_size=0.2, random_state=42
        )

        train_dataset = EmailDataset(train_texts, train_labels, self.tokenizer)
        val_dataset = EmailDataset(val_texts, val_labels, self.tokenizer)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

        num_labels = len(self.label_mapping)
        self.model = BertForSequenceClassification.from_pretrained(
            'bert-base-uncased', num_labels=num_labels
        )
        self.model.to(self.device)

        optimizer = AdamW(self.model.parameters(), lr=learning_rate)
        total_steps = len(train_loader) * epochs
        scheduler = get_linear_schedule_with_warmup(
            optimizer, num_warmup_steps=0, num_training_steps=total_steps
        )

        best_val_accuracy = 0

        for epoch in range(epochs):
            logger.info(f"Epoch {epoch + 1}/{epochs}")
            self.model.train()
            total_loss = 0

            for batch in train_loader:
                optimizer.zero_grad()
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['label'].to(self.device)

                outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
                optimizer.step()
                scheduler.step()

                total_loss += loss.item()

            avg_loss = total_loss / len(train_loader)
            logger.info(f"Average training loss: {avg_loss:.4f}")

            # Validation
            self.model.eval()
            val_preds, val_true = [], []

            with torch.no_grad():
                for batch in val_loader:
                    input_ids = batch['input_ids'].to(self.device)
                    attention_mask = batch['attention_mask'].to(self.device)
                    labels = batch['label'].to(self.device)

                    outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
                    preds = torch.argmax(outputs.logits, dim=1).cpu().numpy()
                    val_preds.extend(preds)
                    val_true.extend(labels.cpu().numpy())

            val_accuracy = accuracy_score(val_true, val_preds)
            logger.info(f"Validation accuracy: {val_accuracy:.4f}")

            if val_accuracy > best_val_accuracy:
                best_val_accuracy = val_accuracy
                logger.info(f"Saving best model with accuracy: {val_accuracy:.4f}")
                self.save_model()

                report = classification_report(
                    val_true, val_preds,
                    target_names=[self.inverse_label_mapping[i] for i in range(num_labels)],
                    digits=4
                )
                logger.info(f"Classification Report:\n{report}")

        return best_val_accuracy

    def save_model(self):
        if self.model:
            self.model.save_pretrained(self.model_dir)
            self.tokenizer.save_pretrained(self.model_dir)
            logger.info(f"Model saved to {self.model_dir}")
        else:
            logger.warning("No model to save.")

    def load_model(self):
        try:
            with open(os.path.join(self.model_dir, 'label_mapping.pkl'), 'rb') as f:
                mappings = pickle.load(f)
                self.label_mapping = mappings['label_mapping']
                self.inverse_label_mapping = mappings['inverse_label_mapping']

            self.model = BertForSequenceClassification.from_pretrained(self.model_dir)
            self.tokenizer = BertTokenizer.from_pretrained(self.model_dir)
            self.model.to(self.device)
            self.model.eval()

            logger.info(f"Model loaded from {self.model_dir}")
            return True
        except Exception as e:
            logger.error(f"Error loading model: {e}")
            return False

    def predict(self, text: str) -> str:
        if self.model is None:
            if not self.load_model():
                raise ValueError("Model not loaded. Please train or load a model first.")

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=256,
            return_tensors='pt'
        )

        input_ids = encoding['input_ids'].to(self.device)
        attention_mask = encoding['attention_mask'].to(self.device)

        with torch.no_grad():
            outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
            pred = torch.argmax(outputs.logits, dim=1).item()

        return self.inverse_label_mapping[pred]

Writing models.py


In [28]:
%%writefile app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any
import uvicorn
import logging

from models import EmailClassifier
from utils import mask_email

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

app = FastAPI(title="Email Classification API",
              description="API for classifying and masking PII in support emails")

# Initialize classifier
classifier = EmailClassifier()
model_loaded = classifier.load_model()
if not model_loaded:
    logger.warning("Model not loaded. Please train or load a model before making predictions.")

class EmailRequest(BaseModel):
    """Request model for email classification"""
    email_body: str

class EmailResponse(BaseModel):
    """Response model for email classification"""
    input_email_body: str
    list_of_masked_entities: List[Dict[str, Any]]
    masked_email: str
    category_of_the_email: str

@app.post("/classify", response_model=EmailResponse)
async def classify_email(request: EmailRequest):
    """
    Classify email and mask PII.

    Parameters:
    - email_body: The email text to classify

    Returns:
    - input_email_body: Original email text
    - list_of_masked_entities: List of detected PII entities
    - masked_email: Email with PII masked
    - category_of_the_email: Predicted email category
    """
    try:
        # Check if email body is provided
        if not request.email_body or len(request.email_body.strip()) == 0:
            raise HTTPException(status_code=400, detail="Email body cannot be empty")

        # Mask PII entities
        masked_email, entities = mask_email(request.email_body)

        # Classify the masked email
        category = classifier.predict(masked_email)

        # Format the response
        response = {
            "input_email_body": request.email_body,
            "list_of_masked_entities": entities,
            "masked_email": masked_email,
            "category_of_the_email": category
        }

        return response

    except Exception as e:
        logger.error(f"Error processing request: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}")

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    if model_loaded:
        return {"status": "ok", "model_loaded": True}
    else:
        return {"status": "warning", "model_loaded": False,
                "message": "Model not loaded. Please train or load a model."}

if __name__ == "__main__":
    uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)

Overwriting app.py


In [30]:
%%writefile train.py
import argparse
import logging
import torch
from models import EmailClassifier

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def train_model(data_path, epochs=4, batch_size=16, learning_rate=2e-5):
    """
    Train the email classification model.

    Args:
        data_path: Path to the CSV file containing emails and their categories
        epochs: Number of training epochs
        batch_size: Batch size for training
        learning_rate: Learning rate for the optimizer
    """
    logger.info(f"Training model with data from {data_path}")

    # Detect GPU/CPU device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info(f"📦 Using device: {device}")

    # Initialize classifier with device
    classifier = EmailClassifier(device=device)

    # Train model
    accuracy = classifier.train(
        data_csv=data_path,
        epochs=epochs,
        batch_size=batch_size,
        learning_rate=learning_rate
    )

    logger.info(f"✅ Training completed. Best validation accuracy: {accuracy:.4f}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Train email classification model")
    parser.add_argument("--data", type=str, required=True, help="Path to the CSV data file")
    parser.add_argument("--epochs", type=int, default=4, help="Number of training epochs")
    parser.add_argument("--batch_size", type=int, default=16, help="Batch size for training")
    parser.add_argument("--lr", type=float, default=2e-5, help="Learning rate")

    args = parser.parse_args()

    train_model(args.data, args.epochs, args.batch_size, args.lr)

Writing train.py


In [31]:
%%writefile test.py
import argparse
import logging
import json
from models import EmailClassifier
from utils import mask_email

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def test_classification(email_text):
    """
    Test the email classification and PII masking.

    Args:
        email_text: Email text to classify
    """
    try:
        # Initialize classifier
        classifier = EmailClassifier()
        success = classifier.load_model()

        if not success:
            logger.error("Failed to load model. Please train or load a model first.")
            return

        # Mask PII
        logger.info("Masking PII...")
        masked_email, entities = mask_email(email_text)

        # Classify email
        logger.info("Classifying email...")
        category = classifier.predict(masked_email)

        # Format response
        response = {
            "input_email_body": email_text,
            "list_of_masked_entities": entities,
            "masked_email": masked_email,
            "category_of_the_email": category
        }

        # Print results
        logger.info("Results:")
        logger.info(f"Category: {category}")
        logger.info(f"Masked email: {masked_email}")
        logger.info(f"Found {len(entities)} PII entities")

        print(json.dumps(response, indent=2))

    except Exception as e:
        logger.error(f"Error testing classification: {e}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Test email classification and PII masking")
    parser.add_argument("--email", type=str, required=True, help="Email text to classify")

    args = parser.parse_args()

    test_classification(args.email)

Writing test.py


In [5]:
%%writefile app_hf.py
from fastapi import FastAPI, HTTPException, Request, Form
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from typing import List, Dict, Any
import logging
from pathlib import Path
import torch
import json
import sys

from models import EmailClassifier
from utils import PiiMasker

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

app = FastAPI(
    title="Email Classification API",
    description="API for classifying and masking PII in support emails"
)

# Setup templates for the web interface
templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates"))

# Initialize classifier with proper device handling
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
classifier = EmailClassifier(device=device, model_dir="./model_data")
model_loaded = classifier.load_model()
if not model_loaded:
    logger.warning("Model not loaded. This is expected if you need to train first.")

# Initialize PII masker
try:
    pii_masker = PiiMasker()
except Exception as e:
    logger.error(f"Failed to initialize PII masker: {e}")
    raise RuntimeError("Failed to initialize PII masker") from e

class EmailRequest(BaseModel):
    """Request model for email classification"""
    email_body: str

class EmailResponse(BaseModel):
    """Response model for email classification"""
    input_email_body: str
    list_of_masked_entities: List[Dict[str, Any]]
    masked_email: str
    category_of_the_email: str

@app.post("/classify", response_model=EmailResponse)
async def classify_email(request: EmailRequest):
    """Classify email and mask PII"""
    try:
        if not model_loaded and not classifier.load_model():
            raise HTTPException(status_code=500, detail="Model not loaded or trained")

        if not request.email_body or len(request.email_body.strip()) == 0:
            raise HTTPException(status_code=400, detail="Email body cannot be empty")

        masked_email, entities = pii_masker.extract_masked_entities(request.email_body)
        logger.info(f"Entities returned from mask_email: {entities}")
        category = classifier.predict(masked_email)

        # Safely format the response
        safe_entities = [
            {
                "position": [
                    entity.get("start_index", 0),
                    entity.get("end_index", 0)
                ],
                "classification": entity.get("entity_type", "unknown"),
                "entity": entity.get("entity_value", "")
            } for entity in entities
        ]

        return {
            "input_email_body": request.email_body,
            "list_of_masked_entities": safe_entities,
            "masked_email": masked_email,
            "category_of_the_email": category
        }

    except Exception as e:
        logger.error(f"Error processing request: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}")

@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
    """Serve the web interface"""
    return templates.TemplateResponse("index.html", {
        "request": request,
        "show_results": False,
        "device": str(device)
    })

@app.post("/", response_class=HTMLResponse)
async def process_email(request: Request, email_body: str = Form(...)):
    """Process email from web form"""
    try:
        if not model_loaded and not classifier.load_model():
            return templates.TemplateResponse("index.html", {
                "request": request,
                "error": "Model not loaded or trained",
                "show_results": False
            })

        if not email_body or len(email_body.strip()) == 0:
            return templates.TemplateResponse("index.html", {
                "request": request,
                "error": "Email body cannot be empty",
                "show_results": False
            })

        masked_email, entities = pii_masker.extract_masked_entities(email_body)
        logger.info(f"Entities returned from mask_email: {entities}")
        category = classifier.predict(masked_email)

        # Safely format the JSON response
        formatted_json = {
            "input_email_body": email_body,
            "list_of_masked_entities": [
                {
                    "position": [
                        entity.get("start_index", 0),
                        entity.get("end_index", 0)
                    ],
                    "classification": entity.get("entity_type", "unknown"),
                    "entity": entity.get("entity_value", "")
                } for entity in entities
            ],
            "masked_email": masked_email,
            "category_of_the_email": category
        }

        # Pretty print the JSON
        pretty_json = json.dumps(formatted_json, indent=2, ensure_ascii=False)

        return templates.TemplateResponse("index.html", {
            "request": request,
            "input_email_body": email_body,
            "masked_email": masked_email,
            "category": category,
            "entities": entities,
            "show_results": True,
            "device": str(device),
            "formatted_json": pretty_json
        })

    except Exception as e:
        logger.error(f"Error processing request: {e}", exc_info=True)
        return templates.TemplateResponse("index.html", {
            "request": request,
            "error": f"Error processing request: {str(e)}",
            "show_results": False
        })

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "ok" if model_loaded else "warning",
        "model_loaded": model_loaded,
        "device": str(device),
        "message": "" if model_loaded else "Model not loaded. Please train or load a model."
    }

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error(f"Global exception: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"detail": f"An unexpected error occurred: {str(exc)}"}
    )

Writing app_hf.py


In [6]:
%%writefile requirements.txt
# Core ML/DL dependencies
torch==2.0.1
transformers==4.33.2
datasets==2.14.5

# Numeric/scientific computing (pinned for Python 3.9)
numpy==1.25.2
scikit-learn==1.2.2  # Downgraded for Python 3.9 compatibility
pandas==1.5.3  # Last version supporting Python 3.9

# NLP specific
spacy>=3.0.0
en-core-web-sm @ https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.0/en_core_web_sm-3.7.0-py3-none-any.whl

# FastAPI stack
fastapi==0.103.1
uvicorn==0.23.2
python-multipart==0.0.6
jinja2==3.1.2

# Pydantic
pydantic==2.4.2

# Additional utilities
tqdm==4.66.1
requests==2.31.0

Writing requirements.txt


In [7]:
%%writefile README.md

# Email Classification System

This project implements an email classification system for a company's support team. The system categorizes incoming support emails into predefined categories while ensuring that personal information (PII) is masked before processing.

## Features

- Email classification using BERT
- PII masking using Named Entity Recognition (NER) and regex patterns
- FastAPI-based API for email classification
- Supports various PII entity types (full name, email, phone number, etc.)

## Setup Instructions

### Prerequisites

- Python 3.7+
- PyTorch
- Transformers library
- Spacy
- FastAPI

### Installation

1. Clone this repository:
```
git clone https://github.com/yourusername/email-classification.git
cd email-classification
```

2. Install the required packages:
```
pip install -r requirements.txt
```

3. Download the SpaCy English model:
```
python -m spacy download en_core_web_sm
```

### Training the Model

1. Prepare your training data in a CSV file with 'email' and 'type' columns.
2. Run the training script:
```python
from models import EmailClassifier

classifier = EmailClassifier()
classifier.train('path/to/your/data.csv', epochs=4, batch_size=16)
```

### Running the API

```
python app.py
```

The API will be available at `http://localhost:8000`.

## API Documentation

### Classify Email

**Endpoint:** `POST /classify`

**Request Body:**
```json
{
    "email_body": "Your email text here"
}
```

**Response:**
```json
{
    "input_email_body": "Original email text",
    "list_of_masked_entities": [
        {
            "position": [start_index, end_index],
            "classification": "entity_type",
            "entity": "original_entity_value"
        }
    ],
    "masked_email": "Masked email text",
    "category_of_the_email": "Predicted category"
}
```

### Health Check

**Endpoint:** `GET /health`

**Response:**
```json
{
    "status": "ok",
    "model_loaded": true
}
```

## Deployment on Hugging Face Spaces

1. Create a new Space on Hugging Face with Docker template
2. Upload the project files to the Space
3. Set up the Space to run the FastAPI application

## File Structure

- `app.py`: FastAPI application
- `models.py`: Email classification model
- `utils.py`: PII masking utilities
- `requirements.txt`: Required packages

Writing README.md


In [8]:
%%writefile Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN python -m spacy download en_core_web_sm

COPY . .

CMD ["uvicorn", "app_hf:app", "--host", "0.0.0.0", "--port", "7860"]

Writing Dockerfile


In [9]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"  # CPU only
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [11]:
!pip install -U transformers==4.33.2

Collecting transformers==4.33.2
  Downloading transformers-4.33.2-py3-none-any.whl.metadata (119 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/119.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m119.9/119.9 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1 (from transformers==4.33.2)
  Downloading tokenizers-0.13.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Downloading transformers-4.33.2-py3-none-any.whl (7.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.6/7.6 MB[0m [31m55.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tokenizers-0.13.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m141.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tokenizers, transformers
  Attempting uninstall

In [4]:
# Run the training script
#!python train.py --data /content/combined_emails_with_natural_pii.csv --epochs 2 --batch_size 4
!python train.py --data /content/combined_emails_with_natural_pii.csv --epochs 4 --batch_size 16 --lr 2e-5

  torch.utils._pytree._register_pytree_node(
2025-04-19 07:44:25.944568: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1745048665.965630    9077 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1745048665.972046    9077 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
  torch.utils._pytree._register_pytree_node(
2025-04-19 07:44:28,855 - INFO - Training model with data from /content/combined_emails_with_natural_pii.csv
2025-04-19 07:44:28,884 - INFO - 📦 Using device: cuda
2025-04-19 07:44:29,030 - INFO - Loading data from /content/combined_emails_with_natural_pii.csv
Some weights of BertForSequenceClassification were not initialized from the model check

In [11]:
# Test the model with a single email containing PII
test_email = """Subject: Changement de configuration AWS urgent

Cher équipe de support client,

J'espère que ce message vous trouve en bonne santé. Je vous écris pour demander un changement de configuration urgent concernant notre infrastructure AWS gérée par votre service de gestion AWS. Nous avons identifié un besoin significatif d'améliorer l'efficacité des ressources, ce qui nécessite une attention immédiate.

Notre numéro de compte auprès de votre société est <acc_num>, et les services spécifiques concernés relèvent du service de gestion AWS My name is David Kim.. Nous faisons face à des défis avec l'allocation des ressources qui impactent nos opérations quotidiennes, causant des retards et des inefficacités qui pourraient avoir des effets néfastes sur la continuité de notre activité. Il est crucial que nous abordions ces inefficacités rapidement pour maintenir les niveaux de productivité et éviter d'autres perturbations.

Nous avons évalué notre configuration actuelle et croyons que l'optimisation de la configuration pourrait conduire à une performance améliorée, une meilleure gestion des ressources et une réduction des coûts. Nous vous demandons gentiment votre assistance experte pour initier les changements nécessaires dès que possible. Merci de nous faire savoir quelles étapes nous devrions entreprendre de notre côté pour faciliter ce processus.

De plus, si vous avez besoin de plus de détails concernant notre infrastructure ou nos objectifs d'optimisation, n'hésitez pas à me contacter directement. Mon numéro de contact est <tel_num>.

Merci pour votre attention rapide à cette affaire You can reach me at maria.gonzalez@shop.es.. Nous apprécions grandement votre soutien et espérons résoudre ce problème rapidement.

Cordialement,

<name>"""

!python test.py --email "$test_email"

  torch.utils._pytree._register_pytree_node(
2025-04-19 08:24:46.723909: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1745051086.744920   19586 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1745051086.751550   19586 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
  torch.utils._pytree._register_pytree_node(
2025-04-19 08:24:52,317 - INFO - Model loaded from ./model_data
2025-04-19 08:24:52,317 - INFO - Masking PII...
2025-04-19 08:24:52,986 - INFO - Classifying email...
2025-04-19 08:24:53,198 - INFO - Results:
2025-04-19 08:24:53,198 - INFO - Category: Change
2025-04-19 08:24:53,198 - INFO - Masked email: Subject: Changement de configuration AWS u

In [None]:
import time
import requests
import json
import socket

# Start the FastAPI server
server_process = subprocess.Popen(['python', '-m', 'uvicorn', 'app:app', '--host', '0.0.0.0', '--port', '8000'])
print("Server started, waiting for it to initialize...")

# Wait until server is ready
def wait_for_server(host, port, timeout=90):
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            with socket.create_connection((host, port), timeout=2):
                print("Server is up!")
                return
        except OSError:
            time.sleep(1)
    raise RuntimeError("Server didn't start within timeout.")

wait_for_server("localhost", 8000)

# Test the API
test_email = """Subject: Multiple Issues with Account and Billing

Dear Support Team,

I'm experiencing several problems with my account and recent charges. I've been a customer for 5 years and need urgent assistance.

My account details:
Name: Robert Johnson
Email: robert.johnson@emailprovider.com
Phone: (555) 123-4567
DOB: 03/12/1978
Account ID: A-45678
Aadhar: 8765 4321 9876 5432

I noticed several unauthorized charges on my credit card (4444-3333-2222-1111, expiry 09/26, CVV 555) last month.

Additionally, I'm unable to access certain premium features I've paid for. When I try to log in from my secondary email (robert.j.work@company.org), it says my subscription has expired.

I've tried contacting billing department at your toll-free number 1-800-555-9876 but couldn't get through.

Please help me resolve these issues as soon as possible. You can reach me at my alternate number +1-555-987-6543.

Best regards,
Robert Johnson"""
response = requests.post('http://localhost:8000/classify', json={'email_body': test_email})

if response.status_code == 200:
    print("API Request Successful!")
    print(json.dumps(response.json(), indent=2))
else:
    print(f"Error: {response.status_code}")
    print(response.text)

# Clean up
server_process.terminate()
print("Server stopped.")

In [27]:
# Create a zip file of all necessary files for Hugging Face deployment
!zip -r email_classifier.zip app_hf.py templates/ models.py utils.py requirements.txt Dockerfile model_data/ README.md

  adding: app_hf.py (deflated 73%)
  adding: templates/ (stored 0%)
  adding: templates/index.html (deflated 68%)
  adding: models.py (deflated 72%)
  adding: utils.py (deflated 65%)
  adding: requirements.txt (deflated 34%)
  adding: Dockerfile (deflated 24%)
  adding: model_data/ (stored 0%)
  adding: model_data/tokenizer_config.json (deflated 45%)
  adding: model_data/pytorch_model.bin (deflated 7%)
  adding: model_data/special_tokens_map.json (deflated 42%)
  adding: model_data/config.json (deflated 52%)
  adding: model_data/vocab.txt (deflated 53%)
  adding: model_data/label_mapping.pkl (deflated 16%)
  adding: README.md (deflated 51%)
