In [17]:
import numpy as np
import itertools
import import_ipynb
import sys
import os

sys.path.append(os.path.abspath("../"))  # Go up one level and into 'data'
from data_synthesis.benchmark_functions import *  # Import function and class
from data_synthesis.loss_functions import *  # Import function and class

In [18]:
# Defines the limits of the input itself, like in a function that depends on x we limit it from [0, 1].
# It only sets the grid of the space, IT HAS NO PROPERTIES*.
#class TargetSpace:
#class SampleSpace:
#class OriginalSpace:
class ConcreteSpace:
    def __init__(self, space_shape, limits, sample_method="same"):
        assert len(space_shape) == len(limits), "Each dimension must have corresponding limits"
        
        self.space_shape = space_shape
        self.limits = np.array(limits)
        self.sample_method = sample_method
        self.space = None  # Stores sampled points
        self.grid = None   # Stores full grid of combinations
        self.samples = None

    def generate_space(self):
        if self.sample_method == "same":
            # Create equally spaced points per dimension
            self.space = [np.linspace(low, high, num) for (low, high), num in zip(self.limits, self.space_shape)]
        elif self.sample_method == "monte_carlo":
            # Create random sampled points per dimension
            self.space = [np.random.uniform(low, high, num) for (low, high), num in zip(self.limits, self.space_shape)]
        else:
            raise ValueError("Invalid sample_method. Choose 'same' or 'monte_carlo'.")
        
    def generate_grid(self):
        if self.space is None:
            raise RuntimeError("Space must be generated first. Call generate_space()")

        # Generate all possible combinations of points in all dimensions
        self.grid = np.array(list(itertools.product(*self.space)))

    def get_space(self):
        return self.space
    
    def get_grid(self):
        if self.grid is None:
            raise RuntimeError("Grid has not been generated. Call generate_space() first.")
        return self.grid

    def change(self, new_shape=None, new_limits=None, new_method=None):
        if new_shape:
            assert len(new_shape) == len(self.limits), "New shape must match dimensions of limits"
            self.space_shape = new_shape
        if new_limits:
            assert len(new_limits) == len(self.space_shape), "New limits must match number of dimensions"
            self.limits = np.array(new_limits)
        if new_method:
            assert new_method in ["same", "monte_carlo"], "Invalid method"
            self.sample_method = new_method
        
        self.generate_space()  # Regenerate space and grid
        
    def generate_random_samples(self, n):
        num_dimensions = len(self.space_shape)
        self.samples = np.random.uniform(self.limits[:, 0], self.limits[:, 1], size=(n, num_dimensions))
        return self.samples
    
    def generate_one_sample(self):
        num_dimensions = len(self.space_shape)
        sample = np.random.uniform(self.limits[:, 0], self.limits[:, 1], size=(1, num_dimensions))
        return sample[0]
    
    def is_within_limits(self, sample):
        sample = np.array(sample)
        return np.all((sample >= self.limits[:, 0]) & (sample <= self.limits[:, 1]))

