# The Travelling Coffee Drinker - Genetic Algorithm

Solving a travelling salesman problem for United Kingdom Starbucks Cafés

## 1. Load and transform data

The data comes from Kaggle, which is accessed using the API wrapper.

The transformation needed is just to filter only GB Starbucks restaurants with a valid lon/lat pair.

In [101]:
!pip install pygad==2.17

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [76]:
!pip install -q kaggle

In [77]:
from google.colab import files
files.upload() # upload a Kaggle JSON file to make request for data 

KeyboardInterrupt: ignored

In [None]:
!mkdir kaggle 

In [None]:
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download kukuroo3/starbucks-locations-worldwide-2021-version -p /content/sample_data/ --unzip

In [165]:
import pandas as pd 

# read in data and check column names 
data = pd.read_csv('/content/sample_data/startbucks.csv')
data.columns

Index(['Unnamed: 0', 'storeNumber', 'countryCode', 'ownershipTypeCode',
       'schedule', 'slug', 'latitude', 'longitude', 'streetAddressLine1',
       'streetAddressLine2', 'streetAddressLine3', 'city',
       'countrySubdivisionCode', 'postalCode', 'currentTimeOffset',
       'windowsTimeZoneId', 'olsonTimeZoneId'],
      dtype='object')

In [166]:
df = data[data['countryCode']=='GB']
df.reset_index(inplace=True)

# check for invalid lon/lat pairs
len(df.dropna(subset=['latitude', 'longitude'])) - len(df)

0

## 2. Exploratory analysis

Find the distribution of cafés across the United Kingdom. 

How are restaurants distributed across towns?
What does a geospatial representation of the data look like?

### 2.1 Distribution of cafés by town

In [167]:
import plotly.express as px
vis = df.groupby('city').storeNumber.count().reset_index()
px.bar(vis, x='city', y='storeNumber', template='seaborn')


### 2.2 Map of cafés in the UK

In [168]:
import folium

In [169]:
map = folium.Map(location=[51.509685, -0.118092], zoom_start=6, tiles="stamentoner")

In [170]:
for _, r in df.iterrows():
  folium.Marker(
      [r['latitude'], r['longitude']], popup=f'<i>{r["storeNumber"]}</i>'
  ).add_to(map)

In [171]:
map

## 3. Testing the distance methodology

To assess how good each solution is there needs to be a measure of fitness. For the purpose of this example the distance 'as the crow flies' is used without taking into account actual road distances however this could be explored in future.

In [173]:
from geopy.distance import geodesic

The tested origin is the first Starbucks in the data and the destination is the second Starbucks in the dataset.

In [174]:
origin = (df['latitude'][0], df['longitude'][0])
dest = (df['latitude'][100], df['longitude'][100])

The distance between the two points as the crow flies in kilometres is given below.

In [175]:
geodesic(origin, dest).kilometers

81.63683980420957

## 4. Preparing data structures

The data structures needed for testing solutions are the "genes" or store options to select from named *genes*

A lookup to access these genes known as *stores* 

A *check_range* which is used to check that every option is given in a solution (a key criteria in the TSP).


In [176]:
test = df.head(10)
genes = {store_num:[lat, lon] for store_num, lat, lon in zip(test['storeNumber'], test['latitude'], test['longitude'])}
stores = list(genes.keys())
check_range = [i for i in range(0, 10)]

## 5. Defining functions 

The algorithm requires a set of functions to be pre-defined as the out of the box genetic algorithm does not support a TSP.

 1. build_population: builds a population of chromosomes to test with proper restrictions applied
 2. fitness_func: Used to test a solution to see how well it performs, in this case the fitness_func will be assessed based on the distance as the crow flies between each successive point
 3. pmx_crossover: performs the crossover of a parent and child with proper Partially Matched Crossover (PMX) logic
 4. crossover_func: applies the crossover
 5. on_crossover: applies the mutation after crossover
 6. on_generation: used to print the progress and results at each generation

