In [8]:
import cv2
import os

# Define input and output base folders
input_base_folder = r"C:\Users\sidda\Downloads\amd1stattempt"  # Change this to your dataset folder
output_base_folder = r"C:\Users\sidda\Downloads\output_dir"

# Define categories (subfolders inside input_base_folder)
categories = ["dryamd", "wetamd", "healthy"]

# Create output directories for each category
for category in categories:
    os.makedirs(os.path.join(output_base_folder, category), exist_ok=True)

# CLAHE parameters (applied to the Green channel)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))

# Median filter kernel size
kernel_size = 3  # Small kernel to avoid excessive blurring

# Process images from each category folder
for category in categories:
    input_folder = os.path.join(input_base_folder, category)
    output_folder = os.path.join(output_base_folder, category)

    if not os.path.exists(input_folder):
        print(f"⚠️ Warning: {input_folder} does not exist!")
        continue

    print(f"📂 Processing category: {category}")

    for filename in os.listdir(input_folder):
        file_path = os.path.join(input_folder, filename)

        if filename.lower().endswith((".jpg", ".png", ".jpeg")):
            print(f"   ➜ Processing {filename}...")

            # Read the color fundus image
            img = cv2.imread(file_path)

            if img is None:
                print(f"   ⚠️ Skipping {filename}: Could not read file.")
                continue

            # Convert to LAB color space and apply CLAHE on L-channel
            lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
            l, a, b = cv2.split(lab)
            l_clahe = clahe.apply(l)
            lab_clahe = cv2.merge((l_clahe, a, b))
            img_clahe = cv2.cvtColor(lab_clahe, cv2.COLOR_LAB2BGR)

            # Apply median filtering on CLAHE-enhanced image
            filtered_img = cv2.medianBlur(img_clahe, kernel_size)

            # Save the processed image
            output_path = os.path.join(output_folder, filename)
            cv2.imwrite(output_path, filtered_img)

    print(f"✅ Finished processing {category}!\n")

print("🎉 All processing complete! Check:", output_base_folder)


📂 Processing category: dryamd
   ➜ Processing ABDUL_MAJEED_15-11-2012_P700255_(0009).jpg...
   ➜ Processing Aisha_lasania_20-5-2008_P469051_(0001).jpg...
   ➜ Processing BALGURI_BUCHAIAH_29-8-2009_N200166_(0000).jpg...
   ➜ Processing BALGURI_BUCHAIAH_29-8-2009_N200166_(0004).jpg...
   ➜ Processing BANDA_SATYAMMA_24-9-2012_P446897_(0011).jpg...
   ➜ Processing BANDA_SATYAMMA_24-9-2012_P446897_(0033).jpg...
   ➜ Processing BATHULA_CHITTAMMA_21-10-2009_P475278_(0009).jpg...
   ➜ Processing BIPLABKUMAR_NANDY_21-9-2009_P545332_(0004).jpg...
   ➜ Processing BIPLABKUMAR_NANDY_21-9-2009_P545332_(0011).jpg...
   ➜ Processing BOPPANA_NAGESH WAR RAO_12-12-2009_P555348_(0016).jpg...
   ➜ Processing CHINNA_RAJAMMA_22-11-2011_P648076_(0006).jpg...
   ➜ Processing CHINNA_RAJAMMA_22-11-2011_P648076_(0012).jpg...
   ➜ Processing DIPTI_B_3-2-2011_P589427_(0017).jpg...
   ➜ Processing DURGADAS_MUNSHI_21-9-2009_P545175_(0006).jpg...
   ➜ Processing DURGADAS_MUNSHI_21-9-2009_P545175_(0012).jpg...
   ➜ Pro

In [None]:
import cv2
import os
import numpy as np
import albumentations as A
from albumentations.core.composition import OneOf
from albumentations.augmentations import transforms

# Define input and output folders
input_base_folder = r"C:\Users\sidda\Downloads\output_dir"  # Folder with preprocessed images
output_base_folder = "augmented_images"

# Categories
categories = ["wet_amd", "dry_amd", "healthy"]

# Define augmentation pipeline
augmentations = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.Rotate(limit=20, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.1, rotate_limit=15, p=0.5),
    A.GaussianBlur(p=0.3),
    A.HueSaturationValue(p=0.3),
])

# Process each category
for category in categories:
    input_folder = os.path.join(input_base_folder, category)
    output_folder = os.path.join(output_base_folder, category)
    os.makedirs(output_folder, exist_ok=True)

    image_files = [f for f in os.listdir(input_folder) if f.endswith(('.jpg', '.png'))]
    num_original_images = len(image_files)
    
    print(f"📂 Augmenting {category}: {num_original_images} images ➝ 150 images")

    # Generate augmented images
    augmented_count = 0
    while augmented_count < 150:  # We need 150 additional images
        for img_name in image_files:
            img_path = os.path.join(input_folder, img_name)
            image = cv2.imread(img_path)

            if image is None:
                continue

            # Apply augmentation
            augmented = augmentations(image=image)["image"]

            # Save the augmented image
            new_name = f"{augmented_count+1}_{img_name}"
            cv2.imwrite(os.path.join(output_folder, new_name), augmented)

            augmented_count += 1
            if augmented_count >= 150:
                break  # Stop when we reach 150 new images

    print(f"✅ Finished augmenting {category}!")

