FEATURE-BASED NOVELTY SEARCH

In [1]:
import numpy as np
import random
import math
from itertools import permutations
import os
import torch

Set seed for reproducibility

In [2]:
def set_global_seed(seed=42):
    random.seed(seed)

    np.random.seed(seed)

    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'

set_global_seed(42)

Load instances

In [3]:
from sklearn.model_selection import train_test_split

lolib = np.load('/kaggle/input/benchmark-instances/LOLIB_instances.npy')
print(lolib.shape)
random_instances = np.load('/kaggle/input/benchmark-instances/random_instances.npy')
print(random_instances.shape)
instances = np.load('/kaggle/input/benchmark-instances/benchmark_instances.npy')
print(instances.shape)
train_3, test_3 = train_test_split(instances, test_size=0.2, random_state=42)
train = train_3
validation = test_3
test = test_3
set_global_seed(42)
print(train.shape)

(3188, 20, 20)
(1500, 20, 20)
(4688, 20, 20)
(3750, 20, 20)


Define LS algorithms

In [4]:
from numba import njit

@njit
def profit_permutation(A, sigma):
    total = 0.0
    N = len(sigma)
    for i in range(N):
        for j in range(i + 1, N):  # Upper triangle only
            total += A[sigma[i], sigma[j]]
    return total

In [5]:
@njit
def two_opt_ls(A):
    N = A.shape[0]
    sigma = np.arange(N)
    for i in range(N-1, 0, -1):
        j = np.random.randint(0, i+1)
        sigma[i], sigma[j] = sigma[j], sigma[i]

    a = profit_permutation(A, sigma)
    improvement = True

    while improvement:
        improvement = False
        for i in range(N):
            for j in range(i+1, N):
                sigma[i:j+1] = sigma[i:j+1][::-1]
                b = profit_permutation(A, sigma)
                if b > a:
                    a = b
                    improvement = True
                    break  # restart
                else:
                    sigma[i:j+1] = sigma[i:j+1][::-1]
            if improvement:
                break

    return sigma

In [6]:
@njit
def swap_ls(A):
    N = A.shape[0]

    sigma = np.arange(N)
    for i in range(N - 1, 0, -1):
        j = np.random.randint(0, i + 1)
        sigma[i], sigma[j] = sigma[j], sigma[i]

    current_profit = profit_permutation(A, sigma)
    improvement = True

    while improvement:
        improvement = False
        for i in range(N):
            for j in range(i + 1, N):
                # Swap in-place
                sigma[i], sigma[j] = sigma[j], sigma[i]
                new_profit = profit_permutation(A, sigma)

                if new_profit > current_profit:
                    current_profit = new_profit
                    improvement = True
                    break
                else:
                    sigma[i], sigma[j] = sigma[j], sigma[i]
            if improvement:
                break

    return sigma

In [7]:
@njit
def insert_ls(A):
    N = A.shape[0]
    sigma = np.arange(N)
    for i in range(N-1, 0, -1):
        j = np.random.randint(0, i+1)
        sigma[i], sigma[j] = sigma[j], sigma[i]

    a = profit_permutation(A, sigma)
    improvement = True

    while improvement:
        improvement = False
        for i in range(N):
            for j in range(N):
                if i == j:
                    continue
                sigma_copy = sigma.copy()
                temp = sigma_copy[i]
                if i < j:
                    for k in range(i, j):
                        sigma_copy[k] = sigma_copy[k+1]
                else:
                    for k in range(i, j, -1):
                        sigma_copy[k] = sigma_copy[k-1]
                sigma_copy[j] = temp

                b = profit_permutation(A, sigma_copy)
                if b > a:
                    sigma = sigma_copy
                    a = b
                    improvement = True
                    break
            if improvement:
                break

    return sigma

Define the NS algorithm and all the necessary functions

In [8]:
# @title
def profit(A, algorithm):
    sigma = np.array(algorithm(A))
    idx = sigma.astype(int)

    A_sub = A[np.ix_(idx, idx)]
    total_sum = np.sum(np.triu(A_sub, k=1))
    return total_sum

In [9]:
def initialise(D, N):
    # D[0] = amount of instances, D[1] = values for uniform disrtibution (-D[1], D[1])
    num_instances = D[0]
    max_valor = D[1]

    population = np.empty((num_instances, N, N))

    for idx in range(num_instances):
        A = np.random.uniform(-max_valor, max_valor, size=(N, N))
        np.fill_diagonal(A, 0)
        population[idx] = A
    return population


In [10]:
def profit_portfolio(A, portfolio):
    f = np.array([profit(A, algo) for algo in portfolio])
    return f.reshape(-1, 1)

In [11]:
from numba import njit
import numpy as np

