In [10]:
%%capture 
!pip install -r requirements.txt

In [11]:
import torch
print('Training will be done on the ' + 'GPU' if torch.cuda.is_available() else 'CPU')

Training will be done on the GPU


In [12]:
from enum import Enum, StrEnum

class Scaling(Enum):
    INDEPENDENT = 1
    JOINT = 2

class DatasetType(StrEnum):
    TRAIN = 'trn'
    TEST = 'tst'
    VALIDATION = 'trn'

# Global variable to enable debug mode
DEBUG = True

#### Data loading & pre-processing functions

In [13]:
import pandas as pd
import numpy as np
from sklearn import preprocessing


# *Helper function to preprocess the RSSI data
def preprocess_rssi_data(df_rssi: pd.DataFrame, scaling_strategy: Scaling) -> pd.DataFrame:
    """
    This function preprocesses the training data by:
    1. Replacing all 100 values with -110 (ensures continuity of data)
    2. Separating the RSS values from the labels
    3. Scaling the data to have zero mean and unit variance

    Parameters:
    - train: The training data to be preprocessed
    - scaling_strategy: The scaling strategy to be used (INDEPENDENT or JOINT)
    """
    
    # 1. replace all 100 values with -110 (ensures continuity of data)
    df = df_rssi.replace(100, -110)
    
    # 2. Separate the RSS values from the labels
    rssiValues = df.iloc[:, :-3]
    labels = df.iloc[:, -3:]
    
    # 3. Scale the data to have zero mean and unit variance
    # This is done either independently for each AP or jointly for all APs
    if scaling_strategy == Scaling.INDEPENDENT:
        scaler = preprocessing.StandardScaler()

        scaled_rss = scaler.fit_transform(rssiValues)
        df_scaled_rss = pd.DataFrame(scaled_rss, columns=rssiValues.columns)
        df = pd.concat([df_scaled_rss, labels], axis=1)
    
    elif scaling_strategy == Scaling.JOINT:
        flattened = rssiValues.values.flatten()
        global_mean = np.mean(flattened)
        global_std = np.std(flattened)
        
        scaled_rss = (rssiValues - global_mean) / global_std
        df = pd.concat([scaled_rss, labels], axis=1)
        df = df.reset_index(drop=True)
    
    else: 
        raise NotImplementedError("Specified scaling strategy is not implemented, use either Scaling.INDEPENDENT or Scaling.JOINT.")
    
    return df

# # *Load and pre-process the training data
# def get_preprocessed_training_data(data_path: str, training_months: list[str], num_APs: int, scaling_strategy: Scaling, floor: int) -> pd.DataFrame:
#     """
#     This function loads and preprocesses the training data from the specified training months and floor.

#     Parameters:
#     - data_path: The path to the data
#     - training_months: The list of training months to be used
#     - num_APs: The number of access points
#     - scaling_strategy: The scaling strategy to be used (INDEPENDENT or JOINT)
#     - floor: The floor to be used
#     """
#     # Since the csv files do not have column names, we define these first.
#     list_of_APs = ["AP" + str(i) for i in range(0, num_APs)]

#     # Load the training data from all specified training sets.  
#     df_rss = pd.concat([pd.read_csv(data_path + training_set + 'trn01rss.csv', names=list_of_APs) for training_set in training_months])
#     df_rss = df_rss.reset_index(drop=True)
    
#     # Get all x,y,floor labels (gotten from data_path + training_month + 'trn01crd.csv')
#     df_labels = pd.concat([pd.read_csv(data_path + training_set + 'trn01crd.csv', names=['x', 'y', 'floor']) for training_set in training_months])
#     df_labels = df_labels.reset_index(drop=True)

#     # Add the labels to the pre-processed data
#     df_labeled = pd.concat([df_rss, df_labels], axis=1)
    
#     # Filter the data to only include the specified floor
#     df_labeled = df_labeled[df_labeled['floor'] == floor]

