In [9]:
# First cell - additional imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from typing import Optional, Dict, List, Tuple, Set
import requests
from bs4 import BeautifulSoup, Tag
import re
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, average_precision_score, roc_auc_score
import tqdm
import random
import os
from collections import Counter, defaultdict

In [10]:
class HTMLAdClassifier(nn.Module):
    """Neural network that classifies each HTML token (or element start tag) as ad / non‑ad.

    The model is intentionally conservative: a sigmoid output and a tunable probability
    threshold allow you to bias toward *no* prediction rather than a false positive.
    """

    def __init__(
        self,
        vocab_size: int,
        tag_vocab_size: int,
        attr_vocab_size: int,
        embed_dim: int = 256,
        num_layers: int = 4,
        num_heads: int = 8,
        dropout: float = 0.2,
        max_seq_len: int = 1024,
    ) -> None:
        super().__init__()

        # ──────────────────── Embedding blocks ────────────────────
        self.token_embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.tag_embed = nn.Embedding(tag_vocab_size, embed_dim, padding_idx=0)
        self.attr_embed = nn.Embedding(attr_vocab_size, embed_dim, padding_idx=0)
        self.pos_embed = nn.Embedding(max_seq_len, embed_dim)
        self.embed_dropout = nn.Dropout(dropout)

        # ──────────────────── Transformer encoder ────────────────────
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dim_feedforward=embed_dim * 4,
            dropout=dropout,
            activation="gelu",
            batch_first=True,
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers)

        # ──────────────────── Classification head ────────────────────
        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, embed_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(embed_dim, 1),  # logit
        )

    def forward(
        self,
        token_ids: torch.LongTensor,      # (B, L)
        tag_ids: torch.LongTensor,        # (B, L)
        attr_ids: torch.LongTensor,       # (B, L)
        pos_ids: torch.LongTensor,        # (B, L)
        attention_mask: Optional[torch.BoolTensor] = None,  # (B, L)
    ) -> torch.Tensor:
        """Return per‑token logits (before sigmoid)."""
        x = (
            self.token_embed(token_ids)
            + self.tag_embed(tag_ids)
            + self.attr_embed(attr_ids)
            + self.pos_embed(pos_ids)
        )
        x = self.embed_dropout(x)

        x = self.encoder(x, src_key_padding_mask=attention_mask)
        logits = self.classifier(x).squeeze(-1)  # (B, L)
        return logits

    # -------------------------------------------------------------
    # Convenience helpers
    # -------------------------------------------------------------
    @staticmethod
    def probability(logits: torch.Tensor) -> torch.Tensor:
        """Convert logits to probabilities with a numerically stable sigmoid."""
        return torch.sigmoid(logits)

    @staticmethod
    def prediction(logits: torch.Tensor, threshold: float = 0.9) -> torch.Tensor:
        """Return boolean mask of predictions above threshold.
        A high default threshold keeps false‑positives low.
        """
        return torch.sigmoid(logits) > threshold


In [11]:
# HTML Tokenization and Processing

