In [18]:
import numpy as np
from scipy.stats import cauchy


def fitness(individual):
    return np.sum(individual)


def weighted_lehmar_mean(succeding_rates, individual_fitnesses, trial_fitnesses):
    improvements = individual_fitnesses - trial_fitnesses
    total_improvement = np.sum(improvements)
    weights = improvements / total_improvement

    lehmar_mean = np.sum(weights * succeding_rates**2) / np.sum(
        weights * succeding_rates
    )

    return lehmar_mean


problem_dimensionality = 2
generation_number = 1
initial_population_size = 100
population_size = initial_population_size
archive_size = initial_population_size
archive = np.zeros((archive_size, problem_dimensionality))
archive_fitnesses = np.empty(archive_size)
archive_fitnesses.fill(np.nan)

lower_boundary = 0  # per ora keep boundary fixed for all dimensions
upper_boundary = 1  # per ora keep boundary fixed for all dimensions

memory_size = 10  # H
memory = np.zeros((memory_size, 2)) + 0.5

p = 0.2

MEMORY_MCR_INDEX = 0
MEMORY_MMR_INDEX = 1

previous_gen_population = np.random.uniform(
    lower_boundary,
    upper_boundary,
    size=(initial_population_size, problem_dimensionality),
)


max_generations = 100

crossing_rates = np.zeros(population_size)
mutation_rates = np.zeros(population_size)

previous_gen_individuals_fitnesses = np.array(
    [fitness(individual) for individual in previous_gen_population]
)
current_number_of_fitness_evaluations = population_size
memory_index_to_update = 0

max_number_fitness_evaluations = 1000
min_number_of_individuals = 4

best_individual_fitness_index = np.argmin(previous_gen_individuals_fitnesses)
global_best_individual = previous_gen_population[best_individual_fitness_index]
global_best_individual_fitness = previous_gen_individuals_fitnesses[
    best_individual_fitness_index
]

