# PyTorch Classifier (Currently Used)

Complete PyTorch neural network implementation including model class definitions (ResidualBlock, PatentNoveltyNet, PyTorchPatentClassifier), training on 10 features, evaluation, and saving.


In [None]:
import sys
from pathlib import Path
sys.path.insert(0, str(Path().absolute().parent))

import numpy as np
import json
import time
import pickle
from datetime import datetime
from typing import Tuple, Dict, Optional, List
import logging

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


## Model Class Definition


In [None]:
class ResidualBlock(nn.Module):
    """Residual block with batch norm and dropout."""
    
    def __init__(self, in_features: int, out_features: int, dropout: float = 0.3, bn_momentum: float = 0.1):
        super().__init__()
        self.fc = nn.Linear(in_features, out_features)
        self.bn = nn.BatchNorm1d(out_features, momentum=bn_momentum)
        self.dropout = nn.Dropout(dropout)
        self.activation = nn.ReLU()
        
        if in_features != out_features:
            self.skip = nn.Linear(in_features, out_features)
            self.skip_bn = nn.BatchNorm1d(out_features, momentum=bn_momentum)
        else:
            self.skip = nn.Identity()
            self.skip_bn = None
    
    def forward(self, x):
        identity = self.skip(x)
        if self.skip_bn is not None:
            identity = self.skip_bn(identity)
        
        out = self.fc(x)
        out = self.bn(out)
        out = self.activation(out)
        out = self.dropout(out)
        out = out + identity
        return out


class PatentNoveltyNet(nn.Module):
    """Deep neural network for patent novelty classification."""
    
    def __init__(
        self,
        input_dim: int,
        hidden_dims: List[int] = [128, 64, 32],
        dropout: float = 0.3,
        use_residual: bool = True,
        bn_momentum: float = 0.1
    ):
        super().__init__()
        
        self.input_bn = nn.BatchNorm1d(input_dim, momentum=bn_momentum)
        
        layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            if use_residual:
                layers.append(ResidualBlock(prev_dim, hidden_dim, dropout, bn_momentum))
            else:
                layers.append(nn.Linear(prev_dim, hidden_dim))
                layers.append(nn.BatchNorm1d(hidden_dim, momentum=bn_momentum))
                layers.append(nn.ReLU())
                layers.append(nn.Dropout(dropout))
            prev_dim = hidden_dim
        
        self.hidden_layers = nn.Sequential(*layers)
        self.output_bn = nn.BatchNorm1d(prev_dim, momentum=bn_momentum)
        self.output_layer = nn.Linear(prev_dim, 1)
        self.sigmoid = nn.Sigmoid()
        
        self._init_weights()
    
    def _init_weights(self):
        """Xavier initialization for better convergence."""
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
    
    def forward(self, x):
        x = self.input_bn(x)
        x = self.hidden_layers(x)
        x = self.output_bn(x)
        x = self.output_layer(x)
        x = self.sigmoid(x)
        return x


