Variáveis globais e funções para pré-processamento do dataset:
JPGs exportados fullsize (4000x4000 ou 3264x3264) sRGB via Ligthroom com EXIF recuperado do RAW original, exceto orientação. Comando abaixo é usado para substituir o EXIF na exportação (requer LR plugin):
```
C:\PROGRA~1\ImageGlass\exiftool -tagsFromFile "{FullMasterFile}" -all:all -x Orientation "{FullExportedFile}"
```

In [None]:
import subprocess
import json
import torch
from torchvision import models
import torch.nn as nn
import os
from torch.utils.data import Dataset
import datetime

subject_distance_min = 0.05
subject_distance_max = 50.0
focal_lenght_min = 16.0
focal_lenght_max = 640.0
fstop_min = 1.2
fstop_max = 48.0

preprocessing_worker_count = 15 # for image tensor generation
plotting_worker_count = 4       # for image tensor metadata reading. Doesn't scale beyond that on E5-2696v3 for some reason. Probably because of the GIL (as per Codestral). Who's GIL? 

resnet_version = 50  # User can set this to 18, 34, 50, 101, or 152

# Dictionary to map resnet_version to the corresponding ResNet model and weights
resnet_models = {
    18: (models.resnet18, models.ResNet18_Weights.IMAGENET1K_V1),
    34: (models.resnet34, models.ResNet34_Weights.IMAGENET1K_V1),
    50: (models.resnet50, models.ResNet50_Weights.IMAGENET1K_V2),
    101: (models.resnet101, models.ResNet101_Weights.IMAGENET1K_V1),
    152: (models.resnet152, models.ResNet152_Weights.IMAGENET1K_V1)
}

# Select the ResNet model based on the resnet_version
selected_resnet_model, selected_resnet_weights = resnet_models[resnet_version]
selected_resnet = selected_resnet_model(weights=selected_resnet_weights)

# Hyperparameters
max_epochs = 250        # limite de épocas. Será interrompido antes se não houver melhoria no desempenho do modelo após um número específico de épocas (patience).
learning_rate = 0.001   # taxa de aprendizado inicial para o otimizador Adam. A taxa de aprendizado controla o tamanho dos passos que o otimizador dá durante o treinamento. Um valor maior pode fazer com que o modelo converja mais rapidamente, mas também pode fazer com que ele oscile ou não converja. Um valor muito pequeno pode levar a um treinamento muito lento ou a um modelo que não converge.
patience = 10           # Número de épocas sem melhoria antes de parar o treinamento precocemente (early stopping)
unfreeze_epoch = 20     # Época em que as camadas congeladas do modelo começam a ser descongeladas para permitir ajustes finos dos pesos durante o treinamento.

#image size and batch size select:
final_dimensions = 896      #input dimension per side, multiple of 224 as recommended for ResNet. 672 is the largest multiple of 224 that fits in GPU memory with batch size 128
dataloader_batch_size = 16  # até 56 de 672px para ResNet50 em placa com 24GB livres. 88 de 896px para ResNet34 em placa com 24GB livres. 24 para ResNet50 com 3090 em uso e 896px.

augmentation_params = [         # Array de parâmetros para criação de dados aumentados. Formato: (crop factor, rotation angle)
    (1, 0), (1.0625, 0), (1.125, 0), (1.175, 0),
    (1.25, -6), (1.25, 0), (1.25, 6),
    (1.3333, 0),
    (1.5, -18), (1.5, -9), (1.5, 0), (1.5, 9), (1.5, 18),
    (2, 0),
    (2.5, -36), (2.5, -18), (2.5, -9), (2.5, 9), (2.5, 18), (2.5, 36),
    (3, 0)
]


#
# Disponibilidade e escolha de GPU:
# ---------------------------------
#

gpu_index = 0  # Palit RTX 3090: 0 (display, menos VRAM livre, possivelmente mais rápida); Gigabyte RTX 3090 Blower (mais VRAM livre, menos dissipação de calor, mais ruído): 1

cuda_available = torch.cuda.is_available()
print(f"CUDA disponível: {cuda_available}")
num_gpus = torch.cuda.device_count()
print(f"Quantidade de GPUs disponíveis: {num_gpus}")
torch.cuda.set_device(gpu_index)

current_device = torch.cuda.current_device()
print(f"GPU atual em uso: ID {current_device}")

if cuda_available:
    device = torch.device("cuda")
    device_name = torch.cuda.get_device_name(device)
    print(f"\tNome dispositivo CUDA:\t{device_name}")
    
    cuda_version = torch.version.cuda
    compute_capability = torch.cuda.get_device_capability(device)
    tensor_core_supported = compute_capability[0] >= 7
    print(f"\tVersão CUDA: {cuda_version} \tCompute Capability: {compute_capability}\t Tensor Core Support: {tensor_core_supported}\n")
    if(tensor_core_supported):
        torch.backends.cuda.matmul.allow_tf32 = True
        torch.backends.cudnn.allow_tf32 = True
        print(f"\tTF32 ativado para matmul e cudnn.\n")

#
# Caminhos de diretório: 
# ---------------------- 
#

tensor_directory = "D:\\TEMTC-CN\\tensors"
checkpoint_dir = f"D:\\TEMTC-CN\\checkpoints\\resnet{resnet_version}"
split_indices_path = f"D:\\TEMTC-CN\\split_indices.pkl"
image_directory = "D:\\TEMTC-CN\\4000px 1k"
# image_directory = "D:\\TEMTC-CN\\4000px test" 

image_files = [os.path.join(image_directory, f) for f in os.listdir(image_directory) if f.endswith(".jpg")]


#
# Funções requeridas para o treinamento e teste do modelo
# --------------------------------------------------------
#

# Normalization and denormalization functions
def normalize_fstop(value, min_value, max_value):
    return (1 - (value - min_value) / (max_value - min_value)) ** 2

def normalize_focal_length(value, min_value, max_value):
    return (value - min_value) / (max_value - min_value)

def normalize_subject_distance(value, min_value, max_value):
    if value < min_value:
        return 0.0
    elif value > max_value:
        return 1.0
    else:
        return (value - min_value) / (max_value - min_value)

def denormalize_fstop(value, min_value, max_value):
    return max_value - (value ** 0.5 * (max_value - min_value))

def denormalize_focal_length(value, min_value, max_value):
    return value * (max_value - min_value) + min_value

def denormalize_subject_distance(value, min_value, max_value):
    if value == 1.0:
        return float('inf')
    else:
        return value * (max_value - min_value) + min_value

def extract_exif_data(img_path):
    result = subprocess.run(['C:\\PROGRA~1\\ImageGlass\\exiftool', '-json', img_path], stdout=subprocess.PIPE)
    exif_data = json.loads(result.stdout)[0]
    focal_length = exif_data.get('FocalLength', 0)
    f_stop = exif_data.get('FNumber', 0)
    subject_distance = exif_data.get('FocusDistance2', 0)
    scale_factor = exif_data.get('ScaleFactorTo35mmEquivalent', 1.5)  # Default value if not available

    focal_length = float(focal_length.split()[0]) if isinstance(focal_length, str) else float(focal_length)
    f_stop = float(f_stop.split()[0]) if isinstance(f_stop, str) else float(f_stop)
    subject_distance = float(subject_distance.split()[0]) if isinstance(subject_distance, str) else float(subject_distance)
    scale_factor = float(scale_factor.split()[0]) if isinstance(scale_factor, str) else float(scale_factor)

    # Scale focal length and f/stop using the scale factor
    focal_length_scaled = focal_length * scale_factor
    f_stop_scaled = f_stop * scale_factor
    # do not scale subject distance

    return torch.tensor([focal_length_scaled, f_stop_scaled, subject_distance])

# Function to calculate MAPE and MSLE:
def mean_absolute_percentage_error(y_true, y_pred):
    return torch.mean(torch.abs((y_true - y_pred) / y_true)) * 100

def mean_squared_logarithmic_error(y_true, y_pred):
    return torch.mean((torch.log1p(y_true) - torch.log1p(y_pred)) ** 2)

# Function to calculate percentage error
def calculate_percentage_error(real, inferred):
    return np.abs((real - inferred) / real) * 100

# Custom dataset class for loading tensors. Qwen32B que disse pra fazer assim, parece funcionar:
class TensorDataset(Dataset):
    def __init__(self, tensor_directory):
        self.tensor_files = [os.path.join(tensor_directory, f) for f in os.listdir(tensor_directory) if f.endswith(".pt")]

    def __len__(self):
        return len(self.tensor_files)

    def __getitem__(self, idx):
        tensor_path = self.tensor_files[idx]
        try:
            image_tensor, exif_tensor_norm = torch.load(tensor_path, weights_only=True)
        except Exception as e:
            print(f"Error loading tensor: {tensor_path}, Error: {e}")
            raise e
        return image_tensor, exif_tensor_norm


#
# Definição do modelo, escolha da ResNet e dropout:
# -------------------------------------------------
#

class HighResNetRegressor(nn.Module):
    def __init__(self):
        super(HighResNetRegressor, self).__init__()
        self.resnet = selected_resnet

        self.resnet.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.dropout = nn.Dropout(p=0.2)  # Add dropout
        # self.dropout = nn.Dropout(p=0.35)  # Alterado para ResNet101 e 152, tentando melhorar o overfitting - não funcionou.
        
        # self.resnet.fc = nn.Linear(self.resnet.fc.in_features, 3)  # Output: [focal_length, f_stop, subject_distance]
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, 2)  # Output: [focal_length, f_stop]         # Dropping subject distance to allow usage of a lens that doesn't include that metadata

    def forward(self, x):
        x = self.resnet(x)
        x = self.dropout(x)  # Apply dropout
        return x


print("Variáveis inicializadas.")
#print time and date when the script ran. Use Brazilian format:
print("Executado em", datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))

Salva tensors pré-processados, com dados aumentados, para treinamento posterior:

In [None]:
import torch
import os
from PIL import Image
from torchvision import transforms
from concurrent.futures import ThreadPoolExecutor
import random

# Preprocess the image with augmentation
def preprocess_image_with_augmentation(img_path, crop_factor, rotation_angle):
    image = Image.open(img_path)
    original_size = image.size[0]
    crop_size = int(original_size / crop_factor)
    rotated_image = image.rotate(random.uniform(rotation_angle/2, rotation_angle), expand=True)
    rotated_size = rotated_image.size[0]
    left = (rotated_size - crop_size) / 2
    top = (rotated_size - crop_size) / 2
    right = (rotated_size + crop_size) / 2
    bottom = (rotated_size + crop_size) / 2
    cropped_image = rotated_image.crop((left, top, right, bottom))
    resized_image = cropped_image.resize((final_dimensions, final_dimensions))
    transform = transforms.ToTensor()
    image_tensor = transform(resized_image)
    return image_tensor