while (generation_number <= max_generations) and (
    current_number_of_fitness_evaluations < max_number_fitness_evaluations
):

    succeding_crossover_rates = np.empty(population_size)
    succeding_crossover_rates.fill(np.nan)
    succeding_mutation_rates = np.empty(population_size)
    succeding_mutation_rates.fill(np.nan)
    succeding_trials = np.empty((population_size, problem_dimensionality))
    succeding_trial_fitnesses = np.empty(population_size)
    succeding_trials_count = 0
    trail_individuals = np.empty((population_size, problem_dimensionality))
    trial_fitnesses = np.empty(population_size)
    are_individuals_surpassed = np.zeros(population_size)
    next_gen_individuals = previous_gen_population.copy()
    next_gen_individuals_fitnesses = previous_gen_individuals_fitnesses.copy()

    for individual_index in range(population_size):
        memory_index = np.random.randint(0, memory_size)
        if np.isnan(memory[memory_index, MEMORY_MCR_INDEX]):
            crossing_rates[individual_index] = 0
        else:
            crossing_rates[individual_index] = np.clip(
                np.random.normal(memory[memory_index, MEMORY_MCR_INDEX], 0.1), 0, 1.0
            )

        generated_mutation_rate = 0
        while generated_mutation_rate <= 0:
            generated_mutation_rate = np.clip(
                cauchy.rvs(memory[memory_index, MEMORY_MMR_INDEX], 0.1), None, 1.0
            )
        mutation_rates[individual_index] = generated_mutation_rate

        number_of_best_individuals = round(population_size * p)
        random_best_individual_index = np.random.randint(0, number_of_best_individuals)

        best_individuals_indices = np.argpartition(
            previous_gen_individuals_fitnesses, number_of_best_individuals
        )[:number_of_best_individuals]

        best_individual = previous_gen_population[
            best_individuals_indices[random_best_individual_index]
        ]

        number_individuals_in_archive = np.sum(~np.isnan(archive_fitnesses))
        individual_indices_not_current = [
            index
            for index in range(population_size + number_individuals_in_archive)
            if index != individual_index
        ]
        random_individual_indices = np.random.choice(
            individual_indices_not_current, 2, replace=False
        )

        random_individual_1 = (
            previous_gen_population[random_individual_indices[0]]
            if random_individual_indices[0] < population_size
            else archive[random_individual_indices[0] - population_size]
        )
        random_individual_2 = (
            previous_gen_population[random_individual_indices[1]]
            if random_individual_indices[1] < population_size
            else archive[random_individual_indices[1] - population_size]
        )
        current_individual = previous_gen_population[individual_index]

        mutant = (
            current_individual
            + mutation_rates[individual_index] * (best_individual - current_individual)
            + mutation_rates[individual_index]
            * (random_individual_1 - random_individual_2)
        )

        for gene_index in range(problem_dimensionality):
            if mutant[gene_index] < lower_boundary:
                mutant[gene_index] = (
                    lower_boundary + current_individual[gene_index]
                ) / 2
            elif mutant[gene_index] > upper_boundary:
                mutant[gene_index] = (
                    upper_boundary + current_individual[gene_index]
                ) / 2

        random_gene_index_to_mutate = np.random.randint(0, problem_dimensionality)
        crossover_gene_index = (
            np.random.rand(problem_dimensionality) < crossing_rates[individual_index]
        )
        crossover_gene_index[random_gene_index_to_mutate] = True

        trail_individual = np.where(crossover_gene_index, mutant, current_individual)
        trail_individuals[individual_index] = trail_individual
        trial_fitnesses[individual_index] = fitness(trail_individual)
        current_number_of_fitness_evaluations = (
            current_number_of_fitness_evaluations + 1
        )

    for individual_index in range(population_size):

        if (
            trial_fitnesses[individual_index]
            <= previous_gen_individuals_fitnesses[individual_index]
        ):
            next_gen_individuals[individual_index] = trail_individuals[individual_index]
            next_gen_individuals_fitnesses[individual_index] = trial_fitnesses[
                individual_index
            ]

            if (
                trial_fitnesses[individual_index]
                < previous_gen_individuals_fitnesses[individual_index]
            ):
                if number_individuals_in_archive < archive_size:
                    archive[number_individuals_in_archive] = trail_individuals[
                        individual_index
                    ]
                    archive_fitnesses[number_individuals_in_archive] = trial_fitnesses[
                        individual_index
                    ]
                else:
                    random_element_index = np.random.randint(0, archive_size)
                    archive[random_element_index] = trail_individuals[individual_index]
                    archive_fitnesses[random_element_index] = trial_fitnesses[
                        individual_index
                    ]

                succeding_mutation_rates[succeding_trials_count] = mutation_rates[
                    individual_index
                ]
                succeding_crossover_rates[succeding_trials_count] = crossing_rates[
                    individual_index
                ]

                succeding_trials[succeding_trials_count] = trail_individuals[
                    individual_index
                ]
                succeding_trial_fitnesses[succeding_trials_count] = trial_fitnesses[
                    individual_index
                ]

                are_individuals_surpassed[individual_index] = 1

                succeding_trials_count += 1

                if trial_fitnesses[individual_index] < global_best_individual_fitness:
                    global_best_individual = trail_individuals[individual_index]
                    global_best_individual_fitness = trial_fitnesses[individual_index]

    if succeding_trials_count > 0:
        if np.nanmax(succeding_mutation_rates) <= 0 or np.isnan(
            memory[memory_index_to_update, MEMORY_MCR_INDEX]
        ):
            memory[memory_index_to_update, MEMORY_MCR_INDEX] = np.nan
        else:
            memory[memory_index_to_update, MEMORY_MCR_INDEX] = weighted_lehmar_mean(
                succeding_crossover_rates[:succeding_trials_count],
                previous_gen_individuals_fitnesses[are_individuals_surpassed == 1],
                succeding_trial_fitnesses[:succeding_trials_count],
            )

        memory[memory_index_to_update, MEMORY_MMR_INDEX] = weighted_lehmar_mean(
            succeding_mutation_rates[:succeding_trials_count],
            previous_gen_individuals_fitnesses[are_individuals_surpassed == 1],
            succeding_trial_fitnesses[:succeding_trials_count],
        )

        memory_index_to_update = (memory_index_to_update + 1) % memory_size

    next_gen_population_size = round(
        (
            (min_number_of_individuals - initial_population_size)
            / max_number_fitness_evaluations
        )
        * current_number_of_fitness_evaluations
        + initial_population_size
    )

    if next_gen_population_size < population_size:
        if next_gen_population_size < min_number_of_individuals:
            next_gen_individuals = np.empty((0, 2))
            next_gen_individuals_fitnesses = np.empty(0)
        else:
            number_of_individuals_to_delete = population_size - next_gen_population_size
            worst_individuals_indices = np.argpartition(
                next_gen_individuals_fitnesses, -number_of_individuals_to_delete
            )[:number_of_individuals_to_delete]

            next_gen_individuals = np.delete(
                next_gen_individuals, worst_individuals_indices, axis=0
            )
            next_gen_individuals_fitnesses = np.delete(
                next_gen_individuals_fitnesses, worst_individuals_indices
            )
            population_size = next_gen_population_size

    previous_gen_population = next_gen_individuals
    previous_gen_individuals_fitnesses = next_gen_individuals_fitnesses
    print(generation_number)
    generation_number = generation_number + 1


