In [0]:
import os
import random
import pathlib
import tempfile
import time
import copy
import torch.optim.lr_scheduler as lr_scheduler
from sklearn.metrics import classification_report
import numpy as np
from functools import partial
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torchvision.models import resnet18, ResNet18_Weights
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset, Subset
import matplotlib.pyplot as plt
import pandas as pd

In [0]:
BASE_DIR= pathlib.Path('/Volumes/pmr_dev/lead_ingots/lead_ingot_images')
image_dir= BASE_DIR / 'Ingot'
label_dir= BASE_DIR / 'Labels/Labels.csv'
df = pd.read_csv(label_dir)
display(df)



In [0]:
file_names=df["Image"].tolist()
print(file_names)

In [0]:
# Load all images at once
def load_images(image_names):
    images = []
    for image_name in image_names:
        image_path = image_dir / image_name
        image = Image.open(image_path).convert('RGB')
        images.append(image)
    return images
# Load all images
images = load_images(file_names)

In [0]:
# Define the function to encode labels
def encode_labels(file_list):
    # Extract image names
    image_names = file_list['Image']
    
    # Combine fault categories into a single label
    # Create a label string by concatenating fault category names where the value is True
    labels = file_list.drop(columns=['Image']).apply(
        lambda row: '_'.join(row.index[row == True]), axis=1
    )
    
    # Encode the labels using LabelEncoder
    le = LabelEncoder()
    encoded_labels = le.fit_transform(labels)
    
    # Create a dictionary mapping image names to encoded labels
    label_dict = dict(zip(image_names, encoded_labels))

    label_mapping = dict(zip(le.transform(le.classes_),le.classes_ ))
    
    return label_dict, label_mapping

# Encode the labels
label_dict, label_mapping = encode_labels(df)

for fault_category, label_number in label_mapping.items():
    print(f"{label_number}: {fault_category}")

value_to_find ="Goede blok"
Goed_label = next((k for k, v in label_mapping.items() if v == value_to_find), None)
print(Goed_label)

In [0]:
label_counts= [0]*len(label_mapping)
for name, label in label_dict.items():
    label_counts[label] += 1
for i, count in enumerate(label_counts):
    print(f"{label_mapping[i]}: {count}")




In [0]:
print(images[0])

In [0]:
# Custom Dataset Class
class LeadIngotDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.labels = labels
        self.images = images  
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

In [0]:
transform = transforms.Compose([
    transforms.Resize((2048, 512)), #alexnet, vgg16 input size
    transforms.ToTensor()
])

In [0]:
# Create the dataset
labels = [label_dict[img] for img in file_names]

In [0]:
batch_size= 32

# Filter images and labels to include only label 7
filtered_indices = [i for i, label in enumerate(labels) if label == Goed_label]
filtered_images = [images[i] for i in filtered_indices]
filtered_labels = [labels[i] for i in filtered_indices]

goede_dataset = LeadIngotDataset(filtered_images,filtered_labels, transform)

train_val_indices, test_indices = train_test_split(
    range(len(goede_dataset)), test_size=0.2, stratify=filtered_labels, random_state=42
)

# Then split train+val into train and validation sets
train_indices, val_indices = train_test_split(
    train_val_indices, test_size=0.125, stratify=[filtered_labels[i] for i in train_val_indices], random_state=42
)

# Create Subset datasets
goede_train_dataset = Subset(goede_dataset, train_indices)
goede_val_dataset = Subset(goede_dataset, val_indices)
goede_test_dataset = Subset(goede_dataset, test_indices)

# Create DataLoaders
goede_train_loader = DataLoader(goede_train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
goede_val_loader = DataLoader(goede_val_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)
goede_test_loader = DataLoader(goede_test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

goede_loader = DataLoader(goede_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)
# Print statistics
train_labels = [filtered_labels[i] for i in train_indices]
val_labels = [filtered_labels[i] for i in val_indices]
test_labels = [filtered_labels[i] for i in test_indices]

print(f"Train set size: {len(goede_train_dataset)} frequenties: {torch.bincount(torch.tensor(train_labels))}")
print(f"Validation set size: {len(goede_val_dataset)} frequenties: {torch.bincount(torch.tensor(val_labels))}")
print(f"Test set size: {len(goede_test_dataset)} frequenties: {torch.bincount(torch.tensor(test_labels))}")

In [0]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
            # Level 1
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # Level 2
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # Level 3
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.flatten = nn.Flatten()
        self.fc_mu = nn.Linear(16384, 50)  # Mean of the latent space
        self.fc_logvar = nn.Linear(16384, 50)  # Log-variance of the latent space

    def forward(self, x):
        x = self.encoder(x)
        x = self.flatten(x)
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar

class Decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(50, 16384)  # Map latent space back to feature space
        self.unflatten = nn.Unflatten(dim=1, unflattened_size=(256, 256, 64))
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 3, kernel_size=3, padding=1),
            nn.Sigmoid(),  # Scale output to [0, 1]
        )

    def forward(self, x):
        x = self.fc1(x)
        x = self.unflatten(x)
        x = self.decoder(x)
        return x