# Save tensors function with augmentation
def save_tensor_with_augmentation(img_path, tensor_directory, crop_factor, rotation_angle, reporter=False):
    exif_tensor_raw = extract_exif_data(img_path).float()

    # Apply scaling based on crop factor
    focal_length_scaled = exif_tensor_raw[0] * crop_factor
    f_stop_scaled = exif_tensor_raw[1] * crop_factor
    subject_distance_scaled = exif_tensor_raw[2]    # No scaling for subject distance

    # Check if the scaled values exceed the limits
    if focal_length_scaled > focal_lenght_max or f_stop_scaled > (fstop_max / 3): #don't want f/stop too close to maximum on augmented images
        print(f"Pulando tensor para \"{img_path}_{crop_factor}_{rotation_angle}\" por estar fora da faixa: {focal_length_scaled:.0f}mm, f/{f_stop_scaled:.1f}")
        return

    # Normalize the scaled values
    focal_length_norm = normalize_focal_length(focal_length_scaled, focal_lenght_min, focal_lenght_max)
    f_stop_norm = normalize_fstop(f_stop_scaled, fstop_min, fstop_max)
    subject_distance_norm = normalize_subject_distance(subject_distance_scaled, subject_distance_min, subject_distance_max)

    exif_tensor_norm = torch.tensor([focal_length_norm, f_stop_norm, subject_distance_norm]).float()
    image_tensor = preprocess_image_with_augmentation(img_path, crop_factor, rotation_angle).half()  # Convert image tensor to FP16

    # Save the tensors
    tensor_path = os.path.join(tensor_directory, f"{os.path.basename(img_path).replace('.jpg', '')}_{crop_factor}_{rotation_angle}.pt")
    torch.save((image_tensor, exif_tensor_norm), tensor_path)

    if reporter:
        global total_tasks, completed_tasks
        completed_tasks += 1
        if completed_tasks % (total_tasks // 50) == 0:  # Print every 2%
            print(f"Processados {completed_tasks}/{total_tasks} tensors ({completed_tasks / total_tasks * 100:.0f}% completo)")

if not os.path.exists(tensor_directory):
    os.makedirs(tensor_directory)

# Calculate total number of tasks
total_tasks = len(image_files) * len(augmentation_params)
completed_tasks = 0

# Process each image with all augmentation parameters
with ThreadPoolExecutor(max_workers=preprocessing_worker_count) as executor:  
    for img_path in image_files:
        for crop_factor, rotation_angle in augmentation_params:
            executor.submit(save_tensor_with_augmentation, img_path, tensor_directory, crop_factor, rotation_angle, reporter=(completed_tasks == 0))

print("Tensors salvos com sucesso.")

Teste de sanidade: verifica se tensors foram salvos corretamente e contém as as informações esperadas.

Algumas imagens podem não ter sido convertidas caso os metadados sejam indesejáveis. O código abaixo considera isso e não tenta carregar tensors que não existem.

In [None]:
import random
import matplotlib.pyplot as plt
from torchvision import transforms
import os
import torch

def display_image_with_data(img_path, tensor_directory):
    base_name = os.path.basename(img_path).replace(".jpg", "")
    images_to_plot = []

    for i, (crop_factor, rotation_angle) in enumerate(augmentation_params):
        tensor_path = os.path.join(tensor_directory, f"{base_name}_{crop_factor}_{rotation_angle}.pt")

        try:
            image_tensor, exif_tensor_norm = torch.load(tensor_path, map_location=torch.device('cpu'), weights_only=True)
            images_to_plot.append((image_tensor, exif_tensor_norm, crop_factor, rotation_angle))
        except FileNotFoundError:
            # print(f"Tensor not found for {base_name}_{crop_factor}_{rotation_angle}.pt")
            continue

    num_images = len(images_to_plot)
    if num_images == 0:
        # print(f"No tensors found for {base_name}. Skipping...")
        return

    num_cols = 6  # Number of columns in the subplot grid
    num_rows = (num_images + num_cols - 1) // num_cols  # Calculate number of rows dynamically

    fig, axes = plt.subplots(num_rows, num_cols, figsize=(24, 12 * num_rows // num_cols))
    fig.suptitle(f"{base_name}", fontsize=16)

    for i, (image_tensor, exif_tensor_norm, crop_factor, rotation_angle) in enumerate(images_to_plot):
        # Convert the image tensor to a PIL image
        image = transforms.ToPILImage()(image_tensor)

        # Denormalize the EXIF data
        focal_length_35mm = denormalize_focal_length(exif_tensor_norm[0].item(), focal_lenght_min, focal_lenght_max)
        f_stop_35mm = denormalize_fstop(exif_tensor_norm[1].item(), fstop_min, fstop_max)
        subject_distance_35mm = denormalize_subject_distance(exif_tensor_norm[2].item(), subject_distance_min, subject_distance_max)

        focal_length_aps_c = focal_length_35mm / 1.5
        f_stop_aps_c = f_stop_35mm / 1.5

        # Display the image
        row, col = divmod(i, num_cols)
        ax = axes[row, col]
        ax.imshow(image)
        ax.axis('off')

        # Add labels
        ax.set_title(f"CF: {crop_factor:.2f}, Rot: {rotation_angle}°\n35mm: {focal_length_35mm:.0f} mm, f/{f_stop_35mm:.1f}\nAPS-C: {focal_length_aps_c:.0f} mm, f/{f_stop_aps_c:.1f}\nDist: {subject_distance_35mm:.2f} m", fontsize=8)

    # Remove unused subplots
    for i in range(num_images, num_rows * num_cols):
        row, col = divmod(i, num_cols)
        fig.delaxes(axes[row, col])

    # Adjust subplot spacing
    plt.subplots_adjust(wspace=0.0, hspace=0.0)

    plt.tight_layout(rect=[0, 0.00, 1, 2.25])
    plt.show()

# Randomly select some images for sanity check
num_images_to_check = 6
selected_images = random.sample(image_files, num_images_to_check)

# Display the selected images and their associated data
for img_path in selected_images:
    display_image_with_data(img_path, tensor_directory)

Verifica a distribuição resultante dos valores de f/stop e FL das imagens do dataset aumentado

In [None]:
import os
import torch
import matplotlib.pyplot as plt
import numpy as np
from concurrent.futures import ThreadPoolExecutor

# Function to round f-stop to the nearest full f-stop
def round_to_full_fstop(f_stop):
    full_fstops = [1, 1.4, 2, 2.8, 4, 5.6, 8, 11, 16, 22, 32, 45, 64]
    return min(full_fstops, key=lambda x: abs(x - f_stop))

# Function to process a chunk of tensors
def process_tensors_chunk(chunk, focal_lengths, f_stops, aperture_sizes, is_reporter=False):
    for i, filename in enumerate(chunk):
        tensor_path = os.path.join(tensor_directory, filename)
        try:
            _, exif_tensor_norm = torch.load(tensor_path, map_location=torch.device('cpu'), weights_only=True)
            focal_length_norm = exif_tensor_norm[0].item()
            f_stop_norm = exif_tensor_norm[1].item()

            # Denormalize the values
            focal_length = denormalize_focal_length(focal_length_norm, focal_lenght_min, focal_lenght_max)
            f_stop = denormalize_fstop(f_stop_norm, fstop_min, fstop_max)

            focal_lengths.append(focal_length)
            f_stops.append(f_stop)
            aperture_sizes.append(focal_length / f_stop)  # Calculate physical aperture size

            # Print status update if this is the reporter thread
            if is_reporter and i % (len(chunk) // 5) == 0:  # Print every 20%
                print(f"Processados {i * plotting_worker_count}/{len(chunk) * plotting_worker_count} tensors ({i / len(chunk) * 100:.0f}% completo)")
        except Exception as e:
            print(f"Falha ao carregar tensor: {tensor_path}, Erro: {e}")

# Load all tensors and extract f-stop and focal length values
total_tensors = len([f for f in os.listdir(tensor_directory) if f.endswith(".pt")])
chunk_size = total_tensors // plotting_worker_count  # Number of tensors per chunk
chunks = [os.listdir(tensor_directory)[i:i + chunk_size] for i in range(0, total_tensors, chunk_size)]

# Initialize lists to store results
f_stops = []
focal_lengths = []
aperture_sizes = []

# Process each chunk in parallel
with ThreadPoolExecutor(max_workers=plotting_worker_count) as executor:
    futures = []
    for i, chunk in enumerate(chunks):
        # Only the first thread is the reporter
        is_reporter = i == 0
        futures.append(executor.submit(process_tensors_chunk, chunk, focal_lengths, f_stops, aperture_sizes, is_reporter))
    for future in futures:
        future.result()

# Plot histograms
plt.figure(figsize=(18, 6))

# Histogram for f-stop with log scale
plt.subplot(1, 3, 1)
plt.hist(f_stops, bins=np.logspace(np.log10(min(f_stops)), np.log10(max(f_stops)), 20), edgecolor='black', alpha=0.7)
plt.xscale('log')
#translate to english: plt.title('Distribuição de imagens por f/stop')
plt.title('Image distribution by f/stop')
plt.xlabel('f/stop')
# plt.ylabel('Número de imagens')
plt.ylabel('Number of images')

# Customize x-axis ticks for f-stop
full_fstops = [1, 1.4, 2, 2.8, 4, 5.6, 8, 11, 16, 22, 32, 45, 64]
plt.xticks(full_fstops, [f"f/{f}" for f in full_fstops])

# Calculate moving average
f_stops_sorted = np.sort(f_stops)
hist, bin_edges = np.histogram(f_stops_sorted, bins=np.logspace(np.log10(min(f_stops)), np.log10(max(f_stops)), 20))
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2

# Adjust the window size for the moving average
window_size = 7  # Change this value to adjust smoothness
moving_avg = np.convolve(hist, np.ones(window_size)/window_size, mode='same')

# Polynomial interpolation for smoother curve
x = np.log(bin_centers)
y = moving_avg
coefficients = np.polyfit(x, y, deg=7)  # Fit a cubic polynomial
polynomial = np.poly1d(coefficients)
x_new = np.linspace(x.min(), x.max(), 100)  # Create more points for a smoother curve
y_new = polynomial(x_new)

# Plot moving average line with smoother curve
# plt.plot(np.exp(x_new), y_new, color='red', label='Média móvel (suavizada)')
plt.plot(np.exp(x_new), y_new, color='red', label='Moving average (smoothed)')
plt.legend()

# Histogram for focal length with log scale
plt.subplot(1, 3, 2)
plt.hist(focal_lengths, bins=np.logspace(np.log10(min(focal_lengths)), np.log10(max(focal_lengths)), 20), edgecolor='black', alpha=0.7)
plt.xscale('log')
#rewrite the following in english:
# plt.title('Distribuição de imagens por distância focal')	
# plt.xlabel('Distância focal (mm)')
# plt.ylabel('Número de imagens')
plt.title('Image distribution by focal length')	
plt.xlabel('Focal length (mm)')
plt.ylabel('Number of images')

# Customize x-axis ticks for focal length
plt.xticks([10, 20, 50, 100, 200, 500, 1000, 2000, 5000], [f"{x} mm" for x in [10, 20, 50, 100, 200, 500, 1000, 2000, 5000]])

# Calculate moving average for focal lengths
focal_lengths_sorted = np.sort(focal_lengths)
hist_fl, bin_edges_fl = np.histogram(focal_lengths_sorted, bins=np.logspace(np.log10(min(focal_lengths)), np.log10(max(focal_lengths)), 20))
bin_centers_fl = (bin_edges_fl[:-1] + bin_edges_fl[1:]) / 2

# Adjust the window size for the moving average
window_size_fl = 7  # Change this value to adjust smoothness
moving_avg_fl = np.convolve(hist_fl, np.ones(window_size_fl)/window_size_fl, mode='same')

# Polynomial interpolation for smoother curve
x_fl = np.log(bin_centers_fl)
y_fl = moving_avg_fl
coefficients_fl = np.polyfit(x_fl, y_fl, deg=7)
polynomial_fl = np.poly1d(coefficients_fl)
x_new_fl = np.linspace(x_fl.min(), x_fl.max(), 100)  # Create more points for a smoother curve
y_new_fl = polynomial_fl(x_new_fl)

# Plot moving average line with smoother curve
# plt.plot(np.exp(x_new_fl), y_new_fl, color='red', label='Média móvel (suavizada)')
plt.plot(np.exp(x_new_fl), y_new_fl, color='red', label='Moving average (smoothed)')
plt.legend()

# Histogram for aperture size with 1mm resolution
plt.subplot(1, 3, 3)
plt.hist(aperture_sizes, bins=np.arange(min(aperture_sizes), max(aperture_sizes) + 1, 1), edgecolor='black', alpha=0.7)
plt.title('Image distribution by pupil size')
plt.xlabel('Pupil size (mm)')
plt.ylabel('Number of images')

# Customize x-axis ticks for aperture size with 10 ticks
plt.xticks(np.linspace(min(aperture_sizes), max(aperture_sizes), 12))

# Calculate moving average for aperture sizes
aperture_sizes_sorted = np.sort(aperture_sizes)
hist_ap, bin_edges_ap = np.histogram(aperture_sizes_sorted, bins=np.arange(min(aperture_sizes), max(aperture_sizes) + 1, 1))
bin_centers_ap = (bin_edges_ap[:-1] + bin_edges_ap[1:]) / 2

# Adjust the window size for the moving average
window_size_ap = 7  # Change this value to adjust smoothness
moving_avg_ap = np.convolve(hist_ap, np.ones(window_size_ap)/window_size_ap, mode='same')

# Polynomial interpolation for smoother curve
x_ap = bin_centers_ap
y_ap = moving_avg_ap
coefficients_ap = np.polyfit(x_ap, y_ap, deg=7) 
polynomial_ap = np.poly1d(coefficients_ap)
x_new_ap = np.linspace(x_ap.min(), x_ap.max(), 100)  # Create more points for a smoother curve
y_new_ap = polynomial_ap(x_new_ap)

# Plot moving average line with smoother curve
# plt.plot(x_new_ap, y_new_ap, color='red', label='Média móvel (suavizada)')
plt.plot(x_new_ap, y_new_ap, color='red', label='Moving average (smoothed)')
plt.legend()

plt.tight_layout()
plt.show()

Cria uma ResNet pré-treinada, adapta entrada e saída e inicia treinamento:

In [None]:
import gc
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torch.amp import GradScaler, autocast
import os
import pickle
from torchvision.transforms import functional as F
import random
import datetime

# Load dataset
dataset = TensorDataset(tensor_directory)

# Split dataset into training, validation, and test sets
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Save the split indices
split_indices = {
    'train': train_dataset.indices,
    'val': val_dataset.indices,
    'test': test_dataset.indices
}
with open(split_indices_path, 'wb') as f:
    pickle.dump(split_indices, f)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=dataloader_batch_size, shuffle=True, num_workers=0) #num_workers precisa ser 0.
val_loader = DataLoader(val_dataset, batch_size=dataloader_batch_size, shuffle=True, num_workers=0)

# Initialize the model, loss function, optimizer and scheduler
model = HighResNetRegressor().cuda()
criterion = nn.L1Loss()  # Use MAE as the loss function
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)  # Add L2 regularization
scaler = GradScaler()
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.1)

def save_checkpoint(epoch, model, optimizer, scaler, loss, val_loss, checkpoint_dir):
    os.makedirs(checkpoint_dir, exist_ok=True)
    checkpoint_path = os.path.join(checkpoint_dir, f'checkpoint_epoch_{epoch+1}.pth')
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scaler_state_dict': scaler.state_dict(),
        'loss': loss,
        'val_loss': val_loss
    }, checkpoint_path)
    # print(f'Checkpoint saved: {checkpoint_path}')