class HTMLTokenizer:
    """Tokenizes HTML into components needed by the model."""
    
    def __init__(self, max_seq_len=1024):
        self.max_seq_len = max_seq_len
        self.token_vocab = {"<PAD>": 0, "<UNK>": 1}
        self.tag_vocab = {"<PAD>": 0, "<UNK>": 1}
        self.attr_vocab = {"<PAD>": 0, "<UNK>": 1}
        self.token_counter = Counter()
        self.tag_counter = Counter()
        self.attr_counter = Counter()
        
    def fit(self, html_documents: List[str], min_freq: int = 2):
        """Build vocabularies from a list of HTML documents."""
        for html in html_documents:
            tokens, tags, attrs = self._tokenize_html(html)
            self.token_counter.update(tokens)
            self.tag_counter.update(tags)
            self.attr_counter.update(attrs)
            
        # Build vocabularies, keeping only tokens that appear at least min_freq times
        for token, count in self.token_counter.items():
            if count >= min_freq and token not in self.token_vocab:
                self.token_vocab[token] = len(self.token_vocab)
                
        for tag, count in self.tag_counter.items():
            if count >= min_freq and tag not in self.tag_vocab:
                self.tag_vocab[tag] = len(self.tag_vocab)
                
        for attr, count in self.attr_counter.items():
            if count >= min_freq and attr not in self.attr_vocab:
                self.attr_vocab[attr] = len(self.attr_vocab)
                
        print(f"Vocab sizes: tokens={len(self.token_vocab)}, tags={len(self.tag_vocab)}, attrs={len(self.attr_vocab)}")
        
    def _tokenize_html(self, html: str) -> Tuple[List[str], List[str], List[str]]:
        """Extract tokens, tags, and attributes from HTML content."""
        soup = BeautifulSoup(html, 'html.parser')
        tokens, tags, attrs = [], [], []
        
        def process_node(node):
            if isinstance(node, Tag):
                # Process tag
                tag_name = node.name.lower()
                tags.append(tag_name)
                tokens.append(f"<{tag_name}>")
                attrs.append("tag_start")
                
                # Process attributes
                for attr, value in node.attrs.items():
                    attr_text = attr.lower()
                    tags.append("<UNK>")  # Tags aren't relevant for attributes
                    attrs.append(attr_text)
                    
                    if isinstance(value, list):
                        value = " ".join(value)
                    elif not isinstance(value, str):
                        value = str(value)
                        
                    tokens.append(value)
                
                # Process children
                for child in node.children:
                    process_node(child)
                    
                # Close tag
                tags.append(tag_name)
                tokens.append(f"</{tag_name}>")
                attrs.append("tag_end")
            elif node.string and node.string.strip():
                # Process text content
                for word in re.findall(r'\w+|[^\w\s]', node.string):
                    tokens.append(word)
                    tags.append("<UNK>")
                    attrs.append("<UNK>")
                    
        process_node(soup)
        return tokens, tags, attrs
    
    def encode(self, html: str) -> Dict[str, torch.Tensor]:
        """Convert HTML to tensors suitable for the model."""
        tokens, tags, attrs = self._tokenize_html(html)
        
        # Truncate if too long
        if len(tokens) > self.max_seq_len:
            tokens = tokens[:self.max_seq_len]
            tags = tags[:self.max_seq_len]
            attrs = attrs[:self.max_seq_len]
            
        # Convert to IDs
        token_ids = [self.token_vocab.get(t, self.token_vocab["<UNK>"]) for t in tokens]
        tag_ids = [self.tag_vocab.get(t, self.tag_vocab["<UNK>"]) for t in tags]
        attr_ids = [self.attr_vocab.get(a, self.attr_vocab["<UNK>"]) for a in attrs]
        
        # Create position IDs and attention mask
        pos_ids = list(range(len(token_ids)))
        attention_mask = [False] * len(token_ids)  # False = attend to this position
        
        # Pad if necessary
        padding_length = self.max_seq_len - len(token_ids)
        if padding_length > 0:
            token_ids += [self.token_vocab["<PAD>"]] * padding_length
            tag_ids += [self.tag_vocab["<PAD>"]] * padding_length
            attr_ids += [self.attr_vocab["<PAD>"]] * padding_length
            pos_ids += [0] * padding_length  # Padding positions get 0
            attention_mask += [True] * padding_length  # True = mask this position
            
        # Convert to tensors
        return {
            "token_ids": torch.tensor(token_ids, dtype=torch.long),
            "tag_ids": torch.tensor(tag_ids, dtype=torch.long),
            "attr_ids": torch.tensor(attr_ids, dtype=torch.long),
            "pos_ids": torch.tensor(pos_ids, dtype=torch.long),
            "attention_mask": torch.tensor(attention_mask, dtype=torch.bool)
        }
    
    def save(self, path: str):
        """Save tokenizer vocabularies."""
        os.makedirs(os.path.dirname(path), exist_ok=True)
        torch.save({
            "token_vocab": self.token_vocab,
            "tag_vocab": self.tag_vocab,
            "attr_vocab": self.attr_vocab,
            "max_seq_len": self.max_seq_len
        }, path)
        
    @classmethod
    def load(cls, path: str):
        """Load tokenizer from saved vocabularies."""
        data = torch.load(path)
        tokenizer = cls(max_seq_len=data["max_seq_len"])
        tokenizer.token_vocab = data["token_vocab"]
        tokenizer.tag_vocab = data["tag_vocab"]
        tokenizer.attr_vocab = data["attr_vocab"]
        return tokenizer

