# üêÑ Cow Lameness Detection - Comprehensive Multi-Modal Pipeline v18

**Architecture:**
- YOLO v8: Multi-cow detection (select largest)
- SAM: Segment Anything (cow isolation)
- Pose: DeepLabCut/MMPose (configurable)
- VideoMAE: Self-supervised visual features
- RAFT: Optical flow (temporal motion)
- Temporal Transformer: Multi-modal fusion

**Dataset:** 1167 videos (Saƒülƒ±klƒ±: 642, Topal: 525)

---

## 1. Setup & Installation

In [None]:
# Install all required packages
!pip install -q ultralytics  # YOLO v8
!pip install -q segment-anything git+https://github.com/facebookresearch/segment-anything.git  # SAM
!pip install -q transformers  # VideoMAE
!pip install -q opencv-python-headless  # RAFT dependencies
!pip install -q pandas numpy scikit-learn scipy matplotlib seaborn tqdm
!pip install -q torch torchvision
!pip install -q timm  # For VideoMAE
!pip install -q pyyaml

print("‚úÖ All packages installed")

In [None]:
# Imports
import os
import glob
import json
import yaml
from pathlib import Path
from datetime import datetime

import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

from scipy import stats
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, roc_auc_score, roc_curve

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from ultralytics import YOLO
from segment_anything import sam_model_registry, SamPredictor
from transformers import VideoMAEFeatureExtractor, VideoMAEModel

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"‚úÖ Device: {device}")

## 2. Mount Drive & Load Config

In [None]:
from google.colab import drive
drive.mount('/content/drive')

BASE = "/content/drive/MyDrive/Inek Topallik Tespiti Parcalanmis Inek Videolari"
VIDEO_DIR = f"{BASE}/cow_single_videos"
POSE_CSV_DIR = f"{BASE}/outputs/deeplabcut"  # or mmpose based on config
OUTPUT_DIR = f"{BASE}/outputs/colab_results"

os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(f"{OUTPUT_DIR}/figures", exist_ok=True)
os.makedirs(f"{OUTPUT_DIR}/models", exist_ok=True)

# Load config
config = {
    'pose_framework': 'deeplabcut',
    'features': {
        'yolo_detection': True,
        'sam_segmentation': True,
        'videomae': True,
        'optical_flow': True,
        'back_curvature': True
    }
}

print(f"‚úÖ Config loaded: {config['pose_framework']}")
print(f"   Features: {list(config['features'].keys())}")

## 3. Initialize Models

In [None]:
# YOLO v8 for cow detection
yolo_model = YOLO('yolov8n.pt')
print("‚úÖ YOLO v8 loaded")

# SAM for segmentation
sam_checkpoint = "sam_vit_h_4b8939.pth"
!wget -q https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth
sam = sam_model_registry["vit_h"](checkpoint=sam_checkpoint)
sam.to(device)
sam_predictor = SamPredictor(sam)
print("‚úÖ SAM loaded")

# VideoMAE for visual features
videomae_extractor = VideoMAEFeatureExtractor.from_pretrained("MCG-NJU/videomae-base")
videomae_model = VideoMAEModel.from_pretrained("MCG-NJU/videomae-base").to(device)
print("‚úÖ VideoMAE loaded")

## 4. Video Processing Pipeline

In [None]:
def detect_largest_cow(frame, yolo_model):
    """Detect multiple cows, return bounding box of largest (foreground cow)"""
    results = yolo_model(frame, classes=[19])  # Class 19 = cow in COCO
    
    if len(results[0].boxes) == 0:
        return None
    
    # Find largest box (by area)
    boxes = results[0].boxes.xyxy.cpu().numpy()
    areas = [(box[2]-box[0]) * (box[3]-box[1]) for box in boxes]
    largest_idx = np.argmax(areas)
    
    return boxes[largest_idx].astype(int)

def segment_cow(frame, bbox, sam_predictor):
    """Segment cow using SAM"""
    sam_predictor.set_image(frame)
    
    # Use bbox as prompt
    masks, _, _ = sam_predictor.predict(
        box=bbox,
        multimask_output=False
    )
    
    return masks[0]

def extract_optical_flow(frames):
    """Compute optical flow using Farneback (lightweight alternative to RAFT)"""
    flows = []
    
    for i in range(len(frames)-1):
        gray1 = cv2.cvtColor(frames[i], cv2.COLOR_BGR2GRAY)
        gray2 = cv2.cvtColor(frames[i+1], cv2.COLOR_BGR2GRAY)
        
        flow = cv2.calcOpticalFlowFarneback(
            gray1, gray2, None, 0.5, 3, 15, 3, 5, 1.2, 0
        )
        
        # Flow magnitude and angle
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        flows.append({'magnitude': mag.mean(), 'angle': ang.mean()})
    
    return flows