# Function to save training metadata
def save_training_metadata(metadata, metadata_file):
    with open(metadata_file, 'wb') as f:
        pickle.dump(metadata, f)

# Early stopping
best_val_mape = float('inf')  # Initialize with infinity
counter = 0 

# Freeze the first few layers of the ResNet model
for param in model.resnet.layer1.parameters():
    param.requires_grad = False
for param in model.resnet.layer2.parameters():
    param.requires_grad = False
for param in model.resnet.layer3.parameters():
    param.requires_grad = False
for param in model.resnet.layer4.parameters():
    param.requires_grad = False

# Print trainable parameters before starting the training
print(f"Iniciando treinamento de ResNet{resnet_version}. Parâmetros treináveis:")
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"{name}: {param.shape}")

# Training loop with validation and checkpoint saving
training_metadata = {'epochs': [], 'train_losses': [], 'val_losses': [], 'train_mape': [], 'val_mape': [], 'train_msle': [], 'val_msle': [], 'train_mse': [], 'val_mse': [], 'start_times': [], 'end_times': [], 'durations': []}
unfreeze_epoch = 20 
unfreeze_layers = [model.resnet.layer4, model.resnet.layer3]  # Layers to unfreeze sequentially

unfreeze_counter = 0

# Inside the training loop
for epoch in range(max_epochs):
    start_time = datetime.datetime.now()
    model.train()
    train_loss = 0.0
    train_mape = 0.0
    train_msle = 0.0
    train_mse = 0.0  # Initialize MSE for training
    for i, (images, labels) in enumerate(train_loader):
        images = images.cuda().half()  # Convert images to FP16
        labels = labels.cuda().float()  # Keep labels in FP32

        optimizer.zero_grad()

        # In-place hardware-accelerated augmentations
        for j in range(images.size(0)):
            img = images[j]
            img = F.hflip(img) if random.random() > 0.5 else img # Random horizontal flip
            img = F.adjust_brightness(img, random.uniform(0.5, 2.0))
            img = F.adjust_contrast(img, random.uniform(0.75, 1.25))
            images[j] = img

        with autocast(device_type='cuda'):
            outputs = model(images)
            loss = criterion(outputs, labels[:, :2])  # Only consider the first two columns for focal length and f-stop

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        train_loss += loss.item()

        # Calculate MAPE, MSLE, and MSE on GPU
        mape = mean_absolute_percentage_error(labels[:, :2], outputs)
        msle = mean_squared_logarithmic_error(labels[:, :2], outputs)
        mse = nn.functional.mse_loss(outputs, labels[:, :2])  # Calculate MSE
        train_mape += mape.item()
        train_msle += msle.item()
        train_mse += mse.item()

        if (i + 1) % 20 == 0:  # Print every 20 steps
            print(f'Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.5f}, MAPE: {mape.item():.2f}%, MSLE: {msle.item():.5f}, MSE: {mse.item():.5f}')

    train_loss /= len(train_loader)
    train_mape /= len(train_loader)
    train_msle /= len(train_loader)
    train_mse /= len(train_loader)  # Average MSE for training
    training_metadata['epochs'].append(epoch + 1)
    training_metadata['train_losses'].append(train_loss)
    training_metadata['train_mape'].append(train_mape)
    training_metadata['train_msle'].append(train_msle)
    training_metadata['train_mse'].append(train_mse)

    # Validation step
    model.eval()
    val_loss = 0.0
    val_mape = 0.0
    val_msle = 0.0
    val_mse = 0.0  # Initialize MSE for validation
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.cuda().half()
            labels = labels.cuda().float()
            with torch.amp.autocast(device_type='cuda'):
                outputs = model(images)
                loss = criterion(outputs, labels[:, :2])  # Only consider the first two columns for focal length and f-stop

                # Calculate MAPE, MSLE, and MSE on GPU
                mape = mean_absolute_percentage_error(labels[:, :2], outputs)
                msle = mean_squared_logarithmic_error(labels[:, :2], outputs)
                mse = nn.functional.mse_loss(outputs, labels[:, :2])  # Calculate MSE
                val_loss += loss.item()
                val_mape += mape.item()
                val_msle += msle.item()
                val_mse += mse.item()

    val_loss /= len(val_loader)
    val_mape /= len(val_loader)
    val_msle /= len(val_loader)
    val_mse /= len(val_loader)  # Average MSE for validation
    training_metadata['val_losses'].append(val_loss)
    training_metadata['val_mape'].append(val_mape)
    training_metadata['val_msle'].append(val_msle)
    training_metadata['val_mse'].append(val_mse)

    end_time = datetime.datetime.now()
    duration = end_time - start_time
    training_metadata['start_times'].append(start_time)
    training_metadata['end_times'].append(end_time)
    training_metadata['durations'].append(duration)

    print(f'{start_time.strftime("%Y-%m-%d %H:%M:%S")} - Época {epoch + 1}/{max_epochs} completa. Loss: Treino = {train_loss:.5f} / Validação = {val_loss:.5f} \tMAPE: Treino = {train_mape:.2f}% / Validação = {val_mape:.2f}% \tMSLE: Treino = {train_msle:.5f} / Validação = {val_msle:.5f} \tMSE: Treino = {train_mse:.5f} / Validação = {val_mse:.5f}')


    # Update learning rate scheduler
    scheduler.step(val_loss)

    # Adaptive unfreezing
    if epoch >= unfreeze_epoch and unfreeze_counter < len(unfreeze_layers):
        if val_mape < best_val_mape:  # Change to use val_mape
            best_val_mape = val_mape  # Update best_val_mape
            counter = 0
        else:
            counter += 1
            if counter >= patience:
                for param in unfreeze_layers[unfreeze_counter].parameters():
                    param.requires_grad = True
                unfreeze_counter += 1
                unfreeze_epoch = epoch + 10  # Wait for 10 more epochs before unfreezing the next layer
                print(f"Descongelando camada {5 - unfreeze_counter} na época {epoch + 1}")
                counter = 0  # Reset counter after unfreezing

    if val_mape < best_val_mape:
        best_val_mape = val_mape  # Update best_val_mape
        counter = 0
    else:
        counter += 1
        if counter >= patience and unfreeze_counter >= 2:  # Only start early stopping after unfreezing layers 3 and 4
            print("Parada adiantada.")
            break

    # Save checkpoint and metadata
    save_checkpoint(epoch, model, optimizer, scaler, train_loss, val_loss, checkpoint_dir)
    metadata_file = os.path.join(checkpoint_dir, f'training_metadata.pkl')
    save_training_metadata(training_metadata, metadata_file)

print("Treinamento finalizado. Melhor Validation MAPE: {:.2f}%".format(best_val_mape))
# Unload the model and optimizer from VRAM
del model
del optimizer
del scaler

# Clear the cache and run garbage collection
torch.cuda.empty_cache()
gc.collect()

# Optionally, you can also reset the CUDA memory to ensure it's cleared
torch.cuda.reset_peak_memory_stats()

print("Modelo e otimizador descarregados da VRAM.")

Cria metadados de resultados de teste para cada época treinada acima:

In [None]:
import gc
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
import pickle
import datetime
import os

# Load the split indices
with open(split_indices_path, 'rb') as f:
    split_indices = pickle.load(f)

dataset = TensorDataset(tensor_directory)
test_indices = split_indices['test']
test_dataset = Subset(dataset, test_indices)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=0)

# Initialize the model
model = HighResNetRegressor().cuda()

# Initialize the loss function
criterion = nn.L1Loss()