print(global_best_individual)
print(global_best_individual_fitness)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[3.22722294e-05 9.36677822e-05]
0.00012594001161723043


In [17]:
np.empty((0, 2))

array([], shape=(0, 2), dtype=float64)

In [5]:
rho_norms

array([ 7.81024968, 16.2788206 , 24.75883681])

In [2]:
import numpy as np

type_count = 2

initial_population = np.array(
    [[1, 2, 3, 4, 5, 6], [7, 8, 9, 10, 11, 12], [13, 14, 15, 16, 17, 18]]
)

for i in range(type_count):
    rho_indices = [type_count + i * type_count + j for j in range(type_count)]
    rho_values = initial_population[:, rho_indices]
    rho_norms = np.linalg.norm(rho_values, axis=1)
    rho_values = rho_values / rho_norms[:, np.newaxis]

    initial_population[:, rho_indices] = rho_values

initial_population

# L'idea iu veloce è lasciarle generate cosi e se hanno norma > 1 scalarle con epsilon drawn
# from a uniform distribution in [0, 1]

array([[ 1,  2,  0,  0,  0,  0],
       [ 7,  8,  0,  0,  0,  0],
       [13, 14,  0,  0,  0,  0]])

In [35]:
import numpy as np
from numba import njit
from numba.typed import List


@njit
def negative_exponential_effect(
    beta_mn: float, end_time: float, times_n: np.ndarray
) -> float:
    return np.sum(1 - np.exp(-beta_mn * (end_time - times_n)))


@njit
def loglikelihood_negative_exponential_contribution(
    alphas_mn: np.ndarray,
    betas_mn: np.ndarray,
    end_time: float,
    events_times: List[np.ndarray],
) -> float:
    num_events = len(events_times)
    negative_exponential_effects = np.empty(num_events, dtype=np.float64)

    for n in range(num_events):
        negative_exponential_effects[n] = negative_exponential_effect(
            betas_mn[n], end_time, events_times[n]
        )

    return np.sum(alphas_mn / betas_mn * negative_exponential_effects)


