# Importy

In [None]:
%load_ext autoreload
%autoreload 2

import pandas as pd
import os
import glob
import numpy as np
import torch
from torch.utils.data import DataLoader, Subset
from torchvision import transforms
from sklearn.model_selection import  RepeatedStratifiedKFold
import kagglehub
from ResNet34 import ResNetTrainer
from sklearn.model_selection import RandomizedSearchCV
from DatasetClasses import CapsuleDataset
from wrappers import CNNVAEResNetEstimator,CNNVAEWrapper, CNNGANWrapper, CNNGANResNetEstimator
import matplotlib.pyplot as plt

In [None]:
# Parametry modelu
IMG_SIZE = 128
CHANNELS = 3
LATENT_DIM = 64
HIDDEN_DIM = 512
BATCH_SIZE = 16
EPOCHS = 100

# Konfiguracja
result_dir = 'results/'
if not os.path.exists(result_dir):
    os.makedirs(result_dir)

result_dir = 'results/GAN'
if not os.path.exists(result_dir):
    os.makedirs(result_dir)

result_dir = 'results/CNNGAN'
if not os.path.exists(result_dir):
    os.makedirs(result_dir)

result_dir = 'results/VAE'
if not os.path.exists(result_dir):
    os.makedirs(result_dir)

result_dir = 'results/CNNVAE'
if not os.path.exists(result_dir):
    os.makedirs(result_dir)


device = (
    torch.device("mps") if torch.backends.mps.is_available()
    else torch.device("cuda") if torch.cuda.is_available()
    else torch.device("cpu")
)

if torch.backends.mps.is_available():
    torch.mps.empty_cache()

print(f"Training device: {device}")

In [None]:
rskf = RepeatedStratifiedKFold(
    n_splits=5,
    n_repeats=2,
    random_state=42
)

# Przygotowania

In [None]:

transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomAffine(degrees=15, translate=(0.05,0.05), scale=(1.1,1.15), fill=255),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
])   

path = kagglehub.dataset_download("tladilebohang/capsule-defects")


pos_folder = os.path.join(path, "capsule/positive")
neg_folder = os.path.join(path, "capsule/negative")
pos_len=len(glob.glob(os.path.join(pos_folder, "*")))
print(pos_len)
neg_len=len(glob.glob(os.path.join(neg_folder, "*")))
print(neg_len)

# Generowanie wykresu krzywych uczenia dla metod do dobrania liczby epok

In [None]:
from sklearn.model_selection import train_test_split
from CNNVAE import CNNVAE
from CNNGAN import CNNGAN

model = CNNVAE(device=device,result_dir='results/CNNVAE',load_pretrained=True).to(device)
dataset = CapsuleDataset(pos_dir=pos_folder, neg_dirs=neg_folder, transform=transform)
full_indices = np.arange(len(dataset))
train_indices, test_indices = train_test_split(
    full_indices, test_size=0.2, random_state=42, shuffle=True
)
train_dataset = Subset(dataset, train_indices)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

train_losses = []

for epoch in range(1, 501):
    train_loss = model.train_vae(epoch=epoch, dataloader=train_loader)
    train_losses.append(train_loss)

    print(f'Epoch {epoch}: Train Loss = {train_loss:.4f}')


plt.figure(figsize=(8, 5))
plt.plot(range(1, 501), train_losses, label='Train Loss', marker='o')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(f'charts/vae_loss_plot.png')
plt.close()


gan=CNNGAN(IMG_SIZE,CHANNELS,LATENT_DIM,device,result_dir="results/CNNGAN",load_pretrained=True)

gen_losses = []
disc_losses = []

for epoch in range(1, 501):

    g_loss, d_loss = gan.train_gan(epoch, dataloader=train_loader)
    gen_losses.append(g_loss)
    disc_losses.append(d_loss)

    print(f'Epoch {epoch}: Generator Loss = {g_loss:.4f}, Discriminator Loss = {d_loss:.4f} \n')


plt.figure(figsize=(8, 5))
plt.plot(range(1, 501), gen_losses, label='Generator Loss', marker='o')
plt.plot(range(1, 501), disc_losses, label='Discriminator Loss', marker='s')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('GAN Training Losses Over Epochs')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(f'charts/gan_loss_plot.png')
plt.close()

# Eksperyment na datasecie bez oversamplingu

In [None]:
IMG_SIZE   = 128
BATCH_SIZE = 16
CHANNELS   = 3
EPOCHS = 25

dataset = CapsuleDataset(pos_dir=pos_folder, neg_dirs=neg_folder, transform=transform)
print(dataset.__len__())

labels = np.array([dataset[i][1] for i in range(len(dataset))])

results = []

