In [1]:
!pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu117
!pip install opencv-python-headless matplotlib tqdm pandas


!git clone https://github.com/WongKinYiu/yolov7.git
%cd yolov7


!wget https://github.com/WongKinYiu/yolov7/releases/download/v0.1/yolov7.pt

import os
import cv2
import torch
import numpy as np
from tqdm import tqdm
from torchvision import transforms
from models.experimental import attempt_load
from utils.general import non_max_suppression, scale_coords
from utils.datasets import letterbox
from utils.torch_utils import select_device

class Config:
    def __init__(self):
        self.device = select_device('0')
        self.weights = 'yolov7.pt'
        self.img_size = 512
        self.batch_size = 16
        self.conf_thres = 0.25  # Confidence threshold
        self.iou_thres = 0.45  # IOU threshold for NMS
        self.max_det = 1000  # Maximum number of detections per image
        self.feature_dir = '/kaggle/working/features'
        self.data_root = '/kaggle/input/dyslexia-handwriting/Processed'
        
        # Create feature directories
        os.makedirs(os.path.join(self.feature_dir, 'Train', 'normal'), exist_ok=True)
        os.makedirs(os.path.join(self.feature_dir, 'Train', 'reversal'), exist_ok=True)
        os.makedirs(os.path.join(self.feature_dir, 'Train', 'corrected'), exist_ok=True)
        os.makedirs(os.path.join(self.feature_dir, 'Test', 'normal'), exist_ok=True)
        os.makedirs(os.path.join(self.feature_dir, 'Test', 'reversal'), exist_ok=True)
        os.makedirs(os.path.join(self.feature_dir, 'Test', 'corrected'), exist_ok=True)

config = Config()

model = attempt_load(config.weights, map_location=config.device)  # load FP32 model
model.eval()
features = {}
def get_features(name):
    def hook(model, input, output):
        features[name] = output.detach()
    return hook
model.model[-2].register_forward_hook(get_features('features'))

def preprocess_image(img_path):
    img = cv2.imread(img_path)
    if img is None:
        return None
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img.astype(np.float32) / 255.0
    
    img = torch.from_numpy(img).permute(2, 0, 1).unsqueeze(0)
    
    return img.to(config.device)

def extract_features(img_path):
    img = preprocess_image(img_path)
    if img is None:
        return None
    with torch.no_grad():
        pred = model(img)
    feature_map = features['features']
    
    pooled_features = torch.nn.functional.adaptive_avg_pool2d(feature_map, (1, 1))
    pooled_features = pooled_features.squeeze(-1).squeeze(-1)
    
    return pooled_features.cpu().numpy()

def process_dataset(data_type='Train', class_name='normal'):
    print(f"Processing {data_type}/{class_name}...")
    
    img_dir = os.path.join(config.data_root, data_type, class_name)
    img_paths = [os.path.join(img_dir, f) for f in os.listdir(img_dir) if f.endswith(('.png', '.jpg', '.jpeg'))]
    
    save_dir = os.path.join(config.feature_dir, data_type, class_name)
    
    for i in tqdm(range(0, len(img_paths), config.batch_size)):
        batch_paths = img_paths[i:i+config.batch_size]
        batch_features = []
        
        for img_path in batch_paths:
            features = extract_features(img_path)
            if features is not None:
                batch_features.append(features)
        
        if batch_features:
            batch_features = np.vstack(batch_features)
            
            for j, img_path in enumerate(batch_paths):
                if j < len(batch_features):
                    feature = batch_features[j]
                    filename = os.path.splitext(os.path.basename(img_path))[0]
                    np.save(os.path.join(save_dir, f"{filename}.npy"), feature)

for data_type in ['Train', 'Test']:
    for class_name in ['normal', 'reversal', 'corrected']:
        process_dataset(data_type, class_name)

print("Feature extraction completed!")

