In [1]:
import os
new_working_directory = "C:\\Users\\ianja\\REPOS\\Pepperpot\\"
os.chdir(new_working_directory)

In [None]:
# SEGMENT A DIR BY PARAM VARIATION
import os
import shutil
import re

class ParameterValues:
    def __init__(self, epsnx, alfax, betax, epsny, alfay, betay, epsnz, alfaz, betaz):
        self.epsnx = epsnx
        self.alfax = alfax
        self.betax = betax
        self.epsny = epsny
        self.alfay = alfay
        self.betay = betay
        self.epsnz = epsnz
        self.alfaz = alfaz
        self.betaz = betaz

# Specify the local directory containing the images
local_directory = 'local-images-before-after-propagation-bw-shorter'

# Check the current working directory
print(f"Current working directory: {os.getcwd()}")

# Check if the local_directory exists in the current working directory
if not os.path.exists(local_directory):
    print(f"The directory '{local_directory}' does not exist in the current working directory.")
else:
    print(f"The directory '{local_directory}' exists.")

# Create the target directories for organizing the images
target_directory_2param = os.path.join(local_directory, '2-param')
target_directory_3param = os.path.join(local_directory, '3-param')
target_directory_6param = os.path.join(local_directory, '6-param')

# Create the target directories if they don't exist
os.makedirs(target_directory_2param, exist_ok=True)
os.makedirs(target_directory_3param, exist_ok=True)
os.makedirs(target_directory_6param, exist_ok=True)

# Regular expression pattern for extracting parameters from filename
pattern = r"epsnx([\d.-]+)_alfax([\d.-]+)_betax([\d.-]+)_epsny([\d.-]+)_alfay([\d.-]+)_betay([\d.-]+)_epsnz([\d.-]+)_alfaz([\d.-]+)_betaz([\d.-]+)_*[\d]*\.png"

# Iterate over the images in the local directory
for filename in os.listdir(local_directory):
    if filename.endswith('.png'):
        source_path = os.path.join(local_directory, filename)

        # Extract parameters from filename using regex
        match = re.match(pattern, filename)
        if match:
            params = ParameterValues(*match.groups())
            varied_params = sum(param != '0.0' for param in vars(params).values())

            # Move the file to the corresponding target directory based on the number of varied parameters
            if filename.startswith("epsnx0.1_"):
                target_path = os.path.join(target_directory_2param, filename)
            elif params.alfay == "-0.55" and params.betay == "170.0":
                target_path = os.path.join(target_directory_3param, filename)
            else:
                target_path = os.path.join(target_directory_6param, filename)

            shutil.move(source_path, target_path)

print("Image organization completed.")


In [None]:
# TEST/TRAIN SPLIT
import os
import random
import shutil
import time

random.seed(42)

original_folders = ["local-images-50cm-propagation-bw-200bin-cropped/2-param/", "local-images-50cm-propagation-bw-200bin-cropped/3-param/", "local-images-10cm-propagation-bw-200bin-cropped"]
train_folder = "./train"
test_folder = "./test"

os.makedirs(train_folder, exist_ok=True)
os.makedirs(test_folder, exist_ok=True)

train_ratio = 0.8

file_list = []

for folder in original_folders:
    file_list.extend([os.path.join(folder, file) for file in os.listdir(folder)])
    print(len(file_list))

random.shuffle(file_list)

train_size = int(len(file_list) * train_ratio)

train_files = file_list[:train_size]
test_files = file_list[train_size:]

for src_path in train_files:
    file = os.path.basename(src_path)
    dst_path = os.path.join(train_folder, file)

    # Check if a file with the same name already exists in the target directory
    if os.path.exists(dst_path):
        # Split filename into name and extension
        base, ext = os.path.splitext(file)
        # Append a unique identifier to the filename
        new_filename = f"{base}_{random.randint(1000,9999)}{ext}"
        dst_path = os.path.join(train_folder, new_filename)

    shutil.copy(src_path, dst_path)

for src_path in test_files:
    file = os.path.basename(src_path)
    dst_path = os.path.join(test_folder, file)

    # Check if a file with the same name already exists in the target directory
    if os.path.exists(dst_path):
        # Split filename into name and extension
        base, ext = os.path.splitext(file)
        # Append a unique identifier to the filename
        new_filename = f"{base}_{random.randint(1000,9999)}{ext}"
        dst_path = os.path.join(test_folder, new_filename)

    shutil.copy(src_path, dst_path)

print("Dataset split completed!")


In [22]:
# force clear cache and memory
import torch
torch.cuda.empty_cache()
import gc
gc.collect()

0

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os
import re
import cv2
import numpy as np
from torchvision import transforms
import random

# ensure determinism
seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
# 3_PARAM SIMPLE MODEL
NAME = "3param"

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(16 * 147 * 147, 256)  # Adjust this if needed
        self.fc2 = nn.Linear(256, 3)  # Now predicting three variables

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

