In [98]:
import torch

# load data and split into train and test sets
from load_data import sim_arr
from sklearn.model_selection import train_test_split, KFold

# reshape from batch, height, width, channel, to batch, channel, height, width
sim_arr_transformed = sim_arr.reshape(
    sim_arr.shape[0], sim_arr.shape[3], sim_arr.shape[1], sim_arr.shape[2])
train_set, test_set = train_test_split(
    sim_arr_transformed, test_size=0.2, random_state=42)

# convert to tensor
train_set = torch.tensor(train_set, dtype=torch.float32)
test_set = torch.tensor(test_set, dtype=torch.float32)

In [99]:
import torch
import torch.nn as nn
from sklearn.model_selection import KFold
from sklearn.cluster import DBSCAN


class Encoder(nn.Module):
    """Encoder class that inherits from PyTorch's nn.Module class."""

    def __init__(self,
                 num_layers,
                 poolsize,
                 channels,
                 kernel_sizes,
                 dilations,
                 paddings,
                 activations):
        super().__init__()
        self.layers = nn.ModuleList()
        for i in range(num_layers):
            self.layers.append(nn.Conv2d(
                in_channels=channels[i],
                out_channels=channels[i + 1],
                kernel_size=kernel_sizes[i],
                padding=paddings[i],
                dilation=dilations[i]))
            self.layers.append(activations[i]())
            self.layers.append(nn.MaxPool2d(poolsize[i], ceil_mode=True))

    def forward(self, x):
        """Forward pass through the encoder."""
        for layer in self.layers:
            print(x.shape)
            print(layer)
            x = layer(x)
            print(x.shape)
        return x


class Decoder(nn.Module):
    """Decoder class that mirrors the structure of the Encoder using convolution transpose."""

    def __init__(self,
                 num_layers,
                 poolsizes,
                 channels,
                 kernel_sizes,
                 dilations,
                 paddings,
                 activations):
        super().__init__()

        poolsizes.reverse()
        channels.reverse()
        kernel_sizes.reverse()
        dilations.reverse()
        paddings.reverse()
        activations.reverse()

        self.layers = nn.ModuleList()
        for i in range(num_layers-1):
            self.layers.append(nn.ConvTranspose2d(
                in_channels=channels[i],
                out_channels=channels[i + 1],
                kernel_size=kernel_sizes[i],
                padding=paddings[i],
                dilation=dilations[i],
                stride=poolsizes[i]))
            self.layers.append(activations[i]())
        self.layers.append(nn.ConvTranspose2d(in_channels=channels[-2],
                                              out_channels=channels[-1],
                                              kernel_size=kernel_sizes[-1],
                                              padding=paddings[-1],
                                              dilation=dilations[-1],
                                              stride=poolsizes[-1]))

    def forward(self, x):
        """Forward pass through the decoder."""
        for layer in self.layers:
            x = layer(x)
        return x
    

