# Pet Adoption Speed Prediction using Transformer Models

This notebook implements word representation using transformer models (BERT) with PyTorch.

## Target: AdoptionSpeed (0-4)
- 0: Same day adoption
- 1: Adoption within 7 days
- 2: Adoption between 8-30 days
- 3: Adoption between 31-90 days
- 4: No adoption after 100 days

In [1]:

from google.colab import drive
drive.mount('/content/drive')

# Create a directory for the unzipped data if it doesn't exist
!mkdir -p data

# Unzip the specified file into the 'data' directory
!unzip -q "/content/drive/MyDrive/Work/APP Deep Learn/data/petfinder-adoption-prediction.zip" -d data

print("File unzipped successfully to the 'data' directory.")

: 

In [2]:
# Import libraries
import json
import os
import pandas as pd
import numpy as np
from pathlib import Path
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, cohen_kappa_score, classification_report, confusion_matrix
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

## 1. Load and Explore Data

In [3]:
# Load training data
# colab
df = pd.read_csv('data/train/train.csv')
# venv on VS code
# df = pd.read_csv('../data/train/train.csv')
print(f"Dataset shape: {df.shape}")
print(f"\nColumns: {df.columns.tolist()}")
print(f"\nFirst few rows:")
df.head()

In [4]:
# Check target distribution
print("AdoptionSpeed distribution:")
print(df['AdoptionSpeed'].value_counts().sort_index())

# Visualize distribution
plt.figure(figsize=(10, 5))
df['AdoptionSpeed'].value_counts().sort_index().plot(kind='bar')
plt.title('Distribution of Adoption Speed')
plt.xlabel('Adoption Speed')
plt.ylabel('Count')
plt.xticks(rotation=0)
plt.show()

In [5]:
# Check descriptions
print(f"Rows with descriptions: {df['Description'].notna().sum()}")
print(f"\nSample description:")
print(df[df['Description'].notna()]['Description'].iloc[0])

In [6]:
# Explore metadata files
sample_pet_id = df['PetID'].iloc[0]
metadata_file = f'data/train_metadata/{sample_pet_id}-1.json'

if os.path.exists(metadata_file):
    with open(metadata_file, 'r') as f:
        metadata = json.load(f)
    print(f"Metadata for {sample_pet_id}:")
    print(json.dumps(metadata, indent=2)[:500], "...")

## 2. Create Dataset Class

In [7]:
class PetAdoptionDataset(Dataset):
    """Dataset for Pet Adoption with text metadata"""

    def __init__(self, dataframe, metadata_dir, tokenizer, max_length=128):
        self.data = dataframe.reset_index(drop=True)
        self.metadata_dir = metadata_dir
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        pet_id = row['PetID']

        # Extract text from metadata
        text = self.extract_text_from_metadata(pet_id)

        # Add all relevant text fields
        text_parts = []
        if pd.notna(row.get('Description')):
            text_parts.append(str(row['Description']))
        if text != 'no description':
            text_parts.append(text)
            
        full_text = " ".join(text_parts) if text_parts else "No description available"

        # Tokenize text
        encoding = self.tokenizer(
            full_text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten().long(),  # Ensure Long type
            'attention_mask': encoding['attention_mask'].flatten().long(), # Ensure Long type
            'labels': torch.tensor(row['AdoptionSpeed'], dtype=torch.long)
        }

    def extract_text_from_metadata(self, pet_id):
        """Extract text descriptions from metadata JSON files"""
        texts = []

        # Check for metadata files with this PetID
        for i in range(1, 10):  # Reduced to top 10 images to speed up
            metadata_file = os.path.join(self.metadata_dir, f"{pet_id}-{i}.json")

            if not os.path.exists(metadata_file):
                break

            try:
                with open(metadata_file, 'r') as f:
                    metadata = json.load(f)

                # Extract label annotations (descriptions)
                if 'labelAnnotations' in metadata:
                    # Only take high confidence labels (>0.8)
                    descriptions = [
                        label['description'] 
                        for label in metadata['labelAnnotations'][:5]
                        if label.get('score', 0) > 0.8
                    ]
                    texts.extend(descriptions)

            except Exception as e:
                continue
        
        # Remove duplicates while preserving order
        seen = set()
        unique_texts = [x for x in texts if not (x in seen or seen.add(x))]

        return ' '.join(unique_texts) if unique_texts else 'no description'

## 3. Define Transformer Model