class PyTorchPatentClassifier:
    """PyTorch-based patent novelty classifier with modern deep learning techniques."""
    
    def __init__(
        self,
        hidden_dims: List[int] = [128, 64, 32],
        dropout: float = 0.3,
        learning_rate: float = 0.001,
        weight_decay: float = 1e-4,
        batch_size: int = 256,
        max_epochs: int = 100,
        patience: int = 15,
        use_residual: bool = True,
        bn_momentum: float = 0.1,
        device: str = None
    ):
        self.hidden_dims = hidden_dims
        self.dropout = dropout
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.batch_size = batch_size
        self.max_epochs = max_epochs
        self.patience = patience
        self.use_residual = use_residual
        self.bn_momentum = bn_momentum
        
        if device is None:
            if torch.cuda.is_available():
                self.device = torch.device("cuda")
            elif torch.backends.mps.is_available():
                self.device = torch.device("mps")
            else:
                self.device = torch.device("cpu")
        else:
            self.device = torch.device(device)
        
        self.model = None
        self.scaler = StandardScaler()
        self.training_history = {"train_loss": [], "val_loss": [], "val_acc": []}
        self.feature_names = None
        
        logger.info(f"PyTorch Classifier initialized (device: {self.device})")
    
    def _create_dataloaders(
        self,
        X_train: np.ndarray,
        y_train: np.ndarray,
        X_val: np.ndarray = None,
        y_val: np.ndarray = None
    ) -> Tuple[DataLoader, Optional[DataLoader]]:
        X_train_t = torch.FloatTensor(X_train)
        y_train_t = torch.FloatTensor(y_train).unsqueeze(1)
        
        train_dataset = TensorDataset(X_train_t, y_train_t)
        train_loader = DataLoader(
            train_dataset, 
            batch_size=self.batch_size, 
            shuffle=True,
            drop_last=True
        )
        
        val_loader = None
        if X_val is not None and y_val is not None:
            X_val_t = torch.FloatTensor(X_val)
            y_val_t = torch.FloatTensor(y_val).unsqueeze(1)
            val_dataset = TensorDataset(X_val_t, y_val_t)
            val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False)
        
        return train_loader, val_loader
    
    def fit(
        self,
        X_train: np.ndarray,
        y_train: np.ndarray,
        X_val: np.ndarray = None,
        y_val: np.ndarray = None,
        feature_names: List[str] = None,
        use_mixup: bool = True,
        mixup_alpha: float = 0.2
    ) -> Dict:
        self.feature_names = feature_names
        
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_val_scaled = self.scaler.transform(X_val) if X_val is not None else None
        
        input_dim = X_train.shape[1]
        self.model = PatentNoveltyNet(
            input_dim=input_dim,
            hidden_dims=self.hidden_dims,
            dropout=self.dropout,
            use_residual=self.use_residual,
            bn_momentum=self.bn_momentum
        ).to(self.device)
        
        criterion = nn.BCELoss()
        optimizer = optim.AdamW(
            self.model.parameters(),
            lr=self.learning_rate,
            weight_decay=self.weight_decay
        )
        
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=5
        )
        
        train_loader, val_loader = self._create_dataloaders(
            X_train_scaled, y_train, X_val_scaled, y_val
        )
        
        logger.info(f"Training PyTorch model...")
        logger.info(f"  Architecture: {self.hidden_dims}")
        logger.info(f"  Dropout: {self.dropout}")
        logger.info(f"  Residual connections: {self.use_residual}")
        logger.info(f"  Mixup augmentation: {use_mixup}")
        
        best_val_loss = float('inf')
        patience_counter = 0
        best_state = None
        
        for epoch in range(self.max_epochs):
            self.model.train()
            train_loss = 0.0
            num_batches = 0
            
            for batch_X, batch_y in train_loader:
                batch_X = batch_X.to(self.device)
                batch_y = batch_y.to(self.device)
                
                if use_mixup and np.random.random() > 0.5:
                    lam = np.random.beta(mixup_alpha, mixup_alpha)
                    index = torch.randperm(batch_X.size(0)).to(self.device)
                    batch_X = lam * batch_X + (1 - lam) * batch_X[index]
                    batch_y = lam * batch_y + (1 - lam) * batch_y[index]
                
                optimizer.zero_grad()
                outputs = self.model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
                
                optimizer.step()
                train_loss += loss.item()
                num_batches += 1
            
            avg_train_loss = train_loss / num_batches if num_batches > 0 else 0.0
            self.training_history["train_loss"].append(avg_train_loss)
            
            if val_loader is not None:
                self.model.eval()
                val_loss = 0.0
                all_preds = []
                all_labels = []
                
                with torch.no_grad():
                    for batch_X, batch_y in val_loader:
                        batch_X = batch_X.to(self.device)
                        batch_y = batch_y.to(self.device)
                        
                        outputs = self.model(batch_X)
                        loss = criterion(outputs, batch_y)
                        val_loss += loss.item()
                        
                        all_preds.extend(outputs.cpu().numpy())
                        all_labels.extend(batch_y.cpu().numpy())
                
                avg_val_loss = val_loss / len(val_loader)
                val_acc = accuracy_score(
                    np.array(all_labels) > 0.5,
                    np.array(all_preds) > 0.5
                )
                
                self.training_history["val_loss"].append(avg_val_loss)
                self.training_history["val_acc"].append(val_acc)
                
                scheduler.step(avg_val_loss)
                
                if avg_val_loss < best_val_loss:
                    best_val_loss = avg_val_loss
                    patience_counter = 0
                    best_state = self.model.state_dict().copy()
                else:
                    patience_counter += 1
                
                if (epoch + 1) % 5 == 0:
                    logger.info(
                        f"Epoch {epoch+1}/{self.max_epochs} - "
                        f"Train Loss: {avg_train_loss:.4f}, "
                        f"Val Loss: {avg_val_loss:.4f}, "
                        f"Val Acc: {val_acc:.4f}"
                    )
                
                if patience_counter >= self.patience:
                    logger.info(f"Early stopping at epoch {epoch+1}")
                    break
        
        if best_state is not None:
            self.model.load_state_dict(best_state)
        
        logger.info("Training complete!")
        
        return self.training_history
    
    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        self.model.eval()
        X_scaled = self.scaler.transform(X)
        X_tensor = torch.FloatTensor(X_scaled).to(self.device)
        
        with torch.no_grad():
            probs = self.model(X_tensor).cpu().numpy()
        
        return np.hstack([1 - probs, probs])
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        probs = self.predict_proba(X)[:, 1]
        return (probs > 0.5).astype(int)
    
    def evaluate(self, X: np.ndarray, y: np.ndarray) -> Dict:
        y_pred = self.predict(X)
        y_proba = self.predict_proba(X)[:, 1]
        
        return {
            "accuracy": accuracy_score(y, y_pred),
            "precision": precision_score(y, y_pred),
            "recall": recall_score(y, y_pred),
            "f1": f1_score(y, y_pred),
            "roc_auc": roc_auc_score(y, y_proba),
            "confusion_matrix": confusion_matrix(y, y_pred).tolist()
        }
    
    def save(self, path: str):
        path = Path(path)
        path.mkdir(parents=True, exist_ok=True)
        
        torch.save({
            "model_state_dict": self.model.state_dict(),
            "hidden_dims": self.hidden_dims,
            "dropout": self.dropout,
            "use_residual": self.use_residual,
            "bn_momentum": self.bn_momentum,
            "input_dim": self.model.input_bn.num_features
        }, path / "pytorch_model.pt")
        
        with open(path / "scaler_pytorch.pkl", "wb") as f:
            pickle.dump(self.scaler, f)
        
        with open(path / "training_history_pytorch.json", "w") as f:
            json.dump(self.training_history, f, indent=2)
        
        logger.info(f"Model saved to {path}")
    
    def load(self, path: str):
        path = Path(path)
        
        checkpoint = torch.load(path / "pytorch_model.pt", map_location=self.device)
        
        self.hidden_dims = checkpoint["hidden_dims"]
        self.dropout = checkpoint["dropout"]
        self.use_residual = checkpoint["use_residual"]
        self.bn_momentum = checkpoint.get("bn_momentum", 0.1)
        
        self.model = PatentNoveltyNet(
            input_dim=checkpoint["input_dim"],
            hidden_dims=self.hidden_dims,
            dropout=self.dropout,
            use_residual=self.use_residual,
            bn_momentum=self.bn_momentum
        ).to(self.device)
        
        self.model.load_state_dict(checkpoint["model_state_dict"])
        
        with open(path / "scaler_pytorch.pkl", "rb") as f:
            self.scaler = pickle.load(f)
        
        logger.info(f"Model loaded from {path}")


