In [57]:
import os
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torch import autograd
from torch.autograd import Variable
from torchsummary import summary
from torchvision.utils import make_grid
import matplotlib.pyplot as plt

In [58]:
device = "cuda" if torch.cuda.is_available() else "cpu"

print("torch version: ", torch.__version__)
print("device: ", device)

torch version:  2.2.2+cpu
device:  cpu


In [59]:
def data_read(path):
    readed_data = []
    for image_name in os.listdir(path):
        image_path = os.path.join(path, image_name)
        image = Image.open(image_path)
        image = image.convert("RGB")
        readed_data.append(image)
    return readed_data

class Apple_Dataset(Dataset):
    def __init__(self, path, transform= None) -> None:
        super().__init__()
        self.transform = transform
        self.images = data_read(path)
        print("sizes: ", len(self.images))
    
    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        image = self.images[index]
        if self.transform:
            image = self.transform(image)
        return image
    

transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

In [60]:
red_apples_path = r"C:\Python_Projects\esaditional_GAN\conditional-GAN\datasett\red_apples"
green_apples_path = r"C:\Python_Projects\esaditional_GAN\conditional-GAN\datasett\green_apples"

batch_size = 16
epochs = 10
learning_rate = 1e-4

g_input_size = 4
g_encode_channel_sizes = [64, 32, 64, 128]
g_encode_kernel_sizes = [3, 3, 3, 3]

dropout = 0.1

g_decode_channel_sizes = [64, 32, 16, 3]
g_decode_kernel_sizes = [3, 3, 3, 3]
g_upconv_pad_s = [1, 1, 1, 1]

d_input_size = 6
d_channel_sizes = [16, 32, 64, 128]
d_kernel_sizes = [3, 3, 3, 3]
d_ann_sizes = [32768, 16, 8, 4]

In [61]:
red_dataset = Apple_Dataset(red_apples_path, transform=transform)
red_data_loader = torch.utils.data.DataLoader(red_dataset, batch_size, shuffle= True)

sizes:  64


In [62]:
green_dataset = Apple_Dataset(green_apples_path, transform = transform)
green_data_loader = torch.utils.data.DataLoader(green_dataset, batch_size, shuffle= True)

sizes:  64


In [63]:
red_dataset[0].size()

torch.Size([3, 256, 256])

In [64]:
green_dataset[6].size()

torch.Size([3, 256, 256])

### Generator and Discriminator blocks