In [8]:
class TransformerPetClassifier(nn.Module):
    """Transformer-based classifier for pet adoption speed prediction"""

    def __init__(self, model_name='microsoft/deberta-v3-base', num_classes=5, dropout=0.3):
        super(TransformerPetClassifier, self).__init__()

        self.transformer = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(dropout)

        # Get hidden size from transformer config
        hidden_size = self.transformer.config.hidden_size

        # Enhanced classification head with BatchNorm
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, num_classes)
        )

    def forward(self, input_ids, attention_mask):
        # Get transformer outputs
        outputs = self.transformer(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

        # Use [CLS] token representation
        pooled_output = outputs.last_hidden_state[:, 0, :]
        pooled_output = self.dropout(pooled_output)

        # Classification
        logits = self.classifier(pooled_output)

        return logits

## 4. Training and Evaluation Functions

In [9]:
def train_epoch(model, dataloader, optimizer, criterion, device):
    """Train for one epoch"""
    model.train()
    total_loss = 0
    predictions = []
    true_labels = []

    progress_bar = tqdm(dataloader, desc='Training')

    for batch in progress_bar:
        # Ensure inputs are strictly LongTensor for Embedding layers
        input_ids = batch['input_ids'].to(device, dtype=torch.long)
        attention_mask = batch['attention_mask'].to(device, dtype=torch.long)
        labels = batch['labels'].to(device, dtype=torch.long)

        optimizer.zero_grad()

        outputs = model(input_ids, attention_mask)
        loss = criterion(outputs, labels)

        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()

        total_loss += loss.item()

        preds = torch.argmax(outputs, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())

        progress_bar.set_postfix({'loss': loss.item()})

    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(true_labels, predictions)
    kappa = cohen_kappa_score(true_labels, predictions, weights='quadratic')
    
    # Print prediction distribution
    unique, counts = np.unique(predictions, return_counts=True)
    pred_dist = dict(zip(unique, counts))
    print(f"\nTraining predictions distribution: {pred_dist}")

    return avg_loss, accuracy, kappa


def evaluate(model, dataloader, criterion, device):
    """Evaluate model"""
    model.eval()
    total_loss = 0
    predictions = []
    true_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc='Evaluating'):
            # Ensure strictly LongTensor for evaluation too
            input_ids = batch['input_ids'].to(device, dtype=torch.long)
            attention_mask = batch['attention_mask'].to(device, dtype=torch.long)
            labels = batch['labels'].to(device, dtype=torch.long)

            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)

            total_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(true_labels, predictions)
    kappa = cohen_kappa_score(true_labels, predictions, weights='quadratic')
    
    # Print validation prediction distribution
    unique, counts = np.unique(predictions, return_counts=True)
    pred_dist = dict(zip(unique, counts))
    print(f"\nValidation predictions distribution: {pred_dist}")

    return avg_loss, accuracy, kappa, predictions, true_labels

## 5. Configuration and Setup

In [10]:
# Configuration
# Revert to DistilBERT for stability
MODEL_NAME = 'distilbert-base-uncased'
MAX_LENGTH = 128
BATCH_SIZE = 32  # Increase batch size for DistilBERT
EPOCHS = 50
LEARNING_RATE = 2e-5
NUM_CLASSES = 5

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

## 6. Prepare Data

In [11]:
# Split data
train_df, val_df = train_test_split(
    df,
    test_size=0.2,
    random_state=42,
    stratify=df['AdoptionSpeed']
)

print(f"Train size: {len(train_df)}")
print(f"Validation size: {len(val_df)}")

In [12]:
# Calculate class weights to handle imbalance
from sklearn.utils.class_weight import compute_class_weight

class_weights = compute_class_weight(
    'balanced',
    classes=np.unique(train_df['AdoptionSpeed']),
    y=train_df['AdoptionSpeed']
)
class_weights = torch.FloatTensor(class_weights).to(device)

print("Class distribution in training set:")
print(train_df['AdoptionSpeed'].value_counts().sort_index())
print(f"\nClass weights: {class_weights}")
print("Higher weights = minority classes get more penalty when misclassified")

In [13]:
# Initialize tokenizer
print(f"Loading tokenizer: {MODEL_NAME}")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

In [14]:
# Create datasets
print("Creating datasets...")
train_dataset = PetAdoptionDataset(train_df, 'data/train_metadata', tokenizer, MAX_LENGTH)
val_dataset = PetAdoptionDataset(val_df, 'data/train_metadata', tokenizer, MAX_LENGTH)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

print(f"Train batches: {len(train_loader)}")
print(f"Validation batches: {len(val_loader)}")