In [19]:
# Defines the parameters available for the model to sample in a linear model we may have a and b varying from [0, 1]
# It only sets the grid of the space, IT HAS NO PROPERTIES*.
#class DualSpace:
#class SearchSpace:
class AbstractSpace:
    def __init__(self, space_shape, limits, sample_method="same"):
        assert len(space_shape) == len(limits), "Each dimension must have corresponding limits"
        
        self.space_shape = space_shape
        self.limits = np.array(limits)
        self.sample_method = sample_method
        self.space = None  # Stores sampled points
        self.grid = None   # Stores full grid of combinations
        self.samples = None

    def generate_space(self):
        if self.sample_method == "same":
            # Create equally spaced points per dimension
            self.space = [np.linspace(low, high, num) for (low, high), num in zip(self.limits, self.space_shape)]
        elif self.sample_method == "monte_carlo":
            # Create random sampled points per dimension
            self.space = [np.random.uniform(low, high, num) for (low, high), num in zip(self.limits, self.space_shape)]
        else:
            raise ValueError("Invalid sample_method. Choose 'same' or 'monte_carlo'.")
        
    def generate_grid(self):
        if self.space is None:
            raise RuntimeError("Space must be generated first. Call generate_space()")

        # Generate all possible combinations of points in all dimensions
        self.grid = np.array(list(itertools.product(*self.space)))

    def get_space(self):
        return self.space
    
    def get_grid(self):
        if self.grid is None:
            raise RuntimeError("Grid has not been generated. Call generate_space() first.")
        return self.grid

    def change(self, new_shape=None, new_limits=None, new_method=None):
        if new_shape:
            assert len(new_shape) == len(self.limits), "New shape must match dimensions of limits"
            self.space_shape = new_shape
        if new_limits:
            assert len(new_limits) == len(self.space_shape), "New limits must match number of dimensions"
            self.limits = np.array(new_limits)
        if new_method:
            assert new_method in ["same", "monte_carlo"], "Invalid method"
            self.sample_method = new_method
        
        self.generate_space()  # Regenerate space and grid
        
    def generate_random_samples(self, n):
        num_dimensions = len(self.space_shape)
        self.samples = np.random.uniform(self.limits[:, 0], self.limits[:, 1], size=(n, num_dimensions))
        return self.samples

    def generate_one_sample(self):
        num_dimensions = len(self.space_shape)
        sample = np.random.uniform(self.limits[:, 0], self.limits[:, 1], size=(1, num_dimensions))
        return sample[0]
    
    def is_within_limits(self, sample):
        sample = np.array(sample)
        return np.all((sample >= self.limits[:, 0]) & (sample <= self.limits[:, 1]))

In [20]:
# IMPORTANT: The search space is a N dimensional space in which the parameters represent some operation in a model, the operations are the PROPERTIES attributed to
# that search space, one search space can have many PROPERTIES, but one PROPERTY can have only be defined in the specific dimensions of that search space
# If the AI model has the exact complexity of the object we want to understant, the search space and the sample space have the same dimension
# If the AI model has the same property of the object we want to understant, the properties of that space are the same, in this case we have a perfect model
# A perfect model means, given some input in the search and sample space, the output will be the same
# With that in mind, perhaps oversized models can still not overfit if the specific output is orthogonal to the effects given by the extra parameters
class ModelProcessor:
    def __init__(self, model_function, loss_function, default_abstraction=None):
        self.model_function = model_function
        self.loss_function = loss_function
        self.default_abstraction = default_abstraction

    def process_single(self, concrete_item, abstract_item=None):
        if abstract_item is None:
            if self.default_abstraction is None:
                raise ValueError("No abstract item provided and no default model set.")
            abstract_item = self.default_abstraction

        return self.model_function(*concrete_item, *abstract_item)

    def process_concrete_full_abstract_one(self, concrete_grid, abstract_item=None):
        if abstract_item is None:
            if self.default_abstraction is None:
                raise ValueError("No abstract item provided and no default model set.")
            abstract_item = self.default_abstraction
            
        return [self.model_function(*concrete, *abstract_item) for concrete in concrete_grid]

    def process_abstract_full_concrete_one(self, concrete_item, abstract_grid):
        return [self.model_function(*concrete_item, *abstract) for abstract in abstract_grid]

    def compare_models(self, concrete_grid, abstract_model_1, abstract_model_2, return_mean=False):
        results_1 = self.process_concrete_full_abstract_one(concrete_grid, abstract_model_1)
        results_2 = self.process_concrete_full_abstract_one(concrete_grid, abstract_model_2)

        loss_values = [self.loss_function(r1, r2) for r1, r2 in zip(results_1, results_2)]
        return np.mean(loss_values) if return_mean else loss_values
    
    def evaluate_against_target(self, concrete_grid, abstract_grid, target_model=None):
        loss_results = []
        
        if target_model is None:
            if self.default_abstraction is None:
                raise ValueError("No abstract item provided and no default model set.")
            target_model = self.default_abstraction
        
        for abstract_candidate in abstract_grid:
            loss_value = self.compare_models(concrete_grid, target_model, abstract_candidate, return_mean=True)
            loss_results.append(loss_value)

        return loss_results
    
    def compare_concrete_models(self, concrete_model_1, concrete_model_2, abstract_grid, return_mean=False):
        results_1 = self.process_abstract_full_concrete_one(concrete_model_1, abstract_grid)
        results_2 = self.process_abstract_full_concrete_one(concrete_model_2, abstract_grid)

        loss_values = [self.loss_function(r1, r2) for r1, r2 in zip(results_1, results_2)]
        return np.mean(loss_values) if return_mean else loss_values
    
    def compare_concrete_models_iteration(self, concrete_model_1, abstract_grid, target_model=None, return_mean=False):
        if target_model is None:
            if self.default_abstraction is None:
                raise ValueError("No abstract item provided and no default model set.")
            target_model = self.default_abstraction
            
        results_1 = self.process_abstract_full_concrete_one(concrete_model_1, abstract_grid)    # Do not pass the grid, pass only the correct model
        target = self.process_single(concrete_model_1, target_model)

        loss_values = [self.loss_function(target, r1) for r1 in results_1]
        return np.mean(loss_values) if return_mean else loss_values
    
    def evaluate_concrete_against_target(self, concrete_grid, abstract_grid, target_model=None):
        loss_results = []
        
        if target_model is None:
            if self.default_abstraction is None:
                raise ValueError("No abstract item provided and no default model set.")
            target_model = self.default_abstraction
        
        for concrete_candidate in concrete_grid:
            loss_value = self.compare_concrete_models_iteration(concrete_candidate, abstract_grid, target_model, return_mean=True)
            loss_results.append(loss_value)

        return loss_results

