In [13]:
import numpy as np
import pandas as pd
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision.transforms as transforms
from torchvision.utils import save_image
import matplotlib.pyplot as plt

In [14]:
# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [15]:
# Read the dataset
DataSolarModules = pd.read_json('InfraredSolarModules/module_metadata.json').transpose().sort_index()

# Define classes and map them to numbers
Classes = DataSolarModules['anomaly_class'].unique()
class_to_number = {v: k for k, v in enumerate(Classes)}

# Map class to number
DataSolarModules['class_code'] = DataSolarModules['anomaly_class'].map(class_to_number)

# Define functions to read images and labels
def read_images_dataframe(dataframe):
    images = []
    for image_path in dataframe['image_filepath']:
        img = cv2.imread("InfraredSolarModules/"+image_path, cv2.IMREAD_GRAYSCALE)
        img = img.reshape(40, 24).astype("float32") / 255
        images.append(img)
    images = np.array(images) 
    return images

def read_labels_dataframe(dataframe):
    labels = dataframe['class_code'].values.astype("int64")
    return labels

# Read images and labels
images = read_images_dataframe(DataSolarModules)
labels = read_labels_dataframe(DataSolarModules)

# Convert to PyTorch tensors
images_tensor = torch.tensor(images).to(device)
labels_tensor = torch.tensor(labels).to(device)

# Define dataset and dataloader
dataset = TensorDataset(images_tensor, labels_tensor)
batch_size = 64
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [16]:
# Define GAN architectures
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(100, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, 1024),
            nn.LeakyReLU(0.2),
            nn.Linear(1024, 40 * 24),
            nn.Tanh()  # To get pixel values between -1 and 1
        )

    def forward(self, z):
        img = self.model(z)
        img = img.view(img.size(0), 1, 40, 24)
        return img

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(40 * 24, 1024),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(1024, 512),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, img):
        img_flat = img.view(img.size(0), -1)
        validity = self.model(img_flat)
        return validity

# Training function for GAN
def train_gan(generator, discriminator, gan_optimizer_G, gan_optimizer_D, device, train_loader, epochs=10, n_critic=5, clip_value=0.01):
    adversarial_loss = nn.BCELoss()

    for epoch in range(epochs):
        for i, (imgs, _) in enumerate(train_loader):

            # Adversarial ground truths
            valid = torch.ones(imgs.size(0), 1).to(device)
            fake = torch.zeros(imgs.size(0), 1).to(device)

            # Configure input
            real_imgs = imgs.type(torch.FloatTensor).to(device)

            # ----------------- #
            #  Train Generator  #
            # ----------------- #

            gan_optimizer_G.zero_grad()

            # Sample noise as generator input
            z = torch.randn(imgs.shape[0], 100).to(device)

            # Generate a batch of images
            gen_imgs = generator(z)

            # Loss measures generator's ability to fool the discriminator
            g_loss = adversarial_loss(discriminator(gen_imgs), valid)

            g_loss.backward()
            gan_optimizer_G.step()

            # --------------------- #
            #  Train Discriminator  #
            # --------------------- #

            for _ in range(n_critic):
                gan_optimizer_D.zero_grad()

                # Measure discriminator's ability to classify real from generated samples
                real_loss = adversarial_loss(discriminator(real_imgs), valid)
                fake_loss = adversarial_loss(discriminator(gen_imgs.detach()), fake)
                d_loss = (real_loss + fake_loss) / 2

                d_loss.backward()
                gan_optimizer_D.step()

                # Clip weights of discriminator
                for p in discriminator.parameters():
                    p.data.clamp_(-clip_value, clip_value)

            print(
                "[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]"
                % (epoch, epochs, i, len(train_loader), d_loss.item(), g_loss.item())
            )

In [17]:
# Define VAE architecture
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(40 * 24, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU()
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, 40 * 24),
            nn.Sigmoid()  # To get pixel values between 0 and 1
        )

        self.mu_layer = nn.Linear(64, 64)
        self.logvar_layer = nn.Linear(64, 64)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        x = x.view(-1, 40 * 24)
        h = self.encoder(x)
        mu = self.mu_layer(h)
        logvar = self.logvar_layer(h)
        z = self.reparameterize(mu, logvar)
        decoded = self.decoder(z)
        return decoded, mu, logvar