In [12]:
# Dataset and DataLoader classes

class HTMLAdDataset(Dataset):
    """Dataset for HTML ad classification."""
    
    def __init__(self, html_documents: List[str], labels: List[List[int]], tokenizer: HTMLTokenizer):
        """
        Args:
            html_documents: List of HTML strings
            labels: List of label lists (one label per token, 0=not ad, 1=ad)
            tokenizer: HTMLTokenizer instance
        """
        self.tokenizer = tokenizer
        self.examples = []
        
        for html, label_list in zip(html_documents, labels):
            encoding = tokenizer.encode(html)
            
            # Ensure labels match the tokenized sequence length
            seq_len = min(len(label_list), tokenizer.max_seq_len)
            token_labels = label_list[:seq_len]
            
            # Pad labels if necessary
            if len(token_labels) < tokenizer.max_seq_len:
                token_labels = token_labels + [0] * (tokenizer.max_seq_len - len(token_labels))
            
            self.examples.append({
                **encoding,
                "labels": torch.tensor(token_labels, dtype=torch.float)
            })
    
    def __len__(self):
        return len(self.examples)
    
    def __getitem__(self, idx):
        return self.examples[idx]

def create_dataloaders(train_html, train_labels, val_html, val_labels, tokenizer, 
                       batch_size=8, oversample_ads=True):
    """Create training and validation DataLoaders with optional oversampling."""
    
    # Create datasets
    train_dataset = HTMLAdDataset(train_html, train_labels, tokenizer)
    val_dataset = HTMLAdDataset(val_html, val_labels, tokenizer)
    
    # Identify examples with ads for oversampling
    if oversample_ads:
        ad_indices = []
        for i, example in enumerate(train_dataset.examples):
            if example["labels"].sum() > 0:  # Contains at least one ad
                ad_indices.append(i)
        
        # Create sampler that oversamples examples with ads
        if ad_indices:
            non_ad_indices = [i for i in range(len(train_dataset)) if i not in ad_indices]
            weights = [5.0 if i in ad_indices else 1.0 for i in range(len(train_dataset))]
            sampler = torch.utils.data.WeightedRandomSampler(
                weights=weights,
                num_samples=len(train_dataset),
                replacement=True
            )
            train_loader = DataLoader(
                train_dataset, 
                batch_size=batch_size,
                sampler=sampler
            )
        else:
            train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    else:
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, val_loader

In [13]:
# Training and evaluation

def train_model(model, train_loader, val_loader, num_epochs=10, lr=1e-4, device="cuda"):
    """Train the HTML ad classifier model."""
    model.to(device)
    
    # Use weighted binary cross-entropy to handle class imbalance
    # Weight positive examples (ads) much higher than negative examples
    pos_weight = torch.tensor([10.0]).to(device)  # Adjust based on your dataset
    criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
    
    optimizer = optim.AdamW(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='max', factor=0.5, patience=2, verbose=True
    )
    
    best_val_auc = 0.0
    history = {"train_loss": [], "val_loss": [], "val_auc": []}
    
    for epoch in range(num_epochs):
        # Training
        model.train()
        train_loss = 0.0
        progress_bar = tqdm.tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")
        
        for batch in progress_bar:
            # Move batch to device
            batch = {k: v.to(device) for k, v in batch.items()}
            
            # Forward pass
            logits = model(
                token_ids=batch["token_ids"],
                tag_ids=batch["tag_ids"],
                attr_ids=batch["attr_ids"],
                pos_ids=batch["pos_ids"],
                attention_mask=batch["attention_mask"]
            )
            
            # Create a mask for non-padding tokens
            non_pad_mask = ~batch["attention_mask"]
            
            # Apply mask to logits and labels
            masked_logits = logits[non_pad_mask]
            masked_labels = batch["labels"][non_pad_mask]
            
            # Calculate loss
            loss = criterion(masked_logits, masked_labels)
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            # Update progress bar
            train_loss += loss.item()
            progress_bar.set_postfix({"loss": loss.item()})
            
        train_loss /= len(train_loader)
        history["train_loss"].append(train_loss)
        
        # Validation
        val_loss, val_auc, val_ap = evaluate_model(model, val_loader, criterion, device)
        history["val_loss"].append(val_loss)
        history["val_auc"].append(val_auc)
        
        print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f}, "
              f"Val Loss: {val_loss:.4f}, Val AUC: {val_auc:.4f}, Val AP: {val_ap:.4f}")
        
        # Update learning rate
        scheduler.step(val_auc)
        
        # Save best model
        if val_auc > best_val_auc:
            best_val_auc = val_auc
            torch.save({
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "epoch": epoch,
                "val_auc": val_auc
            }, "best_ad_classifier.pt")
            
    return model, history

