In [None]:
# prompt: mount google drive

from google.colab import drive
drive.mount('/content/gdrive')


Mounted at /content/gdrive


##Basic_Gan_Implementation_color_images

In [None]:
import os
import json
import PIL
from PIL import Image, UnidentifiedImageError
import torch
import torchvision
import torch.utils.data
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.utils as vutils
import matplotlib.pyplot as plt

class FashionIQ(torch.utils.data.Dataset):
    """FashionIQ dataset for image-based GAN training, robust against file errors."""

    def __init__(self, root_dir, split='train', transform=None):
        self.root_dir = root_dir
        self.split = split
        self.transform = transform
        self.imgs = []
        self._prepare_dataset()

    def _prepare_dataset(self):
        splits = ['train', 'val', 'test']
        if self.split not in splits:
            raise ValueError(f"Invalid split: {self.split}. Choose from {splits}.")

        for category in os.listdir(self.root_dir):
            category_dir = os.path.join(self.root_dir, category)
            if os.path.isdir(category_dir):
                self._load_data(category, category_dir)

    def _load_data(self, category, category_dir):
        json_file = f"cap.{category}.{self.split}.json"
        json_path = os.path.join(self.root_dir, json_file)
        if not os.path.exists(json_path):
            print(f"Skipping missing file: {json_path}")
            return

        with open(json_path, 'r') as f:
            captions_data = json.load(f)

        for item in captions_data:
            img_path = os.path.join(category_dir, f"{item['candidate']}.jpg")
            if os.path.exists(img_path):
                data = {'image_path': img_path}
                self.imgs.append(data)

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        data = self.imgs[idx]
        try:
            image = PIL.Image.open(data['image_path']).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image
        except UnidentifiedImageError:
            print(f"Warning: Skipping corrupt image file {data['image_path']}")
            return None  # Return None for corrupt images

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.main = nn.Sequential(
            nn.ConvTranspose2d(100, 1024, 4, 1, 0, bias=False),
            nn.BatchNorm2d(1024),
            nn.ReLU(True),
            nn.ConvTranspose2d(1024, 512, 4, 2, 1, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.ConvTranspose2d(512, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.ConvTranspose2d(256, 128, 4, 2, 1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 3, 4, 2, 1, bias=False),
            nn.Tanh()
        )

    def forward(self, input):
        return self.main(input)

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.main = nn.Sequential(
            nn.Conv2d(3, 128, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 512, 4, 2, 1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(512, 1024, 4, 2, 1, bias=False),
            nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(1024, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.main(input).view(-1)

# Set up device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# Initialize models
generator = Generator().to(device)
discriminator = Discriminator().to(device)

# Optimizers
g_optimizer = optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
d_optimizer = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

# Loss function
criterion = nn.BCELoss()

# Dataset and DataLoader
transform = transforms.Compose([
    transforms.Resize(64),
    transforms.CenterCrop(64),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
dataset_path = '/content/gdrive/MyDrive/dataset'  # Adjust this path to your dataset location
train_dataset = FashionIQ(root_dir=dataset_path, split='train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)

# Training Loop
num_epochs = 50
print("Starting training loop...")
for epoch in range(num_epochs):
    for i, real_data in enumerate(train_loader):
        if real_data is None:
            continue  # Skip batches with None due to corrupt images
        real_data = real_data.to(device)
        batch_size = real_data.size(0)

        # Train Discriminator
        discriminator.zero_grad()
        label = torch.full((batch_size,), 1, dtype=torch.float, device=device)
        output = discriminator(real_data)
        errD_real = criterion(output, label)
        errD_real.backward()

        noise = torch.randn(batch_size, 100, 1, 1, device=device)
        fake_data = generator(noise)
        label.fill_(0)
        output = discriminator(fake_data.detach())
        errD_fake = criterion(output, label)
        errD_fake.backward()
        errD = errD_real + errD_fake
        d_optimizer.step()

        # Train Generator
        generator.zero_grad()
        label.fill_(1)
        output = discriminator(fake_data)
        errG = criterion(output, label)
        errG.backward()
        g_optimizer.step()


        print(f'Epoch [{epoch + 1}/{num_epochs}] Batch {i + 1}/{len(train_loader)} Loss_D: {errD.item()} Loss_G: {errG.item()}')

print("Training complete.")


Using device: cuda
Starting training loop...


  self.pid = os.fork()


Epoch [1/50] Batch 1/218 Loss_D: 1.5410484075546265 Loss_G: 7.5607709884643555
Epoch [1/50] Batch 2/218 Loss_D: 2.9235188961029053 Loss_G: 10.675596237182617
Epoch [1/50] Batch 3/218 Loss_D: 0.561673641204834 Loss_G: 7.8566179275512695
Epoch [1/50] Batch 4/218 Loss_D: 1.5778430700302124 Loss_G: 15.371644973754883
Epoch [1/50] Batch 5/218 Loss_D: 0.641708254814148 Loss_G: 11.660337448120117
Epoch [1/50] Batch 6/218 Loss_D: 0.3663507103919983 Loss_G: 12.618114471435547
Epoch [1/50] Batch 7/218 Loss_D: 0.14332598447799683 Loss_G: 9.946295738220215
Epoch [1/50] Batch 8/218 Loss_D: 1.1090538501739502 Loss_G: 20.663406372070312
Epoch [1/50] Batch 9/218 Loss_D: 0.554741382598877 Loss_G: 20.672088623046875
Epoch [1/50] Batch 10/218 Loss_D: 0.2787984013557434 Loss_G: 17.911479949951172
Epoch [1/50] Batch 11/218 Loss_D: 0.03569328039884567 Loss_G: 10.094118118286133
Epoch [1/50] Batch 12/218 Loss_D: 1.9408588409423828 Loss_G: 21.81173324584961
Epoch [1/50] Batch 13/218 Loss_D: 0.3132490217685699

AttributeError: Caught AttributeError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/worker.py", line 308, in _worker_loop
    data = fetcher.fetch(index)
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/fetch.py", line 54, in fetch
    return self.collate_fn(data)
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/collate.py", line 277, in default_collate
    return collate(batch, collate_fn_map=default_collate_fn_map)
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/collate.py", line 121, in collate
    return collate_fn_map[elem_type](batch, collate_fn_map=collate_fn_map)
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/collate.py", line 171, in collate_tensor_fn
    numel = sum(x.numel() for x in batch)
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/collate.py", line 171, in <genexpr>
    numel = sum(x.numel() for x in batch)
AttributeError: 'NoneType' object has no attribute 'numel'


In [None]:
import os
import json
import PIL
from PIL import Image, UnidentifiedImageError
import torch
import torchvision
import torch.utils.data
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.utils as vutils
import matplotlib.pyplot as plt

class FashionIQ(torch.utils.data.Dataset):
    """FashionIQ dataset for image-based GAN training, robust against file errors."""

    def __init__(self, root_dir, split='train', transform=None):
        self.root_dir = root_dir
        self.split = split
        self.transform = transform
        self.imgs = []
        self._prepare_dataset()

    def _prepare_dataset(self):
        splits = ['train', 'val', 'test']
        if self.split not in splits:
            raise ValueError(f"Invalid split: {self.split}. Choose from {splits}.")

        for category in os.listdir(self.root_dir):
            category_dir = os.path.join(self.root_dir, category)
            if os.path.isdir(category_dir):
                self._load_data(category, category_dir)

    def _load_data(self, category, category_dir):
        json_file = f"cap.{category}.{self.split}.json"
        json_path = os.path.join(self.root_dir, json_file)
        if not os.path.exists(json_path):
            print(f"Skipping missing file: {json_path}")
            return

        with open(json_path, 'r') as f:
            captions_data = json.load(f)

        for item in captions_data:
            img_path = os.path.join(category_dir, f"{item['candidate']}.jpg")
            if os.path.exists(img_path):
                data = {'image_path': img_path}
                self.imgs.append(data)

    def __len__(self):
        return len(self.imgs)
    def __getitem__(self, idx):
        data = self.imgs[idx]
        try:
            image = PIL.Image.open(data['image_path']).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image
        except (UnidentifiedImageError, OSError):
            print(f"Warning: Skipping corrupt image file {data['image_path']}")
            return None  # Return None for corrupt images

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.main = nn.Sequential(
            nn.ConvTranspose2d(100, 1024, 4, 1, 0, bias=False),
            nn.BatchNorm2d(1024),
            nn.ReLU(True),
            nn.ConvTranspose2d(1024, 512, 4, 2, 1, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.ConvTranspose2d(512, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.ConvTranspose2d(256, 128, 4, 2, 1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 3, 4, 2, 1, bias=False),
            nn.Tanh()
        )

    def forward(self, input):
        return self.main(input)

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.main = nn.Sequential(
            nn.Conv2d(3, 128, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 512, 4, 2, 1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(512, 1024, 4, 2, 1, bias=False),
            nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(1024, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.main(input).view(-1)

# Set up device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# Initialize models
generator = Generator().to(device)
discriminator = Discriminator().to(device)

# Optimizers
g_optimizer = optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
d_optimizer = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

# Loss function
criterion = nn.BCELoss()

# Dataset and DataLoader
transform = transforms.Compose([
    transforms.Resize(64),
    transforms.CenterCrop(64),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
dataset_path = '/content/gdrive/MyDrive/dataset'  # Adjust this path to your dataset location
train_dataset = FashionIQ(root_dir=dataset_path, split='train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)

# Training Loop
num_epochs = 50
import torchvision.utils as vutils

# Training Loop
num_epochs = 500
print("Starting training loop...")
for epoch in range(num_epochs):
    for i, real_data in enumerate(train_loader):
        if real_data is None:
            continue  # Skip batches with None due to corrupt images
        real_data = real_data.to(device)
        batch_size = real_data.size(0)

        # Train Discriminator
        discriminator.zero_grad()
        label = torch.full((batch_size,), 1, dtype=torch.float, device=device)
        output = discriminator(real_data)
        errD_real = criterion(output, label)
        errD_real.backward()

        noise = torch.randn(batch_size, 100, 1, 1, device=device)
        fake_data = generator(noise)
        label.fill_(0)
        output = discriminator(fake_data.detach())
        errD_fake = criterion(output, label)
        errD_fake.backward()
        errD = errD_real + errD_fake
        d_optimizer.step()

        # Train Generator
        generator.zero_grad()
        label.fill_(1)
        output = discriminator(fake_data)
        errG = criterion(output, label)
        errG.backward()
        g_optimizer.step()

    print(f'Epoch [{epoch + 1}/{num_epochs}] Loss_D: {errD.item()} Loss_G: {errG.item()}')

    # Generate a sample image after each epoch
    with torch.no_grad():
            fixed_noise = torch.randn(64, 100, 1, 1, device=device)
            fake = generator(fixed_noise).detach().cpu()
    vutils.save_image(fake, f"epoch_{epoch + 1}.png", normalize=True)

print("Training complete.")


Using device: cuda
Starting training loop...
Epoch [1/500] Loss_D: 1.9787514209747314 Loss_G: 0.6498876214027405
Epoch [2/500] Loss_D: 0.5019299983978271 Loss_G: 2.8207039833068848
Epoch [3/500] Loss_D: 1.1446751356124878 Loss_G: 0.8222211599349976
Epoch [4/500] Loss_D: 0.7996985912322998 Loss_G: 2.5613584518432617
Epoch [5/500] Loss_D: 0.7452030181884766 Loss_G: 2.865910053253174
Epoch [6/500] Loss_D: 0.9504235982894897 Loss_G: 1.9435086250305176
Epoch [7/500] Loss_D: 1.0805575847625732 Loss_G: 2.7965683937072754
Epoch [8/500] Loss_D: 0.9529976844787598 Loss_G: 2.157181739807129
Epoch [9/500] Loss_D: 2.755718946456909 Loss_G: 0.6446815729141235
Epoch [10/500] Loss_D: 3.2558648586273193 Loss_G: 0.6228301525115967
Epoch [11/500] Loss_D: 0.5228728652000427 Loss_G: 2.9994914531707764
Epoch [12/500] Loss_D: 1.3940130472183228 Loss_G: 2.869013786315918
Epoch [13/500] Loss_D: 0.8948913812637329 Loss_G: 2.015883207321167
Epoch [14/500] Loss_D: 0.2810438275337219 Loss_G: 3.8624179363250732
Epo

KeyboardInterrupt: 

In [None]:
with torch.no_grad():
        fixed_noise = torch.randn(64, 100, 1, 1, device=device)
        fake = generator(fixed_noise).detach().cpu()
vutils.save_image(fake, f"epoch_{epoch + 1}.png", normalize=True)

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 4)

In [None]:
with torch.no_grad():
        fake = generator(fixed_input).detach().cpu()
vutils.save_image(fake, f"epoch_{epoch + 1}.png", normalize=True)