class VAE(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)  # Sample epsilon ~ N(0, 1)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decoder(z)
        return x_recon, mu, logvar

# Loss Function for VAE
def vae_loss(recon_x, x, mu, logvar):
    # Reconstruction Loss
    recon_loss = F.mse_loss(recon_x, x, reduction='sum')
    # KL Divergence
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + kl_loss


In [0]:
model=VAE()

device = torch.device("cuda:0")
#device = torch.device("cpu")

model = model.to(device)

In [0]:
# Define the loss function and optimizer
loss_fn = nn.MSELoss()  # Mean Squared Error loss for autoencoder
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

def train_autoencoder(model, num_epochs, train_dl, valid_dl, patience):
    train_loss_hist = [0] * num_epochs
    valid_loss_hist = [0] * num_epochs
    patience_counter = 0
    best_model_wts = model.state_dict()

    for epoch in range(num_epochs):
        start_time = time.time()
        print(f"\nEpoch {epoch + 1}/{num_epochs}")

        # Training Phase
        model.train()
        train_loss = 0
        for x_batch, _ in train_dl:
            x_batch = x_batch.to(device)

            # Forward pass
            outputs = model(x_batch)
            loss = loss_fn(outputs, x_batch)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * x_batch.size(0)

        # Calculate average training loss
        train_loss /= len(train_dl.dataset)
        train_loss_hist[epoch] = train_loss

        # Validation Phase
        model.eval()
        valid_loss = 0
        with torch.no_grad():
            for x_batch, _ in valid_dl:
                x_batch = x_batch.to(device)

                # Forward pass
                outputs = model(x_batch)
                loss = loss_fn(outputs, x_batch)

                valid_loss += loss.item() * x_batch.size(0)

        # Calculate average validation loss
        valid_loss /= len(valid_dl.dataset)
        valid_loss_hist[epoch] = valid_loss

        end_time = time.time()
        epoch_duration = end_time - start_time

        # Early Stopping
        if epoch > 0 and valid_loss_hist[epoch] > min(valid_loss_hist[:epoch]):
            patience_counter += 1
        else:
            patience_counter = 0
            best_model_wts = model.state_dict()

        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch + 1}")
            model.load_state_dict(best_model_wts)
            break

        # Epoch Summary
        print(f"Epoch {epoch + 1}/{num_epochs} Summary:")
        print(f"  Training - Loss: {train_loss_hist[epoch]:.4f}")
        print(f"  Validation - Loss: {valid_loss_hist[epoch]:.4f}")
        print(f"  Duration: {epoch_duration:.2f}s")

    return train_loss_hist, valid_loss_hist

# Training parameters
num_epochs = 1000
train_loss_hist, valid_loss_hist = train_autoencoder(model, num_epochs, goede_train_loader, goede_val_loader, patience=5)

In [0]:
torch.save(model.state_dict(), 'VAE.pth')

In [0]:
# Load the trained model
model = VAE().to(device)
model.load_state_dict(torch.load('VAE.pth'))
model.eval()

In [0]:
import matplotlib.pyplot as plt
import numpy as np
x_arr = np.arange(len(train_loss_hist)) + 1

fig = plt.figure(figsize=(12, 4))
ax = fig.add_subplot(1, 2, 1)
ax.plot(x_arr, train_loss_hist, '-o', label='Train loss')
ax.plot(x_arr, valid_loss_hist, '--<', label='Validation loss')
ax.set_xlabel('Epoch', size=15)
ax.set_ylabel('Loss', size=15)


plt.show()

In [0]:
# Filter images and labels to include only label 8
faulty_indices = [i for i, label in enumerate(labels) if label != Goed_label]
faulty_images = [images[i] for i in faulty_indices]
faulty_labels = [labels[i] for i in faulty_indices]
faulty_subset = LeadIngotDataset(faulty_images, faulty_labels, transform=transform)

# Create a DataLoader for the no_faulty subset
faulty_loader = DataLoader(faulty_subset, batch_size=6, shuffle=False)

In [0]:
len(faulty_indices)


In [0]:
import matplotlib.pyplot as plt
resize_transform = transforms.Resize((500, 1880))
# Function to display images
def denormalize(tensor):
    mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
    std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
    return tensor * std + mean
def imshow(img, ax):
    #img = denormalize(img)
    img=resize_transform(img)
    npimg = img.numpy()
    ax.imshow(np.transpose(npimg, (1, 2, 0)))

# Get a batch of images from the no_defect_val_loader
dataiter = iter(goede_test_loader)
print(len(dataiter))
validation_images, _ = next(dataiter)

# Move the images to the device
validation_images = validation_images.to(device)

# Get the reconstructed images
model.eval()
with torch.no_grad():
    reconstructed = model(validation_images)

# Move the images back to CPU for plotting
validation_images = validation_images.cpu()
reconstructed = reconstructed.cpu()

# Plot the original and reconstructed images
fig, axes = plt.subplots(nrows=2, ncols=3, figsize=(24, 8))