print("🎉 Traditional augmentation complete!")


In [1]:
!pip install torch torchvision torchaudio



Collecting torchvision
  Using cached torchvision-0.21.0-cp310-cp310-win_amd64.whl.metadata (6.3 kB)
Collecting torchaudio
  Using cached torchaudio-2.6.0-cp310-cp310-win_amd64.whl.metadata (6.7 kB)
Using cached torchvision-0.21.0-cp310-cp310-win_amd64.whl (1.6 MB)
Using cached torchaudio-2.6.0-cp310-cp310-win_amd64.whl (2.4 MB)
Installing collected packages: torchvision, torchaudio
Successfully installed torchaudio-2.6.0 torchvision-0.21.0



[notice] A new release of pip is available: 24.3.1 -> 25.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:

pip install matplotlib pillow





[notice] A new release of pip is available: 24.3.1 -> 25.0
[notice] To update, run: python.exe -m pip install --upgrade pip





In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import save_image
import os

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Data Transformations
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))  # Normalizing between -1 and 1
])

# Dataset Paths
train_path = r'C:\Users\sidda\Downloads\amdmypart\augmented_images\train'
val_path = r'C:\Users\sidda\Downloads\amdmypart\augmented_images\val'

# Load Datasets
train_dataset = datasets.ImageFolder(train_path, transform=transform)
val_dataset = datasets.ImageFolder(val_path, transform=transform)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True)

print("✅ Datasets loaded successfully!")


✅ Datasets loaded successfully!


In [3]:
import torch.nn as nn

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=1, padding=3),
            nn.ReLU(inplace=True),

            # Downsampling
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),

            # Bottleneck
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),

            # ✅ Upsampling with PixelShuffle
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.PixelShuffle(2),  # Replaces ConvTranspose2d
            nn.ReLU(inplace=True),

            nn.Conv2d(64, 3, kernel_size=7, stride=1, padding=3),
            nn.Tanh()
        )

    def forward(self, x):
        return self.model(x)


In [4]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(128, 1, kernel_size=4, stride=1, padding=1),
            nn.Sigmoid()  # Output between 0 and 1
        )

    def forward(self, x):
        return self.model(x)


In [5]:
# Model Initialization
G_AB = Generator().to(device)
G_BA = Generator().to(device)
D_A = Discriminator().to(device)
D_B = Discriminator().to(device)

# Loss Functions
adversarial_loss = nn.MSELoss()
cycle_consistency_loss = nn.L1Loss()

# Optimizers
lr = 0.0002
beta1, beta2 = 0.5, 0.999
g_optimizer = optim.Adam(list(G_AB.parameters()) + list(G_BA.parameters()), lr=lr, betas=(beta1, beta2))
d_optimizer = optim.Adam(list(D_A.parameters()) + list(D_B.parameters()), lr=lr, betas=(beta1, beta2))

# Training Parameters
num_epochs = 50
lambda_cycle = 20

# Training Loop
for epoch in range(num_epochs):
    for i, (data_A, data_B) in enumerate(zip(train_loader, val_loader)):
        real_A = data_A[0].to(device)
        real_B = data_B[0].to(device)

        # Adversarial Ground Truths (Dynamic Sizing)
        valid = torch.ones_like(D_A(real_A), device=device)
        fake = torch.zeros_like(D_A(real_A), device=device)

        # ------------------
        #  Train Generators
        # ------------------
        g_optimizer.zero_grad()

        # Identity Loss
        loss_id_A = cycle_consistency_loss(G_BA(real_A), real_A)
        loss_id_B = cycle_consistency_loss(G_AB(real_B), real_B)

        # Adversarial Loss
        fake_B = G_AB(real_A)
        loss_gan_AB = adversarial_loss(D_B(fake_B), valid[:fake_B.size(0)])

        fake_A = G_BA(real_B)
        loss_gan_BA = adversarial_loss(D_A(fake_A), valid[:fake_A.size(0)])

        # Cycle Consistency Loss
        recovered_A = G_BA(fake_B)
        loss_cycle_A = cycle_consistency_loss(recovered_A, real_A)

        recovered_B = G_AB(fake_A)
        loss_cycle_B = cycle_consistency_loss(recovered_B, real_B)

        # Total Generator Loss
        g_loss = (
            loss_gan_AB + loss_gan_BA +
            lambda_cycle * (loss_cycle_A + loss_cycle_B) +
            0.5 * (loss_id_A + loss_id_B)
        )
        g_loss.backward()
        g_optimizer.step()

        # ---------------------
        #  Train Discriminators
        # ---------------------
        d_optimizer.zero_grad()

        # Real Loss
        loss_real_A = adversarial_loss(D_A(real_A), valid[:real_A.size(0)])
        loss_real_B = adversarial_loss(D_B(real_B), valid[:real_B.size(0)])

        # Fake Loss
        loss_fake_A = adversarial_loss(D_A(fake_A.detach()), fake[:fake_A.size(0)])
        loss_fake_B = adversarial_loss(D_B(fake_B.detach()), fake[:fake_B.size(0)])

        # Total Discriminator Loss
        d_loss = (loss_real_A + loss_fake_A + loss_real_B + loss_fake_B) * 0.5
        d_loss.backward()
        d_optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}] - Generator Loss: {g_loss.item():.4f} - Discriminator Loss: {d_loss.item():.4f}")

