In [None]:
# # 🐕 Dog Emotion Classification - CoAtNet Cross-Validation Training

# Notebook này sẽ:
# 1. **Clone repository** từ GitHub và cài đặt dependencies
# 2. **Import CoAtNet module** từ `dog_emotion_classification.coatnet`
# 3. **Download dataset** dog emotion classification  
# 4. **Train CoAtNet** với 50 epochs sử dụng K-Fold Cross Validation
# 5. **Evaluate** với cross-validation scores và confusion matrix
# 6. **Download models** và results về máy

# ---
# **Author**: Dog Emotion Research Team  
# **Date**: 2025  
# **Runtime**: Google Colab (GPU T4/V100 recommended)  
# **Training**: CoAtNet (Convolution and Attention Network) với Cross Validation  
# **Repository**: https://github.com/hoangh-e/dog-emotion-recognition-hybrid.git  
# **Module**: `dog_emotion_classification.coatnet`


In [None]:
# 🔧 STEP 1: Clone Repository và Setup Environment
import os
import sys

# Clone repository từ GitHub
REPO_URL = "https://github.com/hoangh-e/dog-emotion-recognition-hybrid.git"
REPO_NAME = "dog-emotion-recognition-hybrid"

if not os.path.exists(REPO_NAME):
    print(f"📥 Cloning repository from {REPO_URL}")
    !git clone {REPO_URL}
    print("✅ Repository cloned successfully!")
else:
    print(f"✅ Repository already exists: {REPO_NAME}")

# Change to repository directory
os.chdir(REPO_NAME)
print(f"📁 Current directory: {os.getcwd()}")

# Add to Python path để import modules
if os.getcwd() not in sys.path:
    sys.path.insert(0, os.getcwd())
    print("✅ Added repository to Python path")

# Install required packages
print("📦 Installing dependencies...")
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
!pip install opencv-python-headless pillow pandas tqdm gdown albumentations matplotlib seaborn
!pip install scikit-learn timm ultralytics

# 🎯 Import CoAtNet từ custom module
try:
    from dog_emotion_classification.coatnet import (
        load_coatnet_model, 
        predict_emotion_coatnet,
        get_coatnet_transforms,
        create_coatnet_model,
        CoAtNetModel
    )
    print("✅ Successfully imported CoAtNet module from dog_emotion_classification.coatnet")
    print("📋 Available functions:")
    print("   - load_coatnet_model()")
    print("   - predict_emotion_coatnet()")
    print("   - get_coatnet_transforms()")
    print("   - create_coatnet_model()")
    print("   - CoAtNetModel class")
except ImportError as e:
    print(f"❌ Failed to import CoAtNet module: {e}")
    print("Please ensure you're in the repository directory and the module exists.")
    raise

# Import libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
import torchvision.transforms as transforms
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import gdown
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from tqdm import tqdm
import random
import time
import zipfile
import json
import warnings
warnings.filterwarnings('ignore')

# Set random seeds
torch.manual_seed(42)
torch.cuda.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"\n🔧 Using device: {device}")

if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")

print("✅ Environment setup complete!")

class MultiHeadAttention(nn.Module):
    def __init__(self, dim, num_heads=8, qkv_bias=False, attn_drop=0., proj_drop=0.):
        super().__init__()
        self.num_heads = num_heads
        head_dim = dim // num_heads
        self.scale = head_dim ** -0.5

        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_drop)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop)

    def forward(self, x):
        B, N, C = x.shape
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]

        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        attn = self.attn_drop(attn)

        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        return x

class TransformerBlock(nn.Module):
    def __init__(self, dim, num_heads=8, mlp_ratio=4., qkv_bias=False, 
                 drop=0., attn_drop=0., norm_layer=nn.LayerNorm):
        super().__init__()
        self.norm1 = norm_layer(dim)
        self.attn = MultiHeadAttention(dim, num_heads=num_heads, qkv_bias=qkv_bias,
                                     attn_drop=attn_drop, proj_drop=drop)
        self.norm2 = norm_layer(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Dropout(drop),
            nn.Linear(mlp_hidden_dim, dim),
            nn.Dropout(drop)
        )

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.mlp(self.norm2(x))
        return x