for i in range(3):
    # Original images
    ax = axes[0, i]
    imshow(validation_images[i], ax)
    ax.axis('off')
    if i == 0:
        ax.set_title('Original')

    # Reconstructed images
    ax = axes[1, i]
    imshow(reconstructed[i], ax)
    ax.axis('off')
    if i == 0:
        ax.set_title('Reconstructed')

plt.show()

In [0]:
from torch.utils.data import DataLoader, Subset
import torch



# Get a batch of images from the no_defect_val_loader
dataiter2 = iter(faulty_loader)
faulty_images_val, _ = next(dataiter2)

# Move the images to the device
faulty_images_val = faulty_images_val.to(device)

# Get the reconstructed images
model.eval()
with torch.no_grad():
    reconstructed = model(faulty_images_val)

# Move the images back to CPU for plotting
faulty_images_val = faulty_images_val.cpu()
reconstructed = reconstructed.cpu()

# Plot the original and reconstructed images
fig, axes = plt.subplots(nrows=2, ncols=3, figsize=(24, 8))

for i in range(3):
    # Original images
    ax = axes[0, i]
    imshow(faulty_images_val[i], ax)
    ax.axis('off')
    if i == 0:
        ax.set_title('Original')

    # Reconstructed images
    ax = axes[1, i]
    imshow(reconstructed[i], ax)
    ax.axis('off')
    if i == 0:
        ax.set_title('Reconstructed')

plt.show()
import torch.nn.functional as F

# Calculate reconstruction errors
reconstruction_errors = F.mse_loss(reconstructed, faulty_images_val, reduction='none')
reconstruction_errors = reconstruction_errors.mean(dim=[1, 2, 3])
print(reconstruction_errors)


In [0]:
import torch.nn.functional as F
import matplotlib.pyplot as plt
# Function to calculate reconstruction errors
def calculate_reconstruction_errors(loader):
    all_errors = []
    for images, _ in loader:
        images = images.to(device)
        with torch.no_grad():
            reconstructed = model(images)
        images = images.cpu()
        reconstructed = reconstructed.cpu()
        errors = F.mse_loss(reconstructed, images, reduction='none')
        errors = errors.mean(dim=[1, 2, 3])
        all_errors.extend(errors.numpy())
    return all_errors

# Calculate reconstruction errors for faulty and no_faulty images
faulty_errors = calculate_reconstruction_errors(faulty_loader)
no_faulty_errors = calculate_reconstruction_errors(goede_loader)

# Plot the reconstruction errors
plt.figure(figsize=(10, 5))
plt.hist(faulty_errors, bins=30, alpha=0.5, label='Faulty')
plt.hist(no_faulty_errors, bins=30, alpha=0.5, label='No Faulty')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
plt.title('Reconstruction Errors for Faulty and No Faulty Images')
plt.show()

In [0]:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

# Zet je model in eval-mode
model.eval()

# Lijst voor de latente representaties en de bijbehorende labels
latent_representations = []


# Functie om latente ruimte te extraheren uit beide dataloaders
def extract_latent_space(dataloader, category_label):
    with torch.no_grad():
        for data in dataloader:
            inputs, _ = data  # Neem de inputs, labels zijn niet nodig voor latente ruimte
            inputs = inputs.to(device)  # Zet de data op het juiste apparaat

            # Haal de latente representatie op
            encoded = model.encoder(inputs)
            encoded = encoded.view(encoded.size(0), -1)  # Flatten de latente ruimte

            # Voeg de latente representatie toe aan de lijst, samen met de label
            latent_representations.append(encoded.cpu().numpy())


# Veronderstel dat de dataloaders dataloaders1 en dataloaders2 de verschillende categorieën bevatten
extract_latent_space(goede_loader, category_label=0)  # Categorie 1
extract_latent_space(faulty_loader, category_label=1)  # Categorie 2

# Zet de latente representaties om naar een numpy array
latent_representations = np.vstack(latent_representations)

# Stap 3: Dimensionality Reduction (bijv. PCA of t-SNE)

# Probeer eerst PCA om de data naar 2D te reduceren
pca = PCA(n_components=2)
reduced_data = pca.fit_transform(latent_representations)

# Je kunt ook t-SNE proberen voor een betere visualisatie
# t_sne = TSNE(n_components=2, random_state=42)
# reduced_data = t_sne.fit_transform(latent_representations)
goede_reduced_data=reduced_data[0:len(goede_loader.dataset)-1]
slechte_reduced_data=reduced_data[len(goede_loader.dataset)-1:]

# Stap 4: Visualiseer de latente ruimte
plt.figure(figsize=(8, 6))

# Plot de verschillende categorieën met verschillende kleuren
plt.scatter(goede_reduced_data[:, 0], goede_reduced_data[:, 1], color='r', label='Categorie 1', alpha=0.5)
plt.scatter(slechte_reduced_data[:, 0], slechte_reduced_data[:, 1], color='b', label='Categorie 2', alpha=0.5)

plt.title('Latente Ruimte (2D weergave)')
plt.xlabel('Hoofdbestanddeel 1')
plt.ylabel('Hoofdbestanddeel 2')
plt.legend()
plt.show()
