In [5]:
import random
import numpy as np
import heapq
from IPython.display import display, clear_output

# def initialize_population(size, chromosome_length):
    # return [np.random.randint(0, 2, chromosome_length) for i in range(size)]
def initialize_population(size, chromosome_length, rules_dict):
    population = []
    keys_list = list(rules_dict.keys())

    for _ in range(size):
        word = random.sample(keys_list, chromosome_length)
        population.append(word)

    return population

# def fitness(chromosome):
#     goal = ['M', 'A', 'Z', 'D', 'A']
#     score = 0.0
#     for i in range(len(chromosome)):
#         if chromosome[i] == goal[i]:
#             score += 1
#     score = score/len(chromosome)
#     return score
# def fitness_one_parent(chromosome, rules_dict):
#     score = 0.0
#     for i in range(0, len(chromosome)):
#         current_char = chromosome[i]
#         parent_char = rules_dict[current_char]['parent'][0] if rules_dict[current_char]['parent'] else None

#         if parent_char is None or chromosome.index(parent_char) < i:
#             # If no parent or parent is before the current character in the chromosome
#             score += 1

#     score = score / (len(chromosome))  # Normalize the score
#     return score

# def fitness(chromosome, rules_dict, repetition_penalty):
#     score = 0.0
#     unique_chars = set()
    
#     for i in range(0, len(chromosome)):
#         current_char = chromosome[i]
#         parents = rules_dict[current_char]['parents'] if rules_dict[current_char]['parents'] else []

#         # Check if all parents (if any) are before the current character in the chromosome
#         if all(chromosome.index(parent) < i if parent in chromosome else False for parent in parents):
#             score += 1
#         else:
#             score -= 1
#         # Penalize repetition
#         if current_char in unique_chars:
#             score -= repetition_penalty
#             # score = 0
#             # break
#         else:
#             unique_chars.add(current_char)
#     score = max(0, score) / (len(chromosome))  # Normalize the score
#     return score

def split_into_chunks(chromosome, num_chunks):
    np_chunks = np.array_split(chromosome, num_chunks)
    return [chunk.tolist() for chunk in np_chunks]

def parallel_fitness(chromosome, rules_dict, repetition_penalty, n):
    score = 0.0
    unique_chars = set()
    slots = split_into_chunks(chromosome, n)
    for segment in slots:
        for i in range(0, len(segment)):
            current_char = segment[i]
            parents = rules_dict[current_char]['parents'] if rules_dict[current_char]['parents'] else []

            # Check if all parents (if any) are before the current character in all segments
            rule_check = []
            for seg in slots:
                rule_check.append(all(seg.index(parent) < i if parent in seg else False for parent in parents))
            
            # if all(chromosome.index(parent) < i if parent in chromosome else False for parent in parents):
            if any(rule_check):
                score += 1
            else:
                score -= 1
            # Penalize repetition
            if current_char in unique_chars:
                score -= repetition_penalty
                # score = 0
                # break
            else:
                unique_chars.add(current_char)
    score = max(0, score) / (len(chromosome))  # Normalize the score
    return score


def select_parents(population, scores, k=None):
    if k is None:
        k = len(population)
    return random.choices(population, weights=scores, k=k)

def crossover(parent1, parent2):
    pt = random.randint(1, len(parent1)-2)
    child1 = (parent1[:pt] + parent2[pt:])
    child2 = (parent2[:pt] + parent1[pt:])
    return child1, child2

def mutation(chromosome, rules_dict):
    mutated_chromosome = chromosome.copy()
    index = random.randint(0, len(chromosome) - 1)
    
    # Generate a new random uppercase English letter that is different from the original
    original_char = mutated_chromosome[index]
    new_char = original_char
    allowed_keys = list(rules_dict.keys())
    # while new_char == original_char or new_char in mutated_chromosome:
    while new_char == original_char:
        new_char = random.choice(allowed_keys)
    
    mutated_chromosome[index] = new_char
    return mutated_chromosome

def best_of_gen(population, population_scores, k):
    if k == 0:
        return []
    top_k_ix = heapq.nlargest(k, enumerate(population_scores), key=lambda x: x[1])
    top_k = [population[ix] for ix, _ in top_k_ix]
    return top_k