In [65]:
class Generator(nn.Module):
    def __init__(self, 
                input_channels: int,
                encode_channel_sizes: list,
                encode_kernel_sizes: list,

                decode_channel_sizes: list, 
                decode_kernel_sizes: list,
                d_upconv_pad_s: list,
                 
                dropout: float, 
                ) -> None:
        
        self.input_channels: int = input_channels
        self.encode_channel_sizes: list = encode_channel_sizes
        self.encode_kernel_sizes: list = encode_kernel_sizes

        self.decode_channel_sizes: list = decode_channel_sizes
        self.decode_kernel_sizes: list = decode_kernel_sizes
        self.decode_upconv_padding_sizes: list = d_upconv_pad_s

        self.dropout: float = dropout

        super().__init__()
        
        self.model = nn.Sequential(
            nn.Conv2d(self.input_channels, self.encode_channel_sizes[0], self.encode_kernel_sizes[0], padding= 1), # 6-256-256 -> 16-256-256
            #nn.Conv2d(self.encode_channel_sizes[0], self.encode_channel_sizes[0], self.encode_channel_sizes[0], padding= "same"), # 16-256-256 -> 16-256-256
            nn.ReLU(),
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 16-256-256 -> 16-128-128
            nn.Dropout(self.dropout),

            nn.Conv2d(self.encode_channel_sizes[0], self.encode_channel_sizes[1], self.encode_kernel_sizes[1], padding= 1), #16-128-128 -> 32-128-128
            #nn.Conv2d(self.encode_channel_sizes[1], self.encode_channel_sizes[1], self.encode_kernel_sizes[1], padding= "same"), #32-128-128 -> 32-128-128
            nn.ReLU(),
            nn.MaxPool2d(kernel_size= 2, stride= 2), #32-128-128 -> 32-64-64
            nn.Dropout(self.dropout),    

            nn.Conv2d(self.encode_channel_sizes[1], self.encode_channel_sizes[2], self.encode_kernel_sizes[2], padding= 1), # 32-64-64 -> 64-64-64
            #nn.Conv2d(self.encode_channel_sizes[2], self.encode_channel_sizes[2], self.encode_kernel_sizes[2], padding= "same"), # 64-64-64 -> 64-64-64
            nn.ReLU(),
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 64-64-64 -> 64-32-32
            nn.Dropout(self.dropout),

            nn.Conv2d(self.encode_channel_sizes[2], self.encode_channel_sizes[3], self.encode_kernel_sizes[3], padding= 1), # 64-32-32 -> 128-32-32
            #nn.Conv2d(self.encode_channel_sizes[2], self.encode_channel_sizes[2], self.encode_kernel_sizes[2], padding= "same"), # 64-64-64 -> 64-64-64
            nn.ReLU(),
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 128-32-32 -> 128-16-16
            nn.Dropout(self.dropout),

            #here may come some ANN layers 
            #idk why :D

            nn.ConvTranspose2d(self.encode_channel_sizes[3], self.decode_channel_sizes[0], self.decode_kernel_sizes[0], stride= 2, padding= d_upconv_pad_s[0] ,output_padding= d_upconv_pad_s[0]), # 128-16-16 -> 64-32-32
            nn.Conv2d(self.decode_channel_sizes[0], self.decode_channel_sizes[0], self.decode_kernel_sizes[0], padding= 1), # 64-32-32 -> 64-32-32
            nn.ReLU(),
            nn.Dropout(self.dropout),
            
            nn.ConvTranspose2d(self.encode_channel_sizes[0], self.decode_channel_sizes[1], self.decode_kernel_sizes[1], stride= 2, padding= d_upconv_pad_s[1], output_padding= d_upconv_pad_s[1]), # 64-32-32 -> 32-64-64
            nn.Conv2d(self.decode_channel_sizes[1], self.decode_channel_sizes[1], self.decode_kernel_sizes[1], padding= 1), # 32-64-64 -> 32-64-64
            nn.ReLU(),
            nn.Dropout(self.dropout),

            nn.ConvTranspose2d(self.decode_channel_sizes[1], self.decode_channel_sizes[2], self.decode_kernel_sizes[2], stride= 2, padding= d_upconv_pad_s[2], output_padding= d_upconv_pad_s[2]), # 32-64-64 -> 16-128-128 
            nn.Conv2d(self.decode_channel_sizes[2], self.decode_channel_sizes[2], self.decode_kernel_sizes[2], padding= 1), # 16-128-128 -> 16-128-128
            nn.ReLU(),
            nn.Dropout(self.dropout),

            nn.ConvTranspose2d(self.decode_channel_sizes[2], self.decode_channel_sizes[3], self.decode_kernel_sizes[3], stride= 2, padding= d_upconv_pad_s[3], output_padding = d_upconv_pad_s[3]), # 16-128-128 -> 3-256-256
            nn.Sigmoid(),
            )
        
    def forward(self, noise: torch.Tensor, lable_image: torch.Tensor):
        
        x = torch.cat([noise, lable_image], 1)

        out = self.model(x)

        return out