Looking in indexes: https://pypi.org/simple, https://download.pytorch.org/whl/cu117
Cloning into 'yolov7'...
remote: Enumerating objects: 1197, done.[K
remote: Total 1197 (delta 0), reused 0 (delta 0), pack-reused 1197 (from 1)[K
Receiving objects: 100% (1197/1197), 74.23 MiB | 36.85 MiB/s, done.
Resolving deltas: 100% (520/520), done.
/kaggle/working/yolov7
--2025-05-04 07:09:22--  https://github.com/WongKinYiu/yolov7/releases/download/v0.1/yolov7.pt
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/511187726/b0243edf-9fb0-4337-95e1-42555f1b37cf?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250504%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250504T070922Z&X-Amz-Expires=300&X-Amz-Signature=e0a56ce9a8157bdf6b994a49943aeeb7458f7b0869cab8eac46875138d14

  ckpt = torch.load(w, map_location=map_location)  # load


Fusing layers... 
RepConv.fuse_repvgg_block
RepConv.fuse_repvgg_block
RepConv.fuse_repvgg_block
Processing Train/normal...


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]
100%|██████████| 2459/2459 [20:44<00:00,  1.98it/s]


Processing Train/reversal...


100%|██████████| 2924/2924 [25:03<00:00,  1.95it/s]


Processing Train/corrected...


100%|██████████| 4096/4096 [36:35<00:00,  1.87it/s]  


Processing Test/normal...


100%|██████████| 1223/1223 [10:53<00:00,  1.87it/s]


Processing Test/reversal...


100%|██████████| 1118/1118 [09:34<00:00,  1.94it/s]


Processing Test/corrected...


100%|██████████| 1206/1206 [10:29<00:00,  1.92it/s]

Feature extraction completed!





In [2]:
# First ensure all required imports are present
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
import seaborn as sns

# Configuration class remains the same
class Config:
    def __init__(self):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.feature_dir = '/kaggle/working/features'
        self.num_classes = 3  # normal, reversal, corrected
        self.batch_size = 32
        self.learning_rate = 0.001  # Reduced from original
        self.epochs = 200
        self.class_names = ['normal', 'reversal', 'corrected']
        self.class_to_idx = {name: i for i, name in enumerate(self.class_names)}
        
config = Config()
torch.manual_seed(42)
np.random.seed(42)

# Dataset class remains the same
class DyslexiaDataset(Dataset):
    def __init__(self, data_type='Train'):
        self.data = []
        self.labels = []
        
        for class_name in config.class_names:
            class_dir = os.path.join(config.feature_dir, data_type, class_name)
            for feature_file in os.listdir(class_dir):
                if feature_file.endswith('.npy'):
                    feature_path = os.path.join(class_dir, feature_file)
                    self.data.append(feature_path)
                    self.labels.append(config.class_to_idx[class_name])
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        feature = np.load(self.data[idx])
        return torch.from_numpy(feature).float(), self.labels[idx]

# Enhanced Model Architecture
class EnhancedMobileNetV2(nn.Module):
    def __init__(self, num_classes=3):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU6(),
            nn.MaxPool1d(2),
            
            nn.Conv1d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU6(),
            nn.MaxPool1d(2),
            
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU6(),
            nn.MaxPool1d(2),
            
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU6(),
            nn.AdaptiveAvgPool1d(1)
        )
        self.classifier = nn.Sequential(
            nn.Dropout(0.6),
            nn.Linear(256, 128),
            nn.ReLU6(),
            nn.Linear(128, num_classes)
        )
    
    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension
        x = self.features(x)
        x = x.view(x.size(0), -1)  # Flatten
        return self.classifier(x)

# Calculate class weights
def get_class_weights(dataset):
    labels = dataset.labels
    class_weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels)
    return torch.tensor(class_weights, dtype=torch.float).to(config.device)