@njit
def r_mn(
    beta_mn: float,
    current_t_m: float,
    previous_t_m: float,
    previous_r_mn: float,
    intermediate_t_n_values: np.ndarray,
) -> float:
    return np.exp(-beta_mn * (current_t_m - previous_t_m)) * previous_r_mn + np.sum(
        np.exp(-beta_mn * (current_t_m - intermediate_t_n_values))
    )


@njit
def counting_process_integral_subfunction(
    mu_m: float,
    alphas_mn: np.ndarray,
    rs_mn: np.ndarray,
) -> float:
    return np.log(mu_m + np.sum(alphas_mn * rs_mn))


@njit
def loglikelihood_m(
    m_index: int,
    mu_m: float,
    negative_exponential_contribution: float,
    alphas_mn: np.ndarray,
    betas_mn: np.ndarray,
    end_time: float,
    events_times: List[np.ndarray],
) -> float:
    num_events = len(events_times)

    dt_integral = -mu_m * end_time - negative_exponential_contribution

    recursive_part_effects = np.zeros(num_events, dtype=np.float64)
    end_indices_times_events = np.zeros(num_events, dtype=np.float64)

    for n in range(num_events):
        end_indices_times_events[n] = np.searchsorted(
            events_times[n], events_times[m_index][0], side="left"
        )

        recursive_part_effects[n] = r_mn(
            betas_mn[n],
            events_times[m_index][0],
            0,
            recursive_part_effects[n],
            events_times[n][: end_indices_times_events[n]],
        )

    counting_process_integral = counting_process_integral_subfunction(
        mu_m,
        alphas_mn,
        recursive_part_effects,
    )

    for k in range(1, len(events_times[m_index])):
        start_indices_times_events = end_indices_times_events.copy()

        for n in range(num_events):
            current_t_m = events_times[m_index][k]
            previous_t_m = events_times[m_index][k - 1]

            end_indices_times_events[n] = (
                np.searchsorted(
                    events_times[n][start_indices_times_events[n] :],
                    current_t_m,
                    side="left",
                )
                + start_indices_times_events[n]
            )

            recursive_part_effects[n] = r_mn(
                betas_mn[n],
                current_t_m,
                previous_t_m,
                recursive_part_effects[n],
                events_times[n][
                    start_indices_times_events[n] : end_indices_times_events[n]
                ],
            )

        counting_process_integral += counting_process_integral_subfunction(
            mu_m,
            alphas_mn,
            recursive_part_effects,
        )

    return dt_integral + counting_process_integral


@njit
def loglikelihood(
    mu: np.ndarray,
    alphas: np.ndarray,
    betas: np.ndarray,
    end_time: float,
    events_times: List[np.ndarray],
) -> float:
    num_events = len(events_times)
    loglikelihood_value = 0

    negative_exponential_contributions = np.empty(num_events, dtype=np.float64)

    for n in range(num_events):
        negative_exponential_contributions[n] = (
            loglikelihood_negative_exponential_contribution(
                alphas[n], betas[n], end_time, events_times[n]
            )
        )

    for m in range(num_events):
        loglikelihood_value += loglikelihood_m(
            m,
            mu[m],
            negative_exponential_contributions[m],
            alphas[m],
            betas[m],
            end_time,
            events_times,
        )

    return loglikelihood_value


@njit
def l1_penalty(
    alphas: np.ndarray, betas: np.ndarray, regularization_param: float
) -> float:
    return regularization_param * (np.sum(np.abs(alphas)) + np.sum(np.abs(betas)))


@njit
def instability_penalty(rhos: np.ndarray, instability_param: float) -> float:
    kernel_spectral_norm = np.linalg.norm(rhos, ord=2)
    if kernel_spectral_norm > 1:
        return instability_param * kernel_spectral_norm
    else:
        return 0

In [18]:
import numpy as np

np.random.seed(42)

# 2 mu, 4 rho, 4 beta
event_counts_for_type = [11, 13]
initial_population_size = 10
training_time_in_seconds = 30
gene_upper_boundaries = [0.5, 0.5, 0.3, 0.3, 0.3, 0.3, 100, 100, 100, 100]

