In [1]:
from dataclasses import dataclass
from typing import Any, NoReturn
from typing_extensions import Self
from random import randint, choice, choices
from functools import cache
from copy import deepcopy
from itertools import combinations

In [2]:
def bool_as_emoji(content: Any):
    if isinstance(content, bool):        
        return "✅" if content else "❌"
    
    return content

In [3]:
@dataclass
class GraphEntry:
    origin:str
    destination:str
    relationship:str
    active:bool
    
    def __init__(self, origin, destination, relationship, active=True):
        self.origin = origin
        self.destination = destination
        self.relationship = relationship
        self.active = active
        
    def compute_key(self) -> str:
        return f"{self.origin} <{self.relationship}> {self.destination}"
        

@dataclass
class Graph:
    entries:list[GraphEntry]
        
    def active_list(self) -> list[bool]:
        return [entry.active for entry in self.entries]
    
    def get_key_and_active(self) -> list[GraphEntry]:
        return {entry.compute_key():entry.active for entry in self.entries}
    
    def show(self) -> str:
        return "".join(map(bool_as_emoji, self.active_list()))
    
@dataclass
class Chromossome:
    graphs:list[Graph]
    fitness:float
    __all_possible_entry_indexes:list[tuple[int, int]]
    
    def __init__(self, graphs:list[Graph]):
        self.graphs = graphs
        self.__fill_all_possible_entry_indexes() 
        self.__update_fitness()
        
    def __fill_all_possible_entry_indexes(self):
        self.__all_possible_entry_indexes = []
        for graph_index, graph in enumerate(self.graphs):
            for entry_index in range(len(graph.entries)):
                self.__all_possible_entry_indexes.append((graph_index, entry_index)) 
        
    def __update_fitness(self):
        fitness = 0
        dicts = [
            graph.get_key_and_active() for graph in self.graphs
        ]
        for graph1, graph2 in combinations(dicts, 2):
            for key, active in graph1.items():
                if(active):
                    fitness += (1 if key in graph2 else -1)
            for key, active in graph2.items():
                if(active):
                    fitness += (1 if key in graph1 else -1)
        self.fitness = fitness
    
    def activate_all_entries(self) -> Self:
        for graph in self.graphs:
            for entry in graph.entries:
                entry.active = True
        return self
    
    def active(self) -> list[list[bool]]:
        return [graph.active_list() for graph in self.graphs]
    
    def show(self) -> str:
        emojis = "-".join([graph.show() for graph in self.graphs])
        return f"{emojis} -> {self.fitness:+d}"
    
    def __hash__(self):
        return hash(str(self.active()))
    
    def mutate(self) -> Self:
        copy = deepcopy(self)
        n_mutations = max(1, len(copy.__all_possible_entry_indexes)//2)
        available = deepcopy(copy.__all_possible_entry_indexes)
        for _ in range(randint(1, n_mutations)):
            random = choice(range(len(available)))
            graph_index, entry_index = available.pop(random)
            copy.graphs[graph_index].entries[entry_index].active = not copy.graphs[graph_index].entries[entry_index].active
        
        copy.__update_fitness()
        return copy
    
    def crossover(self, other:Self) -> Self:
        copy = deepcopy(self)
        for index, graph in enumerate(self.graphs):
            graph_size = len(graph.entries)
            crop = randint(1, max(graph_size-1, 0))
            for entry_index in range(crop, graph_size):
                copy.graphs[index].entries[entry_index].active = other.graphs[index].entries[entry_index].active
                
        copy.__update_fitness()
        return copy
    
@dataclass
class Population:
    generation:int
    metrics:dict[int, tuple[int,int,float]] #Max fitness, Min fitness, Avg fitness
    chromossomes:list[Chromossome]
    best:Chromossome
    size:int
    
    #Takes in a baseline, all-active chromossome, and generates a population of chromossomes
    def __init__(self, C:Chromossome, size:int=20) -> NoReturn:
        self.size = size
        self.generation = 0
        self.metrics = {}
        self.best=None
                
        C.activate_all_entries()
        self.chromossomes = [C]
        for _ in range(size-1):
            self.chromossomes.append(C.mutate())
            
        self.add_to_metrics()
            
    def active(self) -> list[list[list[bool]]]:
        return [chromossome.active() for chromossome in self.chromossomes]
    
    def show(self, with_chromossomes:bool=False) -> NoReturn:
        max_fitness, min_fitness, avg_fitness = self.metrics.get(self.generation, self.add_to_metrics())
            
        print(f">>>>>>>>>> Generation {self.generation:04d} <<<<<<<<<<")
        print(f"Max: {max_fitness:+d}, Min: {min_fitness:+d}, Avg: {avg_fitness:.2f}")
        print(f"Size: {len(self.chromossomes):d}")
        
        if(with_chromossomes):
            for chromossome in self.chromossomes:
                print(chromossome.show())
            
        print("-"*37)
    
    #Top-10% of the population
    def __selection(self) -> list[Chromossome]:
        chromossomes = sorted(self.chromossomes, key=lambda x: x.fitness, reverse=True)
        best = chromossomes[0]
        
        if(self.best is None or best.fitness > self.best.fitness):
            self.best = best
        return chromossomes[:self.size//10]    
    
    #Maintains the population
    def __substitution(self, selected:list[Chromossome]) -> NoReturn:
        #replaces the population with the selected chromossomes
        self.chromossomes = selected
        self.chromossomes.append(self.best)
        
        len_selected = len(selected)
        
        for _ in range(self.size - len(self.chromossomes)):
            method = choices(["mutate", "crossover"], [0.7, 0.3], k=1)[0]
            if method == "mutate":
                self.chromossomes.append(choice(selected).mutate())
            elif method == "crossover":
                c1, c2 = randint(0,len_selected-1), randint(0,len_selected-1)
                self.chromossomes.append(selected[c1].crossover(selected[c2]))
        pass
    
    def add_to_metrics(self) -> NoReturn:
        fitnesses = [chromossome.fitness for chromossome in self.chromossomes]
        self.metrics[self.generation] = ((max(fitnesses), min(fitnesses), sum(fitnesses)/self.size))
        return self.metrics[self.generation]
    
    def evolve(self, max_iterations:int=100) -> Self:
        history = []
        for _ in range(max_iterations):
            self.show()
            self.generation += 1
            self.__substitution(self.__selection())
            metrics = self.add_to_metrics()
            
            if(len(history) >= 50):
                history.pop(0)
                if all([x[0] == metrics[0] for x in history]):
                    break
            
            history.append(metrics)
            
        self.show()
        return self

In [4]:
import pandas as pd

def load_as_graph(path:str) -> Graph:
    df = pd.read_csv(path)
    return Graph([
        GraphEntry(row["entity1"], row["entity2"], row["relationship"]) for _, row in df.iterrows()
    ])

In [5]:
chromossome = Chromossome(list(map(load_as_graph, ["original_graph.csv", "paraphrase_graph.csv"])))
population = Population(chromossome, 100)

In [6]:
population.evolve(1000)

>>>>>>>>>> Generation 0000 <<<<<<<<<<
Max: +1, Min: -12, Avg: -6.69
Size: 100
-------------------------------------
>>>>>>>>>> Generation 0001 <<<<<<<<<<
Max: +2, Min: -9, Avg: -3.65
Size: 100
-------------------------------------
>>>>>>>>>> Generation 0002 <<<<<<<<<<
Max: +2, Min: -9, Avg: -1.85
Size: 100
-------------------------------------
>>>>>>>>>> Generation 0003 <<<<<<<<<<
Max: +4, Min: -9, Avg: -0.89
Size: 100
-------------------------------------
>>>>>>>>>> Generation 0004 <<<<<<<<<<
Max: +5, Min: -8, Avg: 0.01
Size: 100
-------------------------------------
>>>>>>>>>> Generation 0005 <<<<<<<<<<
Max: +6, Min: -7, Avg: 0.56
Size: 100
-------------------------------------
>>>>>>>>>> Generation 0006 <<<<<<<<<<
Max: +6, Min: -6, Avg: 1.69
Size: 100
-------------------------------------
>>>>>>>>>> Generation 0007 <<<<<<<<<<
Max: +6, Min: -8, Avg: 1.38
Size: 100
-------------------------------------
>>>>>>>>>> Generation 0008 <<<<<<<<<<
Max: +6, Min: -6, Avg: 2.11
Size: 100
-------

Population(generation=54, metrics={0: (1, -12, -6.69), 1: (2, -9, -3.65), 2: (2, -9, -1.85), 3: (4, -9, -0.89), 4: (5, -8, 0.01), 5: (6, -7, 0.56), 6: (6, -6, 1.69), 7: (6, -8, 1.38), 8: (6, -6, 2.11), 9: (6, -9, 1.99), 10: (6, -7, 1.5), 11: (6, -7, 1.87), 12: (6, -8, 0.64), 13: (6, -8, 1.38), 14: (6, -7, 1.95), 15: (6, -9, 1.69), 16: (6, -9, 1.48), 17: (6, -9, 1.3), 18: (6, -11, 0.41), 19: (6, -9, 1.49), 20: (6, -10, 1.62), 21: (6, -8, 1.77), 22: (6, -9, 1.64), 23: (6, -7, 1.55), 24: (6, -6, 2.11), 25: (6, -7, 1.59), 26: (6, -10, 1.41), 27: (6, -9, 2.07), 28: (6, -4, 2.38), 29: (6, -7, 1.91), 30: (6, -8, 0.57), 31: (6, -6, 1.94), 32: (6, -9, 1.36), 33: (6, -7, 1.64), 34: (6, -8, 0.83), 35: (6, -6, 1.74), 36: (6, -8, 0.92), 37: (6, -6, 1.84), 38: (6, -8, 2.37), 39: (6, -9, 1.26), 40: (6, -6, 1.95), 41: (6, -7, 2.04), 42: (6, -6, 1.76), 43: (6, -6, 2.09), 44: (6, -7, 1.31), 45: (6, -10, 1.69), 46: (6, -8, 2.26), 47: (6, -7, 1.01), 48: (6, -8, 1.06), 49: (6, -8, 2.12), 50: (6, -9, 1.67),

In [7]:
population.best.show()

'✅✅❌❌✅❌❌❌❌❌❌-✅✅❌✅❌❌❌❌❌❌ -> +6'