In [66]:
class Discriminator(nn.Module):
    def __init__(self,
                 input_chanel: list,
                 channel_sizes: list,
                 kernel_sizes: list,
                 
                 ann_layer_sizes: list,
                 dropout: float
                 ) -> None:
        
        self.input_channel: int = input_chanel
        self.channel_sizes: list = channel_sizes
        self.kernel_sizes: list  = kernel_sizes

        self.ann_layer_sizes: list = ann_layer_sizes
        self.dropout: float = dropout 

        super().__init__()
        
        self.model = nn.Sequential(
            nn.Conv2d(self.input_channel, self.channel_sizes[0], self.kernel_sizes[0], padding= "same"), # 6-256-256 -> 16-256-256
            #nn.Conv2d(self.channel_sizes[0], self.channel_sizes[0], self.kernel_sizes[0], padding= "same"), # 16-256-256 -> 16-256-256
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 16-256-256 -> 16-128-128

            nn.Conv2d(self.channel_sizes[0], self.channel_sizes[1], self.kernel_sizes[1], padding= "same"), # 16-128-128 -> 32-128-128
            #nn.Conv2d(self.channel_sizes[1], self.channel_sizes[1], self.kernel_sizes[1], padding= "same"), # 32-128-128 -> 32-128-128
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 32-128-128 - 32-64-64

            nn.Conv2d(self.channel_sizes[1], self.channel_sizes[2], self.kernel_sizes[2], padding= "same"), # 32-64-64 -> 64-64-64
            #nn.Conv2d(self.channel_sizes[2], self.channel_sizes[2], self.kernel_sizes[2], padding= "same"), # 64-64-64 -> 64-64-64
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 64-64-64 -> 64-32-32
            
            nn.Conv2d(self.channel_sizes[2], self.channel_sizes[3], self.kernel_sizes[3], padding= "same"), # 64-32-32 -> 128-32-32
            #nn.Conv2d(self.channel_sizes[3], self.channel_sizes[3], self.kernel_sizes[3], padding= "same"), # 128-32-32 -> 128-32-32
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 128-32-32 -> 128-16-16

            nn.Flatten(),

            nn.Linear(self.ann_layer_sizes[0], self.ann_layer_sizes[1]),
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.Linear(self.ann_layer_sizes[1], self.ann_layer_sizes[2]),    
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.Linear(self.ann_layer_sizes[2], self.ann_layer_sizes[3]),
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.Linear(self.ann_layer_sizes[3], 1),
            nn.Sigmoid()

        )
    
    def forward(self, real_image, generated_image):
        x = torch.cat([real_image, generated_image], 1)
        out = self.model(x)
        
        return out.squeeze()

    #its gonna be sequnce of conv2d layers an dense layersc  
    #conv2d will demolishing the image then dense layers with 1 output layer proces the image
    #probably :D

### Train Function

In [67]:
generator = Generator(g_input_size, g_encode_channel_sizes, g_encode_kernel_sizes,
                      g_decode_channel_sizes, g_decode_kernel_sizes, g_upconv_pad_s,
                      dropout).to(device)

discriminator = Discriminator(d_input_size, d_channel_sizes, d_kernel_sizes,
                              d_ann_sizes, dropout).to(device)

In [68]:
# Loss Function
criterion = nn.BCELoss()

In [69]:
#Optimizer
generator_optimizer = torch.optim.Adam(generator.parameters(), lr = learning_rate)
discriminator_optimizer = torch.optim.Adam(discriminator.parameters(),lr = learning_rate)

In [70]:
def generator_train_step(batch_size: int, instance_shape: list, discriminator, generator, generator_optimizer, criterion, label_image):

    generator_optimizer.zero_grad()

    noise = torch.randn(batch_size, 1, instance_shape[0], instance_shape[1]).to(device)
    
    altered_image = generator(noise, label_image)
    
    validity = discriminator(altered_image, label_image)

    generator_loss = criterion(validity, torch.ones(batch_size).to(device))
    
    generator_loss.backward()

    generator_optimizer.step()

    return generator_loss.data

In [71]:
def discriminator_train_step(batch_size, discriminator, generator, discriminator_optimizer, 
                             criterion, real_image, other_real_image, instance_shape):
    
    discriminator_optimizer.zero_grad()

    real_validity = discriminator(real_image, other_real_image)
    
    real_loss = criterion(real_validity, torch.ones(batch_size).to(device))

    noise = torch.randn(batch_size, 1, instance_shape[0], instance_shape[1])

    altered_images = generator(noise, other_real_image)

    altered_image_validity = discriminator(altered_images, other_real_image)

    altered_image_loss = criterion(altered_image_validity, torch.ones(batch_size).to(device))

    discriminator_loss = altered_image_loss + real_loss

    discriminator_loss.backward()

    discriminator_optimizer.step()

    return discriminator_loss.data