#     # Pre-processing of the training data
#     df_train = preprocess_rssi_data(df_labeled, scaling_strategy)
    
#     return df_train

# *Load and pre-process the data
def get_preprocessed_dataset(data_path: str, months: list[str], sets: list[str], type: DatasetType, num_APs: int, scaling_strategy: Scaling, floor: int) -> pd.DataFrame:
    """
    This function loads and preprocesses the training data from the specified training months and floor.

    Parameters:
    - data_path: The path to the data
    - months: The list of months to be used
    - sets: The list of set numbers to be used
    - type: The type of dataset to be made (TRAIN, TEST or VALIDATION)
    - num_APs: The number of access points
    - scaling_strategy: The scaling strategy to be used (INDEPENDENT or JOINT)
    - floor: The floor to be used
    """
    # Since the csv files do not have column names, we define these first.
    list_of_APs = ["AP" + str(i) for i in range(0, num_APs)]

    # Load the test data from all specified test sets.  
    df_test_rss = pd.concat([pd.read_csv(data_path + month + '/' + type + set + 'rss.csv', names=list_of_APs) for month in months for set in sets])
    df_test_rss = df_test_rss.reset_index(drop=True)
    
    # Get all x,y,floor labels
    df_test_labels = pd.concat([pd.read_csv(data_path + month + '/' + type + set + 'crd.csv', names=['x', 'y', 'floor']) for month in months for set in sets])
    df_test_labels = df_test_labels.reset_index(drop=True)

    # Add the labels to the pre-processed data
    df_test_labeled = pd.concat([df_test_rss, df_test_labels], axis=1)
    
    # Filter the data to only include the specified floor
    df_test_labeled = df_test_labeled[df_test_labeled['floor'] == floor]

    # Pre-processing of the training data
    df_test = preprocess_rssi_data(df_test_labeled, scaling_strategy)
    
    return df_test
    

#### SETUP

In [14]:
data_path = './data/'
training_months = ['01', '02', '03', '04', '05']
sets = ['01']
type = DatasetType.TRAIN
num_APs = 620
scaling_strategy = Scaling.JOINT
floor = 3


df_train_full = get_preprocessed_dataset(data_path, training_months, sets, type, num_APs, scaling_strategy, floor)
df_train_x = df_train_full.iloc[:, :-3] # Just the RSSI values
df_train_y = df_train_full.iloc[:, -3:-1] # Just the x and y coordinates (no floor)

if DEBUG: print('df_train_full:', df_train_full.shape)


df_train_full: (1440, 623)


In [15]:
months = ['01', '02', '03', '04', '05']
sets = ['01'] # 01 Corresponds to the same locations as the training set
type = DatasetType.TEST

df_test_full = get_preprocessed_dataset(data_path, months, sets, type, num_APs, scaling_strategy, floor)
df_test_x = df_test_full.iloc[:, :-3] # Just the RSSI values
df_test_y = df_test_full.iloc[:, -3:-1] # Just the x and y coordinates (no floor)

if DEBUG: print('df_test_full:', df_test_full.shape)

df_test_full: (1440, 623)


In [16]:
months = ['01']
sets = ['02', '03', '04']
type = DatasetType.VALIDATION

df_val_full = get_preprocessed_dataset(data_path, months, sets, type, num_APs, scaling_strategy, floor)
df_val_x = df_val_full.iloc[:, :-3] # Just the RSSI values
df_val_y = df_val_full.iloc[:, -3:-1] # Just the x and y coordinates (no floor)

if DEBUG: print('df_val_full:', df_val_full.shape)

df_val_full: (864, 623)


In [17]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# Since the implementations will be made in PyTorch, we convert the data to PyTorch tensors
X_train_tensor = torch.tensor(df_train_x.values, dtype=torch.float32)
y_train_tensor = torch.tensor(df_train_y.values, dtype=torch.float32)
X_test_tensor = torch.tensor(df_test_x.values, dtype=torch.float32)
y_test_tensor = torch.tensor(df_test_y.values, dtype=torch.float32)
X_val_tensor = torch.tensor(df_val_x.values, dtype=torch.float32)
y_val_tensor = torch.tensor(df_val_y.values, dtype=torch.float32)