def extract_videomae_features(frames, videomae_model, videomae_extractor):
    """Extract self-supervised visual features using VideoMAE"""
    # Sample 16 frames (VideoMAE input)
    indices = np.linspace(0, len(frames)-1, 16, dtype=int)
    sampled_frames = [frames[i] for i in indices]
    
    # Preprocess
    inputs = videomae_extractor(sampled_frames, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    # Extract features
    with torch.no_grad():
        outputs = videomae_model(**inputs)
        features = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()
    
    return features

print("‚úÖ Processing functions defined")

## 5. Load Data & Extract Multi-Modal Features

In [None]:
# Get all videos
video_files = []
for label_folder in ['Saglikli', 'Topal']:
    folder_path = f"{VIDEO_DIR}/{label_folder}"
    videos = glob.glob(f"{folder_path}/*.mp4")
    video_files.extend([(v, 0 if label_folder=='Saglikli' else 1) for v in videos])

print(f"üìä Found {len(video_files)} videos")
print(f"   Healthy: {sum(1 for _, l in video_files if l==0)}")
print(f"   Lame: {sum(1 for _, l in video_files if l==1)}")

# Process subset for demo (full processing takes hours)
DEMO_MODE = True
if DEMO_MODE:
    video_files = video_files[:50]  # Process 50 videos for demo
    print(f"\n‚ö†Ô∏è  DEMO MODE: Processing {len(video_files)} videos")

In [None]:
# Extract multi-modal features
dataset = []

for video_path, label in tqdm(video_files, desc="Processing videos"):
    try:
        # Load video
        cap = cv2.VideoCapture(video_path)
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)
        cap.release()
        
        if len(frames) < 10:
            continue
        
        # 1. YOLO: Detect largest cow
        bbox = detect_largest_cow(frames[len(frames)//2], yolo_model)
        
        # 2. SAM: Segment cow
        if bbox is not None:
            mask = segment_cow(frames[len(frames)//2], bbox, sam_predictor)
            mask_area = mask.sum()
        else:
            mask_area = 0
        
        # 3. Optical Flow
        flows = extract_optical_flow(frames[::5])  # Sample every 5 frames
        flow_mag_mean = np.mean([f['magnitude'] for f in flows])
        flow_mag_std = np.std([f['magnitude'] for f in flows])
        
        # 4. VideoMAE features
        videomae_feat = extract_videomae_features(frames, videomae_model, videomae_extractor)
        
        # 5. Load pose CSV
        video_name = Path(video_path).stem
        pose_csv = f"{POSE_CSV_DIR}/{video_name}_DLC_SuperAnimal.csv"
        
        if os.path.exists(pose_csv):
            pose_df = pd.read_csv(pose_csv, header=[1,2])
            pose_features = pose_df.values.mean(axis=0)[:50]  # Simplified
        else:
            pose_features = np.zeros(50)
        
        # Combine all features
        combined_features = np.concatenate([
            pose_features,                    # Pose (50)
            videomae_feat[:100],              # VideoMAE (100)
            [flow_mag_mean, flow_mag_std],    # Optical flow (2)
            [mask_area / (640*480)]           # Segmentation (1)
        ])
        
        dataset.append({
            'video': video_name,
            'label': label,
            'features': combined_features
        })
        
    except Exception as e:
        print(f"  ‚ö†Ô∏è  Failed: {Path(video_path).name} - {e}")

print(f"\n‚úÖ Processed {len(dataset)} videos")
print(f"   Feature dimension: {dataset[0]['features'].shape[0]}")

## 6. Prepare Data for Training

In [None]:
X = np.array([d['features'] for d in dataset])
y = np.array([d['label'] for d in dataset])

# Split
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.30, stratify=y, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.50, stratify=y_temp, random_state=42)

# Standardize
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

print(f"üìä Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")

## 7. Temporal Transformer Model

In [None]:
class TemporalTransformer(nn.Module):
    """Multi-modal Temporal Transformer for lameness classification"""
    def __init__(self, input_dim=153, hidden_dim=256, num_heads=8, num_layers=4):
        super().__init__()
        
        self.input_proj = nn.Linear(input_dim, hidden_dim)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim,
            nhead=num_heads,
            dim_feedforward=hidden_dim*4,
            dropout=0.3,
            batch_first=True
        )
        
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 2)
        )
    
    def forward(self, x):
        # x: (batch, features)
        x = self.input_proj(x).unsqueeze(1)  # (batch, 1, hidden)
        x = self.transformer(x)
        x = x.squeeze(1)  # (batch, hidden)
        return self.classifier(x)

model = TemporalTransformer(input_dim=X_train_scaled.shape[1]).to(device)
print(f"‚úÖ Model created: {sum(p.numel() for p in model.parameters())} parameters")