In [72]:
for epoch in range(epochs):

    print("Starting epoch {}...".format(epoch + 1))

    for red_apples, green_apples in zip(red_data_loader, green_data_loader):
        
        red_apples = red_apples.to(device) 
        green_apples = green_apples.to(device)

        generator.train()

        discriminator_loss = discriminator_train_step(batch_size, discriminator,
                                          generator, discriminator_optimizer,
                                          criterion, red_apples, green_apples, (256, 256))
        
        generator_loss = generator_train_step(batch_size, (256, 256), discriminator, 
                                      generator, generator_optimizer, criterion, green_apples)

        
    generator.eval()

    print("generator loss: {}, discriminator loss: {}".format(generator_loss, discriminator_loss))
    print(" ")


Starting epoch 1...
generator loss: 0.6321700811386108, discriminator loss: 1.2596346139907837
 
Starting epoch 2...
generator loss: 0.6124083995819092, discriminator loss: 1.2324804067611694
 
Starting epoch 3...
generator loss: 0.5943565368652344, discriminator loss: 1.2144768238067627
 
Starting epoch 4...
generator loss: 0.5727418661117554, discriminator loss: 1.1530122756958008
 
Starting epoch 5...
generator loss: 0.5280516147613525, discriminator loss: 1.0682580471038818
 
Starting epoch 6...
generator loss: 0.5336783528327942, discriminator loss: 0.9870407581329346
 
Starting epoch 7...
generator loss: 0.3706742227077484, discriminator loss: 0.7750039100646973
 
Starting epoch 8...
generator loss: 0.27428731322288513, discriminator loss: 0.7374327182769775
 
Starting epoch 9...
generator loss: 0.13542881608009338, discriminator loss: 0.4944491982460022
 
Starting epoch 10...
generator loss: 0.3925681710243225, discriminator loss: 0.3606579899787903
 


#### Predictions

In [73]:
green_test_path = r"C:\Python_Projects\esaditional_GAN\conditional-GAN\test_data\green\real_green_apple_ss_65.jpeg"
red_test_path = r"C:\Python_Projects\esaditional_GAN\conditional-GAN\test_data\red"

In [74]:
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.ToTensor(),
])

In [77]:
img_green = Image.open(green_test_path)

input_tensor = preprocess(img_green)
input_batch = input_tensor.unsqueeze(0)
input_batch = input_batch.to(device)

noise = torch.randn(1, 1, 256, 256)

In [78]:
sample_img_tensor = generator(noise, input_batch).unsqueeze(1).data.cpu()
sample_img = sample_img_tensor[0][0]

In [79]:
sample_img

tensor([[[0.5154, 0.5111, 0.5120,  ..., 0.5122, 0.5148, 0.5198],
         [0.5223, 0.5268, 0.5344,  ..., 0.5171, 0.5236, 0.5190],
         [0.5116, 0.5139, 0.5150,  ..., 0.5071, 0.5153, 0.5170],
         ...,
         [0.5203, 0.5294, 0.5256,  ..., 0.5196, 0.5211, 0.5188],
         [0.5127, 0.5182, 0.5132,  ..., 0.5166, 0.5144, 0.5200],
         [0.5211, 0.5219, 0.5231,  ..., 0.5192, 0.5212, 0.5218]],

        [[0.4457, 0.4597, 0.4543,  ..., 0.4527, 0.4505, 0.4475],
         [0.4510, 0.4794, 0.4550,  ..., 0.4691, 0.4529, 0.4522],
         [0.4521, 0.4652, 0.4547,  ..., 0.4482, 0.4521, 0.4466],
         ...,
         [0.4525, 0.4673, 0.4565,  ..., 0.4648, 0.4533, 0.4535],
         [0.4519, 0.4583, 0.4539,  ..., 0.4536, 0.4531, 0.4482],
         [0.4545, 0.4584, 0.4559,  ..., 0.4556, 0.4560, 0.4499]],

        [[0.5352, 0.5303, 0.5286,  ..., 0.5312, 0.5355, 0.5321],
         [0.5497, 0.5464, 0.5522,  ..., 0.5404, 0.5433, 0.5397],
         [0.5304, 0.5336, 0.5323,  ..., 0.5352, 0.5349, 0.

In [80]:
transform = transforms.ToPILImage()

In [81]:
img = transform(sample_img)

In [82]:
img.show()