# Initialize testing metadata
testing_metadata = {'epochs': [], 'test_losses': [], 'test_mape': [], 'test_msle': [], 'test_mse': [], 'start_times': [], 'end_times': [], 'durations': []}

# Function to evaluate the model on the test set
def evaluate_model(epoch):
    model.eval()
    test_loss = 0.0
    test_mape = 0.0
    test_msle = 0.0
    test_mse = 0.0  # Initialize MSE for testing
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.cuda().half()
            labels = labels.cuda().float()
            with torch.amp.autocast(device_type='cuda'):
                outputs = model(images)
                loss = criterion(outputs, labels[:, :2])  # Only consider the first two columns for focal length and f-stop

                # Calculate MAPE, MSLE, and MSE on GPU
                mape = mean_absolute_percentage_error(labels[:, :2], outputs)
                msle = mean_squared_logarithmic_error(labels[:, :2], outputs)
                mse = nn.functional.mse_loss(outputs, labels[:, :2])  # Calculate MSE
                test_loss += loss.item()
                test_mape += mape.item()
                test_msle += msle.item()
                test_mse += mse.item()

    test_loss /= len(test_loader)
    test_mape /= len(test_loader)
    test_msle /= len(test_loader)
    test_mse /= len(test_loader)  # Average MSE for testing

    testing_metadata['epochs'].append(epoch + 1)
    testing_metadata['test_losses'].append(test_loss)
    testing_metadata['test_mape'].append(test_mape)
    testing_metadata['test_msle'].append(test_msle)
    testing_metadata['test_mse'].append(test_mse)

    return test_loss, test_mape, test_msle, test_mse

# Function to save testing metadata
def save_testing_metadata(metadata, metadata_file):
    with open(metadata_file, 'wb') as f:
        pickle.dump(metadata, f)

# Evaluate the model on the test set after each epoch during training
for epoch in range(max_epochs):
    start_time = datetime.datetime.now()

    # Load the checkpoint for the current epoch
    checkpoint_path = os.path.join(checkpoint_dir, f'checkpoint_epoch_{epoch+1}.pth')
    if os.path.exists(checkpoint_path):
        checkpoint = torch.load(checkpoint_path,weights_only=True)
        model.load_state_dict(checkpoint['model_state_dict'])
    else:
        print(f"Checkpoint not found for epoch {epoch+1}. Skipping evaluation.")
        continue

    # Evaluate the model
    test_loss, test_mape, test_msle, test_mse = evaluate_model(epoch)

    end_time = datetime.datetime.now()
    duration = end_time - start_time
    testing_metadata['start_times'].append(start_time)
    testing_metadata['end_times'].append(end_time)
    testing_metadata['durations'].append(duration)

    print(f'{start_time.strftime("%Y-%m-%d %H:%M:%S")} - Época {epoch + 1}/{max_epochs} completa. Test Loss: {test_loss:.5f} \tTest MAPE: {test_mape:.2f}% \tTest MSLE: {test_msle:.5f} \tTest MSE: {test_mse:.5f}')

    # Save testing metadata
    metadata_file = os.path.join(checkpoint_dir, f'testing_metadata.pkl')
    save_testing_metadata(testing_metadata, metadata_file)

# Unload the model and optimizer from VRAM
del model

# Clear the cache and run garbage collection
torch.cuda.empty_cache()
gc.collect()
# Optionally, you can also reset the CUDA memory to ensure it's cleared
torch.cuda.reset_peak_memory_stats()

print("Avaliação no conjunto de teste concluída.")

Gera gráficos de loss, MAPE e MSLE por época:

Em seguida, testa o treco com o melhor checkpoint disponível conforme cada métrica:

In [None]:
import matplotlib.pyplot as plt
import pickle
import os

# Load the training metadata
metadata_file = os.path.join(checkpoint_dir, 'training_metadata.pkl')
with open(metadata_file, 'rb') as f:
    training_metadata = pickle.load(f)

# Load the testing metadata
testing_metadata_file = os.path.join(checkpoint_dir, 'testing_metadata.pkl')
with open(testing_metadata_file, 'rb') as f:
    testing_metadata = pickle.load(f)

# Extract the data

epochs = training_metadata['epochs']
train_losses = training_metadata['train_losses']
val_losses = training_metadata['val_losses']
test_losses = testing_metadata['test_losses']
train_mape = training_metadata['train_mape']
val_mape = training_metadata['val_mape']
test_mape = testing_metadata['test_mape']
train_msle = training_metadata['train_msle']
val_msle = training_metadata['val_msle']
test_msle = testing_metadata['test_msle']
train_mse = training_metadata['train_mse']
val_mse = training_metadata['val_mse']
test_mse = testing_metadata['test_mse']


# Number of initial epochs to skip
skip_epochs = 0  # Adjust this number as needed

# Slice the data to skip the initial epochs, if needed:
epochs = epochs[skip_epochs:len(test_losses)]   #workaround to fix the bug of the last epoch not being saved in the testing metadata
train_losses = train_losses[skip_epochs:len(epochs)]
val_losses = val_losses[skip_epochs:len(epochs)]
test_losses = test_losses[skip_epochs:len(epochs)]
train_mape = train_mape[skip_epochs:len(epochs)]
val_mape = val_mape[skip_epochs:len(epochs)]
test_mape = test_mape[skip_epochs:len(epochs)]
train_msle = train_msle[skip_epochs:len(epochs)]
val_msle = val_msle[skip_epochs:len(epochs)]
test_msle = test_msle[skip_epochs:len(epochs)]
train_mse = train_mse[skip_epochs:len(epochs)]
val_mse = val_mse[skip_epochs:len(epochs)]
test_mse = test_mse[skip_epochs:len(epochs)]

# Find the best epoch based on test performance
best_epoch_idx_loss = test_losses.index(min(test_losses))
best_epoch_idx_mape = test_mape.index(min(test_mape))
best_epoch_idx_msle = test_msle.index(min(test_msle))
best_epoch_idx_mse = test_mse.index(min(test_mse))

best_epoch_loss = epochs[best_epoch_idx_loss]
best_train_loss_loss = train_losses[best_epoch_idx_loss]
best_val_loss_loss = val_losses[best_epoch_idx_loss]
best_test_loss_loss = test_losses[best_epoch_idx_loss]
best_train_mape_loss = train_mape[best_epoch_idx_loss]
best_val_mape_loss = val_mape[best_epoch_idx_loss]
best_test_mape_loss = test_mape[best_epoch_idx_loss]
best_train_msle_loss = train_msle[best_epoch_idx_loss]
best_val_msle_loss = val_msle[best_epoch_idx_loss]
best_test_msle_loss = test_msle[best_epoch_idx_loss]
best_train_mse_loss = train_mse[best_epoch_idx_loss]
best_val_mse_loss = val_mse[best_epoch_idx_loss]
best_test_mse_loss = test_mse[best_epoch_idx_loss]

best_epoch_mape = epochs[best_epoch_idx_mape]
best_train_loss_mape = train_losses[best_epoch_idx_mape]
best_val_loss_mape = val_losses[best_epoch_idx_mape]
best_test_loss_mape = test_losses[best_epoch_idx_mape]
best_train_mape_mape = train_mape[best_epoch_idx_mape]
best_val_mape_mape = val_mape[best_epoch_idx_mape]
best_test_mape_mape = test_mape[best_epoch_idx_mape]
best_train_msle_mape = train_msle[best_epoch_idx_mape]
best_val_msle_mape = val_msle[best_epoch_idx_mape]
best_test_msle_mape = test_msle[best_epoch_idx_mape]
best_train_mse_mape = train_mse[best_epoch_idx_mape]
best_val_mse_mape = val_mse[best_epoch_idx_mape]
best_test_mse_mape = test_mse[best_epoch_idx_mape]

best_epoch_msle = epochs[best_epoch_idx_msle]
best_train_loss_msle = train_losses[best_epoch_idx_msle]
best_val_loss_msle = val_losses[best_epoch_idx_msle]
best_test_loss_msle = test_losses[best_epoch_idx_msle]
best_train_mape_msle = train_mape[best_epoch_idx_msle]
best_val_mape_msle = val_mape[best_epoch_idx_msle]
best_test_mape_msle = test_mape[best_epoch_idx_msle]
best_train_msle_msle = train_msle[best_epoch_idx_msle]
best_val_msle_msle = val_msle[best_epoch_idx_msle]
best_test_msle_msle = test_msle[best_epoch_idx_msle]
best_train_mse_msle = train_mse[best_epoch_idx_msle]
best_val_mse_msle = val_mse[best_epoch_idx_msle]
best_test_mse_msle = test_mse[best_epoch_idx_msle]

best_epoch_mse = epochs[best_epoch_idx_mse]
best_train_loss_mse = train_losses[best_epoch_idx_mse]
best_val_loss_mse = val_losses[best_epoch_idx_mse]
best_test_loss_mse = test_losses[best_epoch_idx_mse]
best_train_mape_mse = train_mape[best_epoch_idx_mse]
best_val_mape_mse = val_mape[best_epoch_idx_mse]
best_test_mape_mse = test_mape[best_epoch_idx_mse]
best_train_msle_mse = train_msle[best_epoch_idx_mse]
best_val_msle_mse = val_msle[best_epoch_idx_mse]
best_test_msle_mse = test_msle[best_epoch_idx_mse]
best_train_mse_mse = train_mse[best_epoch_idx_mse]
best_val_mse_mse = val_mse[best_epoch_idx_mse]
best_test_mse_mse = test_mse[best_epoch_idx_mse]

# Print the best epoch details for each metric
print(f"Melhor época por MAPE (Teste): {best_epoch_mape}")
print(f"Loss:\t Treino = {best_train_loss_mape:.5f} \t Validação = {best_val_loss_mape:.5f} \t Teste = {best_test_loss_mape:.5f}")
print(f"MAPE:\t Treino = {best_train_mape_mape:.4f}% \t Validação = {best_val_mape_mape:.4f}% \t Teste = {best_test_mape_mape:.4f}%")
print(f"MSLE:\t Treino = {best_train_msle_mape:.5f} \t Validação = {best_val_msle_mape:.5f} \t Teste = {best_test_msle_mape:.5f}")
print(f"MSE:\t Treino = {best_train_mse_mape:.5f} \t Validação = {best_val_mse_mape:.5f} \t Teste = {best_test_mse_mape:.5f}")

print(f"\nMelhor época por Loss (Teste): {best_epoch_loss}")
print(f"Loss:\t Treino = {best_train_loss_loss:.5f} \t Validação = {best_val_loss_loss:.5f} \t Teste = {best_test_loss_loss:.5f}")
print(f"MAPE:\t Treino = {best_train_mape_loss:.4f}% \t Validação = {best_val_mape_loss:.4f}% \t Teste = {best_test_mape_loss:.4f}%")
print(f"MSLE:\t Treino = {best_train_msle_loss:.5f} \t Validação = {best_val_msle_loss:.5f} \t Teste = {best_test_msle_loss:.5f}")
print(f"MSE:\t Treino = {best_train_mse_loss:.5f} \t Validação = {best_val_mse_loss:.5f} \t Teste = {best_test_mse_loss:.5f}")