def find_mode_percentage(fitness_scores):
    # Round fitness scores to one decimal place
    rounded_scores = np.round(fitness_scores, 2)
    
    # Count the occurrences of each unique rounded score
    unique_scores, counts = np.unique(rounded_scores, return_counts=True)
    
    # Find the rounded score with the maximum count
    mode_index = np.argmax(counts)
    mode_score = unique_scores[mode_index]
    
    # Calculate the percentage of the mode score
    mode_percentage = counts[mode_index] / len(fitness_scores)
    
    return mode_score, np.round(mode_percentage, 1)

def ga_algorithm(n_gen, population_size, rules_dict, parallel_n, chromosome_length=10, crossover_rate=0.9, mutation_rate=0.01, top_k=0, repetition_penalty=0):
    population = initialize_population(population_size, chromosome_length, rules_dict)
    population_scores = [parallel_fitness(c, rules_dict, repetition_penalty, parallel_n) for c in population]
    for generation in range(n_gen):
        parents = select_parents(population, population_scores)
        offspring = []
        for i in range(0, population_size, 2):
            parent1, parent2 = parents[i], parents[i+1]
            # crossover
            if random.random() < crossover_rate:
                child1, child2 = crossover(parent1, parent2)
            else:
                child1, child2 = parent1.copy(), parent2.copy()    
            # mutation
            if random.random() < mutation_rate:
                child1 = mutation(child1, rules_dict)
            if random.random() < mutation_rate:
                child2 = mutation(child2, rules_dict)
            offspring.extend([child1, child2])
        # guarantee to pass down the best two samples of this generation to the next generation
        if top_k != 0:
            best_indiv = best_of_gen(population, population_scores, top_k)
            offspring[:-top_k].extend(best_indiv)
        population = offspring
        population_scores = [parallel_fitness(c, rules_dict, repetition_penalty, parallel_n) for c in population]
            
    return population, population_scores


def adaptive_ga_algorithm(max_generations, population_size, rules_dict, parallel_n, 
                          chromosome_length=10, initial_crossover_rate=0.9, initial_mutation_rate=0.01, top_k=0, 
                          repetition_penalty=0, fitness_threshold=1.0, crossover_rate_range=[0.5, 0.9], mutation_rate_range=[0.01, 0.1], 
                          fitness_tolerance=10):
    
    population = initialize_population(population_size, chromosome_length, rules_dict)
    population_scores = [parallel_fitness(c, rules_dict, repetition_penalty, parallel_n) for c in population]
    
    # Initialize rates
    crossover_rate = initial_crossover_rate
    mutation_rate = initial_mutation_rate
    
    fitness_stall_counter = 0
    old_fitness_mode = 0
    
    for generation in range(max_generations):
        parents = select_parents(population, population_scores)
        offspring = []

        for i in range(0, population_size, 2):
            parent1, parent2 = parents[i], parents[i + 1]

            # Crossover
            if random.random() < crossover_rate:
                child1, child2 = crossover(parent1, parent2)
            else:
                child1, child2 = parent1.copy(), parent2.copy()

            # Mutation
            if random.random() < mutation_rate:
                child1 = mutation(child1, rules_dict)
            if random.random() < mutation_rate:
                child2 = mutation(child2, rules_dict)

            offspring.extend([child1, child2])

        # Guarantee to pass down the best samples of this generation to the next generation
        if top_k != 0:
            best_indiv = best_of_gen(population, population_scores, top_k)
            offspring[:-top_k].extend(best_indiv)

        population = offspring
        population_scores = [parallel_fitness(c, rules_dict, repetition_penalty, parallel_n) for c in population]

        # Update rates based on improvement and variance
        best_fitness = max(population_scores)
        fitness_mode = find_mode_percentage(population_scores)

        # Stop if the threshold is reached
        if best_fitness > fitness_threshold:
            print(f"reached generation {generation}, Best Fitness Score: {best_fitness}")
            break
    
        
        if fitness_mode == old_fitness_mode and fitness_mode[1]>=0.7:
            fitness_stall_counter += 1
        else:
            old_fitness_mode = fitness_mode
            
        # Adjust rates based on improvement and variance
        if fitness_stall_counter>=fitness_tolerance:
            # Reduce crossover rate and increase mutation rate for local maxima
            clear_output(wait=True)
            display(f"G({generation}); fitness stall threshold has reached! {fitness_stall_counter}: {fitness_mode}! CV: {np.var(population_scores)/np.mean(population_scores)}, Best Fitness Score: {best_fitness}")
            crossover_rate = max(crossover_rate_range[0], min(crossover_rate_range[1], crossover_rate - 0.1))
            mutation_rate = max(mutation_rate_range[0], min(mutation_rate_range[1], mutation_rate + 0.001))
            
            fitness_stall_counter = 0
        elif np.var(population_scores)/np.mean(population_scores) >= 0.04: # this is variance coefficient >=1 considered as high variance
            # Increase crossover rate and decrease mutation rate for lack of convergence
            display(f"variance is too high, CV: {np.var(population_scores)/np.mean(population_scores)}")
            crossover_rate = max(crossover_rate_range[0], min(crossover_rate_range[1], crossover_rate + 0.01))
            mutation_rate = max(mutation_rate_range[0], min(mutation_rate_range[1], mutation_rate - 0.001))

    return population, population_scores

