In [73]:
from snowflake.snowpark.types import Variant
import snowflake.snowpark.functions as F
from snowflake.snowpark import Session
import random
connection_parameters = {
    "account": 'bxrudoe-ea31380',
    "user": 'LLIU',
    "password": 'Password1',
    "role": 'ACCOUNTADMIN',
    "warehouse": 'COMPUTE_WH',
    "database": 'PROJECT_SANTA',
    "schema": 'ROUTES'    
}
session = Session.builder.configs(connection_parameters).create()


In [124]:
class Path:
    def __init__(self, path):
        self.path = path
        self.fitness = None
        self.fitness_normalized = None

    def mutate(self, mutation_rate):
        # TODO: make mutation depend on distance. If far (or near?) distance then do not mutate
        for _ in range(len(self.path) - 2):
            if random.random() < mutation_rate:
                swap_indicies = random.sample(range(1, len(self.path) - 2), 2)
                idx_a, idx_b = swap_indicies[0], swap_indicies[1]
                self.path[idx_a], self.path[idx_b] = self.path[idx_b], self.path[idx_a]

    def calculate_path_fitness(self, distances):
        prev_node = None
        fitness = 0
        for curr_node in self.path:
            if prev_node is not None:
                fitness += distances[(distances['START'] == prev_node) & (distances['END'] == curr_node)]['DISTANCE_KM'].iloc[0]
            prev_node = curr_node
        self.fitness = fitness

    def calculate_path_subset_fitness(self, distances, start_idx, end_idx):
        prev_node = None
        fitness = 0
        for i in range(start_idx, end_idx):
            curr_node = self.path[i]
            if prev_node is not None:
                fitness += distances[(distances['START'] == prev_node) & (distances['END'] == curr_node)]['DISTANCE_KM'].iloc[0]
            prev_node = curr_node
        return fitness

In [163]:
import random
class GeneticsAlgorithm:
    

    def __init__(self, locations, lookup_table, mutation_rate, generations, population_size):
        self.lookup_table = locations
        self.locations = lookup_table
        self.mutation_rate = mutation_rate
        self.generations = generations
        self.population_size = population_size
        self.north_pole = f'POINT({0} {90})'

        self.METHOD = 'GENETICS'
        self.START_COL = 'START'
        self.END_COL = 'END'
        self.DISTANCE_COL = 'DISTANCE_KM'
        self.COORDS_COL = 'COORDS'
        self.WKT_FMT_COL = 'WKT'

        self.population = []
        self.best_path = None
    
    def generate_random_path(self, root_path):
        '''
        Given a list of nodes (that does not contain the north pole), generate
        a random list of nodes and append the north pole to the start and end of the 
        generated path 
        '''
        random_path = random.sample(root_path, len(root_path))
        random_path.insert(0, self.north_pole)
        random_path.append(self.north_pole)
        
        return random_path
    
    def generate_population(self, root_path):
        for _ in range(self.population_size):
            rand_path = Path(self.generate_random_path(root_path, self.north_pole))
            self.population.append(rand_path)
    
    def calculate_population_fitness(self):
        for path in self.population:
            path.calculate_path_fitness(self.lookup_table)
            if self.best_path is None or path.fitness < self.best_path.fitness:
                self.best_path = Path(path.path)
                self.best_path.fitness = path.fitness
    
    def select_from_population(self):
        probabilities = [path.fitness_normalized for path in self.population]
        new_population = random.choices(self.population, probabilities, k = len(self.population))
        return new_population
    
    def crossover(self, population):
        new_population = []
        for _ in range(self.population_size):
            path_a = random.choice(population)
            path_b = random.choice([path for path in population if path != path_a])
            path_len = len(path_a.path)

            start_idx = random.choice(range(1, path_len - 2))
            end_idx = random.choice(range(start_idx + 1, path_len - 1))

            path_a_subset_fitness = path_a.calculate_path_subset_fitness(self.lookup_table, start_idx, end_idx)
            path_b_subset_fitness = path_b.calculate_path_subset_fitness(self.lookup_table, start_idx, end_idx)

            new_path = [0] * path_len
            new_path[0], new_path[-1] = path_a.path[0], path_a.path[0]

            if path_a_subset_fitness < path_b_subset_fitness:
                seed_path = path_a.path
                filler_path = path_b.path
            else:
                seed_path = path_b.path
                filler_path = path_a.path
                
            new_path[start_idx:end_idx] = seed_path[start_idx:end_idx]
            for i in [i for i in range(1, path_len - 1) if i not in range(start_idx, end_idx)]:
                new_path[i] = next(node for node in filler_path if node not in new_path)

            new_population.append(Path(new_path))

        return new_population    
            
    def simulate_generations(self):
        for i in range(self.generations):
            print(f'simulating {i} of {self.generations} generations')
            normalize_fitness(self.population)
            selected_population = self.select_from_population()
            new_population = self.crossover(selected_population)
            for path in new_population:
                path.mutate(self.mutation_rate)
                path.calculate_path_fitness(self.lookup_table)
                if path.fitness < best_path.fitness:
                    best_path = Path(path.path)
                    best_path.fitness = path.fitness

            self.population = new_population

    def execute(self):
        root_path = self.locations[self.COORDS_COL].tolist()
        
        self.generate_population(root_path)
        self.calculate_population_fitness()
        self.simulate_generations()

        print('simulation complete')





def normalize_fitness(population):
    total_fitness = sum([1 / path.fitness for path in population])
    for path in population:
        path.fitness_normalized = (1 / path.fitness) / total_fitness