# Get the data via DataLoaders
t_training = TensorDataset(X_train_tensor, y_train_tensor)
t_test = TensorDataset(X_test_tensor, y_test_tensor)
t_val = TensorDataset(X_val_tensor, y_val_tensor)

# train_loader = DataLoader(t_training, batch_size=16, shuffle=True)
# test_loader = DataLoader(t_test, batch_size=16, shuffle=True)
# val_loader = DataLoader(t_val, batch_size=16, shuffle=True)



# MultiLayer Perceptrons

#### Full-Input MLP
This network takes the full input of 620 features to perform x,y predictions.

In [18]:
import torch.nn as nn

class MLP(nn.Module):
    def __init__(self, hidden_layer_sizes, dropout_rate, input_dim=620):
        super(MLP, self).__init__()
        
        layers = []
        
        # Make it easier to grid-search different sizes of hidden layers
        for hidden_dim in hidden_layer_sizes:
            layers.append(nn.Linear(input_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            input_dim = hidden_dim # Ensure all hidden layers are constant size
        
        # At this point we know input_dim equals the output size of the last hidden layer, so we can re-use it here.
        layers.append(nn.Linear(input_dim, 2)) # x,y output
        
        # Construct the actual model based on the layers defined above.
        self.model = nn.Sequential(*layers)
        
    def forward(self, x):
        return self.model(x)
    

def train_MLP(model, train_loader, val_loader, criterion, optimizer, epochs):
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    for epoch in range(epochs):
        model.train() # Sets the model to training mode
        running_loss = 0.0 # Keep track of the (MSE) loss
        
        # Actual training loop
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device) #! Move data to GPU if available (seems to require different torch install...)
            
            # Extra case for LGFBS
            def closure():
                    optimizer.zero_grad()
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    loss.backward()
                    return loss
            
            if isinstance(optimizer, torch.optim.LBFGS):
                optimizer.step(closure)
                loss = closure()
            
            else:
                optimizer.zero_grad() # Reset gradients from last iteration
                outputs = model(inputs) # Forward pass
                loss = criterion(outputs, labels) # Compute the loss (MSE) between the predictions and the ground-truth labels
                loss.backward() # Perform backpropagation
                optimizer.step() # Update model parameters (weights) based on the gradients computed during backpropagation
            
            running_loss += loss.item() # Running loss is the sum of the losses for all batches FOR THE CURRENT EPOCH <-- TODO: (Make list for final model to plot)
        
        # Validation time
        model.eval()
        val_loss = 0.0 # Accumulated validation loss
        
        # Validation loop
        with torch.no_grad(): # No need to compute gradients during validation
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device) # Move data to GPU if available
                
                outputs = model(inputs) # Forward pass to get predictions
                loss = criterion(outputs, labels) # Compute the loss (MSE) between the predictions and the ground-truth labels
                val_loss += loss.item() # Accumulate the validation loss for this epoch <-- TODO: (Make list for final model to plot)
        
        # Print the loss for this epoch
        print(f'Epoch {epoch+1}/{epochs} - Avg Training Loss: {running_loss/len(train_loader)} - Avg Validation Loss: {val_loss/len(val_loader)}')
    
    print('Finished Training')
    return val_loss/len(val_loader) # Return the average validation loss for final epoch

#### Reduced-Input MLP
Use either stacked or deep autoencoder to reduce the input space before training a MLP network

In [19]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Simple auto-encoder class with a single hidden layer
class Autoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(Autoencoder, self).__init__()
        
        # Encoder - Compress input data
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU()
        )
        
        # Decoder - Reconstruct input data
        self.decoder = nn.Sequential(
            nn.Linear(hidden_dim, input_dim),
            nn.ReLU()
        )
        
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded



# Function to train a *single* autoencoder
def train_autoencoder(autoencoder, data_loader, criterion, optimizer, epochs):
    autoencoder.to(device) # Move model to GPU if available
    
    # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    for epoch in range(epochs):
        autoencoder.train() # Enable training mode
        running_loss = 0.0 # Running loss for this epoch
        
        for inputs, _ in data_loader: # Unsupervised learning, so we don't need the labels
            inputs = inputs.to(device) # Move data to GPU if available
            
            optimizer.zero_grad() # Reset gradients from last iteration
            
            _, outputs = autoencoder(inputs) # Forward pass - only care about the reconstructed data to compute the loss with.
            loss = criterion(outputs, inputs) # Compute the loss between the reconstructed data and the original input
            
            loss.backward() # Compute gradients
            optimizer.step() # Update model params based on gradients
            
            running_loss += loss.item() # Accumulate loss, item() is used to extract the actual loss value from the tensor
        
        print(f'Epoch {epoch+1}/{epochs} - Avg Training Loss: {running_loss/len(data_loader)}')

# Function to, sequentially, train a stack of autoencoders
def train_stacked_autoencoders(train_data, input_dim, num_encoders, epochs=20):
    train_dataset = TensorDataset(train_data, train_data) # Autoencoders are unsupervised, so the input data is also the target data
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    
    encoders = [] # List to store the trained autoencoders
    current_dim = input_dim # The current input dimension
    # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    for enc_out in num_encoders:
        autoencoder = Autoencoder(current_dim, enc_out).to(device) # Create a new autoencoder
        criterion = nn.MSELoss()
        optimizer = torch.optim.Adam(autoencoder.parameters(), lr=0.001)
        
        # Train the autoencoder
        train_autoencoder(autoencoder, train_loader, criterion, optimizer, epochs)
        
        encoders.append(autoencoder) # Add the trained autoencoder to the list
        
        # Update input data to the encoded data from the current autoencoder
        train_data = get_encoded_data(autoencoder, train_loader)
        train_dataset = TensorDataset(train_data, train_data)
        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
        
        current_dim = enc_out # Update the current input dimension

    return encoders

# Utility function to get the encoded data from the autoencoder
def get_encoded_data(autoencoder, data_loader):
    encoded_data = []

    autoencoder.eval() # Set model to evaluation mode
    
    with torch.no_grad(): # No need to compute gradients during inference
        for inputs, _ in data_loader: # Unsupervised learning, so we don't need the labels
            inputs = inputs.to(device) # Move data to GPU if available
            
            encoded, _ = autoencoder(inputs) # Forward pass - only care about the encoded data
            encoded_data.append(encoded)
    
    return torch.cat(encoded_data, dim=0) # Concatenate all encoded data into a single tensor

def stacked_encode_data(data, encoders):
    """
    Function to encode data using a stack of autoencoders.
    Assumes that the autoencoders have already been trained.
    
    Parameters:
    - data: The data to be encoded
    - encoders: The stack of trained autoencoders to be used (provided as ordered list)
    """
    with torch.no_grad():
        for encoder in encoders:
            data, _ = encoder(data.to(device))
    
    return data.cpu()

#### Hyper-parameter tuning

In [20]:
import optuna