## Load Features


In [None]:
features_dir = Path("data/features")

X_train = np.load(features_dir / "train_features_v2.X.npy")
y_train = np.load(features_dir / "train_features_v2.y.npy")
X_val = np.load(features_dir / "val_features_v2.X.npy")
y_val = np.load(features_dir / "val_features_v2.y.npy")
X_test = np.load(features_dir / "test_features_v2.X.npy")
y_test = np.load(features_dir / "test_features_v2.y.npy")

old_feature_names = [
    'bm25_doc_score',
    'bm25_best_claim_score',
    'cosine_doc_similarity',
    'cosine_max_claim_similarity',
    'embedding_diff_mean',
    'embedding_diff_std',
    'cpc_jaccard',
    'year_diff',
    'title_jaccard',
    'abstract_length_ratio',
    'claim_count_ratio',
    'shared_rare_terms_ratio',
    'claim_similarity'
]

print(f"   Original features: {len(old_feature_names)}")
print(f"   Original feature names: {old_feature_names}")

assert X_train.shape[1] == 13, f"Expected 13 features, got {X_train.shape[1]}"


## Remove BM25 and CPC Features


In [None]:
indices_to_remove = [0, 1, 6]
indices_to_keep = [i for i in range(13) if i not in indices_to_remove]

print(f"\n2. Removing features at indices {indices_to_remove}")
print(f"   Removing: {[old_feature_names[i] for i in indices_to_remove]}")