In [177]:
import random
import numpy as np
from geopy.distance import geodesic

Assess the quality or fitness of a solution so that only the fittest are selected for the next generation and to breed.

In [178]:
def build_population(size, chromosome_size):
  population = []
  for i in range(size):
    home_city = 0
    added = {home_city:'Added'}
    chromosome = [home_city]

    while len(chromosome) < chromosome_size:
      proposed_gene = random.randint(0, chromosome_size-1)
      if added.get(proposed_gene) is None:
        chromosome.append(proposed_gene)
        added.update({proposed_gene:'Added'})
      else:
        pass

    chromosome.append(home_city)

    population.append(chromosome)

  return np.array(population)

In [179]:
population = build_population(100, 10)
population.shape

(100, 11)

In [180]:
def fitness_func(solution, solution_idx):
  # loop through the length of the chromosome finding the distance between each
  # gene added 

  # to increment
  total_dist = 0

  for gene in range(0, len(solution)):

    # get the lon lat of the two points
    a = genes.get(stores[solution[gene]])
    
    try:
      b = genes.get(stores[solution[gene + 1]])

      # find the distance (crow flies)
      dist = geodesic(a, b).kilometers

    except IndexError:
      dist = 0

    total_dist += dist

  # to optimise this value in the positive direction the inverse of dist is used
  fitness = 1 / total_dist

  return fitness 

In [181]:
def pmx_crossover(parent1, parent2, sequence_start, sequence_end):
  # initialise a child
  child = np.zeros(parent1.shape[0])

  # get the genes for parent one that are passed on to child one
  parent1_to_child1_genes = parent1[sequence_start:sequence_end]

  # get the position of genes for each respective combination
  parent1_to_child1 =  np.isin(parent1,parent1_to_child1_genes).nonzero()[0]

  for gene in parent1_to_child1:
    child[gene] = parent1[gene]

  # gene of parent 2 not in the child
  genes_not_in_child = parent2[np.isin(parent2, parent1_to_child1_genes, invert=True).nonzero()[0]]
  
  # if the gene is not already
  if genes_not_in_child.shape[0] >= 1:
    for gene in genes_not_in_child:
      if gene >= 1:
        lookup = gene
        not_in_sequence = True

        while not_in_sequence:
          position_in_parent2 = np.where(parent2==lookup)[0][0]

          if position_in_parent2 in range(sequence_start, sequence_end):
            lookup = parent1[position_in_parent2]

          else:
            child[position_in_parent2] = gene
            not_in_sequence = False

  return child

In [182]:
def crossover_func(parents, offspring_size, ga_instance):
  offspring = []
  idx = 0
  while len(offspring) != offspring_size[0]:

    # locate the parents
    parent1 = parents[idx % parents.shape[0], :].copy()
    parent2 = parents[(idx + 1) % parents.shape[0], :].copy()

    # find gene sequence in parent 1 
    sequence_start = random.randint(1, parent1.shape[0]-4)
    sequence_end = random.randint(sequence_start, parent1.shape[0]-1)

    # perform crossover
    child1 = pmx_crossover(parent1, parent2, sequence_start, sequence_end)
    child2 = pmx_crossover(parent2, parent1, sequence_start, sequence_end)    

    offspring.append(child1)
    offspring.append(child2)


    idx += 1

  return np.array(offspring)

The mutation function chosen is inversion as it does not invalidate the solution.

In [183]:
def mutation_func(offspring, ga_instance):

  for chromosome_idx in range(offspring.shape[0]):
    # define a sequence of genes to reverse
    sequence_start = random.randint(1, offspring[chromosome_idx].shape[0] - 2)
    sequence_end = random.randint(sequence_start, offspring[chromosome_idx].shape[0] - 1)
    
    genes = offspring[chromosome_idx, sequence_start:sequence_end]

    # start at the start of the sequence assigning the reverse sequence back to the chromosome
    index = 0
    if len(genes) > 0:
      for gene in range(sequence_start, sequence_end):

          offspring[chromosome_idx, gene] = genes[index]

          index += 1

    return offspring