class CoAtNetStage(nn.Module):
    def __init__(self, in_channels, out_channels, depth, stage_type='conv', 
                 num_heads=8, mlp_ratio=4., stride=1):
        super().__init__()
        self.stage_type = stage_type
        
        layers = []
        for i in range(depth):
            if stage_type == 'conv':
                layer_stride = stride if i == 0 else 1
                layers.append(MBConv(in_channels if i == 0 else out_channels, 
                                   out_channels, stride=layer_stride))
            else:  # transformer
                if i == 0 and in_channels != out_channels:
                    layers.append(nn.Conv2d(in_channels, out_channels, 1, stride=stride))
                layers.append(TransformerBlock(out_channels, num_heads, mlp_ratio))
        
        self.layers = nn.ModuleList(layers)
        
    def forward(self, x):
        for layer in self.layers:
            if self.stage_type == 'conv':
                x = layer(x)
            else:  # transformer
                if isinstance(layer, nn.Conv2d):
                    x = layer(x)
                else:
                    B, C, H, W = x.shape
                    x = x.flatten(2).transpose(1, 2)  # B, H*W, C
                    x = layer(x)
                    x = x.transpose(1, 2).reshape(B, C, H, W)
        return x

class CoAtNetModel(nn.Module):
    def __init__(self, num_classes=4, depths=[2, 2, 3, 5, 2], dims=[64, 96, 192, 384, 768]):
        super().__init__()
        
        # Stem
        self.stem = nn.Sequential(
            nn.Conv2d(3, dims[0], 3, 2, 1, bias=False),
            nn.BatchNorm2d(dims[0]),
            nn.ReLU(inplace=True),
        )
        
        # Stages
        self.stages = nn.ModuleList()
        
        # Conv stages (S0, S1)
        for i in range(2):
            stage = CoAtNetStage(
                dims[i], dims[i+1], depths[i], 'conv',
                stride=2 if i > 0 else 1
            )
            self.stages.append(stage)
        
        # Transformer stages (S2, S3, S4)
        for i in range(2, 5):
            stage = CoAtNetStage(
                dims[i], dims[i] if i < 4 else dims[i-1], depths[i], 'transformer',
                stride=2 if i == 2 else 1
            )
            self.stages.append(stage)
        
        # Head
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.head = nn.Linear(dims[-2], num_classes)
        
    def forward(self, x):
        x = self.stem(x)
        
        for stage in self.stages:
            x = stage(x)
        
        x = self.global_pool(x)
        x = x.flatten(1)
        x = self.head(x)
        
        return x

print("✅ CoAtNet model architecture defined!")


In [None]:
# Dataset and Training Setup
from pathlib import Path

class DogEmotionDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = Path(data_dir)
        self.transform = transform
        self.classes = ['angry', 'happy', 'relaxed', 'sad']
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
        
        self.samples = []
        self._load_samples()
        
        print(f"📁 Dataset loaded: {len(self.samples)} images")
        print(f"📊 Classes: {self.classes}")
        
    def _load_samples(self):
        for class_name in self.classes:
            class_dir = self.data_dir / class_name
            if class_dir.exists():
                for img_path in class_dir.glob('*.jpg'):
                    if img_path.is_file():
                        self.samples.append((str(img_path), self.class_to_idx[class_name]))
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        
        try:
            image = Image.open(img_path).convert('RGB')
            image = image.resize((224, 224), Image.LANCZOS)
            
            if self.transform:
                image = self.transform(image)
            
            return image, label
            
        except Exception as e:
            print(f"⚠️ Error loading image {img_path}: {e}")
            dummy_image = Image.new('RGB', (224, 224), color='black')
            if self.transform:
                dummy_image = self.transform(dummy_image)
            return dummy_image, label

