First we need to import used libraries:

In [33]:
import gym, itertools, numpy as np, pandas as pd, random

After that we can divide observation space values into certain ranges and map observation vector to bits according to those ranges:

In [34]:
def observation_ranges(bits):
    lst_observations =[]
    #var_for_velocity is fixed value taken from maxarg(abs(observation[cart_velocity]), abs(observation[pole_angular_velocity]))
    var_for_velocity = 5
    
    #given range (-2.4, 2.4)
    cart_position_range = np.linspace(-2.4, 2.4, num=bits+1)
    lst_observations.append(cart_position_range)
    cart_velocity_range = np.linspace(-var_for_velocity, var_for_velocity, num=bits+1)
    lst_observations.append(cart_velocity_range)
    #given range (-0.2095 rad, 0.2095 rad)
    pole_angle_range = np.linspace(-0.2095, 0.2095, num=bits+1)
    lst_observations.append(pole_angle_range)
    pole_angular_velocity = np.linspace(-var_for_velocity, var_for_velocity, num=bits+1)
    lst_observations.append(pole_angular_velocity)
    
    return lst_observations


def mapping_observation_to_bits(observation, lst_observations):
    rule = []

    for observation_index in range(len(lst_observations)):
        for index in range(1, len(lst_observations[observation_index])):
            #lower bound value <= observed value <= upper bound value
            if lst_observations[observation_index][index-1] <= observation[observation_index] <= lst_observations[observation_index][index]:
                rule.append(1)
                #if we find it then fill the rest with 0s and break loop
                for i in range(len(lst_observations[observation_index])-index-1):
                    rule.append(0)
                break
            else:
                rule.append(0)
                
        #5 break cells between observation values
        for i in range(0,5):
            rule.append(0)
            
    return rule


Rule as a class:

In [35]:
class rules:
    def __init__(self,geno={},fitness=0, num_neighbours=5):
        self.geno = geno
        self.fitness = fitness
        self.num_neighbours = num_neighbours

        #generate each possible combination for 5 bits
        combinations_n_neighbours = list(itertools.product([0, 1], repeat=num_neighbours))
        combinations_n_neighbours = ["".join(str(seq)).replace(',','').replace(' ','').replace('(','').replace(')','') for seq in combinations_n_neighbours]

        self.configurations = combinations_n_neighbours

    def update_geno(self,gene):
        for i in range(len(self.configurations)):
            self.geno[self.configurations[i]] = gene[i]


    def random_initiation(self):
        if self.geno == {}:
            _ = {}

            for configuration in self.configurations:
                _[configuration] = random.choice([0,1])

            self.geno = _
            return

    def update_fitness(self,new_fitness):
        self.fitness = new_fitness

    def reproduce(self, other, method='uniform'):
        if method == 'random_one_point':
            split_point = random.randrange(len(self.geno.values()))
            first_half = list(self.geno.values())[:split_point]
            second_half = list(other.geno.values())[split_point:]
            genotype = first_half + second_half

        if method == 'one_point':
            split_point = len(self.geno.values()) / 2
            first_half = list(self.geno.values())[:split_point]
            second_half = list(other.geno.values())[split_point:]
            genotype = first_half + second_half

        if method == 'uniform':
            genotype = []
            first_genotype = list(self.geno.values())
            second_genotype = list(other.geno.values())
            for index in range(len(self.geno.values())):
                genotype.append(np.random.choice([first_genotype[index], second_genotype[index]]))
        
        child = rules()
        child.update_geno(genotype)
        return child

    def mutate(self, mutation_probability):
        genotype = list(self.geno.values())
        # genotype = [digit if random.random() > mutation_probability else 1 - digit for digit in genotype]
        for gen in genotype:
            gen = np.random.choice([gen, 1 - gen], p=[1 - mutation_probability, mutation_probability])

        self.update_geno(genotype)
        self.update_fitness(0)
        return self

Create list containing rulesets which map every combinations of 5 neighbours bits to randomly chosen 0 or 1:

In [36]:
def generate_ruleset(size):
    ruleset = []

    for _ in range(0,size):
        rule = rules()
        rule.random_initiation()
        ruleset.append(rule)

    return ruleset

We define function action to behave accordingly to previously generated ruleset:

In [37]:
def action(number_of_bits, mapped_observation, rule):
    # 4 values from observation vector mapped to 10 bits each + 5 break cells between each mapped observation value
    total_bits = number_of_bits * 4 + 4 * 5

    oldline = mapped_observation
    newline = [0] * len(mapped_observation)
    for x in range(0,len(mapped_observation)):
        for bit in range(len(oldline)):
            combination = str(oldline[bit % total_bits]) + str(oldline[(bit+1) % total_bits]) + str(oldline[(bit+2) % total_bits]) + str(oldline[(bit+3) % total_bits]) + str(oldline[(bit+4) % total_bits])
            newline[(bit+2) % total_bits] = rule.geno[combination]
        oldline = newline

    return 1 if newline.count(1) > (len(mapped_observation)/2) else 0

Selecting candidates for reproduction or mutation using fitness proportionate selection

In [38]:
def evolve_ruleset(ruleset, offspring_proportion, mutation_probability):
    m = len(ruleset)
    num_children_needed = round(m * offspring_proportion)
    sorted_ruleset = sorted(ruleset, key=lambda x: x.fitness, reverse = True)
    best_performing = sorted_ruleset[:m - num_children_needed]
    offsprings = []

    total_fitness =  sum([rule.fitness for rule in ruleset])
    selection_probs = [rule.fitness/total_fitness for rule in ruleset]

    for i in range(num_children_needed):
        p1, p2 = select_parents(sorted_ruleset, selection_probs)
        offspring = p1.reproduce(p2)
        offsprings.append(offspring.mutate(mutation_probability))

    return best_performing + offsprings

def select_one(ruleset):
    return np.random.choice(ruleset)

def select_parents(ruleset, selection_probs):
    return np.random.choice(ruleset, size=2, replace=False, p=selection_probs)

Prepare the environment, get the values of hyperparameters and generate rulesets:

In [39]:
# best to keep number_of_bits at 10
number_of_bits = 10
generations = 20
number_of_rules = 50
number_of_episodes = 500
available_maxscore = 200
offspring_proportion = 0.9
# crossover_proportion = 0.8
mutation_probability = 0.05

ruleset = generate_ruleset(number_of_rules)
observation_ranges = observation_ranges(number_of_bits)

Running the simulation for a given number of generations:

In [40]:
env = gym.make("CartPole-v1", render_mode="human")
#truncated after reaching available_maxscore
env = gym.wrappers.TimeLimit(env, available_maxscore)   
observation, info= env.reset()
scoreboard = pd.DataFrame()

for i in range(1,generations+1):
    rule_list = []
    score_list = []

    for rule in ruleset:
        score = 0
        maxscore = 0
        for episode in range(number_of_episodes):
            mapped_observation = mapping_observation_to_bits(observation, observation_ranges)
            observation, reward, terminated, truncated, info = env.step(action(number_of_bits, mapped_observation, rule))
            score += reward
            
            if terminated or truncated:
                observation, info = env.reset()
                if score > maxscore:
                    maxscore = score 
                score = 0

        rule.update_fitness(int(maxscore))      
        rule_list.append(''.join(str(list(rule.geno.values()))))
        score_list.append(int(maxscore))

    # # Saves both ruleset and it's result in separate file after each generation
    df = pd.DataFrame({'Rules':rule_list, 'Fitness': score_list})
    df.to_csv(f'results_generation_{i}.csv', index=False, header=False, sep=";")

    scoreboard[f'Generation_{i}'] = score_list
    # Each generation, we evolve the ruleset based on each rule's fit   ness function
    ruleset = evolve_ruleset(ruleset, offspring_proportion, mutation_probability)

#At the end, scores are saved into csv file corresponding to their generation
scoreboard.to_csv(f'scores_after_{i}_generations.csv', index=False, header=False)

env.close()