print(f"\nMelhor época por MSLE (Teste): {best_epoch_msle}")
print(f"Loss:\t Treino = {best_train_loss_msle:.5f} \t Validação = {best_val_loss_msle:.5f} \t Teste = {best_test_loss_msle:.5f}")
print(f"MAPE:\t Treino = {best_train_mape_msle:.4f}% \t Validação = {best_val_mape_msle:.4f}% \t Teste = {best_test_mape_msle:.4f}%")
print(f"MSLE:\t Treino = {best_train_msle_msle:.5f} \t Validação = {best_val_msle_msle:.5f} \t Teste = {best_test_msle_msle:.5f}")
print(f"MSE:\t Treino = {best_train_mse_msle:.5f} \t Validação = {best_val_mse_msle:.5f} \t Teste = {best_test_mse_msle:.5f}")

print(f"\nMelhor época por MSE (Teste): {best_epoch_mse}")
print(f"Loss:\t Treino = {best_train_loss_mse:.5f} \t Validação = {best_val_loss_mse:.5f} \t Teste = {best_test_loss_mse:.5f}")
print(f"MAPE:\t Treino = {best_train_mape_mse:.4f}% \t Validação = {best_val_mape_mse:.4f}% \t Teste = {best_test_mape_mse:.4f}%")
print(f"MSLE:\t Treino = {best_train_msle_mse:.5f} \t Validação = {best_val_msle_mse:.5f} \t Teste = {best_test_msle_mse:.5f}")
print(f"MSE:\t Treino = {best_train_mse_mse:.5f} \t Validação = {best_val_mse_mse:.5f} \t Teste = {best_test_mse_mse:.5f}")

# Plot training, validation, and test losses, along with MAPE, MSLE, and MSE, highlighting the best epoch with a marker
plt.figure(figsize=(18, 12))

# Loss Plot
plt.subplot(2, 2, 1)
plt.plot(epochs, train_losses, label='Training Loss', marker='o')
plt.plot(epochs, val_losses, label='Validation Loss', marker='x')
plt.plot(epochs, test_losses, label='Test Loss', marker='^')
plt.plot(best_epoch_loss, best_test_loss_loss, 'ro', label='Best Epoch by Loss', markersize=12)
plt.xlabel('Epochs', fontsize=16)
plt.ylabel('MAE Loss', fontsize=16)
plt.title('MAE Loss per Epoch', fontsize=16)
plt.legend(fontsize=12)
plt.grid(True)

# MAPE Plot
plt.subplot(2, 2, 2)
plt.plot(epochs, train_mape, label='Training MAPE', marker='o')
plt.plot(epochs, val_mape, label='Validation MAPE', marker='x')
plt.plot(epochs, test_mape, label='Test MAPE', marker='^')
plt.plot(best_epoch_mape, best_test_mape_mape, 'ro', label='Best Epoch by MAPE', markersize=12)
plt.xlabel('Epochs', fontsize=16)
plt.ylabel('MAPE (%)', fontsize=16)
plt.title('MAPE per Epoch', fontsize=18)
plt.legend(fontsize=12)
plt.grid(True)

# MSLE Plot
plt.subplot(2, 2, 3)
plt.plot(epochs, train_msle, label='Training MSLE', marker='o')
plt.plot(epochs, val_msle, label='Validation MSLE', marker='x')
plt.plot(epochs, test_msle, label='Test MSLE', marker='^')
plt.plot(best_epoch_msle, best_test_msle_msle, 'ro', label='Best Epoch by MSLE', markersize=12)
plt.xlabel('Epochs', fontsize=16)
plt.ylabel('MSLE', fontsize=16)
plt.title('MSLE per Epoch', fontsize=18)
plt.legend(fontsize=12)
plt.grid(True)

# MSE Plot
plt.subplot(2, 2, 4)
plt.plot(epochs, train_mse, label='Training MSE', marker='o')
plt.plot(epochs, val_mse, label='Validation MSE', marker='x')
plt.plot(epochs, test_mse, label='Test MSE', marker='^')
plt.plot(best_epoch_mse, best_test_mse_mse, 'ro', label='Best Epoch by MSE', markersize=12)
plt.xlabel('Epochs', fontsize=16)
plt.ylabel('MSE', fontsize=16)
plt.title('MSE per Epoch', fontsize=18)
plt.legend(fontsize=12)
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
import gc
import matplotlib.pyplot as plt
import pickle
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from torchvision import transforms
import numpy as np

# Load the training metadata
metadata_file = os.path.join(checkpoint_dir, 'training_metadata.pkl')
with open(metadata_file, 'rb') as f:
    training_metadata = pickle.load(f)

# Load the testing metadata
testing_metadata_file = os.path.join(checkpoint_dir, 'testing_metadata.pkl')
with open(testing_metadata_file, 'rb') as f:
    testing_metadata = pickle.load(f)

# Function to evaluate the model and return individual errors
def evaluate_model(model, test_loader, criterion):
    model.eval()
    test_loss = 0.0
    all_real_metadata = []
    all_inferred_metadata = []
    focal_length_errors = []
    fstop_errors = []
    all_images = []
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.cuda().half()
            labels = labels.cuda().float()
            with torch.amp.autocast(device_type='cuda'):
                outputs = model(images)
                loss = criterion(outputs, labels[:, :2])  # Only consider the first two columns for focal length and f-stop
                test_loss += loss.item()
                all_real_metadata.extend(labels.cpu().numpy())
                all_inferred_metadata.extend(outputs.cpu().numpy())
                focal_length_errors.extend(calculate_percentage_error(labels[:, 0].cpu().numpy(), outputs[:, 0].cpu().numpy()))
                fstop_errors.extend(calculate_percentage_error(labels[:, 1].cpu().numpy(), outputs[:, 1].cpu().numpy()))
                all_images.extend(images.cpu())
    test_loss /= len(test_loader)
    print(f'Test Loss: {test_loss:.5f}')

    all_real_metadata = np.array(all_real_metadata)
    all_inferred_metadata = np.array(all_inferred_metadata)
    focal_length_errors = np.array(focal_length_errors)
    fstop_errors = np.array(fstop_errors)

    avg_focal_length_error = np.mean(focal_length_errors)
    avg_fstop_error = np.mean(fstop_errors)

    print(f'Erro percentual médio - FL: {avg_focal_length_error:.2f}%')
    print(f'Erro percentual médio - f/stop: {avg_fstop_error:.2f}%')

    return all_images, all_real_metadata, all_inferred_metadata, focal_length_errors, fstop_errors