In [15]:
# Test dataset - check a sample
sample = train_dataset[0]
print(f"Input IDs shape: {sample['input_ids'].shape}")
print(f"Attention mask shape: {sample['attention_mask'].shape}")
print(f"Label: {sample['labels'].item()}")

# Decode the tokens to see the text
decoded_text = tokenizer.decode(sample['input_ids'], skip_special_tokens=True)
print(f"\nDecoded text (first 200 chars): {decoded_text[:200]}...")

## 7. Initialize Model

In [16]:
# Initialize model
print(f"Initializing model: {MODEL_NAME}")
model = TransformerPetClassifier(
    model_name=MODEL_NAME,
    num_classes=NUM_CLASSES,
    dropout=0.3
).to(device)

model = model.float()

# Use weighted loss function to handle class imbalance
if 'class_weights' in locals():
    print("Using weighted CrossEntropyLoss")
    criterion = nn.CrossEntropyLoss(weight=class_weights)
else:
    print("Using standard CrossEntropyLoss (check if class weights cell was run)")
    criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.AdamW(
    model.parameters(), 
    lr=LEARNING_RATE,
    weight_decay=0.01
)

# Learning rate scheduler - reduces LR when validation kappa plateaus
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='max',          # Maximize kappa score
    factor=0.5,          # Reduce LR by 50%
    patience=3,          # Wait 3 epochs before reducing
    min_lr=1e-7          # Minimum learning rate
)

# Early stopping parameters
EARLY_STOPPING_PATIENCE = 7
early_stopping_counter = 0

print(f"âœ“ Scheduler: ReduceLROnPlateau (patience=3, factor=0.5)")
print(f"âœ“ Early Stopping: patience={EARLY_STOPPING_PATIENCE}")
print(f"âœ“ Model converted to float32")

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\nTotal parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")

## 8. Train Model

In [None]:
# Training loop with early stopping and LR scheduler
history = {
    'train_loss': [],
    'train_acc': [],
    'train_kappa': [],
    'val_loss': [],
    'val_acc': [],
    'val_kappa': [],
    'learning_rates': []
}

best_kappa = 0
early_stopping_counter = 0

for epoch in range(EPOCHS):
    print(f"\n{'='*50}")
    print(f"Epoch {epoch + 1}/{EPOCHS}")
    print(f"{'='*50}")

    # Get current learning rate
    current_lr = optimizer.param_groups[0]['lr']
    print(f"Learning Rate: {current_lr:.2e}")

    # Train
    train_loss, train_acc, train_kappa = train_epoch(
        model, train_loader, optimizer, criterion, device
    )

    # Validate
    val_loss, val_acc, val_kappa, val_preds, val_labels = evaluate(
        model, val_loader, criterion, device
    )

    # Store history
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['train_kappa'].append(train_kappa)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    history['val_kappa'].append(val_kappa)
    history['learning_rates'].append(current_lr)

    print(f"\nTrain - Loss: {train_loss:.4f} | Acc: {train_acc:.4f} | Kappa: {train_kappa:.4f}")
    print(f"Val   - Loss: {val_loss:.4f} | Acc: {val_acc:.4f} | Kappa: {val_kappa:.4f}")

    # Learning rate scheduler step (monitors validation kappa)
    scheduler.step(val_kappa)

    # Check for improvement and handle early stopping
    if val_kappa > best_kappa:
        best_kappa = val_kappa
        early_stopping_counter = 0
        torch.save(model.state_dict(), 'best_transformer_model.pth')
        print(f"âœ“ Saved best model (Kappa: {best_kappa:.4f})")
    else:
        early_stopping_counter += 1
        print(f"âš  No improvement for {early_stopping_counter}/{EARLY_STOPPING_PATIENCE} epoch(s)")

        if early_stopping_counter >= EARLY_STOPPING_PATIENCE:
            print(f"\n{'='*50}")
            print(f"ðŸ›‘ Early stopping triggered!")
            print(f"{'='*50}")
            print(f"Training stopped at epoch {epoch + 1}/{EPOCHS}")
            print(f"Best validation kappa: {best_kappa:.4f}")
            break

print(f"\n{'='*50}")
print(f"Training completed!")
print(f"Best validation kappa: {best_kappa:.4f}")
print(f"{'='*50}")

## 9. Visualize Training History