for fold_idx, (train_idx, test_idx) in enumerate(rskf.split(X=np.zeros(len(labels)), y=labels), start=1):
    print(f"===== Fold {fold_idx} =====")

    # Subset + DataLoader
    train_ds = Subset(dataset, train_idx)
    test_ds  = Subset(dataset, test_idx)
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
    test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=True)
    trainer = ResNetTrainer()
    
    trainer.train(
        train_loader,
        num_epochs=EPOCHS,
    )

    f2, bal_acc, recall, specificity = trainer.validate(test_loader)
    print(f"Fold {fold_idx} Test Accuracy: {bal_acc:.4f}")

    results.append({
        'fold': fold_idx,
        'f2_score': f2,
        'balanced_accuracy': bal_acc,
        'recall': recall,
        'specificity': specificity
    })

#Zapis wyników z każdego folda
results_df = pd.DataFrame(results)
results_df.to_csv('crossvalidation_results/without_oversampling_cross_validation_results.csv', index=False)


# CNNVAE: Ekperyment 

In [None]:

IMG_SIZE   = 128
BATCH_SIZE = 16
CHANNELS   = 3
CLASSIFIER_EPOCHS = 25
OVERSAMPLER_EPOCHS = 200

dataset = CapsuleDataset(pos_dir=pos_folder, neg_dirs=neg_folder, transform=transform)
print(dataset.__len__())

labels = np.array([dataset[i][1] for i in range(len(dataset))])

results = []

for fold_idx, (train_idx, test_idx) in enumerate(rskf.split(X=np.zeros(len(labels)), y=labels), start=1):
    print(f"===== Fold {fold_idx} =====")

    
    CnnVae = CNNVAEWrapper(dataset,device,BATCH_SIZE,OVERSAMPLER_EPOCHS)
    
    CnnVae.fit(train_idx)


    estimator = CNNVAEResNetEstimator(
        dataset=dataset,
        vae_model=CnnVae.vae_model,
        device=device,
        batch_size=BATCH_SIZE,
        classifier_epochs=CLASSIFIER_EPOCHS
    )

    param_grid = {
        'mu_multiplier': [0.8, 1.2],
        'logvar_multiplier': [0.5, 1.5],
        'multiplier_generated_samples': [1/2,1, 2]
    }


    random_search = RandomizedSearchCV(
        estimator=estimator,
        param_distributions=param_grid,
        n_iter=8, 
        cv=None,
        verbose=2,
        n_jobs=1,
        random_state=42
    )
    
    random_search.fit(train_idx)

    best_estimator = random_search.best_estimator_
    best_params = random_search.best_params_

    test_ds = Subset(dataset, test_idx)
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)
    f2, bal_acc, recall, specificity = best_estimator.trainer.validate(test_loader)
    print(f"Fold {fold_idx} Test Accuracy: {bal_acc:.4f}")

    results.append({
        'fold': fold_idx,
        'f2_score': f2,
        'balanced_accuracy': bal_acc,
        'recall': recall,
        'specificity': specificity,
        **best_params  
    })




#Zapis wyników z każdego folda
results_df = pd.DataFrame(results)
results_df.to_csv('crossvalidation_results/CNNVAE_cross_validation_results.csv', index=False)
    

# CNNGAN: Eksperyment

In [None]:

IMG_SIZE   = 128
BATCH_SIZE = 16
CHANNELS   = 3
CLASSIFIER_EPOCHS = 15
OVERSAMPLER_EPOCHS = 250

dataset = CapsuleDataset(pos_dir=pos_folder, neg_dirs=neg_folder, transform=transform)
print(dataset.__len__())

labels = np.array([dataset[i][1] for i in range(len(dataset))])

results = []

for fold_idx, (train_idx, test_idx) in enumerate(rskf.split(X=np.zeros(len(labels)), y=labels), start=1):
    print(f"===== Fold {fold_idx} =====")


    CnnGan = CNNGANWrapper(dataset,device,BATCH_SIZE,OVERSAMPLER_EPOCHS)

    CnnGan.fit(train_idx)


    estimator = CNNGANResNetEstimator(
        dataset=dataset,
        gan_model=CnnGan.gan_model,
        device=device,
        batch_size=BATCH_SIZE,
        classifier_epochs=CLASSIFIER_EPOCHS
    )

    param_grid = {
        'scale_factor': [0.5, 1, 1.5],  # Czynnik skalujący szum w GAN
        'multiplier_generated_samples': [1/2,1, 2]
    }


    random_search = RandomizedSearchCV(
        estimator=estimator,
        param_distributions=param_grid,
        n_iter=6,
        cv=None,
        verbose=2,
        n_jobs=1,
        random_state=42
    )

    random_search.fit(train_idx)

    best_estimator = random_search.best_estimator_
    best_params = random_search.best_params_

    test_ds = Subset(dataset, test_idx)
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)
    f2, bal_acc, recall, specificity = best_estimator.trainer.validate(test_loader)
    print(f"Fold {fold_idx} Test Accuracy: {bal_acc:.4f}")

    results.append({
        'fold': fold_idx,
        'f2_score': f2,
        'balanced_accuracy': bal_acc,
        'recall': recall,
        'specificity': specificity,
        **best_params
    })