X_train = X_train[:, indices_to_keep]
X_val = X_val[:, indices_to_keep]
X_test = X_test[:, indices_to_keep]

feature_names = [old_feature_names[i] for i in indices_to_keep]

print(f"   Kept features: {feature_names}")
print(f"   New feature count: {len(feature_names)}")

with open(features_dir / "feature_names_v2.json", "w") as f:
    json.dump(feature_names, f, indent=2)
print(f"   Updated feature_names_v2.json with 10 features")

print(f"\n   Training set: {len(X_train)} samples, {X_train.shape[1]} features")
print(f"   Validation set: {len(X_val)} samples")
print(f"   Test set: {len(X_test)} samples")

assert X_train.shape[1] == 10, f"Expected 10 features, got {X_train.shape[1]}"


## Initialize Model


In [None]:
model = PyTorchPatentClassifier(
    hidden_dims=[128, 64, 32],
    dropout=0.3,
    learning_rate=0.001,
    max_epochs=100,
    patience=15,
    batch_size=256
)

## Train Model


In [None]:
print("\n4. Training model...")
print("   This will take 10-20 minutes on GPU, 30-60 minutes on CPU")
start_time = time.time()

model.fit(
    X_train, y_train,
    X_val, y_val,
    feature_names=feature_names,
    use_mixup=True
)

training_time = time.time() - start_time
print(f"\n   Training completed in {training_time/60:.1f} minutes")

## Evaluate Model


In [None]:
print("\n5. Evaluating model...")
train_metrics = model.evaluate(X_train, y_train)
val_metrics = model.evaluate(X_val, y_val)
test_metrics = model.evaluate(X_test, y_test)

print("\n" + "="*70)
print("RESULTS")
print("="*70)
print(f"\nTraining Set:")
print(f"  Accuracy:  {train_metrics['accuracy']:.4f}")
print(f"  ROC-AUC:   {train_metrics['roc_auc']:.4f}")
print(f"  F1:        {train_metrics['f1']:.4f}")

print(f"\nValidation Set:")
print(f"  Accuracy:  {val_metrics['accuracy']:.4f}")
print(f"  ROC-AUC:   {val_metrics['roc_auc']:.4f}")
print(f"  F1:        {val_metrics['f1']:.4f}")

print(f"\nTest Set:")
print(f"  Accuracy:  {test_metrics['accuracy']:.4f}")
print(f"  ROC-AUC:   {test_metrics['roc_auc']:.4f}")
print(f"  F1:        {test_metrics['f1']:.4f}")


## Save Model


In [None]:
print("\n6. Saving model...")
model_path = Path("models/pytorch_nn")
model_path.mkdir(parents=True, exist_ok=True)
model.save(model_path)

results = {
    "training_time_minutes": training_time / 60,
    "features": feature_names,
    "num_features": 10,
    "removed_features": [old_feature_names[i] for i in indices_to_remove],
    "train_metrics": train_metrics,
    "val_metrics": val_metrics,
    "test_metrics": test_metrics,
    "timestamp": datetime.now().isoformat()
}

results_path = model_path / "training_results.json"
with open(results_path, "w") as f:
    json.dump(results, f, indent=2)

print(f"   Model saved to: {model_path}")
print(f"   Results saved to: {results_path}")

print("\n" + "="*70)
print("COMPLETE")
print("="*70)
print(f"\nModel retrained with 10 features (BM25 and CPC removed)")
print(f"Feature names updated in: data/features/feature_names_v2.json")