# Loss function for VAE
def vae_loss(recon_x, x, mu, logvar):
    BCE = nn.functional.binary_cross_entropy(recon_x, x.view(-1, 40 * 24), reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

# Training function for VAE
def train_vae(vae, train_loader, optimizer, device, epochs=10):
    vae.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch_idx, (data, _) in enumerate(train_loader):
            data = data.to(device)
            optimizer.zero_grad()
            recon_batch, mu, logvar = vae(data)
            loss = vae_loss(recon_batch, data, mu, logvar)
            loss.backward()
            total_loss += loss.item()
            optimizer.step()
        print('Epoch {}, Average Loss: {:.4f}'.format(epoch+1, total_loss / len(train_loader.dataset)))

In [18]:
# Initialize GAN models, optimizers, and train
generator = Generator().to(device)
discriminator = Discriminator().to(device)

optimizer_G = optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

train_gan(generator, discriminator, optimizer_G, optimizer_D, device, train_loader)

[Epoch 0/10] [Batch 0/313] [D loss: 0.689112] [G loss: 0.715350]
[Epoch 0/10] [Batch 1/313] [D loss: 0.666286] [G loss: 0.697730]
[Epoch 0/10] [Batch 2/313] [D loss: 0.532099] [G loss: 0.697460]
[Epoch 0/10] [Batch 3/313] [D loss: 0.398909] [G loss: 0.692653]
[Epoch 0/10] [Batch 4/313] [D loss: 0.372705] [G loss: 0.661742]
[Epoch 0/10] [Batch 5/313] [D loss: 0.363167] [G loss: 0.631651]
[Epoch 0/10] [Batch 6/313] [D loss: 0.355829] [G loss: 0.640226]
[Epoch 0/10] [Batch 7/313] [D loss: 0.326851] [G loss: 0.650024]
[Epoch 0/10] [Batch 8/313] [D loss: 0.308337] [G loss: 0.660954]
[Epoch 0/10] [Batch 9/313] [D loss: 0.360302] [G loss: 0.571409]
[Epoch 0/10] [Batch 10/313] [D loss: 0.517279] [G loss: 0.316550]
[Epoch 0/10] [Batch 11/313] [D loss: 0.504160] [G loss: 0.365892]
[Epoch 0/10] [Batch 12/313] [D loss: 0.297387] [G loss: 0.494384]
[Epoch 0/10] [Batch 13/313] [D loss: 0.160748] [G loss: 0.870459]
[Epoch 0/10] [Batch 14/313] [D loss: 0.130359] [G loss: 0.941914]
[Epoch 0/10] [Batch 

In [19]:
# Initialize VAE model, optimizer, and train
vae = VAE().to(device)
optimizer = optim.Adam(vae.parameters(), lr=1e-3)
train_vae(vae, train_loader, optimizer, device)

Epoch 1, Average Loss: 619.8375
Epoch 2, Average Loss: 605.3095
Epoch 3, Average Loss: 601.4353
Epoch 4, Average Loss: 599.8123
Epoch 5, Average Loss: 599.4522
Epoch 6, Average Loss: 599.2977
Epoch 7, Average Loss: 599.3355
Epoch 8, Average Loss: 599.1600
Epoch 9, Average Loss: 599.2575
Epoch 10, Average Loss: 599.2882


In [20]:
import os
from torchvision.transforms.functional import resize
import cv2
import matplotlib.pyplot as plt

# Function to generate and save images
def generate_images_per_class(model, model_type, device, n_samples=10, image_size=(40, 24)):
    model.eval()
    with torch.no_grad():
        for class_code, class_name in enumerate(Classes):
            # Create a folder for each class
            class_folder = os.path.join(save_dir, model_type, class_name)
            os.makedirs(class_folder, exist_ok=True)
            
            if model_type == "GAN":
                z = torch.randn(n_samples, 100).to(device)
                gen_imgs = model(z)
            elif model_type == "VAE":
                z = torch.randn(n_samples, 64).to(device)
                gen_imgs = model.decoder(z)
                # Reshape to include channel dimension
                gen_imgs = gen_imgs.view(-1, 1, image_size[0], image_size[1])
            else:
                raise ValueError("Invalid model type. Use 'GAN' or 'VAE'.")
            
            # Save individual images
            for i in range(n_samples):
                image = gen_imgs[i].cpu().numpy()
                
                # Save the image
                image = image.squeeze() * 255  # Scale to 0-255
                image = image.astype('uint8')  # Convert to uint8
                image_path = os.path.join(class_folder, f"{model_type}_generated_{i}.png")
                cv2.imwrite(image_path, image)

# Define a directory to save the generated images
save_dir = "generated_images"
os.makedirs(save_dir, exist_ok=True)

# Generate and save images for GAN
generate_images_per_class(generator, "GAN", device)

# Generate and save images for VAE
generate_images_per_class(vae, "VAE", device)