class Autoencoder(nn.Module):
    """Autoencoder class that inherits from PyTorch's nn.Module class."""

    def __init__(self,
                 num_layers,
                 poolsize,
                 channels,
                 kernel_sizes,
                 dilations,
                 paddings,
                 activations,
                 epochs,
                 batch_size,
                 learning_rate,
                 data):
        super().__init__()
        # Encoder
        self.encoder = Encoder(num_layers,
                               poolsize,
                               channels,
                               kernel_sizes,
                               dilations,
                               paddings,
                               activations)
        # Decoder
        self.decoder = Decoder(num_layers,
                               poolsize,
                               channels,
                               kernel_sizes,
                               dilations,
                               paddings,
                               activations)

        self.epochs = epochs
        self.batch_size = batch_size
        self.learning_rate = learning_rate

        # data and dataloader
        self.data = data

        # optimizer
        self.optimizer = torch.optim.Adam(
            self.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()

    def forward(self, x):
        """Forward pass through the autoencoder."""
        x = self.encoder(x)
        x = self.decoder(x)
        return x

    def _get_reconstruction_loss(self, batch):
        """Compute the reconstruction loss."""
        x = batch
        x_hat = self.forward(x)
        loss = self.criterion(x_hat, x)
        return loss

    def train_model(self, dataloader, device=torch.device('cuda:0')):
        """Train the autoencoder."""
        self.to(device)
        self.train()
        for epoch in range(self.epochs):
            running_loss = 0.0
            for batch in dataloader:
                batch = batch.to(device)
                self.optimizer.zero_grad()
                loss = self._get_reconstruction_loss(batch)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item() * batch.size(0)
            #epoch_loss = running_loss / len(dataloader)
            #print(f"Epoch {epoch+1}/{self.epochs} Loss: {epoch_loss:.4f}")

    def evaluate_model(self, dataloader, device=torch.device('cuda:0')):
        """Evaluate the autoencoder."""
        self.to(device)
        self.eval()
        total_loss = 0.0
        with torch.no_grad():
            for batch in dataloader:
                batch = batch.to(device)
                loss = self._get_reconstruction_loss(batch)
                total_loss += loss.item() * batch.size(0)
        avg_loss = total_loss / len(dataloader)
        #print(f"Validation Loss: {avg_loss:.4f}")
        return avg_loss

    def cross_val(self, n_splits=5):
        """Perform cross-validation on the autoencoder."""
        kf = KFold(n_splits=n_splits, shuffle=True)
        val_losses = []
        for fold, (train_index, val_index) in enumerate(kf.split(self.data)):
            print(f"Fold {fold+1}/{n_splits}")
            train_sampler = torch.utils.data.SubsetRandomSampler(train_index)
            val_sampler = torch.utils.data.SubsetRandomSampler(val_index)

            train_loader = torch.utils.data.DataLoader(
                self.data, sampler=train_sampler)
            val_loader = torch.utils.data.DataLoader(
                self.data, sampler=val_sampler)

            self.train_model(train_loader)
            val_loss = self.evaluate_model(val_loader)
            val_losses.append(val_loss)
        return val_losses


In [100]:
def search_space(trial, input_dim, output_dim):
    """ define the hyperparameter search space."""
    num_layers = trial.suggest_int('num_layers', 2, 5)
    if num_layers == 2:
        poolsize = trial.suggest_categorical('poolsize_2', [[5, 16], [16, 5],
                                                            [8, 10], [10, 8],
                                                            [4, 20], [20, 4],
                                                            [2, 40], [40, 2]])
    elif num_layers == 3:
        poolsize = trial.suggest_categorical(
            'poolsize_3', [[2, 2, 20], [2, 20, 2], [20, 2, 2],
                           [4, 4, 5], [4, 5, 4], [5, 4, 4],
                           [2, 5, 8], [2, 8, 5], [5, 2, 8],
                           [5, 8, 2], [8, 2, 5], [8, 5, 2],
                           [2, 4, 10], [2, 10, 4], [4, 2, 10],
                           [4, 10, 2], [10, 2, 4], [10, 4, 2]])
    elif num_layers == 4:
        poolsize = trial.suggest_categorical(
            'poolsize_4', [[2, 2, 2, 10], [2, 2, 10, 2], [2, 10, 2, 2], [10, 2, 2, 2],
                           [2, 2, 4, 5], [2, 2, 5, 4], [2, 4, 2, 5],
                           [2, 4, 5, 2], [2, 5, 2, 4], [2, 5, 4, 2],
                           [4, 2, 2, 5], [4, 2, 5, 2], [4, 5, 2, 2],
                           [5, 2, 2, 4], [5, 2, 4, 2], [5, 4, 2, 2]])
    elif num_layers == 5:
        poolsize = trial.suggest_categorical(
            'poolsize_5', [[2, 2, 2, 2, 5], [2, 2, 2, 5, 2], [2, 2, 5, 2, 2],
                           [2, 5, 2, 2, 2], [5, 2, 2, 2, 2]])

    channels = [input_dim,]
    for i in range(num_layers - 1):
        channels.append(trial.suggest_int(f'channels_{i}', 1, 12))
    channels.append(output_dim)

    kernel_sizes = [trial.suggest_int(
        f'kernel_size_{i}', 2, 24) for i in range(num_layers)]

    dilations = [trial.suggest_categorical(
        f'dilation_{i}', [1, 2, 4]) for i in range(num_layers)]

    paddings = [x * (y - 1)//2 for x, y in zip(dilations, kernel_sizes)]

    activations = [trial.suggest_categorical(
        f'activation_{i}', ['nn.Softplus',
                            'nn.SELU',
                            'nn.SiLU',
                            'nn.Tanh']) for i in range(num_layers)]

    return [num_layers, poolsize, channels, kernel_sizes, dilations, paddings, activations]


In [101]:
import optuna
import numpy as np

def objective(trial):
    # Define the hyperparameter search space
    num_layers, poolsize, channels, kernel_sizes, dilations, paddings, activations = search_space(
        trial, input_dim=3, output_dim=3)
    activations = [eval(activation) for activation in activations]
    
    # Define the autoencoder
    autoencoder = Autoencoder(num_layers=num_layers,
                              poolsize=poolsize,
                              channels=channels,
                              kernel_sizes=kernel_sizes,
                              dilations=dilations,
                              paddings=paddings,
                              activations=activations,
                              epochs=10,
                              batch_size=32,
                              learning_rate=1e-3,
                              data=train_set)
    
    print('fin')

    return 0

study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10)

[I 2024-05-02 02:14:41,764] A new study created in memory with name: no-name-70850752-0b40-461e-bfcf-ba49f93b2701
[I 2024-05-02 02:14:41,769] Trial 0 finished with value: 0.0 and parameters: {'num_layers': 5, 'poolsize_5': [2, 2, 2, 5, 2], 'channels_0': 9, 'channels_1': 9, 'channels_2': 4, 'channels_3': 10, 'kernel_size_0': 17, 'kernel_size_1': 7, 'kernel_size_2': 16, 'kernel_size_3': 20, 'kernel_size_4': 2, 'dilation_0': 1, 'dilation_1': 1, 'dilation_2': 1, 'dilation_3': 4, 'dilation_4': 4, 'activation_0': 'nn.Softplus', 'activation_1': 'nn.Softplus', 'activation_2': 'nn.Softplus', 'activation_3': 'nn.SiLU', 'activation_4': 'nn.SELU'}. Best is trial 0 with value: 0.0.
[I 2024-05-02 02:14:41,772] Trial 1 finished with value: 0.0 and parameters: {'num_layers': 3, 'poolsize_3': [2, 10, 4], 'channels_0': 5, 'channels_1': 2, 'kernel_size_0': 20, 'kernel_size_1': 16, 'kernel_size_2': 19, 'dilation_0': 4, 'dilation_1': 1, 'dilation_2': 1, 'activation_0': 'nn.Softplus', 'activation_1': 'nn.Si

fin
fin


ValueError: CategoricalDistribution does not support dynamic value space.