Used in the genetic algorithm flow to apply the custom mutation after crossover

In [184]:
def on_crossover(ga_instance, offspring_crossover):
    # apply mutation to ensure uniqueness 
    offspring_mutation  = mutation_func(offspring_crossover, ga_instance)

    # save the new offspring set as the parents of the next generation
    ga_instance.last_generation_offspring_mutation = offspring_mutation

Added for debugging and assessing progress by generation at runtime

In [185]:
def on_generation(ga):
    print("Generation", ga.generations_completed)
    print(ga.population)

## 6. Executing the algorithm

The genetic algorithm is set up as instance and at initialisation several parameters are given. 

The algorithm then runs to find the best solution for a set number of generations.

In [186]:
import pygad

### 6.1 Example Initialising the algorithm

The algorithm is initialised below.

Notable parameters include:
  - The use of gene space to limit the possible genes chosen to just be those in the TSP range
  - Mutations being turned off temporarily
  - Implementation of custom on_ functions 
  - Allow duplication of genes parameter set to false to ensure any newly introduced chromosomes/chromosomes created as population is initialised have no duplicate genes

In [187]:
ga_instance = pygad.GA(num_generations=100,
                       num_parents_mating=40,
                       fitness_func=fitness_func,
                       sol_per_pop=200,
                       initial_population=population,
                       gene_space=range(0, 10),
                       gene_type=int,
                       mutation_type=mutation_func,
                       on_generation=on_generation,
                       crossover_type=crossover_func, 
                       keep_parents=6,
                       mutation_probability=0.4)

### 6.2 Running the algorithm 

The genetic algorithm is run with a simple function call

In [188]:
ga_instance.run()

Generation 1
[[0 3 2 ... 4 5 0]
 [0 3 6 ... 1 2 0]
 [0 8 3 ... 6 1 0]
 ...
 [0 9 5 ... 7 4 0]
 [0 2 7 ... 8 6 0]
 [0 3 5 ... 6 8 0]]
Generation 2
[[0 1 2 ... 8 6 0]
 [0 1 9 ... 8 3 0]
 [0 3 2 ... 4 5 0]
 ...
 [0 3 6 ... 1 2 0]
 [0 3 6 ... 1 2 0]
 [0 3 1 ... 6 2 0]]
