In [None]:
# we intend to find a way to quantify how "affected by interactions" an optimization problem is

# For example, linear problems are completely free from interactions
# In general, an interaction-free problem is in the form F(x) = aggr_func(map(f_i(x_i)))


# Why? 
# non-linear problems are complex, so it's good to know how non-linear a problem is
#  https://link.springer.com/article/10.1007/s10479-007-0172-6
# variable importance is only relevant in interaction-free problems

## Challenges
# There are ways of detecting if a problem is linear, kinda, but not if a problem is separable


## method
# a variable v_i can be separated from the rest if 
"""
I can take 2 solutions that only differ by that variable, and consistently predict which one is better
"""

# explanation of prev sentence
"""

1: variables which interact with others cannot be considered to predict dominance
2. "consistently predict" could be done via a model, but it will simply be some maths 
"""


# methodology
"""
def get_separability_of_vi(vi, solutions, fitnesses):
    # sort the solutions by fitness
    solutions_with_fitnesses = list(zip(solutions, fitnesses)
    sorted_solutions, sorted_fitnesses = zip(*solutions_with_fitnesses)
    
    ranked_vi_values = np.array(sorted_solutions)[:, vi]
    
    # if vi is separable, then ranked_vi_values will be something like 11111...000000.....22222    blocks of same values
    # let's measure how "nasty" it is
    
    
    


"""

In [44]:
from Core.FullSolution import FullSolution
from typing import Callable
from Core.SearchSpace import SearchSpace
from BenchmarkProblems.BenchmarkProblem import BenchmarkProblem

class SubProblem:
    search_space: SearchSpace
    
    
    def __init__(self,
                 search_space: SearchSpace):
        self.search_space = search_space
    
    def fitness_function(self, solution):
        raise NotImplemented()
    
    
class Univariate(SubProblem):
    fitnesses: list[float]
    
    def __init__(self, fitnesses: list[float]):
        super().__init__(SearchSpace([len(fitnesses)]))
        self.fitnesses = fitnesses
        
    def fitness_function(self, solution):
        return self.fitnesses[solution[0]]
    
    
class UnitaryProblem(SubProblem):
    clique_size: int
    
    def __init__(self,
                 clique_size: int):
        super().__init__(SearchSpace([2]*clique_size))
        self.clique_size = clique_size
        
    def fitness_function(self, solution):
        ones = sum(solution)
        return self.unitary_function(ones)
        
    def unitary_function(self, n):
        raise NotImplemented
    
class OneMax(UnitaryProblem):
    def __init__(self, clique_size: int):
        super().__init__(clique_size)
    def unitary_function(self, n):
        return n
    
class RoyalRoad(UnitaryProblem):
    
    def __init__(self, clique_size: int):
        super().__init__(clique_size)
    def unitary_function(self, n):
        return self.clique_size if n == self.clique_size else 0
    
class Parity(UnitaryProblem):
    def __init__(self, clique_size: int):
        super().__init__(clique_size)
    def unitary_function(self, n):
        return n % 2
    
    
class TwoPeaks(UnitaryProblem):
    def __init__(self, clique_size: int):
        super().__init__(clique_size)
    def unitary_function(self, n):
        return max(n, self.clique_size-n)


class Trapk(UnitaryProblem):
    def __init__(self, clique_size: int):
        super().__init__(clique_size)
    def unitary_function(self, n):
        return self.clique_size if n == self.clique_size else self.clique_size-1-n

class NITProblem(BenchmarkProblem):
    sub_problems: list[SubProblem]
    aggregation_func: Callable
    
    
    def __init__(self, sub_problems, aggregation_func):
        self.sub_problems = sub_problems
        self.aggregation_func = aggregation_func
        super().__init__(SearchSpace.concatenate_search_spaces(sub.search_space for sub in self.sub_problems))
    
    def fitness_function(self, fs: FullSolution) -> float:
        next_start = 0
        fragments = []
        for problem in self.sub_problems:
            end = next_start+problem.search_space.amount_of_parameters
            fragments.append(fs.values[next_start:end])
            next_start = end
        
        sub_fitnesses = [problem.fitness_function(fragment) for problem, fragment in zip(self.sub_problems, fragments)]
        return self.aggregation_func(sub_fitnesses)