In [None]:
rules_dict = {
	'X': {'parents': None}, 
    'A': {'parents': ['X']}, 
	'B': {'parents': ['A']}, 
	'C': {'parents': ['X']}, 
	'D': {'parents': ['X']}, 
	'E': {'parents': ['X']}, 
	'F': {'parents': ['E', 'B']}, 
	'G': {'parents': ['X']}, 
	'H': {'parents': ['E', 'A']}, 
	'I': {'parents': ['F']}, 
	'J': {'parents': ['D']}, 
	'K': {'parents': ['D']}, 
	'L': {'parents': ['K', 'E', 'A']}, 
	'M': {'parents': ['B']}, 
	'N': {'parents': ['X']}, 
	'O': {'parents': ['J', 'A']}, 
	'P': {'parents': ['C']}, 
	'Q': {'parents': ['C', 'N', 'H']}, 
	'R': {'parents': ['P', 'C', 'G']}, 
	'S': {'parents': ['C', 'F', 'N']}, 
	'T': {'parents': ['I', 'B', 'Q']}
}

# population, fit_score = ga_algorithm(n_gen=600, 
#                                         population_size=1000, 
#                                         rules_dict=rules_dict, 
#                                         parallel_n = 2,
#                                         chromosome_length=len(rules_dict), 
#                                         crossover_rate=0.80, 
#                                         mutation_rate=0.2, 
#                                         top_k=50,
#                                         repetition_penalty=len(rules_dict)/10)
population, fit_score = adaptive_ga_algorithm(max_generations=1000, 
                                            population_size=3000, 
                                            rules_dict=rules_dict, 
                                            parallel_n=2, 
                                            chromosome_length=len(rules_dict), 
                                            initial_crossover_rate=0.9, 
                                            initial_mutation_rate=0.01, 
                                            top_k=100, 
                                            repetition_penalty=len(rules_dict)/10, 
                                            fitness_threshold=0.99, 
                                            crossover_rate_range=[0.5, 
                                            0.9], 
                                            mutation_rate_range=[0.01, 
                                            0.3], 
                                            fitness_tolerance=10
)

'G(546); fitness stall threshold has reached! 10: (0.52, 0.7)! CV: 0.01034934074313877, Best Fitness Score: 0.5190476190476191'

In [None]:
counter = 0
for i in range(len(population)):
    if(fit_score[i]>=0.9):
        counter += 1
        print(counter, population[i], fit_score[i])
print(counter)

In [None]:
# consider task_time only when the score is 1
rules_dict = {'A': {'parents': None}, 'B': {'parents': ['A','C']}, 'C': {'parents': ['A']}, }
population = initialize_population(3, 3, rules_dict)
for p in population:
    print(mutation(p, rules_dict) )

In [None]:
split_into_chunks(population[0], 2)

In [None]:
a = ['X', 'C', 'D', 'P', 'F', 'M', 'N', 'G', 'T', 'H', 'R', 'Q', 'E', 'A', 'B', 'J', 'I', 'K', 'L', 'O', 'S']
# a = ['X', 'O', 'Q', 'F', 'A', 'E', 'D', 'S', 'G', 'H', 'P', 'N', 'J', 'L', 'K', 'C', 'R', 'M', 'I', 'B', 'T']
uni = set()
for e in a:
    if e in uni:
        print("Repetition Found!", e)
    else:
        uni.add(e)
split_into_chunks(a, 2)

In [None]:
import matplotlib.pyplot as plt

def plot_fitness_distribution(fitness_scores):
    plt.hist(fitness_scores, bins=20, color='skyblue', edgecolor='black')
    plt.xlabel('Fitness Score')
    plt.ylabel('Frequency')
    plt.title('Distribution of Fitness Scores')
    plt.grid(True)
    plt.show()

plot_fitness_distribution(fit_score)
