In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
from transformers import ViTModel
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import numpy as np
import matplotlib.pyplot as plt
import cv2
from PIL import Image
import torch.nn.functional as F

In [None]:
import os
import gc
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms
from transformers import ViTModel
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from PIL import Image

# Custom Model: Hybrid EfficientNet + ViT
class HybridEfficientNetViT(nn.Module):
    def __init__(self, num_classes=5, dropout_rate=0.3):
        super(HybridEfficientNetViT, self).__init__()
        self.effnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.IMAGENET1K_V1)
        self.effnet_features = self.effnet.features
        self.effnet_avgpool = nn.AdaptiveAvgPool2d(1)
        
        self.vit = ViTModel.from_pretrained("google/vit-base-patch16-224", add_pooling_layer=False)

        self.fc1 = nn.Linear(1280 + 768, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        device = next(self.parameters()).device
        x = x.to(device)
        
        effnet_features = self.effnet_features(x)
        effnet_features = self.effnet_avgpool(effnet_features)
        effnet_features = torch.flatten(effnet_features, 1)
        
        vit_outputs = self.vit(pixel_values=x).last_hidden_state[:, 0, :]
        
        combined_features = torch.cat((effnet_features, vit_outputs), dim=1)
        x = self.bn1(F.relu(self.fc1(combined_features)))
        x = self.dropout1(x)
        x = self.fc2(x)
        return x

# Training Function
def train_model(model, train_loader, val_loader, epochs=5, lr=1e-4, device="cuda"):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr)
    scaler = torch.cuda.amp.GradScaler()
    best_acc = 0.0
    best_model_path = "best_hybrid_model.pth"
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            
            with torch.cuda.amp.autocast():
                outputs = model(images)
                loss = criterion(outputs, labels)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.4f}")
        acc = evaluate_model(model, val_loader, device)
        
        if acc > best_acc:
            best_acc = acc
            torch.save(model.state_dict(), best_model_path)
            print("Best model saved!")
    
    torch.save(model.state_dict(), "hybrid_model.pth")
    torch.save(model, "hybrid_model_full.pth")
    print("Final model saved successfully!")

# Evaluation Function
def evaluate_model(model, data_loader, device):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    
    print(f"Accuracy: {acc:.4f}, F1 Score: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")
    return acc

# Data Augmentation
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

# Dataset Path
DATASET_PATH = r"/content/drive/MyDrive/hackathone/Diabetic_Balanced_Data"

# Custom PyTorch Dataset
class DiabeticRetinopathyDataset(Dataset):
    def __init__(self, base_path, transform=None):
        self.base_path = base_path
        self.transform = transform
        self.image_paths = []
        self.labels = []

        for class_label in range(5):  # Classes: 0, 1, 2, 3, 4
            folder_path = os.path.join(base_path, str(class_label))
            if not os.path.exists(folder_path):
                print(f"Warning: Folder {folder_path} not found!")
                continue
            
            for img_name in os.listdir(folder_path):
                img_path = os.path.join(folder_path, img_name)
                self.image_paths.append(img_path)
                self.labels.append(class_label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Creating DataLoaders
BATCH_SIZE = 32
train_dataset = DiabeticRetinopathyDataset(os.path.join(DATASET_PATH, "train"), transform=transform)
val_dataset = DiabeticRetinopathyDataset(os.path.join(DATASET_PATH, "val"), transform=transform)
test_dataset = DiabeticRetinopathyDataset(os.path.join(DATASET_PATH, "test"), transform=transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = HybridEfficientNetViT(num_classes=5, dropout_rate=0.3)
train_model(model, train_loader, val_loader, epochs=10, lr=1e-4, device=device)


In [None]:
def predict_image(image_path, model_path="hybrid_model.pth", device="cuda"):
    model = HybridEfficientNetViT(num_classes=5)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    
    image = Image.open(image_path).convert("RGB")
    image = transform(image).unsqueeze(0).to(device)
    
    with torch.no_grad():
        output = model(image)
        predicted_class = torch.argmax(output, dim=1).item()
    
    return predicted_class

In [None]:

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Evaluate on test dataset
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
acc, f1, precision, recall = evaluate_model(model, test_loader, device)

# Print results
print(f"Test Accuracy: {acc:.4f}")
print(f"Test F1 Score: {f1:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