# Function to visualize the results in a grid format
def visualize_results(images, real_metadata, inferred_metadata, num_samples=32, title=""):
    num_images = min(num_samples, len(images))
    num_cols = 6  # Number of columns in the subplot grid
    num_rows = (num_images + num_cols - 1) // num_cols  # Calculate number of rows dynamically

    fig, axes = plt.subplots(num_rows, num_cols, figsize=(24, 12 * num_rows // num_cols))
    fig.suptitle(title, fontsize=16, y=1.05)  # Adjust the y parameter to move the title up

    for i in range(num_images):
        image = images[i].numpy().transpose(1, 2, 0)  # Convert to HWC format
        image = image.astype('float32')  # Ensure the image is of type float32

        real_focal_length = denormalize_focal_length(real_metadata[i][0], focal_lenght_min, focal_lenght_max)
        real_fstop = denormalize_fstop(real_metadata[i][1], fstop_min, fstop_max)
        inferred_focal_length = denormalize_focal_length(inferred_metadata[i][0], focal_lenght_min, focal_lenght_max)
        inferred_fstop = denormalize_fstop(inferred_metadata[i][1], fstop_min, fstop_max)

        # Ensure values are within expected ranges
        inferred_focal_length = max(focal_lenght_min, min(focal_lenght_max, inferred_focal_length))
        inferred_fstop = max(fstop_min, min(fstop_max, inferred_fstop))

        # Convert the image tensor to a PIL image
        pil_image = transforms.ToPILImage()(image)

        # Display the image
        row, col = divmod(i, num_cols)
        if num_rows == 1:
            ax = axes[col]
        else:
            ax = axes[row, col]
        ax.imshow(pil_image)
        ax.axis('off')

        # Add labels
        ax.set_title(f"Real: FL={real_focal_length:.0f}mm, f/{real_fstop:.1f}\nInferred: FL={inferred_focal_length:.0f}mm, f/{inferred_fstop:.1f}", fontsize=13)

    # Remove unused subplots
    for i in range(num_images, num_rows * num_cols):
        row, col = divmod(i, num_cols)
        if num_rows == 1:
            fig.delaxes(axes[col])
        else:
            fig.delaxes(axes[row, col])

    # Adjust subplot spacing
    plt.subplots_adjust(wspace=0.05, hspace=0.05)
    plt.tight_layout(rect=[0, 0.00, 1, 2.25]) 
    plt.show()

# Function to load and evaluate the model
def load_and_evaluate_model(checkpoint_path, test_loader):
    # Load the checkpoint
    checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'), weights_only=True)
    model = HighResNetRegressor().cuda()
    model.load_state_dict(checkpoint['model_state_dict'])
    criterion = nn.L1Loss()  # Define the criterion here

    # Evaluate the model
    all_images, all_real_metadata, all_inferred_metadata, focal_length_errors, fstop_errors = evaluate_model(model, test_loader, criterion)

    # Unload the model and optimizer from VRAM
    del model
    torch.cuda.empty_cache()
    gc.collect()
    torch.cuda.reset_peak_memory_stats()

    return all_images, all_real_metadata, all_inferred_metadata, focal_length_errors, fstop_errors

# Extract the data
epochs = training_metadata['epochs']
train_losses = training_metadata['train_losses']
val_losses = training_metadata['val_losses']
test_losses = testing_metadata['test_losses']
train_mape = training_metadata['train_mape']
val_mape = training_metadata['val_mape']
test_mape = testing_metadata['test_mape']
train_msle = training_metadata['train_msle']
val_msle = training_metadata['val_msle']
test_msle = testing_metadata['test_msle']

# Number of initial epochs to skip
skip_epochs = 0  # Adjust this number as needed

# Slice the data to skip the initial epochs
epochs = epochs[skip_epochs:len(epochs)]
train_losses = train_losses[skip_epochs:len(epochs)]
val_losses = val_losses[skip_epochs:len(epochs)]
test_losses = test_losses[skip_epochs:len(epochs)]
train_mape = train_mape[skip_epochs:len(epochs)]
val_mape = val_mape[skip_epochs:len(epochs)]
test_mape = test_mape[skip_epochs:len(epochs)]
train_msle = train_msle[skip_epochs:len(epochs)]
val_msle = val_msle[skip_epochs:len(epochs)]
test_msle = test_msle[skip_epochs:len(epochs)]

# Identify the best epoch based on the lowest test MAPE, Loss, and MSLE
best_epoch_idx_mape = test_mape.index(min(test_mape))
best_epoch_mape = epochs[best_epoch_idx_mape]

best_epoch_idx_loss = test_losses.index(min(test_losses))
best_epoch_loss = epochs[best_epoch_idx_loss]

best_epoch_idx_msle = test_msle.index(min(test_msle))
best_epoch_msle = epochs[best_epoch_idx_msle]

# Load the split indices
with open(split_indices_path, 'rb') as f:
    split_indices = pickle.load(f)

dataset = TensorDataset(tensor_directory)
test_indices = split_indices['test']
test_dataset = Subset(dataset, test_indices)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=0)

# Evaluate the model at the best epoch by MAPE
checkpoint_path_mape = os.path.join(checkpoint_dir, f'checkpoint_epoch_{best_epoch_mape}.pth')
print(f"\nResNet{resnet_version}: \nAvaliando melhor época por MAPE (Teste): {best_epoch_mape}")
all_images_mape, all_real_metadata_mape, all_inferred_metadata_mape, focal_length_errors_mape, fstop_errors_mape = load_and_evaluate_model(checkpoint_path_mape, test_loader)

# Visualize the best and worst results for the best epoch by MAPE
combined_errors_mape = focal_length_errors_mape + fstop_errors_mape
sorted_indices_mape = np.argsort(combined_errors_mape)
best_indices_mape = sorted_indices_mape[:5]
worst_indices_mape = sorted_indices_mape[-5:]
print("\nMelhores resultados por MAPE (Teste):")
visualize_results([all_images_mape[i] for i in best_indices_mape], all_real_metadata_mape[best_indices_mape], all_inferred_metadata_mape[best_indices_mape], num_samples=5)
print("\nPiores resultados por MAPE (Teste):")
visualize_results([all_images_mape[i] for i in worst_indices_mape], all_real_metadata_mape[worst_indices_mape], all_inferred_metadata_mape[worst_indices_mape], num_samples=5)
print("\nResultados sortidos por MAPE (Teste):")    # Visualize assorted results only for the best epoch by MAPE
visualize_results(all_images_mape, all_real_metadata_mape, all_inferred_metadata_mape, num_samples=32)

# Evaluate the model at the best epoch by loss
checkpoint_path_loss = os.path.join(checkpoint_dir, f'checkpoint_epoch_{best_epoch_loss}.pth')
print(f"\nAvaliando melhor época por Loss (Teste): {best_epoch_loss}")
all_images_loss, all_real_metadata_loss, all_inferred_metadata_loss, focal_length_errors_loss, fstop_errors_loss = load_and_evaluate_model(checkpoint_path_loss, test_loader)

# Visualize the best and worst results for the best epoch by Loss
combined_errors_loss = focal_length_errors_loss + fstop_errors_loss
sorted_indices_loss = np.argsort(combined_errors_loss)
best_indices_loss = sorted_indices_loss[:5]
worst_indices_loss = sorted_indices_loss[-5:]
print("\nMelhores resultados por Loss (Teste):")
visualize_results([all_images_loss[i] for i in best_indices_loss], all_real_metadata_loss[best_indices_loss], all_inferred_metadata_loss[best_indices_loss], num_samples=5)
print("\nPiores resultados por Loss (Teste):")
visualize_results([all_images_loss[i] for i in worst_indices_loss], all_real_metadata_loss[worst_indices_loss], all_inferred_metadata_loss[worst_indices_loss], num_samples=5)

# Evaluate the model at the best epoch by MSLE
checkpoint_path_msle = os.path.join(checkpoint_dir, f'checkpoint_epoch_{best_epoch_msle}.pth')
print(f"\nAvaliando melhor época por MSLE (Teste): {best_epoch_msle}")
all_images_msle, all_real_metadata_msle, all_inferred_metadata_msle, focal_length_errors_msle, fstop_errors_msle = load_and_evaluate_model(checkpoint_path_msle, test_loader)

# Visualize the best and worst results for the best epoch by MSLE
combined_errors_msle = focal_length_errors_msle + fstop_errors_msle
sorted_indices_msle = np.argsort(combined_errors_msle)
best_indices_msle = sorted_indices_msle[:5]
worst_indices_msle = sorted_indices_msle[-5:]
print("\nMelhores resultados por MSLE (Teste):")
visualize_results([all_images_msle[i] for i in best_indices_msle], all_real_metadata_msle[best_indices_msle], all_inferred_metadata_msle[best_indices_msle], num_samples=5)
print("\nPiores resultados por MSLE (Teste):")
visualize_results([all_images_msle[i] for i in worst_indices_msle], all_real_metadata_msle[worst_indices_msle], all_inferred_metadata_msle[worst_indices_msle], num_samples=5)

# Evaluate the model at the last epoch found in the folder:
last_epoch = max([int(filename.split('_')[-1].split('.')[0]) for filename in os.listdir(checkpoint_dir) if filename.startswith('checkpoint_epoch_')])
checkpoint_path_last = os.path.join(checkpoint_dir, f'checkpoint_epoch_{last_epoch}.pth')
print(f"\nAvaliando última época encontrada: {last_epoch}")
all_images_last, all_real_metadata_last, all_inferred_metadata_last, focal_length_errors_last, fstop_errors_last = load_and_evaluate_model(checkpoint_path_last, test_loader)

# Visualize the best and worst results for the best epoch by last
combined_errors_last = focal_length_errors_last + fstop_errors_last
sorted_indices_last = np.argsort(combined_errors_last)
best_indices_last = sorted_indices_last[:5]
worst_indices_last = sorted_indices_last[-5:]
print("\nMelhores resultados por last (Teste):")
visualize_results([all_images_last[i] for i in best_indices_last], all_real_metadata_last[best_indices_last], all_inferred_metadata_last[best_indices_last], num_samples=5)
print("\nPiores resultados por last (Teste):")
visualize_results([all_images_last[i] for i in worst_indices_last], all_real_metadata_last[worst_indices_last], all_inferred_metadata_last[worst_indices_last], num_samples=5)



Error trends, residuals, etc.:

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Plot error trends with real 35mm-equivalent values
plt.figure(figsize=(18, 6))

# Plot focal length error vs true focal length
plt.subplot(1, 2, 1)
real_focal_lengths_mape = [denormalize_focal_length(val, focal_lenght_min, focal_lenght_max) for val in all_real_metadata_mape[:, 0]]
plt.scatter(real_focal_lengths_mape, focal_length_errors_mape, alpha=0.5)
plt.xscale('log')
plt.xlabel('True Focal Length (35mm-equiv)')
plt.ylabel('Percentage Error (%)')
plt.title('Focal Length Error Trend')
plt.xticks([16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600], [f"{x}" for x in [16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600]])

# Plot f-stop error vs true f-stop
plt.subplot(1, 2, 2)
real_fstops_mape = [denormalize_fstop(val, fstop_min, fstop_max) for val in all_real_metadata_mape[:, 1]]
plt.scatter(real_fstops_mape, fstop_errors_mape, alpha=0.5)
plt.xscale('log')
plt.xlabel('True f-stop')
plt.ylabel('Percentage Error (%)')
plt.title('f-stop Error Trend')
plt.xticks([2, 2.8, 4, 5.6, 8, 11, 16], [f"f/{f}" for f in [2, 2.8, 4, 5.6, 8, 11, 16]])

plt.tight_layout()
plt.show()

# Calculate residuals
real_focal_lengths_mape = [denormalize_focal_length(val, focal_lenght_min, focal_lenght_max) for val in all_real_metadata_mape[:, 0]]
predicted_focal_lengths_mape = [denormalize_focal_length(val, focal_lenght_min, focal_lenght_max) for val in all_inferred_metadata_mape[:, 0]]
residuals_fl = np.array(real_focal_lengths_mape) - np.array(predicted_focal_lengths_mape)

real_fstops_mape = [denormalize_fstop(val, fstop_min, fstop_max) for val in all_real_metadata_mape[:, 1]]
predicted_fstops_mape = [denormalize_fstop(val, fstop_min, fstop_max) for val in all_inferred_metadata_mape[:, 1]]
residuals_fstop = np.array(real_fstops_mape) - np.array(predicted_fstops_mape)

# Plot residuals vs actual values
plt.figure(figsize=(18, 6))

plt.subplot(1, 2, 1)
plt.scatter(real_focal_lengths_mape, residuals_fl, alpha=0.5)
plt.xlabel('True Focal Length (35mm-equiv)')
plt.ylabel('Residuals')
plt.title('Residuals vs True Focal Length')
plt.xscale('log')
plt.xticks([16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600], [f"{x}" for x in [16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600]])

plt.subplot(1, 2, 2)
plt.scatter(real_fstops_mape, residuals_fstop, alpha=0.5)
plt.xlabel('True f-stop')
plt.ylabel('Residuals')
plt.title('Residuals vs True f-stop')
plt.xscale('log')
plt.xticks([2, 2.8, 4, 5.6, 8, 11, 16], [f"f/{f}" for f in [2, 2.8, 4, 5.6, 8, 11, 16]])

plt.tight_layout()
plt.show()

# Bias Analysis: Mean error for different ranges
def mean_error_in_range(real_values, predicted_values, min_val, max_val):
    real_values = np.array(real_values)
    predicted_values = np.array(predicted_values)
    mask = (real_values >= min_val) & (real_values < max_val)
    if mask.any():  # Check if there are any values in the range
        errors = (predicted_values[mask] - real_values[mask]) / real_values[mask] * 100
        return np.mean(errors)
    else:
        return np.nan  # Return NaN if there are no values in the range

# Define ranges for focal lengths and f-stops
fl_ranges = [(16, 24), (24, 35), (35, 50), (50, 70), (70, 85), (85, 105), (105, 135), (135, 200), (200, 250), (250, 300), (300, 400), (400, 500), (500, 600)]
fstop_ranges = [(2, 2.8), (2.8, 4), (4, 5.6), (5.6, 8), (8, 11), (11, 16)]

# Calculate mean errors for focal lengths
mean_errors_fl = [mean_error_in_range(real_focal_lengths_mape, predicted_focal_lengths_mape, min_val, max_val) for min_val, max_val in fl_ranges]

# Plot mean errors for focal lengths
plt.figure(figsize=(12, 6))
plt.bar(range(len(fl_ranges)), mean_errors_fl, tick_label=[f"{min_val}-{max_val}" for min_val, max_val in fl_ranges])
plt.xlabel('Focal Length Ranges (35mm-equiv)')
plt.ylabel('Mean Percentage Error (%)')
plt.title('Mean Percentage Error for Different Focal Length Ranges')
plt.axhline(y=0, color='black', linewidth=0.5)  # Add a horizontal line at y=0 for reference
plt.show()

# Calculate mean errors for f-stops
mean_errors_fstop = [mean_error_in_range(real_fstops_mape, predicted_fstops_mape, min_val, max_val) for min_val, max_val in fstop_ranges]

# Plot mean errors for f-stops
plt.figure(figsize=(12, 6))
plt.bar(range(len(fstop_ranges)), mean_errors_fstop, tick_label=[f"f/{min_val}-f/{max_val}" for min_val, max_val in fstop_ranges])
plt.xlabel('f-stop Ranges')
plt.ylabel('Mean Percentage Error (%)')
plt.title('Mean Percentage Error for Different f-stop Ranges')
plt.axhline(y=0, color='black', linewidth=0.5)  # Add a horizontal line at y=0 for reference
plt.show()

Compara os modelos de diferentes tamanhos treinados previamente:

In [None]:
import matplotlib.pyplot as plt
import pickle
import os
import numpy as np

# Define the available ResNet versions
resnet_versions = [18, 34, 50, 101, 152]

# Define different markers for each ResNet version
markers = ['o', 'x', '^', 'D', 'v']

# Dictionary to store metadata for each model
model_metadata = {}

# Function to load metadata for a given model
def load_metadata(checkpoint_dir):
    training_metadata_file = os.path.join(checkpoint_dir, 'training_metadata.pkl')
    testing_metadata_file = os.path.join(checkpoint_dir, 'testing_metadata.pkl')

    training_metadata = None
    testing_metadata = None

    if os.path.exists(training_metadata_file):
        with open(training_metadata_file, 'rb') as f:
            training_metadata = pickle.load(f)

    if os.path.exists(testing_metadata_file):
        with open(testing_metadata_file, 'rb') as f:
            testing_metadata = pickle.load(f)

    return training_metadata, testing_metadata

# Load metadata for each available model
for version in resnet_versions:
    checkpoint_dir = f"D:\\\\TEMTC-CN\\\\checkpoints\\\\resnet{version}"
    training_metadata, testing_metadata = load_metadata(checkpoint_dir)
    if training_metadata and testing_metadata:
        model_metadata[version] = {'training': training_metadata, 'testing': testing_metadata}

# Check if any metadata was loaded
if not model_metadata:
    print("No trained models found.")
else:
    # Extract the data for plotting
    epochs = {}
    train_losses = {}
    val_losses = {}
    test_losses = {}
    train_mape = {}
    val_mape = {}
    test_mape = {}
    train_msle = {}
    val_msle = {}
    test_msle = {}
    train_mse = {}
    val_mse = {}
    test_mse = {}

    for version, metadata in model_metadata.items():
        epochs[version] = metadata['training']['epochs']
        train_losses[version] = metadata['training']['train_losses']
        val_losses[version] = metadata['training']['val_losses']
        test_losses[version] = metadata['testing']['test_losses']
        train_mape[version] = metadata['training']['train_mape']
        val_mape[version] = metadata['training']['val_mape']
        test_mape[version] = metadata['testing']['test_mape']
        train_msle[version] = metadata['training']['train_msle']
        val_msle[version] = metadata['training']['val_msle']
        test_msle[version] = metadata['testing']['test_msle']
        train_mse[version] = metadata['training']['train_mse']
        val_mse[version] = metadata['training']['val_mse']
        test_mse[version] = metadata['testing']['test_mse']

        # Apply the workaround to fix the bug of the last epoch not being saved in the testing metadata
        epochs[version] = epochs[version][:len(test_losses[version])]
        train_losses[version] = train_losses[version][:len(test_losses[version])]
        val_losses[version] = val_losses[version][:len(test_losses[version])]
        train_mape[version] = train_mape[version][:len(test_losses[version])]
        val_mape[version] = val_mape[version][:len(test_losses[version])]
        train_msle[version] = train_msle[version][:len(test_losses[version])]
        val_msle[version] = val_msle[version][:len(test_losses[version])]
        train_mse[version] = train_mse[version][:len(test_losses[version])]
        val_mse[version] = val_mse[version][:len(test_losses[version])]

    # Plot the comparative graphs
    plt.figure(figsize=(24, 24))

    # Training Loss Plot
    plt.subplot(3, 2, 1)
    for i, version in enumerate(resnet_versions):
        if version in model_metadata:
            plt.plot(epochs[version], train_losses[version], marker=markers[i], label=f'ResNet{version} Treino', linestyle='-')
            plt.plot(epochs[version], val_losses[version], marker=markers[i], label=f'ResNet{version} Validação', linestyle='--')
    plt.xlabel('Epochs', fontsize=20)
    plt.ylabel('Loss', fontsize=20)
    plt.title('Loss per epoch - training and validation', fontsize=24)
    plt.legend(fontsize=18)
    plt.grid(True)

    # Test Loss Plot
    plt.subplot(3, 2, 2)
    for i, version in enumerate(resnet_versions):
        if version in model_metadata:
            plt.plot(epochs[version], test_losses[version], marker=markers[i], label=f'ResNet{version}')
    plt.xlabel('Epochs', fontsize=20)
    plt.ylabel('Test Loss', fontsize=20)
    plt.title('Test Loss per epoch', fontsize=24)
    plt.legend(fontsize=18)
    plt.grid(True)

    # Training MAPE Plot
    plt.subplot(3, 2, 3)
    for i, version in enumerate(resnet_versions):
        if version in model_metadata:
            plt.plot(epochs[version], train_mape[version], marker=markers[i], label=f'ResNet{version} Treino', linestyle='-')
            plt.plot(epochs[version], val_mape[version], marker=markers[i], label=f'ResNet{version} Validação', linestyle='--')
    plt.xlabel('Epochs', fontsize=20)
    plt.ylabel('MAPE (%)', fontsize=20)
    plt.title('MAPE per epoch - training and validation', fontsize=24)
    plt.legend(fontsize=18)
    plt.grid(True)

    # Test MAPE Plot
    plt.subplot(3, 2, 4)
    for i, version in enumerate(resnet_versions):
        if version in model_metadata:
            plt.plot(epochs[version], test_mape[version], marker=markers[i], label=f'ResNet{version}')
    plt.xlabel('Epochs', fontsize=20)
    plt.ylabel('Test MAPE (%)', fontsize=20)
    plt.title('Test MAPE per epoch', fontsize=24)
    plt.legend(fontsize=18)
    plt.grid(True)

    # Training MSLE Plot
    plt.subplot(3, 2, 5)
    for i, version in enumerate(resnet_versions):
        if version in model_metadata:
            plt.plot(epochs[version], train_msle[version], marker=markers[i], label=f'ResNet{version} Treino', linestyle='-')
            plt.plot(epochs[version], val_msle[version], marker=markers[i], label=f'ResNet{version} Validação', linestyle='--')
    plt.xlabel('Epochs', fontsize=20)
    plt.ylabel('MSLE', fontsize=20)
    plt.title('MSLE per epoch - training and validation', fontsize=24)
    plt.legend(fontsize=18)
    plt.grid(True)

    # Test MSLE Plot
    plt.subplot(3, 2, 6)
    for i, version in enumerate(resnet_versions):
        if version in model_metadata:
            plt.plot(epochs[version], test_msle[version], marker=markers[i], label=f'ResNet{version}')
    plt.xlabel('Epochs', fontsize=20)
    plt.ylabel('Test MSLE', fontsize=20)
    plt.title('Test MSLE per epoch', fontsize=24)
    plt.legend(fontsize=18)
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    # Additional plots for MSE
    plt.figure(figsize=(24, 12))

    # Training MSE Plot
    plt.subplot(1, 2, 1)
    for i, version in enumerate(resnet_versions):
        if version in model_metadata:
            plt.plot(epochs[version], train_mse[version], marker=markers[i], label=f'ResNet{version} Treino', linestyle='-')
            plt.plot(epochs[version], val_mse[version], marker=markers[i], label=f'ResNet{version} Validação', linestyle='--')
    plt.xlabel('Epochs', fontsize=20)
    plt.ylabel('MSE', fontsize=20)
    plt.title('MSE per epoch - training and validation', fontsize=24)
    plt.legend(fontsize=18)
    plt.grid(True)

    # Test MSE Plot
    plt.subplot(1, 2, 2)
    for i, version in enumerate(resnet_versions):
        if version in model_metadata:
            plt.plot(epochs[version], test_mse[version], marker=markers[i], label=f'ResNet{version}')
    plt.xlabel('Epochs', fontsize=20)
    plt.ylabel('Test MSE', fontsize=20)
    plt.title('Test MSE per epoch', fontsize=24)
    plt.legend(fontsize=18)
    plt.grid(True)

    plt.tight_layout()
    plt.show()

Teste de (in)sanidade: verifica desempenho inicial do modelo com apenas ImageNet e sem treinamento algum:

In [None]:
import gc
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from torchvision import transforms
import pickle
import os

# Load the split indices
with open(split_indices_path, 'rb') as f:
    split_indices = pickle.load(f)

dataset = TensorDataset(tensor_directory)
test_indices = split_indices['test']
test_dataset = Subset(dataset, test_indices)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=0)