def selection(population):
    probabilities = [path.fitness_normalized for path in population]
    new_population = random.choices(population, probabilities, k = len(population))
    return new_population

def crossover(population, population_size, distances):
    new_population = []
    for _ in range(population_size):
        path_a = random.choice(population)
        path_b = random.choice([path for path in population if path != path_a])
        path_len = len(path_a.path)

        start_idx = random.choice(range(1, path_len - 2))
        end_idx = random.choice(range(start_idx + 1, path_len - 1))

        path_a_subset_fitness = path_a.calculate_path_subset_fitness(distances, start_idx, end_idx)
        path_b_subset_fitness = path_b.calculate_path_subset_fitness(distances, start_idx, end_idx)

        new_path = [0] * path_len
        new_path[0], new_path[-1] = path_a.path[0], path_a.path[0]

        if path_a_subset_fitness < path_b_subset_fitness:
            seed_path = path_a.path
            filler_path = path_b.path
        else:
            seed_path = path_b.path
            filler_path = path_a.path
        new_path[start_idx:end_idx] = seed_path[start_idx:end_idx]
        for i in [i for i in range(1, path_len - 1) if i not in range(start_idx, end_idx)]:
            new_path[i] = next(node for node in filler_path if node not in new_path)
        new_population.append(Path(new_path))
    return new_population    
        

In [173]:
# takes a table of of coordinates with a coords column and generates a path that travels to all points 
# using a "genetic" approach starting from the North Pole
METHOD = 'GENETICS'
START_COL = 'START'
END_COL = 'END'
DISTANCE_COL = 'DISTANCE_KM'
COORDS_COL = 'COORDS'
WKT_FMT_COL = 'WKT'
north_pole = f'POINT({0} {90})'
population_size = 100
generations = 50
mutation_rate = 0.05

coordinates = session.table('test_points')
num_coords = coordinates.count()

a = coordinates.rename(F.col(COORDS_COL), START_COL)
b = coordinates.rename(F.col(COORDS_COL), END_COL)

joined = a.join(b, F.call_builtin('POINTS_EQUAL', F.col(START_COL), F.col(END_COL)) != True)

distances = joined \
    .with_column(DISTANCE_COL, F.call_builtin('ST_DISTANCE', F.col(START_COL), F.col(END_COL))/1000) \
    .with_column(START_COL, F.call_builtin('ST_ASWKT', F.col(START_COL))) \
    .with_column(END_COL, F.call_builtin('ST_ASWKT', F.col(END_COL))) \
    .to_pandas()

nodes = coordinates.filter(F.call_builtin('POINTS_EQUAL', F.col(COORDS_COL), F.call_builtin('TO_GEOGRAPHY', north_pole)) == False) \
    .with_column(COORDS_COL, F.call_builtin('ST_ASWKT', F.col(COORDS_COL))) \
    .to_pandas()

root_path = nodes[COORDS_COL].tolist()
population = generate_population(root_path, population_size, north_pole)
best_path = None

# caculate fitness
for path in population:
    path.calculate_path_fitness(distances)
    if best_path is None or path.fitness < best_path.fitness:
        best_path = Path(path.path)
        best_path.fitness = path.fitness

print(best_path.fitness)

def iterate_generations(generations, population, best_path, mutation_rate, distances):
    for i in range(generations):
        normalize_fitness(population)
        selected_population = selection(population)
        new_population = crossover(selected_population, population_size, distances)
        for path in new_population:
            path.mutate(mutation_rate)
            path.calculate_path_fitness(distances)
            if path.fitness < best_path.fitness:
                best_path = Path(path.path)
                best_path.fitness = path.fitness

        population = new_population
        
        print(i, best_path.fitness)
    return best_path

best_path = iterate_generations(generations, population, best_path, mutation_rate, distances)



20258.133131475006
0 20258.133131475006
1 20258.133131475006
2 20136.623366703876
3 20136.623366703876
4 20136.623366703876
5 20136.623366703876
6 20136.623366703876
7 20136.623366703876
8 20136.623366703876
9 20136.623366703876
10 20136.623366703876
11 20136.623366703876
12 20136.623366703876
13 20136.623366703876
14 20136.623366703876
15 20136.623366703876
16 19405.944788746714
17 19405.944788746714
18 19405.944788746714
19 19405.944788746714
20 19405.944788746714
21 19405.944788746714
22 19405.944788746714
23 19405.944788746714
24 19405.944788746714
25 19405.944788746714
26 19405.944788746714
27 19405.944788746714
28 19405.944788746714
29 19405.944788746714
30 19405.944788746714
31 19405.944788746714
32 19405.944788746714
33 19405.944788746714
34 19405.944788746714
35 19405.944788746714
36 19405.944788746714
37 19405.944788746714
38 19405.944788746714
39 19104.539116679873
40 19104.539116679873
41 19104.539116679873
42 19104.539116679873
43 19104.539116679873
44 19104.539116679873
4

In [174]:
print(best_path.path)

['POINT(0 90)', 'POINT(-87.8684039 42.8821595)', 'POINT(-73.9869641 40.9390043)', 'POINT(-73.7826309 41.0598463)', 'POINT(-78.8431931 33.7269719)', 'POINT(-88.1137198 30.7283717)', 'POINT(-90.9264951 30.5447972)', 'POINT(-95.5720002 29.770199)', 'POINT(-111.996804 33.6055745)', 'POINT(-105.0953399 40.4085544)', 'POINT(-115.8093979 33.4883332)', 'POINT(0 90)']