# Data transforms
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Training configuration
EPOCHS = 50
BATCH_SIZE = 16
LEARNING_RATE = 1e-4
NUM_CLASSES = 4
K_FOLDS = 5
EMOTION_CLASSES = ['angry', 'happy', 'relaxed', 'sad']

print(f"📊 Training Configuration:")
print(f"   - Epochs per fold: {EPOCHS}")
print(f"   - Batch size: {BATCH_SIZE}")
print(f"   - Learning rate: {LEARNING_RATE}")
print(f"   - Device: {device}")

# Load dataset and prepare for cross-validation
dataset = DogEmotionDataset(dataset_dir, transform=train_transform)
labels = [sample[1] for sample in dataset.samples]

# K-fold cross-validation
kfold = StratifiedKFold(n_splits=K_FOLDS, shuffle=True, random_state=42)

print("✅ Dataset and training setup complete!")


In [None]:
# Cross-Validation Training
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (data, target) in enumerate(tqdm(dataloader, desc="Training")):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(output.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
    
    return running_loss / len(dataloader), 100. * correct / total

def validate_epoch(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for data, target in tqdm(dataloader, desc="Validation"):
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            
            running_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_targets.extend(target.cpu().numpy())
    
    return running_loss / len(dataloader), 100. * correct / total, all_preds, all_targets

# Start cross-validation training
print("🎯 Starting 5-Fold Cross-Validation Training...")

fold_results = []
all_val_accs = []

for fold, (train_idx, val_idx) in enumerate(kfold.split(range(len(dataset)), labels)):
    print(f"\n{'='*60}")
    print(f"🔄 FOLD {fold + 1}/5")
    print(f"{'='*60}")
    
    # Create data samplers
    train_sampler = SubsetRandomSampler(train_idx)
    val_sampler = SubsetRandomSampler(val_idx)
    
    # Create data loaders
    train_loader = DataLoader(dataset, batch_size=BATCH_SIZE, sampler=train_sampler, num_workers=2)
    val_dataset = DogEmotionDataset(dataset_dir, transform=val_transform)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, sampler=val_sampler, num_workers=2)
    
    print(f"📊 Fold {fold + 1} data:")
    print(f"   - Training samples: {len(train_idx)}")
    print(f"   - Validation samples: {len(val_idx)}")
    
    # Create model
    model = CoAtNetModel(num_classes=NUM_CLASSES).to(device)
    
    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=15, gamma=0.1)
    
    # Training history
    train_losses = []
    train_accs = []
    val_losses = []
    val_accs = []
    
    best_val_acc = 0.0
    best_model_state = None
    
    # Training loop
    for epoch in range(EPOCHS):
        print(f"\nEpoch {epoch + 1}/{EPOCHS}")
        
        # Train
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        
        # Validate
        val_loss, val_acc, val_preds, val_targets = validate_epoch(model, val_loader, criterion, device)
        
        # Update scheduler
        scheduler.step()
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = model.state_dict().copy()
        
        # Record history
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
    
    # Load best model for final evaluation
    model.load_state_dict(best_model_state)
    final_val_loss, final_val_acc, final_preds, final_targets = validate_epoch(model, val_loader, criterion, device)
    
    print(f"\n✅ Fold {fold + 1} completed!")
    print(f"Best Validation Accuracy: {best_val_acc:.2f}%")
    
    # Store results
    fold_results.append({
        'fold': fold + 1,
        'train_losses': train_losses,
        'train_accs': train_accs,
        'val_losses': val_losses,
        'val_accs': val_accs,
        'best_val_acc': best_val_acc,
        'final_preds': final_preds,
        'final_targets': final_targets,
        'model_state': best_model_state
    })
    all_val_accs.append(best_val_acc)
    
    # Memory cleanup
    torch.cuda.empty_cache()