def evaluate_model(model, dataloader, criterion, device):
    """Evaluate the model on validation or test data."""
    model.eval()
    val_loss = 0.0
    all_probs = []
    all_labels = []
    
    with torch.no_grad():
        for batch in dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            
            # Forward pass
            logits = model(
                token_ids=batch["token_ids"],
                tag_ids=batch["tag_ids"],
                attr_ids=batch["attr_ids"],
                pos_ids=batch["pos_ids"],
                attention_mask=batch["attention_mask"]
            )
            
            # Create a mask for non-padding tokens
            non_pad_mask = ~batch["attention_mask"]
            
            # Apply mask to logits and labels
            masked_logits = logits[non_pad_mask]
            masked_labels = batch["labels"][non_pad_mask]
            
            # Calculate loss
            loss = criterion(masked_logits, masked_labels)
            val_loss += loss.item()
            
            # Calculate probabilities
            probs = torch.sigmoid(masked_logits)
            
            # Collect for metrics
            all_probs.extend(probs.cpu().numpy())
            all_labels.extend(masked_labels.cpu().numpy())
    
    val_loss /= len(dataloader)
    
    # Calculate metrics
    all_probs = np.array(all_probs)
    all_labels = np.array(all_labels)
    
    # ROC-AUC score
    try:
        auc = roc_auc_score(all_labels, all_probs)
    except:
        auc = 0.5  # If only one class is present
    
    # Average precision score (PR-AUC)
    try:
        ap = average_precision_score(all_labels, all_probs)
    except:
        ap = 0.0
        
    return val_loss, auc, ap

In [14]:
# Data collection and preprocessing

def fetch_html(url):
    """Fetch HTML content from a URL."""
    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        return response.text
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None

def identify_ad_elements(html, ad_patterns):
    """Identify ad elements in HTML based on patterns."""
    soup = BeautifulSoup(html, 'html.parser')
    tokens, tags, attrs = [], [], []
    labels = []
    
    def is_ad_element(tag):
        """Check if an element is an ad based on patterns."""
        for pattern in ad_patterns:
            # Check tag name
            if pattern.get('tag') and pattern['tag'].lower() == tag.name.lower():
                # Check attributes
                for attr, value in pattern.get('attrs', {}).items():
                    if attr in tag.attrs:
                        tag_value = tag.attrs[attr]
                        if isinstance(tag_value, list):
                            tag_value = ' '.join(tag_value)
                        elif not isinstance(tag_value, str):
                            tag_value = str(tag_value)
                            
                        if isinstance(value, str) and value in tag_value:
                            return True
                        elif hasattr(value, 'search') and value.search(tag_value):  # Regex
                            return True
        return False
    
    def process_node(node, is_ad=False):
        if isinstance(node, Tag):
            # Check if this node is an ad
            node_is_ad = is_ad or is_ad_element(node)
            
            # Process tag
            tag_name = node.name.lower()
            tags.append(tag_name)
            tokens.append(f"<{tag_name}>")
            attrs.append("tag_start")
            labels.append(1 if node_is_ad else 0)
            
            # Process attributes
            for attr, value in node.attrs.items():
                attr_text = attr.lower()
                tags.append("<UNK>")
                attrs.append(attr_text)
                
                if isinstance(value, list):
                    value = " ".join(value)
                elif not isinstance(value, str):
                    value = str(value)
                    
                tokens.append(value)
                labels.append(1 if node_is_ad else 0)
            
            # Process children
            for child in node.children:
                process_node(child, node_is_ad)
                
            # Close tag
            tags.append(tag_name)
            tokens.append(f"</{tag_name}>")
            attrs.append("tag_end")
            labels.append(1 if node_is_ad else 0)
        elif node.string and node.string.strip():
            # Process text content
            for word in re.findall(r'\w+|[^\w\s]', node.string):
                tokens.append(word)
                tags.append("<UNK>")
                attrs.append("<UNK>")
                labels.append(1 if is_ad else 0)
                
    process_node(soup)
    return tokens, tags, attrs, labels