## 8. Training with 5-Fold CV

In [None]:
def train_model(model, X_train, y_train, X_val, y_val, epochs=20):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    X_train_t = torch.FloatTensor(X_train).to(device)
    y_train_t = torch.LongTensor(y_train).to(device)
    X_val_t = torch.FloatTensor(X_val).to(device)
    y_val_t = torch.LongTensor(y_val).to(device)
    
    best_val_acc = 0
    best_model = None
    
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(X_train_t)
        loss = criterion(outputs, y_train_t)
        loss.backward()
        optimizer.step()
        
        model.eval()
        with torch.no_grad():
            val_outputs = model(X_val_t)
            val_preds = val_outputs.argmax(dim=1)
            val_acc = (val_preds == y_val_t).float().mean().item()
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model = model.state_dict().copy()
        
        if (epoch + 1) % 5 == 0:
            print(f"  Epoch {epoch+1}/{epochs} - Loss: {loss.item():.4f}, Val Acc: {val_acc:.4f}")
    
    return best_model, best_val_acc

# 5-Fold CV
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
fold_results = []

for fold, (train_idx, val_idx) in enumerate(skf.split(X_train_scaled, y_train)):
    print(f"\nFOLD {fold+1}/5")
    model = TemporalTransformer(input_dim=X_train_scaled.shape[1]).to(device)
    best_model, val_acc = train_model(model, X_train_scaled[train_idx], y_train[train_idx],
                                      X_train_scaled[val_idx], y_train[val_idx])
    fold_results.append(val_acc)
    print(f"  ‚úÖ Fold {fold+1} Accuracy: {val_acc:.4f}")

print(f"\n5-Fold CV: {np.mean(fold_results):.4f} ¬± {np.std(fold_results):.4f}")

## 9. Final Test Evaluation

In [None]:
# Train final model
final_model = TemporalTransformer(input_dim=X_train_scaled.shape[1]).to(device)
best_model_state, _ = train_model(final_model, X_train_scaled, y_train, X_val_scaled, y_val, epochs=30)
final_model.load_state_dict(best_model_state)

# Test
final_model.eval()
with torch.no_grad():
    X_test_t = torch.FloatTensor(X_test_scaled).to(device)
    test_outputs = final_model(X_test_t)
    test_preds = test_outputs.argmax(dim=1).cpu().numpy()
    test_probs = torch.softmax(test_outputs, dim=1).cpu().numpy()[:, 1]

# Metrics
accuracy = accuracy_score(y_test, test_preds)
precision, recall, f1, _ = precision_recall_fscore_support(y_test, test_preds, average='binary')
cm = confusion_matrix(y_test, test_preds)
auc = roc_auc_score(y_test, test_probs)

print(f"\n{'='*60}")
print("FINAL TEST RESULTS")
print(f"{'='*60}")
print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1-Score:  {f1:.4f}")
print(f"ROC-AUC:   {auc:.4f}")
print(f"\nConfusion Matrix:\n{cm}")
print(f"{'='*60}")

# Save
metrics = {
    'test_accuracy': float(accuracy),
    'precision': float(precision),
    'recall': float(recall),
    'f1': float(f1),
    'roc_auc': float(auc),
    'confusion_matrix': cm.tolist(),
    'cv_mean_accuracy': float(np.mean(fold_results)),
    'cv_std_accuracy': float(np.std(fold_results)),
    'features_used': list(config['features'].keys()),
    'timestamp': datetime.now().isoformat()
}

with open(f"{OUTPUT_DIR}/metrics.json", 'w') as f:
    json.dump(metrics, f, indent=2)

torch.save({
    'model_state_dict': final_model.state_dict(),
    'scaler': scaler,
    'config': config
}, f"{OUTPUT_DIR}/models/best_model_multimodal.pth")

print("\n‚úÖ Results saved")

## 10. Visualizations

In [None]:
# Confusion Matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Healthy', 'Lame'],
            yticklabels=['Healthy', 'Lame'])
plt.ylabel('True')
plt.xlabel('Predicted')
plt.title(f'Confusion Matrix (Acc: {accuracy:.2%})')
plt.savefig(f"{OUTPUT_DIR}/figures/confusion_matrix.png", dpi=300, bbox_inches='tight')
plt.show()

# ROC Curve
fpr, tpr, _ = roc_curve(y_test, test_probs)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, linewidth=2, label=f'AUC = {auc:.3f}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.title('ROC Curve')
plt.legend()
plt.grid(alpha=0.3)
plt.savefig(f"{OUTPUT_DIR}/figures/roc_curve.png", dpi=300, bbox_inches='tight')
plt.show()

print("‚úÖ Complete!")