# Calculate overall statistics
mean_acc = np.mean(all_val_accs)
std_acc = np.std(all_val_accs)

print(f"\n🎉 CROSS-VALIDATION COMPLETED!")
print(f"{'='*60}")
print(f"📊 Final Results:")
print(f"   - Mean Accuracy: {mean_acc:.2f}% ± {std_acc:.2f}%")
print(f"   - Individual Folds: {[f'{acc:.2f}%' for acc in all_val_accs]}")
print(f"   - Best Fold: {np.argmax(all_val_accs) + 1} ({max(all_val_accs):.2f}%)")


In [None]:
# Visualization and Results
plt.style.use('default')
sns.set_palette("husl")

fig = plt.figure(figsize=(20, 15))

# 1. Training curves
ax1 = plt.subplot(2, 3, 1)
for result in fold_results:
    epochs = range(1, len(result['train_losses']) + 1)
    plt.plot(epochs, result['train_losses'], label=f"Fold {result['fold']}", alpha=0.7)
plt.title('Training Loss Across Folds', fontsize=14, fontweight='bold')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)

# 2. Validation curves
ax2 = plt.subplot(2, 3, 2)
for result in fold_results:
    epochs = range(1, len(result['val_accs']) + 1)
    plt.plot(epochs, result['val_accs'], label=f"Fold {result['fold']}", alpha=0.7)
plt.title('Validation Accuracy Across Folds', fontsize=14, fontweight='bold')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.grid(True, alpha=0.3)

# 3. Fold comparison
ax3 = plt.subplot(2, 3, 3)
fold_names = [f'Fold {i+1}' for i in range(K_FOLDS)]
bars = plt.bar(fold_names, all_val_accs, color=sns.color_palette("husl", K_FOLDS))
plt.title('Best Validation Accuracy by Fold', fontsize=14, fontweight='bold')
plt.ylabel('Accuracy (%)')
plt.ylim(0, 100)

for bar, acc in zip(bars, all_val_accs):
    plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
             f'{acc:.1f}%', ha='center', va='bottom', fontweight='bold')

plt.axhline(y=mean_acc, color='red', linestyle='--', alpha=0.7, label=f'Mean: {mean_acc:.1f}%')
plt.legend()

# 4. Confusion Matrix
best_fold_idx = np.argmax(all_val_accs)
best_result = fold_results[best_fold_idx]

ax4 = plt.subplot(2, 3, 4)
cm = confusion_matrix(best_result['final_targets'], best_result['final_preds'])
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=EMOTION_CLASSES, yticklabels=EMOTION_CLASSES)
plt.title(f'Confusion Matrix - Best Fold ({best_fold_idx+1})', fontsize=14, fontweight='bold')
plt.xlabel('Predicted')
plt.ylabel('Actual')

# 5. Training vs Validation for best fold
ax5 = plt.subplot(2, 3, 5)
epochs = range(1, len(best_result['train_accs']) + 1)
plt.plot(epochs, best_result['train_accs'], label='Training', linewidth=2)
plt.plot(epochs, best_result['val_accs'], label='Validation', linewidth=2)
plt.title(f'Training vs Validation - Best Fold ({best_fold_idx+1})', fontsize=14, fontweight='bold')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.grid(True, alpha=0.3)

# 6. Statistics summary
ax6 = plt.subplot(2, 3, 6)
ax6.axis('off')
stats_text = f"""
📊 CoAtNet Cross-Validation Results

🎯 Model Performance:
   • Mean Accuracy: {mean_acc:.2f}% ± {std_acc:.2f}%
   • Best Fold: {best_fold_idx+1} ({max(all_val_accs):.2f}%)
   • Worst Fold: {np.argmin(all_val_accs)+1} ({min(all_val_accs):.2f}%)

⚙️ Training Configuration:
   • Architecture: CoAtNet (Conv + Attention)
   • Epochs per fold: {EPOCHS}
   • Batch size: {BATCH_SIZE}
   • Learning rate: {LEARNING_RATE}
   • Device: {device}

📈 Data Information:
   • Total samples: {len(dataset)}
   • Classes: {len(EMOTION_CLASSES)}
   • Folds: {K_FOLDS} (stratified)
"""