def collect_training_data(urls, ad_patterns):
    """Collect training data from a list of URLs."""
    html_documents = []
    all_labels = []
    
    for url in tqdm.tqdm(urls, desc="Collecting data"):
        html = fetch_html(url)
        if html:
            tokens, tags, attrs, labels = identify_ad_elements(html, ad_patterns)
            html_documents.append(html)
            all_labels.append(labels)
    
    return html_documents, all_labels

In [15]:
# Sample ad patterns and URLs for training

# Common ad patterns (add more based on your observations)
ad_patterns = [
    {'tag': 'div', 'attrs': {'class': re.compile(r'ad|advert|banner|promo', re.I)}},
    {'tag': 'div', 'attrs': {'id': re.compile(r'ad|advert|banner|promo', re.I)}},
    {'tag': 'iframe', 'attrs': {'src': re.compile(r'ad|advert|banner|doubleclick|googleads', re.I)}},
    {'tag': 'a', 'attrs': {'href': re.compile(r'sponsored|ad\.|advert|campaign\?', re.I)}},
    {'tag': 'div', 'attrs': {'data-ad': re.compile(r'.')}},  # any data-ad attribute
    {'tag': 'script', 'attrs': {'src': re.compile(r'ad|pagead|adsbygoogle', re.I)}},
    {'tag': 'ins', 'attrs': {'class': 'adsbygoogle'}},
    {'tag': 'div', 'attrs': {'class': 'sponsored'}},
    # Add patterns for specific sites you're targeting
]

# Example URLs to crawl (replace with actual URLs)
urls = [
    "https://example.com",
    "https://news.example.com",
    # Add more URLs
]

# For demonstration, let's create some synthetic data
def create_synthetic_data(n_samples=100):
    """Create synthetic HTML samples with ads for testing."""
    html_samples = []
    label_samples = []
    
    ad_classes = ["ad-banner", "sponsored-content", "adsbygoogle", "promo-box"]
    ad_ids = ["ad-container", "sponsored", "promotion", "adsense"]
    
    for i in range(n_samples):
        # Create random HTML structure
        html = "<html><body>\n"
        
        # Add header
        html += "<header><h1>Sample Page</h1></header>\n"
        
        # Add main content
        html += "<main>\n"
        html += "<article>\n"
        html += "<h2>Article Title</h2>\n"
        html += "<p>This is a sample paragraph with text content.</p>\n"
        
        # Add some sections
        for j in range(random.randint(3, 6)):
            html += f"<section>\n"
            html += f"<h3>Section {j+1}</h3>\n"
            html += "<p>More content for this section with some text.</p>\n"
            
            # Maybe add an ad
            if random.random() < 0.2:  # 20% chance of ad
                ad_type = random.choice(["class", "id"])
                if ad_type == "class":
                    ad_value = random.choice(ad_classes)
                    html += f'<div class="{ad_value}">This is an advertisement!</div>\n'
                else:
                    ad_value = random.choice(ad_ids)
                    html += f'<div id="{ad_value}">This is an advertisement!</div>\n'
                    
            html += "</section>\n"
            
        html += "</article>\n"
        html += "</main>\n"
        
        # Add footer
        html += "<footer><p>Copyright example.com</p></footer>\n"
        html += "</body></html>"
        
        tokens, tags, attrs, labels = identify_ad_elements(html, ad_patterns)
        html_samples.append(html)
        label_samples.append(labels)
    
    return html_samples, label_samples

