In [1]:
import numpy as np
import math
import matplotlib.pyplot as plt
from tqdm import tqdm
from numba import njit, jit

In [2]:
from numba.np.extensions import cross2d
# from sec_cross import is_crossing

class Track:
    def __init__(self, walls:np.ndarray, stop:np.ndarray):
        self.walls = walls
        self.start = (0, 0)
        self.stop = stop

    # https://stackoverflow.com/questions/563198/how-do-you-detect-where-two-line-segments-intersect
    def is_crossing(self, vector:np.ndarray):
        return self._is_crossing(self.walls, vector)
        # for wall in self.walls:
        #     if is_crossing(wall, vector):
        #         return True
        

    def is_finishing(self, vector:np.ndarray):
        return self._is_finishing(self.stop, vector)
        # return is_crossing(self.stop, vector)
    
    @staticmethod
    @njit
    def _is_crossing(walls, vector):
        for wall in walls:
            p = wall[0]
            q = vector[0]

            r = wall[1] - wall[0]
            s = vector[1] - vector[0]

            rs_cross = cross2d(r, s)
            qpr_cross = cross2d(q-p, r)

            if rs_cross == 0 and qpr_cross == 0:
                t0 = np.dot(q-p, r) / np.dot(r, r)
                t1 = np.dot(s, r) / np.dot(r, r) + t0
                if (t0 > 0 and t0 < 1) or (t1 > 0 and t1 < 1) or (t0 < 0 and t1 > 1) or (t1 < 0 and t0 > 1):
                    return True
            elif rs_cross != 0:
                t = cross2d(q-p, s) / rs_cross
                u = qpr_cross / rs_cross
                if (t > 0 and t < 1 and u > 0 and u < 1):
                    return True
        
        return False
    
    @staticmethod
    @njit
    def _is_finishing(stop, vector):
        p = stop[0]
        q = vector[0]

        r = stop[1] - stop[0]
        s = vector[1] - vector[0]

        rs_cross = cross2d(r, s)
        qpr_cross = cross2d(q-p, r)

        if rs_cross == 0 and qpr_cross == 0:
            t0 = np.dot(q-p, r) / np.dot(r, r)
            t1 = np.dot(s, r) / np.dot(r, r) + t0
            if (t0 > 0 and t0 < 1) or (t1 > 0 and t1 < 1) or (t0 < 0 and t1 > 1) or (t1 < 0 and t0 > 1):
                return True
        elif rs_cross != 0:
            t = cross2d(q-p, s) / rs_cross
            u = qpr_cross / rs_cross
            if (t > 0 and t < 1 and u > 0 and u < 1):
                return True
        
        return False
            

In [37]:
class Chrom:
    # genes: list

    code = {1 : (-1, -1), 
            2 : (0, -1),
            3 : (1, -1), 
            4 : (-1, 0), 
            5 : (0, 0), 
            6 : (1, 0), 
            7 : (-1, 1), 
            8 : (0, 1), 
            9 : (1, 1)}
    
    inv_code = dict(map(reversed, code.items()))

    def __init__(self, genes, off_track_pen, finish_rew):
        self.genes = genes
        self.loss = 0
        self.off_track_pen = off_track_pen
        self.finish_rew = finish_rew

    def update_loss(self, track:Track):
        loss = 0
        poss = self.get_poss()
        out_of_track = False
        finished = False
        for i in range(1, len(poss)):
            if track.is_crossing(np.array([poss[i], poss[i-1]])):  
                out_of_track = not out_of_track
            if out_of_track:
                loss += math.pow(self.off_track_pen, i)
            if not out_of_track and not finished and track.is_finishing(np.array([poss[i], poss[i-1]])):
                finished = True
                loss -= math.pow(self.finish_rew, i)
                
        self.loss = loss

    def get_vectors_from_changes(self, start:np.ndarray) -> list:
        # vectors = []
        # curr_pos = np.array(start)
        # for i in range(len(self.genes)):
        #     np.array([curr_pos])
        pass

    def get_poss(self):
        changes = np.array(self.decode(self.genes))
        _vel = np.array([0, 0])
        _curr_pos = np.array([0, 0])
        poss = np.zeros((len(changes) + 1, 2), dtype=np.float64)
        poss[0] = _curr_pos
        for i in range(len(changes)):
            _vel += changes[i]
            _curr_pos += _vel
            poss[i+1] = _curr_pos
        return poss

    def decode(self, code):
        return np.array([self.code.get(c) for c in code])
    
    def __len__(self):
        return len(self.genes)
    
    def __getitem__(self, i):
        return self.genes[i]
    
    def __setitem__(self, key, item):
        self.genes[key] = item

In [128]:
import random