# Enhanced training function
def train_model_enhanced():
    # Initialize model
    model = EnhancedMobileNetV2(config.num_classes).to(config.device)
    
    # Handle class imbalance
    class_weights = get_class_weights(train_dataset)
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    
    # Optimizer with weight decay
    optimizer = optim.AdamW(model.parameters(), lr=config.learning_rate, weight_decay=1e-4)
    
    # Learning rate scheduler
    scheduler = ReduceLROnPlateau(optimizer, 'max', patience=5, factor=0.5, verbose=True)
    
    best_accuracy = 0.0
    
    print("Starting enhanced training...")
    for epoch in range(config.epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for features, labels in train_loader:
            features, labels = features.to(config.device), labels.to(config.device)
            
            # Simple data augmentation
            if np.random.rand() > 0.5:
                features += torch.randn_like(features) * 0.01  # Add small noise
            
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            
            # Gradient clipping
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            
            optimizer.step()
            
            # Track metrics
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        epoch_acc = 100 * correct / total
        print(f'Epoch [{epoch+1}/{config.epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {epoch_acc:.2f}%')
        
        # Validation
        val_acc = validate_model(model)
        scheduler.step(val_acc)
        
        # Save best model
        if val_acc > best_accuracy:
            best_accuracy = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'New best model saved with val accuracy: {best_accuracy:.2f}%')
    
    # Load best model before return
    model.load_state_dict(torch.load('best_model.pth'))
    return model

def validate_model(model):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for features, labels in test_loader:
            features, labels = features.to(config.device), labels.to(config.device)
            outputs = model(features)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return 100 * correct / total

# Enhanced evaluation
def evaluate_model_enhanced(model):
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []
    
    with torch.no_grad():
        correct = 0
        total = 0
        for features, labels in test_loader:
            features, labels = features.to(config.device), labels.to(config.device)
            outputs = model(features)
            _, predicted = torch.max(outputs.data, 1)
            probs = torch.softmax(outputs, dim=1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())
        
        accuracy = 100 * correct / total
        mAP, mIoU, f1 = calculate_enhanced_metrics(np.array(all_preds), 
                                                np.array(all_labels),
                                                np.array(all_probs),
                                                config.num_classes)
        
        print(f'\nEnhanced Test Metrics:')
        print(f'Accuracy: {accuracy:.2f}%')
        print(f'Weighted mAP: {mAP:.4f}')
        print(f'Weighted mIoU: {mIoU:.4f}')
        print(f'Weighted F1 Score: {f1:.4f}')
        
        print('\nClassification Report:')
        print(classification_report(all_labels, all_preds, 
                                target_names=config.class_names,
                                digits=4))
        
        # Confusion matrix
        cm = confusion_matrix(all_labels, all_preds)
        plt.figure(figsize=(10,8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                  xticklabels=config.class_names,
                  yticklabels=config.class_names)
        plt.title('Confusion Matrix')
        plt.xlabel('Predicted')
        plt.ylabel('Actual')
        plt.show()

def calculate_enhanced_metrics(all_preds, all_labels, all_probs, num_classes):
    # Weighted mAP
    aps = []
    for cls in range(num_classes):
        tp = np.sum((all_preds == cls) & (all_labels == cls))
        fp = np.sum((all_preds == cls) & (all_labels != cls))
        precision = tp / (tp + fp + 1e-10)
        aps.append(precision)
    
    # Weight by class support
    class_counts = np.bincount(all_labels)
    weights = class_counts / len(all_labels)
    weighted_mAP = np.sum(np.array(aps) * weights)
    
    # Weighted mIoU
    ious = []
    for cls in range(num_classes):
        intersection = np.sum((all_preds == cls) & (all_labels == cls))
        union = np.sum((all_preds == cls) | (all_labels == cls))
        ious.append(intersection / (union + 1e-10))
    weighted_mIoU = np.sum(np.array(ious) * weights)
    
    # Weighted F1
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted')
    
    return weighted_mAP, weighted_mIoU, f1

# Initialize datasets
train_dataset = DyslexiaDataset('Train')
test_dataset = DyslexiaDataset('Test')

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=config.batch_size, shuffle=False)

# Run the enhanced pipeline
print("Starting training pipeline...")
enhanced_model = train_model_enhanced()
print("\nEvaluating model...")
evaluate_model_enhanced(enhanced_model)

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/working/features/Train/normal'