print("🎯 Training Complete!")


   


Epoch [1/50] - Generator Loss: 7.6599 - Discriminator Loss: 0.4643
Epoch [2/50] - Generator Loss: 5.3198 - Discriminator Loss: 0.4280
Epoch [3/50] - Generator Loss: 4.6248 - Discriminator Loss: 0.4011
Epoch [4/50] - Generator Loss: 5.8197 - Discriminator Loss: 0.4868
Epoch [5/50] - Generator Loss: 5.8968 - Discriminator Loss: 0.4235
Epoch [6/50] - Generator Loss: 4.8991 - Discriminator Loss: 0.3339
Epoch [7/50] - Generator Loss: 5.6078 - Discriminator Loss: 0.3221
Epoch [8/50] - Generator Loss: 5.7166 - Discriminator Loss: 0.3723
Epoch [9/50] - Generator Loss: 5.1347 - Discriminator Loss: 0.3495
Epoch [10/50] - Generator Loss: 4.4628 - Discriminator Loss: 0.3989
Epoch [11/50] - Generator Loss: 4.2441 - Discriminator Loss: 0.3884
Epoch [12/50] - Generator Loss: 4.2819 - Discriminator Loss: 0.4335
Epoch [13/50] - Generator Loss: 4.5533 - Discriminator Loss: 0.3956
Epoch [14/50] - Generator Loss: 4.8878 - Discriminator Loss: 0.3790
Epoch [15/50] - Generator Loss: 4.1725 - Discriminator Lo

In [6]:
import os
import torch
from torchvision.utils import save_image

# ✅ Output Directories
output_dirs = {
    'wet_AMD': r'C:\Users\sidda\Downloads\amdmypart\Results_of_gan\wetamd',
    'dry_AMD': r'C:\Users\sidda\Downloads\amdmypart\Results_of_gan\dryamd',
    'healthy': r'C:\Users\sidda\Downloads\amdmypart\Results_of_gan\healthy'
}

# Automatically create directories if they don't exist
for dir_path in output_dirs.values():
    os.makedirs(dir_path, exist_ok=True)

# ✅ Function to Generate Images (Same Size as Input)
def generate_images(generator, data_loader, output_dir, num_images=50):
    os.makedirs(output_dir, exist_ok=True)
    generator.eval()  # Set generator to evaluation mode
    count = 0

    with torch.no_grad():
        for images, _ in data_loader:
            images = images.to(device)
            fake_images = generator(images)

            # Match the original size
            if fake_images.size() != images.size():
                fake_images = torch.nn.functional.interpolate(fake_images, size=(images.size(2), images.size(3)), mode='bilinear', align_corners=False)

            for i in range(fake_images.size(0)):
                if count >= num_images:
                    return  # Stop after generating the required number of images
                save_image(fake_images[i], os.path.join(output_dir, f"generated_{count + 1}.png"), normalize=True)
                count += 1

# ✅ Generate 50 Images for Each Category
generate_images(G_AB, train_loader, output_dirs['wet_AMD'], num_images=50)
generate_images(G_BA, val_loader, output_dirs['dry_AMD'], num_images=50)
generate_images(G_AB, val_loader, output_dirs['healthy'], num_images=50)

print("✅ 50 Images Generated for Each Category with Original Size!")


✅ 50 Images Generated for Each Category with Original Size!


In [7]:
from torchvision import transforms

# Add Resize Transformation
resize_transform = transforms.Resize((512, 512))  # Change (256, 256) to your desired size

def generate_images(generator, data_loader, output_dir, num_images=50):
    generator.eval()
    count = 0

    with torch.no_grad():
        for images, _ in data_loader:
            images = images.to(device)
            fake_images = generator(images)

            for i in range(fake_images.size(0)):
                if count >= num_images:
                    return
                # Apply the resizing transformation
                resized_image = resize_transform(fake_images[i].cpu())
                save_image(resized_image, os.path.join(output_dir, f"generated_{count+1}.png"), normalize=True)
                count += 1

# Generate resized images
generate_images(G_AB, train_loader, output_dirs['wet_AMD'], num_images=50)
generate_images(G_AB, val_loader, output_dirs['dry_AMD'], num_images=50)
generate_images(G_AB, train_loader, output_dirs['healthy'], num_images=50)

print("✅ Resized Images Generated!")


✅ Resized Images Generated!