@njit
def compute_descriptors(solution_set):
    pop_size, N, _ = solution_set.shape
    data_matrix = np.zeros((pop_size, 12))

    tril_i = []
    tril_j = []
    for i in range(N):
        for j in range(i):
            tril_i.append(i)
            tril_j.append(j)
    tril_i = np.array(tril_i)
    tril_j = np.array(tril_j)

    for t in range(pop_size):
        A = solution_set[t].copy()

        # Set diagonal to zero
        for i in range(N):
            A[i, i] = 0

        data_matrix[t, 0] = np.mean(A)
        data_matrix[t, 1] = np.var(A)
        data_matrix[t, 2] = np.max(A)
        data_matrix[t, 3] = np.min(A)

        B = np.empty(len(tril_i))
        for idx in range(len(tril_i)):
            i = tril_i[idx]
            j = tril_j[idx]
            B[idx] = (A[i, j] - A[j, i]) / 2

        if B.size > 0:
            data_matrix[t, 4] = np.mean(B)
            data_matrix[t, 5] = np.var(B)
            data_matrix[t, 6] = np.max(B)
            data_matrix[t, 7] = np.min(B)
        else:
            data_matrix[t, 4:8] = 0

        row_col_diff = np.empty(N)
        for i in range(N):
            row_sum = 0.0
            col_sum = 0.0
            for j in range(N):
                row_sum += A[i, j]
                col_sum += A[j, i]
            row_col_diff[i] = (row_sum - col_sum) / (2 * N)

        data_matrix[t, 8] = np.mean(row_col_diff)
        data_matrix[t, 9] = np.var(row_col_diff)
        data_matrix[t,10] = np.max(row_col_diff)
        data_matrix[t,11] = np.min(row_col_diff)

    return data_matrix

In [12]:
def novelty_score(population, archive, k):
    combined_set = np.concatenate((archive, population), axis=0)

    all_descriptors = compute_descriptors(combined_set)

    archive_size = len(archive)
    pop_size = len(population)
    D = all_descriptors[archive_size:, :]

    scores = np.zeros((pop_size, 1))

    for t in range(pop_size):
        distances = np.linalg.norm(all_descriptors - D[t, :], axis=1)

        self_index = archive_size + t
        distances[self_index] = np.inf

        sorted_dist = np.sort(distances)
        finite_dists = sorted_dist[np.isfinite(sorted_dist)]
        k_eff = min(k, len(finite_dists))

        scores[t] = np.mean(finite_dists[:k_eff])

    return scores


In [13]:
def performance_score(population, portfolio, R=10):
    amount_algorithms = len(portfolio)
    pop_size = population.shape[0]

    performance_scores = np.zeros((pop_size, amount_algorithms))

    for i in range(pop_size):
        cumulative_profit = np.zeros(amount_algorithms)

        for r in range(R):
            profit_vec = profit_portfolio(population[i], portfolio)
            profit_vec = profit_vec.flatten()
            cumulative_profit += profit_vec

        performance_scores[i, :] = cumulative_profit / R

    for i in range(pop_size):
        target_value = performance_scores[i, 0]
        other_values = performance_scores[i, 1:]
        performance_scores[i, 0] = target_value - np.max(other_values)

    return performance_scores

In [14]:
def evaluate(population, archive, portfolio, k, phi, R=10):
    pop_size = population.shape[0]

    novelty = novelty_score(population, archive, k)
    performance = performance_score(population, portfolio, R)

    novelty = np.asarray(novelty).flatten()

    target_performance = np.asarray(performance[:, 0]).flatten()

    novelty_std = (novelty - np.mean(novelty)) / np.std(novelty)
    target_performance_std = (target_performance - np.mean(target_performance)) / np.std(target_performance)

    fitness = phi * target_performance_std + (1 - phi) * novelty_std

    return fitness.reshape(-1, 1)

In [15]:
def cross(instance_1, instance_2, N):
    mask = ~np.eye(N, dtype=bool)
    flat_1 = instance_1[mask].flatten()
    flat_2 = instance_2[mask].flatten()

    dimension = N * N - N
    crossover_mask = np.random.randint(0, 2, dimension)

    offspring1 = np.where(crossover_mask, flat_1, flat_2)
    offspring2 = np.where(crossover_mask, flat_2, flat_1)

    return offspring1, offspring2


In [16]:
def mutation(instance, mutation_rate=0.01):
    instance = instance.copy()
    mask = np.random.rand(len(instance)) < mutation_rate
    instance[mask] = np.random.uniform(-1, 1, np.sum(mask))
    return instance

In [17]:
def array_to_matrix(array, N):
    matrix = np.zeros((N, N))
    idx = 0
    for i in range(N):
        for j in range(N):
            if i != j:
                matrix[i, j] = array[idx]
                idx += 1
    return matrix