class CustomDataset(Dataset):
    def __init__(self, image_dir):
        self.image_dir = image_dir
        self.image_files = sorted(os.listdir(image_dir))
        self.pattern = r"epsnx([\d.-]+)_alfax([\d.-]+)_betax([\d.-]+)_epsny([\d.-]+)_alfay([\d.-]+)_betay([\d.-]+)_epsnz([\d.-]+)_alfaz([\d.-]+)_betaz([\d.-]+)\.png"
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))  # Normalizing single channel
        ])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, index):
        image_name = self.image_files[index]
        image_path = os.path.join(self.image_dir, image_name)
        image = cv2.imread(image_path)
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        image = cv2.resize(image, (294, 294))
        image = self.transform(image)
        matches = re.search(self.pattern, image_name)
        if matches is None:
            print(f"No match found for file: {image_name}")
            return None
        variables = [float(matches.group(i)) for i in range(1, 10) if matches.group(i)]
        epsnx = variables[0]  # epsnx value
        alfa_x = variables[1]  # alfax value
        beta_x = variables[2]  # betax value
        return image, torch.tensor([epsnx, alfa_x, beta_x])  # Returns epsnx, alfa x and betax as the labels

In [None]:
from collections import deque
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

batch_size = 4
learning_rate = 0.001
num_epochs = 100

model = CNN().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

root_dir = "path_to_root_dir"
dataset = CustomDataset(root_dir)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

saved_models = deque(maxlen=5)
saved_losses = deque(maxlen=5)

total_steps = len(dataloader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(dataloader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")
    
    if len(saved_models) < 5 or loss.item() < max(saved_losses):
        if len(saved_models) == 5:
            worst_model = saved_models[saved_losses.index(max(saved_losses))]
            os.remove(worst_model)
            saved_models.remove(worst_model)
            saved_losses.remove(max(saved_losses))

        filename = f"cnn_{NAME}_bs{batch_size}_lr{learning_rate}_e{epoch+1}.pth"
        torch.save(model.state_dict(), filename)
        saved_models.append(filename)
        saved_losses.append(loss.item())

print("Training complete")

In [5]:
def evaluate_model(model, test_dataset, num_variables):
    model.eval()
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

    total_mae = torch.zeros(num_variables, device=device)
    total_mape = torch.zeros(num_variables, device=device)
    total_smape = torch.zeros(num_variables, device=device)
    total_mse = torch.zeros(num_variables, device=device)
    total_count = 0

    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)

            absolute_error = torch.abs(outputs - labels)
            total_mae += absolute_error.sum(dim=0)

            non_zero_mask = torch.abs(labels) > 1e-8
            percentage_error = (absolute_error / torch.abs(labels)) * 100
            total_mape += (percentage_error * non_zero_mask).sum(dim=0)

            smape = 200.0 * torch.abs(outputs - labels) / (torch.abs(outputs) + torch.abs(labels) + torch.finfo(torch.float32).eps)
            total_smape += smape.sum(dim=0)
            
            mse = (outputs - labels) ** 2
            total_mse += mse.sum(dim=0)

            total_count += labels.size(0)

    mae = total_mae / total_count
    mape = total_mape / total_count
    smape = total_smape / total_count
    rmse = torch.sqrt(total_mse / total_count)

    return mae.cpu().numpy(), mape.cpu().numpy(), smape.cpu().numpy(), rmse.cpu().numpy()


In [16]:
# RUN MODEL EVALUATION
import glob
import torch

# Load models and evaluate their performance
model_dir = './'  # Specify ysour directory where models are saved
test_dataset = CustomDataset('test')  # Specify your test dataset

model_performance = []

# Load each model and evaluate it
for model_file in glob.glob(model_dir + '/*.pth'):
    # Load model
    model = CNN()
    model.load_state_dict(torch.load(model_file))
    model.to(device)

    # Evaluate model
    mae, mape, smape, rmse = evaluate_model(model, test_dataset, 2)
    aggregate_score = np.mean([mae, mape, smape, rmse])
    model_performance.append((model_file, mae, mape, smape, rmse, aggregate_score))

# Sort models based on aggregate score
model_performance.sort(key=lambda x: x[-1])

# Print the performance of each model
for model_info in model_performance:
    model_file, mae, mape, smape, rmse, aggregate_score = model_info
    print(f"Model: {model_file}, Aggregate Score: {aggregate_score:.4f}")
    print(f"Mean Absolute Error: {mae}")
    print(f"Mean Absolute Percentage Error: {mape}")
    print(f"Symmetric Mean Absolute Percentage Error: {smape}")
    print(f"Root Mean Square Error: {rmse}")
    print()


Model: .\cnn_2param-bw-sgwd_bs4_lr0.001_e200_6752.pth, Aggregate Score: 17.2686
Mean Absolute Error: [ 0.4459879 17.1889   ]
Mean Absolute Percentage Error: [51.482895   7.5551586]
Symmetric Mean Absolute Percentage Error: [28.398268   7.3972483]
Root Mean Square Error: [ 0.6025582 25.077765 ]

Model: .\cnn_2param-bw-extraprop_bs4_lr0.001_e200_3376.pth, Aggregate Score: 80.7094
Mean Absolute Error: [  1.4378942 131.74649  ]
Mean Absolute Percentage Error: [94.070366 80.48501 ]
Symmetric Mean Absolute Percentage Error: [91.236885 35.74836 ]
Root Mean Square Error: [  2.205542 208.74432 ]