In [16]:
# Main execution

# Create or load data
train_html, train_labels = create_synthetic_data(500)  # For demo
val_html, val_labels = create_synthetic_data(100)  # For demo

# Build tokenizer and vocabularies
tokenizer = HTMLTokenizer(max_seq_len=1024)
tokenizer.fit(train_html + val_html)

# Create model
model = HTMLAdClassifier(
    vocab_size=len(tokenizer.token_vocab),
    tag_vocab_size=len(tokenizer.tag_vocab),
    attr_vocab_size=len(tokenizer.attr_vocab),
    embed_dim=256,
    num_layers=4,
    num_heads=8,
    dropout=0.2,
    max_seq_len=1024
)

# Create dataloaders
train_loader, val_loader = create_dataloaders(
    train_html, train_labels, val_html, val_labels, tokenizer,
    batch_size=8, oversample_ads=True
)

# Train model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model, history = train_model(
    model, train_loader, val_loader,
    num_epochs=10, lr=1e-4, device=device
)

# Save model and tokenizer
torch.save(model.state_dict(), "html_ad_classifier.pt")
tokenizer.save("html_tokenizer.pt")

Vocab sizes: tokens=67, tags=15, attrs=6


Epoch 1/10: 100%|█████████████████████████████████████████████████████████| 63/63 [00:04<00:00, 14.05it/s, loss=0.0677]
  output = torch._nested_tensor_from_mask(


Epoch 1/10 - Train Loss: 0.3850, Val Loss: 0.0355, Val AUC: 0.9972, Val AP: 0.9100


Epoch 2/10: 100%|█████████████████████████████████████████████████████████| 63/63 [00:03<00:00, 16.51it/s, loss=0.0193]


Epoch 2/10 - Train Loss: 0.0547, Val Loss: 0.0343, Val AUC: 0.9970, Val AP: 0.8971


Epoch 3/10: 100%|████████████████████████████████████████████████████████| 63/63 [00:03<00:00, 16.90it/s, loss=0.00752]


Epoch 3/10 - Train Loss: 0.0537, Val Loss: 0.0339, Val AUC: 0.9970, Val AP: 0.8992


Epoch 4/10: 100%|████████████████████████████████████████████████████████| 63/63 [00:03<00:00, 16.88it/s, loss=0.00584]


Epoch 4/10 - Train Loss: 0.0451, Val Loss: 0.0349, Val AUC: 0.9971, Val AP: 0.9038


Epoch 5/10: 100%|████████████████████████████████████████████████████████| 63/63 [00:03<00:00, 16.88it/s, loss=0.00981]


Epoch 5/10 - Train Loss: 0.0471, Val Loss: 0.0331, Val AUC: 0.9970, Val AP: 0.8945


Epoch 6/10: 100%|█████████████████████████████████████████████████████████| 63/63 [00:03<00:00, 16.81it/s, loss=0.0124]


Epoch 6/10 - Train Loss: 0.0502, Val Loss: 0.0330, Val AUC: 0.9970, Val AP: 0.8952


Epoch 7/10: 100%|████████████████████████████████████████████████████████| 63/63 [00:03<00:00, 15.91it/s, loss=0.00976]


Epoch 7/10 - Train Loss: 0.0355, Val Loss: 0.0333, Val AUC: 0.9969, Val AP: 0.8939


Epoch 8/10: 100%|████████████████████████████████████████████████████████| 63/63 [00:03<00:00, 16.63it/s, loss=0.00796]


Epoch 8/10 - Train Loss: 0.0435, Val Loss: 0.0327, Val AUC: 0.9969, Val AP: 0.8942


Epoch 9/10: 100%|████████████████████████████████████████████████████████| 63/63 [00:03<00:00, 16.78it/s, loss=0.00838]


Epoch 9/10 - Train Loss: 0.0385, Val Loss: 0.0327, Val AUC: 0.9970, Val AP: 0.8960


Epoch 10/10: 100%|████████████████████████████████████████████████████████| 63/63 [00:03<00:00, 16.88it/s, loss=0.0111]


Epoch 10/10 - Train Loss: 0.0378, Val Loss: 0.0327, Val AUC: 0.9970, Val AP: 0.8977


FileNotFoundError: [WinError 3] The system cannot find the path specified: ''

In [8]:
# Inference and visualization

def detect_ads_in_html(html, model, tokenizer, threshold=0.9, device="cpu"):
    """Detect ads in HTML and return annotated HTML."""
    model.to(device)
    model.eval()
    
    # Tokenize HTML
    soup = BeautifulSoup(html, 'html.parser')
    tokens, tags, attrs = tokenizer._tokenize_html(html)
    
    # Encode for model
    encoding = tokenizer.encode(html)
    encoding = {k: v.unsqueeze(0).to(device) for k, v in encoding.items()}  # Add batch dimension
    
    # Get predictions
    with torch.no_grad():
        logits = model(**encoding)[0]  # Remove batch dimension
        probs = torch.sigmoid(logits).cpu().numpy()
    
    # Process up to the actual content length
    content_len = min(len(tokens), len(logits))
    predictions = probs[:content_len] > threshold
    
    # Debug info
    ad_elements = []
    ad_token_ids = []
    for i, (token, is_ad) in enumerate(zip(tokens[:content_len], predictions)):
        if is_ad:
            ad_token_ids.append(i)
            if token.startswith("<") and token.endswith(">"):
                tag_name = token[1:-1]
                if not tag_name.startswith("/"):  # Start tag, not end tag
                    ad_elements.append(tag_name)
    
    # Now mark the ads in the parsed HTML
    def mark_ad_elements(node, node_path=None, parent_is_ad=False):
        if node_path is None:
            node_path = []
        
        if isinstance(node, Tag):
            # Check if this node corresponds to one of our ad elements
            is_ad = parent_is_ad
            
            # Mark this node as an ad if it's in our list
            if node.name in ad_elements:
                is_ad = True
                if not node.get("data-marked-as-ad"):
                    node["data-marked-as-ad"] = "true"
                    node["style"] = "border: 3px solid red; background-color: rgba(255, 0, 0, 0.1);"
            
            # Process all children
            for child in node.children:
                mark_ad_elements(child, node_path + [node], is_ad)
                
    mark_ad_elements(soup)
    
    # Count the ads found
    ad_count = len([1 for tag in soup.find_all() if tag.get("data-marked-as-ad") == "true"])
    
    return {
        "annotated_html": str(soup),
        "ad_count": ad_count,
        "ad_elements": ad_elements,
        "ad_token_ids": ad_token_ids
    }

def visualize_training_history(history):
    """Visualize training and validation metrics."""
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(history["train_loss"], label="Train Loss")
    plt.plot(history["val_loss"], label="Val Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Loss Curve")
    
    plt.subplot(1, 2, 2)
    plt.plot(history["val_auc"], label="Validation AUC")
    plt.xlabel("Epoch")
    plt.ylabel("AUC Score")
    plt.legend()
    plt.title("Validation AUC")
    
    plt.tight_layout()
    plt.show()

# Example usage
def test_on_real_website(url, model, tokenizer, threshold=0.9):
    """Test the model on a real website."""
    print(f"Testing on {url}")
    html = fetch_html(url)
    if not html:
        print("Failed to fetch HTML")
        return
        
    result = detect_ads_in_html(html, model, tokenizer, threshold=threshold)
    print(f"Found {result['ad_count']} potential ads")
    print(f"Ad element types: {result['ad_elements']}")
    
    # Save annotated HTML for inspection
    with open("annotated_page.html", "w", encoding="utf-8") as f:
        f.write(result["annotated_html"])
    print("Saved annotated HTML to annotated_page.html")