In [2]:
!pip install deap

Collecting deap
  Downloading deap-1.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.4/135.4 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: deap
Successfully installed deap-1.4.1


### Setup

In [3]:
from deap import base, creator, tools
import random
import string
import itertools

signals = list(string.ascii_lowercase)  # Characters used to make words
meanings = list(range(0, 1000))  # 100 possible meanings

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", dict, fitness=creator.FitnessMax)

toolbox = base.Toolbox()

# def create_word(min_length=1, max_length=3):
#     length = random.randint(min_length, max_length)
#     return ''.join(random.choice(signals) for _ in range(length))

def create_word(min_length=1, max_length=3):
    length = random.randint(min_length, max_length)
    return str(random.choices(signals, weights=(8.12,1.49,2.71,4.32,12.02,2.30,2.03,5.92,7.31,0.10,0.69,3.98,2.61,6.95,7.68,1.82,0.11,6.02,6.28,9.10,2.88,1.11,2.09,0.17,2.11,0.07),k=length))


def create_individual():
    individual = creator.Individual()
    individual.local_state = random.choice(meanings)
    individual.age = 0
    for meaning in meanings:

        word = create_word()
        individual[meaning] = word

    return individual

toolbox.register("individual", create_individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)


### Evaluation

In [4]:
GLOBAL_STATE = random.choice(meanings)

"""
Adds newly heard word to individual's vocabulary
if a randomly guessed meaning matches that of the
speaker.
"""
def learn(individual, new_word, speakers_meaning):
    random_meaning = random.choice(meanings)
    if random_meaning == speakers_meaning:
        individual[random_meaning] = new_word

def pairwise_communication(speaker, listener, GLOBAL_STATE):
    fitness = 0
    # speaker encodes signals
    local_meaning = speaker.local_state
    global_meaning = GLOBAL_STATE

    semantic_local = speaker[local_meaning]
    semantic_global = speaker[global_meaning]

    # only update if listener has words in dictionary
    decoded_local = -100
    decoded_global = -101

    localWordPresentinListener = 0
    globalWordPresentinListener = 0
    # listener decodes local signal or possibly learns new word
    for meaning in meanings:
        if semantic_local == listener[meaning]:
            decoded_local = meaning
            localWordPresentinListener = 1
        if semantic_global == listener[meaning]:
            decoded_global = meaning
            globalWordPresentinListener = 1

    if localWordPresentinListener == 1:
        if (random.random() < 1):
            learn(listener, semantic_local, local_meaning)

    if globalWordPresentinListener == 1:
        if (random.random() < 1):
            learn(listener, semantic_global, global_meaning)

    if (decoded_local == local_meaning) & (localWordPresentinListener == 1):
        fitness += 1
    if (decoded_global == global_meaning) & (globalWordPresentinListener == 1):
        fitness += 1

    # penalty if different ideas are represented by the same word
    if local_meaning != global_meaning:
        if semantic_global == semantic_local:
            fitness -= 1
        if decoded_local == decoded_global:
            fitness -= 1

    return fitness

def survival_task(individual, GROUP_SIZE):
    penalty = 0
    for i in range(2 * (GROUP_SIZE - 1)):
        meaning1, meaning2 = random.sample(meanings, 2) # two different meanings
        if individual[meaning1] == individual[meaning2]: # same word
            penalty += 2

    return penalty

# evaluate group
def evaluate_group(group, GROUP_SIZE):
    GLOBAL_STATE = random.choice(meanings)
    # reset fitness
    for ind in group:
        # ind.fitness.values = (0,)  # reset fitness
        ind.local_state = random.choice(meanings)

    # speak
    for speaker, listener in itertools.permutations(group, 2):
        fitness_bonus = pairwise_communication(speaker, listener, GLOBAL_STATE)
        speaker.fitness.values = (speaker.fitness.values[0] + fitness_bonus,)
        listener.fitness.values = (listener.fitness.values[0] + fitness_bonus,)

    # individual survival
    for ind in group:
        penalty = survival_task(ind, GROUP_SIZE)
        ind.fitness.values = (ind.fitness.values[0] - penalty,)


### Declare population + initial eval

In [5]:
# create pop
population = toolbox.population(n=100)

for ind in population:
    ind.fitness.values = (0,)

toolbox.register("evaluate", evaluate_group)

GROUP_SIZE = 10

# for i in range(500):
groups = [population[i:i + GROUP_SIZE] for i in range(0, len(population), GROUP_SIZE)]
for group in groups:
    toolbox.evaluate(group, GROUP_SIZE)

count = 0
for ind in population:
    if ind.fitness.values[0] > 0:
        print(ind, ind.fitness.values[0])
        count += 1
print(count)

0


### Params

In [6]:
from deap import algorithms
import numpy

# Parameters
NUM_GENERATIONS = 10000
CXPB = 0.5  # Crossover probability
MUTPB = 0.01  # Mutation probability (seems better at 0 for now or .01)

# mut for signals
def mutateInd(individual, indpb):
    for i in range(len(meanings)):
        if random.random() < indpb:
            individual[i] = create_word()

    return individual,

# tools
toolbox.register("mate", tools.cxUniform, indpb=0.5)
toolbox.register("mutate", mutateInd, indpb=0.1)
toolbox.register("select", tools.selRoulette) # inspired by paper
toolbox.register("tourn", tools.selTournament)
toolbox.register("randsel", tools.selRandom)
toolbox.register("stochastic_select", tools.selStochasticUniversalSampling)



stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", numpy.mean)
stats.register("min", numpy.min)
stats.register("max", numpy.max)

# for steady state
def select_next_generation(parents, offspring, max_age=3):
    combined = parents + offspring
    filtered = [ind for ind in combined if ind.age < max_age]
    selected = toolbox.tourn(filtered, len(population), 2)
    return selected



### run algorithm

In [7]:
for gen in range(NUM_GENERATIONS):

    #for ES:
    MU = 100
    LAMBDA = 150

    """
    Mating selection
    """
    #offspring = toolbox.randsel(population, LAMBDA)
    #offspring = toolbox.select(population, len(population))
    #offspring = toolbox.tourn(population, len(population), tournsize=2)
    offspring = toolbox.randsel(population, len(population)) # random sel

    offspring = list(map(toolbox.clone, offspring))

    # mutation and crossover
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < CXPB:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values

    for mutant in offspring:
        if random.random() < MUTPB:
            toolbox.mutate(mutant)
            del mutant.fitness.values

    """
    Evaluate Fitness
    """
    random.shuffle(offspring)
    for ind in offspring:
        ind.fitness.values = (0,)
    groups = [offspring[i:i + GROUP_SIZE] for i in range(0, len(offspring), GROUP_SIZE)]
    for group in groups:
        toolbox.evaluate(group, GROUP_SIZE)

    for ind in population:
        ind.age += 1

    """
    Survivor Selection
    """
    # ES:
    #population[:] = toolbox.stochastic_select(offspring, MU)

    # steady state:
    population [:] = select_next_generation(population, offspring)

    # generational:
    #population[:] = offspring


    fits = [ind.fitness.values[0] for ind in population]


    from collections import Counter

    def count_duplicate_values(individual):
        value_count = Counter(individual.values())
        duplicates = [value for value, count in value_count.items() if count > 1]
        return len(duplicates)  # Return the number of unique values that are duplicates

    dup_list = []

    for individual in population:
        num_duplicates = count_duplicate_values(individual)
        dup_list.append(num_duplicates)

    sum_dups = 0
    for dup in dup_list:
        sum_dups += dup
    #print("Dup Avg:", sum_dups / len(ind) / len(population))



    length = len(population)
    mean = sum(fits) / length
    sum2 = sum(x*x for x in fits)
    std = abs(sum2 / length - mean**2)**0.5

    print(f"Generation {gen}: Min {min(fits)}, Max {max(fits)}, Avg {mean}, Std {std}, Avg % of duplicate genes across participants {sum_dups / len(ind) / len(population)}")

# print final pop
for ind in population:
    print(ind, ind.fitness.values)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Generation 5100: Min 5.0, Max 30.0, Avg 16.16, Std 5.543861470130724, Avg % of duplicate genes across participants 0.11757999999999999
Generation 5101: Min 1.0, Max 33.0, Avg 24.99, Std 9.28815912869714, Avg % of duplicate genes across participants 0.11718999999999999
Generation 5102: Min 4.0, Max 34.0, Avg 25.78, Std 7.836555365720318, Avg % of duplicate genes across participants 0.11750999999999999
Generation 5103: Min 5.0, Max 33.0, Avg 23.88, Std 7.7656680330799635, Avg % of duplicate genes across participants 0.11831
Generation 5104: Min 5.0, Max 34.0, Avg 23.94, Std 7.973481046569311, Avg % of duplicate genes across participants 0.11787
Generation 5105: Min 5.0, Max 35.0, Avg 25.29, Std 7.3079340445847, Avg % of duplicate genes across participants 0.11721
Generation 5106: Min 4.0, Max 34.0, Avg 22.94, Std 8.131199173553677, Avg % of duplicate genes across participants 0.11733
Generation 5107: Min 14.0, Max 34.0, Avg

### Final population

In [8]:
for ind in population:
    print(ind)

{0: "['v', 'b']", 1: "['t', 'v']", 2: "['t', 'u']", 3: "['r']", 4: "['o']", 5: "['p', 'o']", 6: "['e', 'e', 'v']", 7: "['o']", 8: "['h', 't']", 9: "['f']", 10: "['l', 's', 'o']", 11: "['r', 'e']", 12: "['f', 's', 'h']", 13: "['i', 't', 'u']", 14: "['t']", 15: "['e', 'f', 'r']", 16: "['r', 'u']", 17: "['d', 'i']", 18: "['t', 'n']", 19: "['u']", 20: "['l', 'o']", 21: "['i', 'h', 'r']", 22: "['a', 'h']", 23: "['d']", 24: "['t']", 25: "['n', 'r', 'w']", 26: "['g', 'a']", 27: "['h', 'k']", 28: "['i', 'p']", 29: "['e', 'i', 'e']", 30: "['t', 'o', 't']", 31: "['e', 'r', 'a']", 32: "['l', 'v']", 33: "['n']", 34: "['n']", 35: "['h', 't', 's']", 36: "['i', 'c', 'd']", 37: "['o']", 38: "['t', 'i']", 39: "['p']", 40: "['r']", 41: "['a']", 42: "['f', 't']", 43: "['t', 'a']", 44: "['a']", 45: "['h']", 46: "['d']", 47: "['n', 'v', 'o']", 48: "['n', 'd']", 49: "['p', 'a']", 50: "['t', 'e']", 51: "['p', 'r', 'p']", 52: "['o']", 53: "['e']", 54: "['n', 'a', 'i']", 55: "['s', 'o', 'e']", 56: "['j', 'h']"