ax6.text(0.1, 0.9, stats_text, transform=ax6.transAxes, fontsize=12,
         verticalalignment='top', fontfamily='monospace',
         bbox=dict(boxstyle="round,pad=0.5", facecolor="lightblue", alpha=0.8))

plt.tight_layout()
plt.savefig('coatnet_cross_validation_results.png', dpi=300, bbox_inches='tight')
plt.show()

print("✅ Visualizations created and saved!")

# Save best model
best_model = CoAtNetModel(num_classes=NUM_CLASSES)
best_model.load_state_dict(fold_results[best_fold_idx]['model_state'])
model_filename = f'coatnet_best_fold_{best_fold_idx+1}_acc_{max(all_val_accs):.2f}.pth'

torch.save({
    'model_state_dict': best_model.state_dict(),
    'model_config': {
        'num_classes': NUM_CLASSES,
        'architecture': 'CoAtNet',
        'depths': [2, 2, 3, 5, 2],
        'dims': [64, 96, 192, 384, 768]
    },
    'training_info': {
        'best_fold': best_fold_idx + 1,
        'best_accuracy': max(all_val_accs),
        'mean_accuracy': mean_acc,
        'std_accuracy': std_acc,
        'epochs': EPOCHS,
        'batch_size': BATCH_SIZE,
        'learning_rate': LEARNING_RATE
    },
    'class_names': EMOTION_CLASSES
}, model_filename)

print(f"✅ Model saved as: {model_filename}")

# Save results
results_filename = 'coatnet_training_results.json'
training_results = {
    'cross_validation_results': {
        'mean_accuracy': float(mean_acc),
        'std_accuracy': float(std_acc),
        'fold_accuracies': [float(acc) for acc in all_val_accs],
        'best_fold': int(best_fold_idx + 1),
        'best_accuracy': float(max(all_val_accs))
    },
    'training_config': {
        'epochs': EPOCHS,
        'batch_size': BATCH_SIZE,
        'learning_rate': LEARNING_RATE,
        'device': str(device)
    },
    'dataset_info': {
        'total_samples': len(dataset),
        'num_classes': NUM_CLASSES,
        'class_names': EMOTION_CLASSES
    }
}

with open(results_filename, 'w') as f:
    json.dump(training_results, f, indent=2)

print(f"✅ Training results saved as: {results_filename}")

# Download files
try:
    from google.colab import files
    print("\n📥 Downloading files...")
    files.download(model_filename)
    files.download(results_filename)
    files.download('coatnet_cross_validation_results.png')
    print("✅ Files downloaded successfully!")
except ImportError:
    print("📁 Files saved locally")

print(f"\n🎉 COATNET TRAINING COMPLETED!")
print(f"{'='*80}")
print(f"📊 FINAL RESULTS:")
print(f"   🎯 Mean Accuracy: {mean_acc:.2f}% ± {std_acc:.2f}%")
print(f"   🏆 Best Fold: {best_fold_idx+1} with {max(all_val_accs):.2f}% accuracy")
print(f"   📈 All Fold Accuracies: {[f'{acc:.2f}%' for acc in all_val_accs]}")
print(f"   💾 Model: {model_filename}")
print(f"   📊 Results: {results_filename}")
print(f"{'='*80}")

print(f"\n📋 HOW TO USE:")
print(f"model = CoAtNetModel(num_classes=4)")
print(f"checkpoint = torch.load('{model_filename}')")
print(f"model.load_state_dict(checkpoint['model_state_dict'])")
print(f"Classes: {EMOTION_CLASSES}")

print(f"\n🎯 CoAtNet training completed successfully!")