def MLP_full_optimize(trial, optim : str = 'Adam') -> float:
    # Hyper-parameters to be optimized
    
    # The line below does not work due to a optuna limitation. It is kept here for reference.
    #! hidden_layer_sizes = trial.suggest_categorical('hidden_layer_sizes', [ (v,) * i for v in [700, 512, 256, 128] for i in range(2, 5)])
    
    hidden_layer_size = trial.suggest_categorical('hidden_layer_size', [700, 512, 256, 128])
    hidden_layer_count = trial.suggest_int('hidden_layer_count', 2, 4) # inclusive
    hidden_layer_sizes = (hidden_layer_size,) * hidden_layer_count
    
    dropout_rate = trial.suggest_float('dropout_rate', 0.4, 0.6)
    lr = trial.suggest_float('lr', 0.001, 0.01)
    batch_size = trial.suggest_int('batch_size', 16, 512, step=16)
    epochs = trial.suggest_int('epochs', 50, 150)
    
    # Initialize the model
    model = MLP(hidden_layer_sizes, dropout_rate)
    
    if optim.lower() == 'adam': optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optim.lower() == 'lbfgs': optimizer = torch.optim.LBFGS(model.parameters(), lr=lr)
    else : raise ValueError('Unknown optimizer')
    
    criterion = nn.MSELoss()
    
    # Use chosen batch size instead of pre-defined one
    train_loader = DataLoader(t_training, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(t_val, batch_size=batch_size, shuffle=True)
    
    # Train the model, return validation loss
    val_loss = train_MLP(model, train_loader, val_loader, criterion, optimizer, epochs)
    
    return val_loss




def MLP_SAE_optimize(trial, SAE, input_size, optim : str = 'Adam') -> float:
    # Hyper-parameters to be optimized
    
    hidden_layer_size = trial.suggest_categorical('hidden_layer_size', [256, 128, 64, 32, 16])
    hidden_layer_count = trial.suggest_int('hidden_layer_count', 2, 4) # inclusive
    hidden_layer_sizes = (hidden_layer_size,) * hidden_layer_count
    
    dropout_rate = trial.suggest_float('dropout_rate', 0.4, 0.6)
    lr = trial.suggest_float('lr', 0.001, 0.01)
    batch_size = trial.suggest_int('batch_size', 16, 512, step=16)
    epochs = trial.suggest_int('epochs', 50, 150)
    
    # Initialize the model
    model = MLP(hidden_layer_sizes, dropout_rate, input_size)
    
    
    if optim.lower() == 'adam': optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optim.lower() == 'lbfgs': optimizer = torch.optim.LBFGS(model.parameters(), lr=lr)
    else : raise ValueError('Unknown optimizer')
    
    criterion = nn.MSELoss()
    
    # Use chosen batch size instead of pre-defined one
    
    # Encode training and validation data using the stacked autoencoders in SAE
    train_data_encoded = stacked_encode_data(X_train_tensor, SAE)
    val_data_encoded = stacked_encode_data(X_val_tensor, SAE)
    
    train_loader = DataLoader(TensorDataset(train_data_encoded, y_train_tensor), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(TensorDataset(val_data_encoded, y_val_tensor), batch_size=batch_size, shuffle=True)
    
    
    # Train the model, return validation loss
    val_loss = train_MLP(model, train_loader, val_loader, criterion, optimizer, epochs)
    
    return val_loss

  from .autonotebook import tqdm as notebook_tqdm


In [21]:
# Use the following booleans to enable or disable the grid-search for the different models.
# After running the grid-search, train the final models with the best hyperparameters.

SEARCH_MLP_FULL = False
SEARCH_MLP_REDUCED_256 = False
SEARCH_MLP_REDUCED_128 = False

In [27]:
def pretty_print_study(study):
    print('====================================')
    print('Number of finished trials:', len(study.trials))
    print('Best trial:')
    trial = study.best_trial
    print('     Duration: ', trial.duration.total_seconds())
    print('     Value: ', trial.value)
    print('     Params: ')
    for key, value in trial.params.items():
        print(f'         {key}: {value}')

if SEARCH_MLP_FULL:
    print('Starting MLP full grid search')

    # Optuna study object and direction (minimize validation loss)
    study = optuna.create_study(direction='minimize')
    # study.optimize(MLP_full_gridsearch, n_trials=2)
    study.optimize(lambda trial: MLP_full_optimize(trial, 'Adam'), n_trials=2)

    pretty_print_study(study)

    
else: print('Skipping SEARCH_MLP_FULL')

print('\n====================================')
print('V results for SEARCH_MLP_REDUCED_256 V')
print('====================================\n')


if SEARCH_MLP_REDUCED_256:
    print('Starting MLP reduced grid search for 512-256 SAE')

    encoders = train_stacked_autoencoders(X_train_tensor, 620, [512, 256], 20)
    
    study = optuna.create_study(direction='minimize')
    study.optimize(lambda trial: MLP_SAE_optimize(trial, encoders, 256, 'Adam'), n_trials=2)
    
    pretty_print_study(study)

else: print('Skipping SEARCH_MLP_REDUCED_256') 

print('\n====================================')
print('V results for SEARCH_MLP_REDUCED_128 V')
print('====================================\n')

if SEARCH_MLP_REDUCED_128:
    print('Starting MLP reduced grid search for 512-256-128 SAE')

    encoders = train_stacked_autoencoders(X_train_tensor, 620, [512, 256, 128], 20)
    
    study = optuna.create_study(direction='minimize')
    study.optimize(lambda trial: MLP_SAE_optimize(trial, encoders, 128, 'Adam'), n_trials=2)
    
    pretty_print_study(study)

else: print('Skipping SEARCH_MLP_REDUCED_128')

Skipping SEARCH_MLP_FULL

V results for SEARCH_MLP_REDUCED_256 V

Skipping SEARCH_MLP_REDUCED_256

V results for SEARCH_MLP_REDUCED_128 V

Skipping SEARCH_MLP_REDUCED_128


# Kolmogorov Arnold Networks

#### Full-Input KAN
This network takes the full input of 620 features to perform x,y predictions.

In [23]:
def train_KAN(kan_model, train_loader, val_loader, criterion, optimizer, epochs):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    kan_model.to(device) # Move model to GPU if available
    
    # Training loop
    for epoch in range(epochs):
        kan_model.train() # Sets the model to training mode
        running_loss = 0.0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device) # Move data to GPU if available
            
            def closure():
                optimizer.zero_grad()
                outputs = kan_model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                return loss
                
            if isinstance(optimizer, torch.optim.LBFGS):
                optimizer.step(closure)
                loss = closure()
            else:
                optimizer.zero_grad()
                outputs = kan_model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
            
            running_loss += loss.item() # Accumulate the loss for this epoch
        
        kan_model.eval()
        val_loss = 0.0
        
        # Validation loop
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                
                outputs = kan_model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
        
        print(f'Epoch {epoch+1}/{epochs} - Avg Training Loss: {running_loss/len(train_loader)} - Avg Validation Loss: {val_loss/len(val_loader)}')
    
    print('Finished Training')
    return val_loss/len(val_loader) # Used for hyperparameter optimization

In [30]:
import optuna
from fastkan import FastKAN as KAN

def KAN_full_optimize(trial, optim : str = 'Adam') -> float:
    # Hyper-parameters to be optimized
    
    hidden_layer_count = trial.suggest_int('hidden_layer_count', 2, 4) # inclusive
    hidden_layer_sizes = [trial.suggest_categorical('hidden_layer_size', [700, 620, 512, 256, 128, 64, 32, 16]) for _ in range(hidden_layer_count)]
    kan_layers = [620] + hidden_layer_sizes + [2] # Ensure correct input/output size
    
    learning_rate = trial.suggest_float('lr', 0.001, 0.01)
    batch_size = trial.suggest_int('batch_size', 16, 512, step=16)
    epochs = trial.suggest_int('epochs', 50, 150)
    
    # Initialize the model
    model = KAN(kan_layers) # We use the FastKAN implementation.
    
    if optim.lower() == 'adam': optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    elif optim.lower() == 'lbfgs': optimizer = torch.optim.LBFGS(model.parameters(), lr=learning_rate)
    else : raise ValueError('Unknown optimizer')
    
    criterion = nn.MSELoss() # As we are doing regression
    
    # Use chosen batch size instead of pre-defined one
    train_loader = DataLoader(t_training, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(t_val, batch_size=batch_size, shuffle=True)
    
    # Train the model, return validation loss
    val_loss = train_KAN(model, train_loader, val_loader, criterion, optimizer, epochs)
    
    return val_loss



def KAN_SAE_optimize(trial, SAE, input_size, optim : str = 'Adam') -> float:
    # Hyper-parameters to be optimized
    
    hidden_layer_count = trial.suggest_int('hidden_layer_count', 2, 4) # inclusive
    hidden_layer_sizes = [trial.suggest_categorical('hidden_layer_size', [512, 256, 128, 64, 32, 16]) for _ in range(hidden_layer_count)]
    kan_layers = [input_size] + hidden_layer_sizes + [2] # Ensure correct input/output size
    
    lr = trial.suggest_float('lr', 0.001, 0.01)
    batch_size = trial.suggest_int('batch_size', 16, 512, step=16)
    epochs = trial.suggest_int('epochs', 50, 150)
    
    # Initialize the model
    model = KAN(kan_layers)
    
    if optim.lower() == 'adam': optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optim.lower() == 'lbfgs': optimizer = torch.optim.LBFGS(model.parameters(), lr=lr)
    else : raise ValueError('Unknown optimizer')
    
    criterion = nn.MSELoss()
    
    # Encode training and validation data using the stacked autoencoders in SAE
    train_data_encoded = stacked_encode_data(X_train_tensor, SAE)
    val_data_encoded = stacked_encode_data(X_val_tensor, SAE)
    
    train_loader = DataLoader(TensorDataset(train_data_encoded, y_train_tensor), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(TensorDataset(val_data_encoded, y_val_tensor), batch_size=batch_size, shuffle=True)
    
    
    # Train the model, return validation loss
    val_loss = train_MLP(model, train_loader, val_loader, criterion, optimizer, epochs)
    
    return val_loss

In [24]:
# Use the following booleans to enable or disable the grid-search for the different models.
# After running the grid-search, train the final models with the best hyperparameters.

SEARCH_KAN_FULL = False 
SEARCH_KAN_REDUCED_256 = False
SEARCH_KAN_REDUCED_128 = False 

In [29]:
if SEARCH_KAN_FULL:
    print('Starting KAN full grid search')

    # Optuna study object and direction (minimize validation loss)
    study = optuna.create_study(direction='minimize')
    # study.optimize(MLP_full_gridsearch, n_trials=2)
    study.optimize(lambda trial: MLP_full_optimize(trial, 'Adam'), n_trials=2)

    pretty_print_study(study)

else: print('Skipping SEARCH_KAN_FULL')

print('\n====================================')
print('V results for SEARCH_KAN_REDUCED_256 V')
print('====================================\n')

if SEARCH_KAN_REDUCED_256:
    print('Starting KAN reduced search for 512-256 SAE')

    encoders = train_stacked_autoencoders(X_train_tensor, 620, [512, 256], 20)
    
    study = optuna.create_study(direction='minimize')
    study.optimize(lambda trial: KAN_SAE_optimize(trial, encoders, 256, 'Adam'), n_trials=2)
    
    pretty_print_study(study)

else: print('Skipping SEARCH_KAN_REDUCED_256') 

print('\n====================================')
print('V results for SEARCH_KAN_REDUCED_128 V')
print('====================================\n')

if SEARCH_KAN_REDUCED_128:
    print('Starting KAN reduced grid search for 512-256-128 SAE')

    encoders = train_stacked_autoencoders(X_train_tensor, 620, [512, 256, 128], 20)
    
    study = optuna.create_study(direction='minimize')
    study.optimize(lambda trial: KAN_SAE_optimize(trial, encoders, 128, 'Adam'), n_trials=2)
    
    pretty_print_study(study)

else: print('Skipping SEARCH_KAN_REDUCED_128')

Skipping SEARCH_KAN_FULL

V results for SEARCH_KAN_REDUCED_256 V

Skipping SEARCH_KAN_REDUCED_256

V results for SEARCH_KAN_REDUCED_128 V

Skipping SEARCH_KAN_REDUCED_128


# Evaluation