In [None]:
from google.colab import drive
import os

# Mount Google Drive
drive.mount('/content/drive')

# Define project directory
project_dir = "/content/drive/MyDrive/LeafImageGeneration"
dataset_dir = os.path.join(project_dir, "dataset")
results_dir = os.path.join(project_dir, "results")

# Create directories
os.makedirs(dataset_dir, exist_ok=True)
os.makedirs(results_dir, exist_ok=True)

print(f"✅ Directories set up: {dataset_dir}, {results_dir}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Directories set up: /content/drive/MyDrive/LeafImageGeneration/dataset, /content/drive/MyDrive/LeafImageGeneration/results


In [None]:
import os
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import glob
from diffusers import UNet2DModel, DDPMScheduler
import torch.nn.functional as F
from tqdm import tqdm
import numpy as np
import logging

# ✅ Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# ✅ Configuration
DATASET_PATH = "/content/drive/MyDrive/LeafImageGeneration/dataset"
SAVE_IMAGE_DIR = "/content/drive/MyDrive/LeafImageGeneration/results"
IMAGE_SIZE = 128
BATCH_SIZE = 8
NUM_EPOCHS = 5
LEARNING_RATE = 1e-4
NUM_TIMESTEPS = 500
NUM_IMAGES = 50  # 🔥 Generate 50 images
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ✅ Ensure dataset exists
if not os.path.exists(DATASET_PATH):
    raise FileNotFoundError(f"❌ Dataset folder not found: {DATASET_PATH}")

# ✅ Ensure save directory exists
os.makedirs(SAVE_IMAGE_DIR, exist_ok=True)

# ✅ Apply Transformation
transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# ✅ Custom Dataset Loader (No Subfolders Required)
class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.image_paths = glob.glob(os.path.join(root_dir, "*.*"))
        if not self.image_paths:
            raise FileNotFoundError(f"❌ No images found in {root_dir}")
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image  # No class labels needed

# ✅ Load dataset
def load_dataset(dataset_path, transform, batch_size=8):
    dataset = CustomImageDataset(dataset_path, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
    logging.info(f"✅ Loaded {len(dataset)} images from dataset!")
    return dataloader

# ✅ Define Model
def create_model():
    model = UNet2DModel(
        sample_size=IMAGE_SIZE,
        in_channels=3,
        out_channels=3,
        layers_per_block=1,  # Reduce layers
        block_out_channels=(64, 128, 256, 512),  # Reduce channels
        down_block_types=("DownBlock2D", "DownBlock2D", "DownBlock2D", "DownBlock2D"),
        up_block_types=("UpBlock2D", "UpBlock2D", "UpBlock2D", "UpBlock2D")
    )
    return model

# ✅ Training Loop
def train_model(model, dataloader, optimizer, scheduler, num_epochs, device):
    scaler = torch.cuda.amp.GradScaler()  # Mixed precision
    model.to(device, memory_format=torch.channels_last)

    for epoch in range(num_epochs):
        epoch_loss = 0
        model.train()
        for images in tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            images = images.to(device, memory_format=torch.channels_last, non_blocking=True)
            noise = torch.randn_like(images, device=device)

            timesteps = torch.randint(0, NUM_TIMESTEPS, (images.shape[0],), device=device).long()
            noisy_images = scheduler.add_noise(images, noise, timesteps)

            with torch.cuda.amp.autocast():  # Mixed precision
                predicted_noise = model(noisy_images, timesteps)["sample"]
                loss = F.mse_loss(predicted_noise, noise)

            optimizer.zero_grad()
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            epoch_loss += loss.item()

        torch.cuda.empty_cache()  # Free GPU memory
        logging.info(f"📉 Epoch [{epoch+1}/{num_epochs}] - Avg Loss: {epoch_loss / len(dataloader):.4f}")

# ✅ Generate Multiple Images After Training
def generate_images(model, scheduler, device, num_images):
    model.eval()
    for i in range(num_images):
        with torch.no_grad():  # 🔥 Reduce memory usage
            z = torch.randn((1, 3, IMAGE_SIZE, IMAGE_SIZE), device=device)
            for t in reversed(range(NUM_TIMESTEPS)):
                z = scheduler.step(model(z, torch.tensor([t], device=device))["sample"], t, z)["prev_sample"]

        generated_image = (z.squeeze(0).permute(1, 2, 0).cpu().numpy() + 1) / 2
        generated_image = (generated_image * 255).astype(np.uint8)

        # ✅ Save Image
        save_path = os.path.join(SAVE_IMAGE_DIR, f"diffusion_output_{i}.png")
        Image.fromarray(generated_image).save(save_path)
        logging.info(f"✅ Image {i+1}/{num_images} saved: {save_path}")

# ✅ Main Function
def main():
    # Load dataset
    dataloader = load_dataset(DATASET_PATH, transform, BATCH_SIZE)

    # Create model, scheduler, and optimizer
    model = create_model()
    scheduler = DDPMScheduler(num_train_timesteps=NUM_TIMESTEPS)
    optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

    # Train model
    train_model(model, dataloader, optimizer, scheduler, NUM_EPOCHS, DEVICE)

    # Generate and save 50 images
    generate_images(model, scheduler, DEVICE, NUM_IMAGES)

if __name__ == "__main__":
    main()