Generation 3
[[0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 ...
 [0 1 9 ... 8 3 0]
 [0 3 2 ... 4 5 0]
 [0 9 2 ... 8 3 0]]
Generation 4
[[0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 ...
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]]
Generation 5
[[0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 ...
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]]
Generation 6
[[0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 ...
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]]
Generation 7
[[0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 ...
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]]
Generation 8
[[0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]
 [0 1 2 ... 8 6 0]

## 7. Assessing results 

The result solution can be checked and analysed using the ga_instance itself

In [189]:
solution, solution_fitness, solution_idx = ga_instance.best_solution()

In [190]:
solution, solution_fitness, solution_idx = ga_instance.best_solution()
print(f'Generation of best solution: {ga_instance.best_solution_generation}')
print("Fitness value of the best solution = {solution_fitness}".format(solution_fitness=solution_fitness))
print("Index of the best solution : {solution_idx}".format(solution_idx=solution_idx))

Generation of best solution: 1
Fitness value of the best solution = 0.010681933534441102
Index of the best solution : 0


In [191]:
if ga_instance.best_solution_generation != -1:
    print("Best fitness value reached after {best_solution_generation} generations.".format(best_solution_generation=ga_instance.best_solution_generation))


Best fitness value reached after 1 generations.


### 7.1 Verifying a solution

For a solution to be valid it needs to have:
 - A maximum gene value that matches the total number of stores 
 - A minimum gene value of 0 
 - Each gene must be unique

In [192]:
def verify_solution(solution, max_gene):
  if min(solution) != 0:
    print('Failed values below 0')

  if max(solution) != max_gene:
    print('Failed values less than or above max possible value')

  if len(set(solution)) - len(solution) != -1:
    print(len(set(solution)) - len(solution))
    print('Failed solution does not contain unique values')

In [193]:
verify_solution(solution, 9)

In [194]:
solution

array([0, 1, 2, 3, 4, 5, 7, 9, 8, 6, 0])

### 7.2 Interpreting the result 

The result sequence can be used to access latitude and longitude for each store in the solution.

In [195]:
points = [genes.get(stores[id]) + [stores[id]] for id in solution]
points[:5]

[[51.483556, -1.557143, '9155-152277'],
 [51.482387, -1.555109, '22194-218828'],
 [51.481264, -1.556526, '18362-190424'],
 [51.481177, -1.557422, '9136-152279'],
 [51.562617, -1.798111, '47832-260044']]

In [196]:
import folium 

map = folium.Map(location=[51.509685, -0.118092], zoom_start=6, tiles="stamentoner")

for point in range(0, len(points)):
  folium.Marker(
      [points[point][0], points[point][1]], popup=f'<i>{points[point][2]}</i>'
  ).add_to(map)

  try:
    folium.PolyLine([(points[point][0], points[point][1]), 
                      (points[point+1][0], points[point+1][1])],
                  color='red',
                  weight=5,
                  opacity=0.8).add_to(map)

  except IndexError:
    pass
  

In [197]:
map

The map shows the shortest path that has been found. So that the travelling coffee drinker can maximise the time on coffee and minimise the time on travelling.

Now the algorithm can be scaled up for the whole of the UK, or tailored to just one town. An example of the solution scaled to the UK is given below.

## 8. Scaling up the solution

This is where the fun begins!

In [157]:
df = df[df['city'] == 'London']
genes = {store_num:[lat, lon] for store_num, lat, lon in zip(df['storeNumber'], df['latitude'], df['longitude'])}
stores = list(genes.keys())
len(stores)

165

In [155]:
population = build_population(200, 165)
len(population[0])

166

### 8.1 Building the final algorithm

The code to build the algorithm has to be re-run with the above data structures altered.

In [108]:
def fitness_func(solution, solution_idx):
  # loop through the length of the chromosome finding the distance between each
  # gene added 

  # to increment
  total_dist = 0

  for gene in range(0, len(solution)):

    # get the lon lat of the two points
    a = genes.get(stores[solution[gene]])
    
    try:
      b = genes.get(stores[solution[gene + 1]])

      # find the distance (crow flies)
      dist = geodesic(a, b).kilometers

    except IndexError:
      dist = 0

    total_dist += dist

  # to optimise this value in the positive direction the inverse of dist is used
  fitness = 1 / total_dist

  return fitness 

In [109]:
def pmx_crossover(parent1, parent2, sequence_start, sequence_end):
  # initialise a child
  child = np.zeros(parent1.shape[0])

  # get the genes for parent one that are passed on to child one
  parent1_to_child1_genes = parent1[sequence_start:sequence_end]

  # get the position of genes for each respective combination
  parent1_to_child1 =  np.isin(parent1,parent1_to_child1_genes).nonzero()[0]

  for gene in parent1_to_child1:
    child[gene] = parent1[gene]

  # gene of parent 2 not in the child
  genes_not_in_child = parent2[np.isin(parent2, parent1_to_child1_genes, invert=True).nonzero()[0]]
  
  if genes_not_in_child.shape[0] >= 1:
    for gene in genes_not_in_child:
      if gene >= 1:
        lookup = gene
        not_in_sequence = True

        while not_in_sequence:
          position_in_parent2 = np.where(parent2==lookup)[0][0]

          if position_in_parent2 in range(sequence_start, sequence_end):
            lookup = parent1[position_in_parent2]

          else:
            child[position_in_parent2] = gene
            not_in_sequence = False

  return child

In [130]:
def crossover_func(parents, offspring_size, ga_instance):
  offspring = []
  idx = 0
  while len(offspring) != offspring_size[0]:

    # locate the parents
    parent1 = parents[idx % parents.shape[0], :].copy()
    parent2 = parents[(idx + 1) % parents.shape[0], :].copy()

    # find gene sequence in parent 1 
    sequence_start = random.randint(1, parent1.shape[0]-4)
    sequence_end = random.randint(sequence_start, parent1.shape[0]-1)

    # perform crossover
    child1 = pmx_crossover(parent1, parent2, sequence_start, sequence_end)
    child2 = pmx_crossover(parent2, parent1, sequence_start, sequence_end)
    

    offspring.append(child1)
    offspring.append(child2)

    idx += 1

  return np.array(offspring)

In [144]:
def mutation_func(offspring, ga_instance):

  for chromosome_idx in range(offspring.shape[0]):
    # define a sequence of genes to reverse
    sequence_start = random.randint(1, offspring[chromosome_idx].shape[0] - 2)
    sequence_end = random.randint(sequence_start, offspring[chromosome_idx].shape[0] - 1)
    
    genes = offspring[chromosome_idx, sequence_start:sequence_end]

    # start at the start of the sequence assigning the reverse sequence back to the chromosome
    index = 0
    if len(genes) > 0:
      for gene in range(sequence_start, sequence_end):

          offspring[chromosome_idx, gene] = genes[index]

          index += 1

    return offspring

In [126]:
def on_crossover(ga_instance, offspring_crossover):
    # apply mutation to ensure uniqueness 
    offspring_mutation  = mutation_func(offspring_crossover, ga_instance)

    # save the new offspring set as the parents of the next generation
    ga_instance.last_generation_offspring_mutation = offspring_mutation

In [127]:
def on_generation(ga):
    print("Generation", ga.generations_completed)
    print(ga.population)

In [145]:
ga_instance = pygad.GA(num_generations=100,
                       num_parents_mating=40,
                       fitness_func=fitness_func,
                       sol_per_pop=200,
                       initial_population=population,
                       gene_space=range(0, 165),
                       gene_type=int,
                       mutation_type=mutation_func,
                       on_generation=on_generation,
                       crossover_type=crossover_func, 
                       keep_parents=6,
                       mutation_probability=0.4)

In [146]:
ga_instance.run()

Generation 1
[[  0   1 111 ... 127 108   0]
 [  0  62 141 ...  26 161   0]
 [  0 137 155 ... 158   3   0]
 ...
 [  0 142 162 ...   2 159   0]
 [  0 161 159 ... 112  66   0]
 [  0 152 108 ...  72  58   0]]
Generation 2
[[  0   1 111 ... 127 108   0]
 [  0   1 111 ... 127 108   0]
 [  0  62 141 ...  26 161   0]
 ...
 [  0 137 155 ... 135  76   0]
 [  0 137 155 ... 158   3   0]
 [  0  96  40 ... 135   5   0]]
Generation 3
[[  0   1 145 ...  26  94   0]
 [  0   1 111 ... 127 108   0]
 [  0   1 111 ... 127 108   0]
 ...
 [  0  89 155 ... 158  32   0]
 [  0   1 110 ... 127  94   0]
 [  0  96  40 ...  64  90   0]]
Generation 4
[[  0   1  56 ...  26  81   0]
 [  0   1 164 ... 127 108   0]
 [  0   1 110 ... 127  94   0]
 ...
 [  0   1 111 ... 127 108   0]
 [  0   1 145 ...  26  22   0]
 [  0   1  77 ... 127 142   0]]
Generation 5
[[  0   1 111 ... 127 108   0]
 [  0   1 164 ... 127 108   0]
 [  0   1  56 ... 127  81   0]
 ...
 [  0   1  60 ... 127 118   0]
 [  0   1 154 ... 127   7   0]
 [  0  

## 8.2 Evaluating the final algorithm 

The overall solution can now be assessed.

In [147]:
solution, solution_fitness, solution_idx = ga_instance.best_solution()
print(f'Generation of best solution: {ga_instance.best_solution_generation}')
print("Fitness value of the best solution = {solution_fitness}".format(solution_fitness=solution_fitness))
print("Index of the best solution : {solution_idx}".format(solution_idx=solution_idx))

Generation of best solution: 25
Fitness value of the best solution = 0.0010087414431375688
Index of the best solution : 0


In [148]:
verify_solution(solution, len(stores))
solution

Failed values less than or above max possible value


array([  0,   1, 164,  19,  77,  23,  10,   9, 154, 158, 157,  26,  22,
        92,  42, 137,  75, 143, 149,  12, 100,  85,  86, 124, 128, 135,
       147,  54,  24,   3,  58, 123, 153,  51,  29,  69,  20, 110,  59,
        95, 113, 115, 121,  91,  36,  64,  65,  32,  53,  35, 105,  52,
        21,  34, 133, 109,  47,  71,  98, 106, 131,  89, 108,  56, 152,
       150,   7,  38,  43,  94,   8, 132, 155,   4,  16,  84,  90,  27,
         2, 144, 151,  39,  45, 159, 125,  79, 156,  40,   6,  74, 139,
       141, 145,  76, 104,  50,  37, 129, 130,  72, 142,  97,  25,  93,
       134, 126, 138, 140, 148, 120,  96,  28, 160, 116,  18, 112,  31,
        41,  55,  63,  73, 122, 162, 161, 163,  66, 107,  17,  87, 103,
        80,  81,  88,  83,  82,  14,  33,  11,  46,  61,  60, 136, 146,
        15,  70,  44,  48,  67,  78, 111,  13,  62,  30, 118, 114,  99,
       102,   5,  68,  49,  57, 101, 117, 127, 119,   0])

In [150]:
points = [genes.get(stores[id]) + [stores[id]] for id in solution]
points[:5]

[[51.877854, -0.376379, '12851-253386'],
 [51.877854, -0.376289, '7187-253385'],
 [51.655847, -0.277039, '47771-259784'],
 [51.51402, -0.13925, '12021-10341'],
 [51.54541, -0.16269, '12158-22023']]

In [151]:
map = folium.Map(location=[51.509685, -0.118092], zoom_start=6, tiles="stamentoner")

for point in range(0, len(points)):
  folium.Marker(
      [points[point][0], points[point][1]], popup=f'<i>{points[point][2]}</i>'
  ).add_to(map)

  try:
    folium.PolyLine([(points[point][0], points[point][1]), 
                      (points[point+1][0], points[point+1][1])],
                  color='red',
                  weight=5,
                  opacity=0.8).add_to(map)

  except IndexError:
    pass

In [152]:
map

## 10. Total result 

The total resulting distance around London after optimising the solution is:

In [153]:
def distance(solution):
  # loop through the length of the chromosome finding the distance between each
  # gene added 

  # to increment
  total_dist = 0

  for gene in range(0, len(solution)):

    # get the lon lat of the two points
    a = genes.get(stores[solution[gene]])
    
    try:
      b = genes.get(stores[solution[gene + 1]])

      # find the distance (crow flies)
      dist = geodesic(a, b).kilometers

    except IndexError:
      dist = 0

    
    total_dist += dist

  # to optimise this value in the positive direction the inverse of dist is used

  return total_dist 

In [154]:
distance(solution)

991.3343075204886

Which is not too bad for 975 cups of joe. 🥤