In [18]:
def offspring(population, archive, portfolio, k, phi, mutation_rate=0.01, R=10):
    pop_size, N, _ = population.shape
    flat_size = N * N - N

    fitness_values = evaluate(population, archive, portfolio, k, phi, R)
    indexes = np.arange(pop_size)

    parents = []
    for _ in range(2 * pop_size):
        a, b = random.choices(indexes, k=2)
        winner = a if fitness_values[a] > fitness_values[b] else b
        parents.append(winner)
    parents = np.array(parents).reshape(pop_size, 2)

    offspring_flat = np.zeros((pop_size, flat_size))
    for i in range(0, pop_size, 2):
        p1 = population[parents[i, 0]]
        p2 = population[parents[i, 1]]
        o1, o2 = cross(p1, p2, N)
        o1 = mutation(o1, mutation_rate)
        o2 = mutation(o2, mutation_rate)
        offspring_flat[i] = o1
        offspring_flat[i + 1] = o2

    offspring_matrix = np.array([array_to_matrix(vec, N) for vec in offspring_flat])
    return offspring_matrix

In [19]:
def update_archive(population, archive, portfolio, k, phi, ta, R=10):
    pop_size = len(population)
    pop_novelty = novelty_score(population, archive, k)
    pop_perf = performance_score(population, portfolio, R)

    for i in range(pop_size):
        if random.random() < 0.01:
            archive = np.append(archive, [population[i]], axis=0)
        elif pop_perf[i, 0] > 0 and pop_novelty[i] >= ta:
            archive = np.append(archive, [population[i]], axis=0)

    return archive

In [20]:
def update_ss(population, solution_set, portfolio, phi, tss, R=10):
    pop_size = len(population)
    pop_perf = performance_score(population, portfolio, R)

    for i in range(pop_size):
        if pop_perf[i, 0] > 0:
            novelty = novelty_score(np.array([population[i]]), solution_set, k=1)[0]
            if novelty >= tss:
                solution_set = np.append(solution_set, [population[i]], axis=0)

    return solution_set

In [21]:
def novelty_search(D, N, k, phi, generations, portfolio, ta, tss, R=10, mutation_rate=0.01, initial_archive = [], population = []):
    if len(population) == 0:
        population = initialise(D, N)
    else:
        population = population

    if len(initial_archive) == 0:
        archive = np.array([random.choice(population)])
    else:
        archive = initial_archive

    solution_set = archive
    archive = update_archive(population, archive, portfolio, k, phi, ta, R)
    solution_set = update_ss(population, solution_set, portfolio, phi, tss, R)

    for i in range(generations):
        offspring_pop = offspring(population, archive, portfolio, k, phi, mutation_rate, R)

        combined = np.concatenate((population, offspring_pop), axis=0)
        fitness = evaluate(combined, archive, portfolio, k, phi, R).flatten()

        fitness_parents = fitness[:len(population)]
        fitness_offspring = fitness[len(population):]

        new_population = []
        for j in range(len(population)):
            if fitness_offspring[j] > fitness_parents[j]:
                new_population.append(offspring_pop[j])
            else:
                new_population.append(population[j])
        population = np.array(new_population)

        archive = update_archive(population, archive, portfolio, k, phi, ta, R)
        solution_set = update_ss(population, solution_set, portfolio, phi, tss, R)

        
        if (i+1) in [250, 500, 750, 1000]:
            name = portfolio[0].py_func.__name__
            np.save(f"{name}_{i+1}_gens_NS1_0.npy", solution_set)
            print(f"[Checkpoint] Saved in generation {i+1}")


    return solution_set

Run the algorithm and save the results for 250, 500, 750 and 1000 generations

In [None]:
set_global_seed(42)

D = [50, 1]
N = 20
k = 3
phi = 0.85
generations = 1000
ta = 0.03
ts = 0.03
R=3

training_set, population_set = train_test_split(train,test_size=D[0] / len(train),random_state=42)

print(training_set.shape)
print(population_set.shape)

portfolio_1 = np.array([insert_ls, two_opt_ls, swap_ls])
portfolio_2 = np.array([swap_ls, two_opt_ls, insert_ls])
portfolio_3 = np.array([two_opt_ls, swap_ls, insert_ls])

solution_set_1 = novelty_search(D, N, k, phi, generations, portfolio_1, ta, ts, R, 1 / ((N * N) - N), [], population_set)
solution_set_2 = novelty_search(D, N, k, phi, generations, portfolio_2, ta, ts, R, 1 / ((N * N) - N), [], population_set)
solution_set_3 = novelty_search(D, N, k, phi, generations, portfolio_3, ta, ts, R, 1 / ((N * N) - N), [], population_set)

size_1 = len(solution_set_1)
size_2 = len(solution_set_2)
size_3 = len(solution_set_3)
print(f"Tamaños: {size_1}, {size_2}, {size_3}")

(3700, 20, 20)
(50, 20, 20)
[Checkpoint] Saved in generation 250
[Checkpoint] Saved in generation 500