In [69]:
import numpy as np
import utils
from Core.FullSolution import FullSolution

def sum_of_squares(*values):
    return np.sum(np.square(values))


problem = NITProblem([Univariate([2, 3]), Univariate([0, 10]), RoyalRoad(4), Trapk(5), OneMax(4), Parity(3), TwoPeaks(4)], aggregation_func=sum_of_squares)


random_solutions = [FullSolution.random(problem.search_space) for _ in range(1000)]
fitnesses = [problem.fitness_function(solution) for solution in random_solutions]

sorted_solutions_with_fitnesses = sorted(zip(random_solutions, fitnesses), key=utils.second, reverse=True)

sorted_solutions, sorted_fitnesses = zip(*sorted_solutions_with_fitnesses)
# sorted_solutions = np.array([solution.values for solution in sorted_solutions])
# sorted_fitnesses = np.array(sorted_fitnesses)



In [72]:
from BenchmarkProblems.BenchmarkProblem import BenchmarkProblem


def measure_separability_of_var(vi: int, sorted_solutions: np.ndarray, sorted_fitnesses: np.ndarray):
    sorted_symbols = sorted_solutions[:, vi]
    cardinality = np.max(sorted_symbols)+1
    
    seen_vector = np.zeros(cardinality, dtype=int)
    seen_during_ties = np.zeros(cardinality, dtype=int)
    
    wins = np.zeros(shape=(cardinality, cardinality), dtype=int)
    
    prev_fitness = None
    for symbol, fitness in zip(sorted_symbols, sorted_fitnesses):
        wins[symbol] += seen_vector
        seen_during_ties[symbol] += 1
        if prev_fitness != fitness:
            seen_vector += seen_during_ties
    
    wins = wins / (wins + wins.T)
    losses = wins.T
    errors = (wins * losses) / (wins + losses)
    np.fill_diagonal(errors, 0)
    
    average_error_per_symbol = np.sum(errors, axis=0) / (cardinality-1)
    
    return average_error_per_symbol

def measure_error(vi: int, solutions: list[FullSolution], fitnesses: list[float], problem: BenchmarkProblem):
    cardinality = problem.search_space.cardinalities[vi]
    
    wins = np.zeros(shape=(cardinality, cardinality), dtype=int)
    
    for solution, fitness in zip(solutions, fitnesses):
        assert(isinstance(solution, FullSolution))
        original_value = solution.values[vi]
        different_value = 1-original_value
        perturbed = solution.with_different_value(vi, different_value) # TODO adapt this to non-binary problems
        perturbed_fitness = problem.fitness_function(perturbed)
        original_is_better = fitness > perturbed_fitness
        is_tie = perturbed_fitness == fitness
        if original_is_better:
            wins[original_value, different_value] +=1
        else:
            wins[different_value, original_value] +=1
        
    wins = wins / (wins + wins.T)
    losses = wins.T
    errors = (wins * losses) / (wins + losses) * 4
    np.fill_diagonal(errors, 0)
    
    average_error_per_symbol = np.sum(errors, axis=0) / (cardinality-1)
    
    return average_error_per_symbol


for var in range(problem.search_space.amount_of_parameters):
    scores = measure_error(var, sorted_solutions, sorted_fitnesses, problem)
    print(var, np.array(scores*100, dtype=int))

  wins = wins / (wins + wins.T)


0 [0 0]
1 [0 0]
2 [98 98]
3 [99 99]
4 [98 98]
5 [99 99]
6 [25 25]
7 [22 22]
8 [27 27]
9 [25 25]
10 [23 23]
11 [0 0]
12 [0 0]
13 [0 0]
14 [0 0]
15 [99 99]
16 [99 99]
17 [99 99]
18 [99 99]
19 [99 99]
20 [99 99]
21 [99 99]