class Population:
    # chroms: ndarray

    decoder = {1 : (-1, -1), 
            2 : (0, -1),
            3 : (1, -1), 
            4 : (-1, 0), 
            5 : (0, 0), 
            6 : (1, 0), 
            7 : (-1, 1), 
            8 : (0, 1), 
            9 : (1, 1)}
    coder = dict(map(reversed, decoder.items()))

    def __init__(self, npop, l_min, l_max, track:Track, off_track_pen, finish_rew, p_mute:tuple):
        self.npop = npop
        self.track = track
        self.off_track_pen = off_track_pen
        self.finish_rew = finish_rew
        self.chroms:list[Chrom] = self.generate_pop(npop, l_min, l_max)
        self.p_mute = p_mute

    def generate_pop(self, npop, l_min, l_max):
        chroms = np.empty(npop, dtype=Chrom)
        for i in range(npop):
            l = np.random.randint(l_min, l_max)
            chrom_genes = np.random.randint(3, size=(l, 2)) - 1
            chrom = Chrom(genes = self.encode(chrom_genes), off_track_pen=self.off_track_pen, finish_rew = self.finish_rew)
            chroms[i] = chrom
            chrom.update_loss(track = self.track)
        return chroms
    
    def next_gen(self):
        probs = np.array([c.loss for c in self.chroms])
        probs /= sum(probs) #normalize
        parent1 = self.select_parent(probs)
        parent2 = self.select_parent(probs)
        self.replace_with_children(parent1, parent2)
        self.mutate()

    def select_parent(self, probs):
        spin = random.random()
        cumulative_probability = 0.0

        for i, probability in enumerate(probs):
            cumulative_probability += probability
            if spin <= cumulative_probability:
                return self.chroms[i]

    def replace_with_children(self, parent1, parent2):
        min_length = min(len(parent1), len(parent2))
        c_point = random.randint(1, min_length - 1)

        # Perform crossover point-wise
        temp = np.copy(parent1[:c_point])
        parent1[:c_point] = parent2[:c_point]
        parent2[:c_point] = temp
        parent1.update_loss(self.track)
        parent2.update_loss(self.track)

    def mutate(self):
        # self._mutate([chrom.genes for chrom in self.chroms], self.track, self.p_mute)
        chance = random.random()
        for chrom in self.chroms:
            ## up mutation
            if chance < self.p_mute[0]:
                gene_idx = random.randint(0, len(chrom)-1)
                if chrom[gene_idx] < 7:
                    chrom[gene_idx] += 3
                    chrom.update_loss(self.track)

            ## right mutation
            if chance < self.p_mute[1]:
                gene_idx = random.randint(0, len(chrom)-1)
                if chrom[gene_idx] % 3 != 0:
                    chrom[gene_idx] += 1
                    chrom.update_loss(self.track)

            ## down mutation
            if chance < self.p_mute[2]:
                gene_idx = random.randint(0, len(chrom)-1)
                if chrom[gene_idx] > 3:
                    chrom[gene_idx] -= 3
                    chrom.update_loss(self.track)

            ## left mutation
            if chance < self.p_mute[3]:
                gene_idx = random.randint(0, len(chrom)-1)
                if chrom[gene_idx] % 3 != 1:
                    chrom[gene_idx] -= 1
                    chrom.update_loss(self.track)

    def get_best(self) -> Chrom:
        argmin = np.argmin([c.loss for c in self.chroms])
        return self.chroms[argmin]
    
    def encode(self, changes):
        return [self.coder.get(tuple(change)) for change in changes]
    
    def decode(self, code):
        return [list(self.decoder.get(c)) for c in code]
    
    def __repr__(self):
        fig, ax = plt.subplots(figsize = (7, 7))
        ax.set_aspect('equal')
        for wall in self.track.walls:
            ax.plot(wall[:, 0], wall[:, 1], 'k-', linewidth=2)
        ax.scatter(self.track.start[0], self.track.start[1])
        ax.plot(self.track.stop[:, 0], self.track.stop[:, 1])
        vectors = self.get_best().get_poss()
        ax.plot(vectors[:, 0], vectors[:, 1], 'ob-', linewidth=1)
        ax.grid()
        # ax.set_xticks(list(range(self.track.start[0], self.track.stop[0])))
        return ""
    
    @staticmethod
    @njit
    def _mutate(chroms, track, p_mute):
        chance = random.random()
        for chrom in chroms:
            ## up mutation
            if chance < p_mute[0]:
                gene_idx = random.randint(0, len(chrom)-1)
                if chrom[gene_idx] < 7:
                    chrom[gene_idx] += 3
                    chrom.update_loss(track)

            ## right mutation
            if chance < p_mute[1]:
                gene_idx = random.randint(0, len(chrom)-1)
                if chrom[gene_idx] % 3 != 0:
                    chrom[gene_idx] += 1
                    chrom.update_loss(track)

            ## down mutation
            if chance < p_mute[2]:
                gene_idx = random.randint(0, len(chrom)-1)
                if chrom[gene_idx] > 3:
                    chrom[gene_idx] -= 3
                    chrom.update_loss(track)

            ## left mutation
            if chance < p_mute[3]:
                gene_idx = random.randint(0, len(chrom)-1)
                if chrom[gene_idx] % 3 != 1:
                    chrom[gene_idx] -= 1
                    chrom.update_loss(track)

        

In [129]:
stop=np.array([[21, -4], [21, 0]], dtype=np.float64)
walls = np.array(
              [[[-1, -1], [3, -1]],
                [[-1, -1], [-1, 9]],
               [[-1, 9], [13, 9]],
               [[13, 9], [13, 0]],
               [[13, 0], [21, 0]],
               [[3, -1], [3, 5]],
               [[3, 5], [9, 5]],
               [[9, 5], [9, -4]],
               [[9, -4], [21, -4]]
               ], dtype=np.float64)

track = Track(stop = stop, walls = walls)

npop = 10**4

pop = Population(npop=npop, l_min=1, l_max=4, track = track, off_track_pen=0.3, finish_rew = 0.1, p_mute = (0.03, 0.03, 0.03, 0.03))
for _ in tqdm(range(10**2 * npop)):
  pop.next_gen()
repr(pop) 

  0%|          | 325/1000000 [00:23<10:15:20, 27.08it/s]

In [33]:
[tuple(change) for change in [[1,2], [4,5]]]
[list(pop.decoder.get(c)) for c in [9,4,1]]

[[1, 1], [-1, 0], [-1, -1]]