# Function to evaluate the model and return individual errors
def evaluate_model(model, test_loader, criterion):
    model.eval()
    test_loss = 0.0
    all_real_metadata = []
    all_inferred_metadata = []
    focal_length_errors = []
    fstop_errors = []
    all_images = []
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.cuda().half()
            labels = labels.cuda().float()
            with torch.amp.autocast(device_type='cuda'):
                outputs = model(images)
                loss = criterion(outputs, labels[:, :2])  # Only consider the first two columns for focal length and f-stop
                test_loss += loss.item()
                all_real_metadata.extend(labels.cpu().numpy())
                all_inferred_metadata.extend(outputs.cpu().numpy())
                focal_length_errors.extend(calculate_percentage_error(labels[:, 0].cpu().numpy(), outputs[:, 0].cpu().numpy()))
                fstop_errors.extend(calculate_percentage_error(labels[:, 1].cpu().numpy(), outputs[:, 1].cpu().numpy()))
                all_images.extend(images.cpu())
    test_loss /= len(test_loader)
    print(f'Test Loss: {test_loss:.5f}')

    all_real_metadata = np.array(all_real_metadata)
    all_inferred_metadata = np.array(all_inferred_metadata)
    focal_length_errors = np.array(focal_length_errors)
    fstop_errors = np.array(fstop_errors)

    avg_focal_length_error = np.mean(focal_length_errors)
    avg_fstop_error = np.mean(fstop_errors)

    print(f'Erro percentual médio - FL: {avg_focal_length_error:.2f}%')
    print(f'Erro percentual médio - f/stop: {avg_fstop_error:.2f}%')

    return all_images, all_real_metadata, all_inferred_metadata, focal_length_errors, fstop_errors

# Evaluate the pre-finetune model (ImageNet weights but not fine-tuned)
print("\nResNet{}: Avaliando modelo apenas com pesos ImageNet (pré-finetune)".format(resnet_version))
model_prefinetune = HighResNetRegressor().cuda()
criterion = nn.L1Loss()
all_images_prefinetune, all_real_metadata_prefinetune, all_inferred_metadata_prefinetune, focal_length_errors_prefinetune, fstop_errors_prefinetune = evaluate_model(model_prefinetune, test_loader, criterion)

# Unload the pre-finetune model from VRAM
del model_prefinetune
torch.cuda.empty_cache()
gc.collect()
torch.cuda.reset_peak_memory_stats()

# Evaluate the fully untrained model (random weights)
print("\nResNet{}: Avaliando modelo completamente não treinado (pesos aleatórios)".format(resnet_version))
selected_resnet_model, _ = resnet_models[resnet_version]
selected_resnet = selected_resnet_model(weights=None)  # No weights, random initialization
model_untrained = HighResNetRegressor().cuda()
all_images_untrained, all_real_metadata_untrained, all_inferred_metadata_untrained, focal_length_errors_untrained, fstop_errors_untrained = evaluate_model(model_untrained, test_loader, criterion)

# Unload the untrained model from VRAM
del model_untrained
torch.cuda.empty_cache()
gc.collect()
torch.cuda.reset_peak_memory_stats()

# Visualize the best and worst results for the pre-finetune model
combined_errors_prefinetune = focal_length_errors_prefinetune + fstop_errors_prefinetune
sorted_indices_prefinetune = np.argsort(combined_errors_prefinetune)
best_indices_prefinetune = sorted_indices_prefinetune[:5]
worst_indices_prefinetune = sorted_indices_prefinetune[-5:]
print("\nMelhores resultados pré-finetune:")
visualize_results([all_images_prefinetune[i] for i in best_indices_prefinetune], all_real_metadata_prefinetune[best_indices_prefinetune], all_inferred_metadata_prefinetune[best_indices_prefinetune], num_samples=5)
print("\nPiores resultados pré-finetune:")
visualize_results([all_images_prefinetune[i] for i in worst_indices_prefinetune], all_real_metadata_prefinetune[worst_indices_prefinetune], all_inferred_metadata_prefinetune[worst_indices_prefinetune], num_samples=5)