#Zapis wyników z każdego folda
results_df = pd.DataFrame(results)
results_df.to_csv('crossvalidation_results/CNNGAN_cross_validation_results.csv', index=False)

# CNNGAN: Generowanie jedynie syntetycznego zbioru

In [None]:
IMG_SIZE   = 128
BATCH_SIZE = 16
CHANNELS   = 3
CLASSIFIER_EPOCHS = 20
OVERSAMPLER_EPOCHS = 200

dataset = CapsuleDataset(pos_dir=pos_folder, neg_dirs=neg_folder, transform=transform)
print(dataset.__len__())

labels = np.array([dataset[i][1] for i in range(len(dataset))])

results = []

for fold_idx, (train_idx, test_idx) in enumerate(rskf.split(X=np.zeros(len(labels)), y=labels), start=1):
    print(f"===== Fold {fold_idx} =====")


    CnnGan = CNNGANWrapper(dataset,device,BATCH_SIZE,OVERSAMPLER_EPOCHS)

    CnnGan.fit(train_idx)


    estimator = CNNGANResNetEstimator(
        dataset=dataset,
        gan_model=CnnGan.gan_model,
        device=device,
        batch_size=BATCH_SIZE,
        classifier_epochs=CLASSIFIER_EPOCHS,
        multiplier_generated_samples='synthetic'
    )

    param_grid = {
        'scale_factor': [0.5, 1, 1.5]  # Czynnik skalujący szum w GAN
    }
    random_search = RandomizedSearchCV(
        estimator=estimator,
        param_distributions=param_grid,
        n_iter=2,
        cv=None,
        verbose=2,
        n_jobs=1,
        random_state=42
    )

    random_search.fit(train_idx)

    best_estimator = random_search.best_estimator_
    best_params = random_search.best_params_

    test_ds = Subset(dataset, test_idx)
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)
    f2, bal_acc, recall, specificity = best_estimator.trainer.validate(test_loader)
    print(f"Fold {fold_idx} Test Accuracy: {bal_acc:.4f}")

    results.append({
        'fold': fold_idx,
        'f2_score': f2,
        'balanced_accuracy': bal_acc,
        'recall': recall,
        'specificity': specificity,
        **best_params
    })




#Zapis wyników z każdego folda
results_df = pd.DataFrame(results)
results_df.to_csv('crossvalidation_results/CNNGAN_synthetic_cross_validation_results.csv', index=False)

# CNNVAE: Generowanie jedynie syntetycznego zbioru

In [None]:
IMG_SIZE   = 128
BATCH_SIZE = 16
CHANNELS   = 3
CLASSIFIER_EPOCHS = 20
OVERSAMPLER_EPOCHS = 200

dataset = CapsuleDataset(pos_dir=pos_folder, neg_dirs=neg_folder, transform=transform)
print(dataset.__len__())

labels = np.array([dataset[i][1] for i in range(len(dataset))])

results = []

for fold_idx, (train_idx, test_idx) in enumerate(rskf.split(X=np.zeros(len(labels)), y=labels), start=1):
    print(f"===== Fold {fold_idx} =====")


    CnnVae = CNNVAEWrapper(dataset,device,BATCH_SIZE,OVERSAMPLER_EPOCHS)

    CnnVae.fit(train_idx)


    estimator = CNNVAEResNetEstimator(
        dataset=dataset,
        vae_model=CnnVae.vae_model,
        device=device,
        batch_size=BATCH_SIZE,
        classifier_epochs=CLASSIFIER_EPOCHS,
        multiplier_generated_samples='synthetic'
    )

    param_grid = {
        'mu_multiplier': [0.8, 1.2],
        'logvar_multiplier': [0.5, 1.5]
    }

    random_search = RandomizedSearchCV(
        estimator=estimator,
        param_distributions=param_grid,
        n_iter=3,
        cv=None,
        verbose=2,
        n_jobs=1,
        random_state=42
    )

    random_search.fit(train_idx)

    best_estimator = random_search.best_estimator_
    best_params = random_search.best_params_

    test_ds = Subset(dataset, test_idx)
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)
    f2, bal_acc, recall, specificity = best_estimator.trainer.validate(test_loader)
    print(f"Fold {fold_idx} Test Accuracy: {bal_acc:.4f}")

    results.append({
        'fold': fold_idx,
        'f2_score': f2,
        'balanced_accuracy': bal_acc,
        'recall': recall,
        'specificity': specificity,
        **best_params
    })




#Zapis wyników z każdego folda
results_df = pd.DataFrame(results)
results_df.to_csv('crossvalidation_results/CNNVAE_synthetic_cross_validation_results.csv', index=False)

# Wykres krzywych uczenia