In [21]:
class ModelComparison:
    def __init__(self, model_1, model_2, loss_function=None):
        self.model_1 = model_1
        self.model_2 = model_2
        self.loss_function = loss_function

    def compare_models(self, concrete_grid_1, concrete_grid_2, abstract_model_1, abstract_model_2, return_mean=False):
        results_1 = self.model_1.process_concrete_full_abstract_one(concrete_grid_1, abstract_model_1)
        results_2 = self.model_2.process_concrete_full_abstract_one(concrete_grid_2, abstract_model_2)

        loss_values = [self.model_1.loss_function(r1, r2) for r1, r2 in zip(results_1, results_2)]
        return np.mean(loss_values) if return_mean else loss_values
    
    def evaluate_against_target(self, concrete_grid_1, concrete_grid_2, abstract_grid_2, target_model=None):
        loss_results = []
        
        if target_model is None:
            if self.model_1.default_abstraction is None:
                raise ValueError("No abstract item provided and no default model set.")
            target_model = self.model_1.default_abstraction
        
        for abstract_candidate in abstract_grid_2:
            loss_value = self.compare_models(concrete_grid_1, concrete_grid_2, target_model, abstract_candidate, return_mean=True)
            loss_results.append(loss_value)

        return loss_results
    
    def compare_concrete_models_iteration(self, concrete_model_1, concrete_model_2, abstract_grid, target_model=None, return_mean=False):
        if target_model is None:
            if self.model_1.default_abstraction is None:
                raise ValueError("No abstract item provided and no default model set.")
            target_model = self.model_1.default_abstraction
            
        results_1 = self.model_2.process_abstract_full_concrete_one(concrete_model_2, abstract_grid)    # Do not pass the grid, pass only the correct model
        target = self.model_1.process_single(concrete_model_1, target_model)

        loss_values = [self.model_1.loss_function(target, r1) for r1 in results_1]
        return np.mean(loss_values) if return_mean else loss_values
    
    def evaluate_concrete_against_target(self, concrete_grid_1, concrete_grid_2, abstract_grid, target_model=None):
        loss_results = []
        
        if target_model is None:
            if self.model_1.default_abstraction is None:
                raise ValueError("No abstract item provided and no default model set.")
            target_model = self.model_1.default_abstraction
        
        for concrete_candidate_1, concrete_candidate_2 in zip(concrete_grid_1, concrete_grid_2):
            loss_value = self.compare_concrete_models_iteration(concrete_candidate_1, concrete_candidate_2, abstract_grid, target_model, return_mean=True)
            loss_results.append(loss_value)

        return loss_results