# Visualize the best and worst results for the fully untrained model
combined_errors_untrained = focal_length_errors_untrained + fstop_errors_untrained
sorted_indices_untrained = np.argsort(combined_errors_untrained)
best_indices_untrained = sorted_indices_untrained[:5]
worst_indices_untrained = sorted_indices_untrained[-5:]
print("\nMelhores resultados não treinados:")
visualize_results([all_images_untrained[i] for i in best_indices_untrained], all_real_metadata_untrained[best_indices_untrained], all_inferred_metadata_untrained[best_indices_untrained], num_samples=5)
print("\nPiores resultados não treinados:")
visualize_results([all_images_untrained[i] for i in worst_indices_untrained], all_real_metadata_untrained[worst_indices_untrained], all_inferred_metadata_untrained[worst_indices_untrained], num_samples=5)


# Plot error trends with real 35mm-equivalent values for pre-finetune model
plt.figure(figsize=(18, 6))

# Plot focal length error vs true focal length
plt.subplot(1, 2, 1)
real_focal_lengths_prefinetune = [denormalize_focal_length(val, focal_lenght_min, focal_lenght_max) for val in all_real_metadata_prefinetune[:, 0]]
plt.scatter(real_focal_lengths_prefinetune, focal_length_errors_prefinetune, alpha=0.5)
plt.xscale('log')
plt.xlabel('True Focal Length (35mm-equiv)')
plt.ylabel('Percentage Error (%)')
plt.title('Focal Length Error Trend (pré-finetune)')
plt.xticks([16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600], [f"{x}" for x in [16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600]])

# Plot f-stop error vs true f-stop
plt.subplot(1, 2, 2)
real_fstops_prefinetune = [denormalize_fstop(val, fstop_min, fstop_max) for val in all_real_metadata_prefinetune[:, 1]]
plt.scatter(real_fstops_prefinetune, fstop_errors_prefinetune, alpha=0.5)
plt.xscale('log')
plt.xlabel('True f-stop')
plt.ylabel('Percentage Error (%)')
plt.title('f-stop Error Trend (pré-finetune)')
plt.xticks([2, 2.8, 4, 5.6, 8, 11, 16, 22, 32], [f"f/{f}" for f in [2, 2.8, 4, 5.6, 8, 11, 16, 22, 32]])

plt.tight_layout()
plt.show()

# Plot error trends with real 35mm-equivalent values for fully untrained model
plt.figure(figsize=(18, 6))

# Plot focal length error vs true focal length
plt.subplot(1, 2, 1)
real_focal_lengths_untrained = [denormalize_focal_length(val, focal_lenght_min, focal_lenght_max) for val in all_real_metadata_untrained[:, 0]]
plt.scatter(real_focal_lengths_untrained, focal_length_errors_untrained, alpha=0.5)
plt.xscale('log')
plt.xlabel('True Focal Length (35mm-equiv)')
plt.ylabel('Percentage Error (%)')
plt.title('Focal Length Error Trend (não treinado)')
plt.xticks([16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600], [f"{x}" for x in [16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600]])

# Plot f-stop error vs true f-stop
plt.subplot(1, 2, 2)
real_fstops_untrained = [denormalize_fstop(val, fstop_min, fstop_max) for val in all_real_metadata_untrained[:, 1]]
plt.scatter(real_fstops_untrained, fstop_errors_untrained, alpha=0.5)
plt.xscale('log')
plt.xlabel('True f-stop')
plt.ylabel('Percentage Error (%)')
plt.title('f-stop Error Trend (não treinado)')
plt.xticks([2, 2.8, 4, 5.6, 8, 11, 16, 22, 32], [f"f/{f}" for f in [2, 2.8, 4, 5.6, 8, 11, 16, 22, 32]])

plt.tight_layout()
plt.show()

# Calculate residuals for pre-finetune model
predicted_focal_lengths_prefinetune = [denormalize_focal_length(val, focal_lenght_min, focal_lenght_max) for val in all_inferred_metadata_prefinetune[:, 0]]
residuals_fl_prefinetune = np.array(real_focal_lengths_prefinetune) - np.array(predicted_focal_lengths_prefinetune)

predicted_fstops_prefinetune = [denormalize_fstop(val, fstop_min, fstop_max) for val in all_inferred_metadata_prefinetune[:, 1]]
residuals_fstop_prefinetune = np.array(real_fstops_prefinetune) - np.array(predicted_fstops_prefinetune)

# Plot residuals vs actual values for pre-finetune model
plt.figure(figsize=(18, 6))

plt.subplot(1, 2, 1)
plt.scatter(real_focal_lengths_prefinetune, residuals_fl_prefinetune, alpha=0.5)
plt.xlabel('True Focal Length (35mm-equiv)')
plt.ylabel('Residuals')
plt.title('Residuals vs True Focal Length (pré-finetune)')
plt.xscale('log')
plt.xticks([16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600], [f"{x}" for x in [16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600]])

plt.subplot(1, 2, 2)
plt.scatter(real_fstops_prefinetune, residuals_fstop_prefinetune, alpha=0.5)
plt.xlabel('True f-stop')
plt.ylabel('Residuals')
plt.title('Residuals vs True f-stop (pré-finetune)')
plt.xscale('log')
plt.xticks([2, 2.8, 4, 5.6, 8, 11, 16, 22, 32], [f"f/{f}" for f in [2, 2.8, 4, 5.6, 8, 11, 16, 22, 32]])

plt.tight_layout()
plt.show()

# Calculate residuals for fully untrained model
predicted_focal_lengths_untrained = [denormalize_focal_length(val, focal_lenght_min, focal_lenght_max) for val in all_inferred_metadata_untrained[:, 0]]
residuals_fl_untrained = np.array(real_focal_lengths_untrained) - np.array(predicted_focal_lengths_untrained)

predicted_fstops_untrained = [denormalize_fstop(val, fstop_min, fstop_max) for val in all_inferred_metadata_untrained[:, 1]]
residuals_fstop_untrained = np.array(real_fstops_untrained) - np.array(predicted_fstops_untrained)

# Plot residuals vs actual values for fully untrained model
plt.figure(figsize=(18, 6))

plt.subplot(1, 2, 1)
plt.scatter(real_focal_lengths_untrained, residuals_fl_untrained, alpha=0.5)
plt.xlabel('True Focal Length (35mm-equiv)')
plt.ylabel('Residuals')
plt.title('Residuals vs True Focal Length (não treinado)')
plt.xscale('log')
plt.xticks([16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600], [f"{x}" for x in [16, 20, 28, 35, 50, 70, 85, 105, 135, 200, 250, 300, 400, 500, 600]])

plt.subplot(1, 2, 2)
plt.scatter(real_fstops_untrained, residuals_fstop_untrained, alpha=0.5)
plt.xlabel('True f-stop')
plt.ylabel('Residuals')
plt.title('Residuals vs True f-stop (não treinado)')
plt.xscale('log')
plt.xticks([2, 2.8, 4, 5.6, 8, 11, 16, 22, 32], [f"f/{f}" for f in [2, 2.8, 4, 5.6, 8, 11, 16, 22, 32]])

plt.tight_layout()
plt.show()

# Bias Analysis: Mean error for different ranges
def mean_error_in_range(real_values, predicted_values, min_val, max_val):
    real_values = np.array(real_values)
    predicted_values = np.array(predicted_values)
    mask = (real_values >= min_val) & (real_values < max_val)
    errors = predicted_values[mask] - real_values[mask] 
    return np.mean(errors)

# Define ranges for focal lengths and f-stops
fl_ranges = [(16, 20), (20, 35), (35, 50), (50, 70), (70, 85), (85, 105), (105, 135), (135, 200), (200, 250), (250, 300), (300, 400), (400, 500), (500, 600)]
fstop_ranges = [(2, 2.8), (2.8, 4), (4, 5.6), (5.6, 8), (8, 11), (11, 16), (16, 22), (22, 32)]

# Calculate mean errors for focal lengths
mean_errors_fl = [mean_error_in_range(real_focal_lengths_mape, predicted_focal_lengths_mape, min_val, max_val) for min_val, max_val in fl_ranges]

# Bias Analysis: Mean error for different ranges for pre-finetune model
mean_errors_fl_prefinetune = [mean_error_in_range(real_focal_lengths_prefinetune, predicted_focal_lengths_prefinetune, min_val, max_val) for min_val, max_val in fl_ranges]
mean_errors_fstop_prefinetune = [mean_error_in_range(real_fstops_prefinetune, predicted_fstops_prefinetune, min_val, max_val) for min_val, max_val in fstop_ranges]

# Plot mean errors for focal lengths for pre-finetune model
plt.figure(figsize=(12, 6))
plt.bar(range(len(fl_ranges)), mean_errors_fl_prefinetune, tick_label=[f"{min_val}-{max_val}" for min_val, max_val in fl_ranges])
plt.xlabel('Focal Length Ranges (35mm-equiv)')
plt.ylabel('Mean Error')
plt.title('Mean Error for Different Focal Length Ranges (pré-finetune)')
plt.axhline(y=0, color='black', linewidth=0.5)  # Add a horizontal line at y=0 for reference
plt.show()

# Plot mean errors for f-stops for pre-finetune model
plt.figure(figsize=(12, 6))
plt.bar(range(len(fstop_ranges)), mean_errors_fstop_prefinetune, tick_label=[f"f/{min_val}-f/{max_val}" for min_val, max_val in fstop_ranges])
plt.xlabel('f-stop Ranges')
plt.ylabel('Mean Error')
plt.title('Mean Error for Different f-stop Ranges (pré-finetune)')
plt.show()

# Bias Analysis: Mean error for different ranges for fully untrained model
mean_errors_fl_untrained = [mean_error_in_range(real_focal_lengths_untrained, predicted_focal_lengths_untrained, min_val, max_val) for min_val, max_val in fl_ranges]
mean_errors_fstop_untrained = [mean_error_in_range(real_fstops_untrained, predicted_fstops_untrained, min_val, max_val) for min_val, max_val in fstop_ranges]

# Plot mean errors for focal lengths for fully untrained model
plt.figure(figsize=(12, 6))
plt.bar(range(len(fl_ranges)), mean_errors_fl_untrained, tick_label=[f"{min_val}-{max_val}" for min_val, max_val in fl_ranges])
plt.xlabel('Focal Length Ranges (35mm-equiv)')
plt.ylabel('Mean Error')
plt.title('Mean Error for Different Focal Length Ranges (não treinado)')
plt.axhline(y=0, color='black', linewidth=0.5)  # Add a horizontal line at y=0 for reference
plt.show()

# Plot mean errors for f-stops for fully untrained model
plt.figure(figsize=(12, 6))
plt.bar(range(len(fstop_ranges)), mean_errors_fstop_untrained, tick_label=[f"f/{min_val}-f/{max_val}" for min_val, max_val in fstop_ranges])
plt.xlabel('f-stop Ranges')
plt.ylabel('Mean Error')
plt.title('Mean Error for Different f-stop Ranges (não treinado)')
plt.show()

print("Modelos pré-finetune e não treinados descarregados da VRAM.")