In [None]:
# Plot training history
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# Loss
axes[0, 0].plot(history['train_loss'], label='Train', marker='o')
axes[0, 0].plot(history['val_loss'], label='Validation', marker='s')
axes[0, 0].set_title('Loss', fontsize=14, fontweight='bold')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Loss')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Accuracy
axes[0, 1].plot(history['train_acc'], label='Train', marker='o')
axes[0, 1].plot(history['val_acc'], label='Validation', marker='s')
axes[0, 1].set_title('Accuracy', fontsize=14, fontweight='bold')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('Accuracy')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Kappa
axes[1, 0].plot(history['train_kappa'], label='Train', marker='o')
axes[1, 0].plot(history['val_kappa'], label='Validation', marker='s')
axes[1, 0].set_title('Cohen\'s Kappa (Quadratic)', fontsize=14, fontweight='bold')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('Kappa')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Learning Rate
axes[1, 1].plot(history['learning_rates'], marker='o', color='purple', linewidth=2)
axes[1, 1].set_title('Learning Rate Schedule', fontsize=14, fontweight='bold')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Learning Rate')
axes[1, 1].set_yscale('log')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print summary
print(f"\nTraining Summary:")
print(f"Total epochs run: {len(history['train_loss'])}")
print(f"Best validation kappa: {max(history['val_kappa']):.4f}")
print(f"Final learning rate: {history['learning_rates'][-1]:.2e}")

## 10. Final Evaluation

In [None]:
# Load best model
model.load_state_dict(torch.load('best_transformer_model.pth'))
val_loss, val_acc, val_kappa, val_preds, val_labels = evaluate(
    model, val_loader, criterion, device
)

print(f"Best Validation Results:")
print(f"Loss: {val_loss:.4f}")
print(f"Accuracy: {val_acc:.4f}")
print(f"Cohen's Kappa (Quadratic): {val_kappa:.4f}")

In [None]:
# Classification report
print("\nClassification Report:")
print(classification_report(
    val_labels,
    val_preds,
    target_names=[f'Speed {i}' for i in range(5)]
))

In [None]:
# Confusion matrix
cm = confusion_matrix(val_labels, val_preds)

plt.figure(figsize=(10, 8))
sns.heatmap(
    cm,
    annot=True,
    fmt='d',
    cmap='Blues',
    xticklabels=[f'Speed {i}' for i in range(5)],
    yticklabels=[f'Speed {i}' for i in range(5)]
)
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

## 11. Extract Text Embeddings (Optional)

In [None]:
# Function to extract embeddings from the transformer
def get_embeddings(model, dataloader, device):
    """Extract [CLS] token embeddings from transformer"""
    model.eval()
    embeddings = []
    labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc='Extracting embeddings'):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            batch_labels = batch['labels']

            # Get transformer outputs
            outputs = model.transformer(
                input_ids=input_ids,
                attention_mask=attention_mask
            )

            # Extract [CLS] token embeddings
            cls_embeddings = outputs.last_hidden_state[:, 0, :]

            embeddings.append(cls_embeddings.cpu().numpy())
            labels.append(batch_labels.numpy())

    return np.vstack(embeddings), np.concatenate(labels)

# Extract embeddings
print("Extracting embeddings from validation set...")
val_embeddings, val_labels_array = get_embeddings(model, val_loader, device)
print(f"Embeddings shape: {val_embeddings.shape}")

In [None]:
# Visualize embeddings with t-SNE or UMAP
from sklearn.manifold import TSNE

# Reduce dimensions for visualization
print("Reducing dimensions with t-SNE...")
tsne = TSNE(n_components=2, random_state=42)
embeddings_2d = tsne.fit_transform(val_embeddings[:1000])  # Use subset for speed

# Plot
plt.figure(figsize=(12, 10))
scatter = plt.scatter(
    embeddings_2d[:, 0],
    embeddings_2d[:, 1],
    c=val_labels_array[:1000],
    cmap='viridis',
    alpha=0.6
)
plt.colorbar(scatter, label='Adoption Speed')
plt.title('t-SNE Visualization of Text Embeddings')
plt.xlabel('t-SNE 1')
plt.ylabel('t-SNE 2')
plt.show()

## 12. Save Model and Tokenizer

In [None]:
# Save model
torch.save({
    'model_state_dict': model.state_dict(),
    'model_name': MODEL_NAME,
    'num_classes': NUM_CLASSES,
    'max_length': MAX_LENGTH,
    'best_kappa': best_kappa
}, 'transformer_model_complete.pth')

# Save tokenizer
tokenizer.save_pretrained('tokenizer')

print("Model and tokenizer saved!")