type_count = len(event_counts_for_type)
square_matrix_size = type_count * type_count
parameters_count = 2 * square_matrix_size + type_count

initial_population = np.zeros((initial_population_size, parameters_count))

# fill the initial population with random rho values
for i, j in [(i, j) for i in range(type_count) for j in range(type_count)]:
    current_rho_max_value = min(
        0.2 * event_counts_for_type[i] / event_counts_for_type[j], 0.5
    )

    initial_population[:, type_count + i * type_count + j] = np.random.uniform(
        0,
        min(
            gene_upper_boundaries[type_count + i * type_count + j],
            current_rho_max_value,
        ),
        initial_population_size,
    )


# fill the initial population with random mu values
for i in range(type_count):
    current_mu_max_value = 0.2 * event_counts_for_type[i] / training_time_in_seconds
    initial_population[:, i] = np.random.uniform(
        0,
        min(gene_upper_boundaries[i], current_mu_max_value),
        initial_population_size,
    )


# rescale the rho values for individuals with spectral norm > 1
for individual_index in range(initial_population_size):
    rho_values = initial_population[
        individual_index, type_count : type_count + square_matrix_size
    ].reshape((type_count, type_count))
    kernel_spectral_norm = np.linalg.norm(rho_values, ord=2)
    if kernel_spectral_norm > 1:
        random_factor = np.random.uniform(0, 1)

        rescaled_rho_matrix = rho_values * (random_factor / kernel_spectral_norm)

        initial_population[
            individual_index, type_count : type_count + square_matrix_size
        ] = rescaled_rho_matrix.flatten()

# fill the initial population with random beta values
for i, j in [(i, j) for i in range(type_count) for j in range(type_count)]:
    bernoulli_samples = np.random.binomial(n=1, p=0.5, size=initial_population_size)

    random_values = np.where(
        bernoulli_samples == 0,
        np.random.uniform(0, 1, size=initial_population_size),
        np.random.uniform(0, 100, size=initial_population_size),
    )

    initial_population[:, type_count + square_matrix_size + i * type_count + j] = (
        random_values
    )

initial_population

array([[8.94947056e-03, 8.40306677e-02, 7.49080238e-02, 3.48352980e-03,
        1.44619775e-01, 1.21508970e-01, 7.06857344e-01, 6.33403757e+01,
        9.42909704e-01, 6.45172790e+01],
       [3.63129734e-02, 6.71781780e-02, 1.90142861e-01, 1.64138590e-01,
        3.29712762e-02, 3.41048247e-02, 3.25183322e+01, 2.49292229e-01,
        3.23202932e-01, 8.35302496e-01],
       [2.52182488e-03, 8.14232416e-02, 2.68682248e-01, 2.58544401e-01,
        1.26730191e-01, 2.38775313e-02, 7.71270347e-01, 8.03672077e+01,
        6.09564334e+01, 6.90937738e+01],
       [6.66834962e-02, 7.75517037e-02, 1.42883068e-01, 4.28825844e-02,
        1.03338589e-01, 2.26472488e-01, 6.37557471e+01, 7.55551139e-01,
        5.02679023e+01, 3.86735346e+01],
       [1.89771987e-02, 5.18179982e-02, 3.12037281e-02, 3.07703791e-02,
        1.07798360e-01, 1.93126407e-01, 3.58465729e-01, 2.28798165e-01,
        3.63629602e-01, 4.07751416e-02],
       [4.85849675e-02, 7.98957670e-02, 1.03413450e-01, 1.02879070e-01,
   

In [11]:
import numpy as np

array

array([0, 1, 2, 4, 8])

In [27]:
index = np.searchsorted(betas_mn, 2, side="left")
print(index)
print(betas_mn[index - 1])
print(betas_mn[index])

1
1
2
