In [1]:
import numpy as np
import pandas as pd
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision.transforms as transforms
from torchvision.utils import save_image
import matplotlib.pyplot as plt

In [2]:
# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
# Read the dataset
DataSolarModules = pd.read_json('InfraredSolarModules/module_metadata.json').transpose().sort_index()

# Define classes and map them to numbers
Classes = DataSolarModules['anomaly_class'].unique()
class_to_number = {v: k for k, v in enumerate(Classes)}

# Map class to number
DataSolarModules['class_code'] = DataSolarModules['anomaly_class'].map(class_to_number)

# Define functions to read images and labels
def read_images_dataframe(dataframe):
    images = []
    for image_path in dataframe['image_filepath']:
        img = cv2.imread("InfraredSolarModules/"+image_path, cv2.IMREAD_GRAYSCALE)
        img = img.reshape(40, 24).astype("float32") / 255
        images.append(img)
    images = np.array(images) 
    return images

def read_labels_dataframe(dataframe):
    labels = dataframe['class_code'].values.astype("int64")
    return labels

# Read images and labels
images = read_images_dataframe(DataSolarModules)
labels = read_labels_dataframe(DataSolarModules)

# Convert to PyTorch tensors
images_tensor = torch.tensor(images).to(device)
labels_tensor = torch.tensor(labels).to(device)

# Define dataset and dataloader
dataset = TensorDataset(images_tensor, labels_tensor)
batch_size = 64
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [4]:
# Define VAE architecture
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(40 * 24, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU()
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, 40 * 24),
            nn.Sigmoid()  # To get pixel values between 0 and 1
        )

        self.mu_layer = nn.Linear(64, 64)
        self.logvar_layer = nn.Linear(64, 64)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        x = x.view(-1, 40 * 24)
        h = self.encoder(x)
        mu = self.mu_layer(h)
        logvar = self.logvar_layer(h)
        z = self.reparameterize(mu, logvar)
        decoded = self.decoder(z)
        return decoded, mu, logvar

# Loss function for VAE
def vae_loss(recon_x, x, mu, logvar):
    BCE = nn.functional.binary_cross_entropy(recon_x, x.view(-1, 40 * 24), reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

# Training function for VAE
def train_vae(vae, train_loader, optimizer, device, epochs=100):
    vae.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch_idx, (data, _) in enumerate(train_loader):
            data = data.to(device)
            optimizer.zero_grad()
            recon_batch, mu, logvar = vae(data)
            loss = vae_loss(recon_batch, data, mu, logvar)
            loss.backward()
            total_loss += loss.item()
            optimizer.step()
        print('Epoch {}, Average Loss: {:.4f}'.format(epoch+1, total_loss / len(train_loader.dataset)))

In [5]:
# Initialize VAE model, optimizer, and train
vae = VAE().to(device)
optimizer = optim.Adam(vae.parameters(), lr=1e-3)
train_vae(vae, train_loader, optimizer, device)

Epoch 1, Average Loss: 620.0367
Epoch 2, Average Loss: 605.5377
Epoch 3, Average Loss: 602.5768
Epoch 4, Average Loss: 600.8785
Epoch 5, Average Loss: 600.4205
Epoch 6, Average Loss: 599.5613
Epoch 7, Average Loss: 599.3143
Epoch 8, Average Loss: 599.2555
Epoch 9, Average Loss: 599.2081
Epoch 10, Average Loss: 599.1544
Epoch 11, Average Loss: 599.1912
Epoch 12, Average Loss: 599.1432
Epoch 13, Average Loss: 599.1318
Epoch 14, Average Loss: 599.0873
Epoch 15, Average Loss: 599.1445
Epoch 16, Average Loss: 599.0844
Epoch 17, Average Loss: 599.0375
Epoch 18, Average Loss: 599.0902
Epoch 19, Average Loss: 599.0277
Epoch 20, Average Loss: 599.0960
Epoch 21, Average Loss: 599.0665
Epoch 22, Average Loss: 599.0282
Epoch 23, Average Loss: 599.0640
Epoch 24, Average Loss: 599.0196
Epoch 25, Average Loss: 599.0629
Epoch 26, Average Loss: 599.0078
Epoch 27, Average Loss: 599.0119
Epoch 28, Average Loss: 599.0772
Epoch 29, Average Loss: 599.0029
Epoch 30, Average Loss: 599.0051
Epoch 31, Average L

In [6]:
import os
import json

# Create directory for saving images and metadata
os.makedirs("InfraredSolarModules/imagesVAE", exist_ok=True)

# Function to save images and generate metadata
def save_images_and_metadata(vae, data, labels, device, metadata_path, generated_per_class):
    vae.eval()
    with torch.no_grad():
        # Encode images to latent space
        data = data.to(device)
        _, mu, _ = vae(data)
        mu = mu.cpu().numpy()

        metadata = {}
        generated_count = [0] * len(generated_per_class)
        for i in range(len(mu)):
            label = int(labels[i])
            if generated_count[label] < generated_per_class[label]:
                # Save image
                image_name = f"{i}.jpg"
                save_image(data[i].view(1, 1, 40, 24), os.path.join("InfraredSolarModules/imagesVAE", image_name))

                # Generate metadata entry
                metadata_entry = {
                    "image_filepath": f"imagesVAE/{image_name}",
                    "anomaly_class": Classes[label]
                }
                metadata[str(i)] = metadata_entry
                generated_count[label] += 1

        # Write metadata to JSON file
        with open(metadata_path, "w") as f:
            json.dump(metadata, f)

# Define metadata path
metadata_path = "InfraredSolarModules/VAE_metadata.json"

# Define the number of generated images per class
generated_per_class = [1000] * 12  # Example: 10 images per class

# Save images and metadata
save_images_and_metadata(vae, images_tensor, labels_tensor, device, metadata_path, generated_per_class)
