Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added initial version

  • Loading branch information...
commit 7d266e54e9a6afc48f0418c8cff0a3537148ff1f 0 parents
@WoLpH authored
Showing with 698 additions and 0 deletions.
  1. +1 −0  data
  2. +593 −0 ga.py
  3. +43 −0 generate_charts.sh
  4. +61 −0 readme.rst
1  data
593 ga.py
@@ -0,0 +1,593 @@
+#!/usr/bin/env python
+
+import sys
+import random
+import optparse
+import multiprocessing
+
+try:
+ # Since we like pretty colours we see if the fabulous module is installed.
+ # If it is, than we can color-code the cities to present a nice graphical
+ # representation of the evolution.
+ from fabulous.color import bg256, fg256
+except ImportError:
+ fg256 = bg256 = lambda color, text: text
+ print >>sys.stderr, 'Fabulous not installed, no color output available'
+
+def generate_colours(interval):
+ r, g, b = 255, 0, 0
+ step = 256 / interval
+ out = []
+ for g in range(step - 1, 256, step):
+ out.append('#%02x%02x%02x' % (r, g, b))
+
+ for r in range(255, 0, -step):
+ out.append('#%02x%02x%02x' % (r, g, b))
+
+ for b in range(step - 1, 256, step):
+ out.append('#%02x%02x%02x' % (r, g, b))
+
+ for g in range(255, 0, -step):
+ out.append('#%02x%02x%02x' % (r, g, b))
+
+ for r in range(step - 1, 256, step):
+ out.append('#%02x%02x%02x' % (r, g, b))
+
+ for b in range(255, 0, -step):
+ out.append('#%02x%02x%02x' % (r, g, b))
+
+ return out
+
+colours = generate_colours(16)
+
+def colour(max, i):
+ out = '%02d' % i
+ value = i * (len(colours) / max)
+ return str(fg256('#000', bg256(colours[value], out)))
+
+def maximize(self, a, b):
+ '''Tell the individual to maximize towards the solution'''
+ return cmp(b.score, a.score)
+
+def minimize(self, a, b):
+ '''Tell the individual to minimize towards the solution'''
+ return cmp(a.score, b.score)
+
+def pickpivots(individual):
+ '''
+ Get two lists of random pivots in the chromosome for the given individual
+ '''
+ left = random.randrange(1, individual.length - 2)
+ right = random.randrange(left, individual.length - 1)
+ return left, right
+
+
+class CrossoverMixin(object):
+ def crossover(self, other):
+ 'A crossover method which returns a list of newly created offspring'
+ return []
+
+
+class TwopointCrossoverMixin(CrossoverMixin):
+ '''
+ Simple twopoint crossover method, not that suited for TSP since it
+ requires a smart/complex repair method which beats the purpose
+
+ Would be useful for many other problems and/or for the TSP problem given
+ a different encoding
+ '''
+ def crossover(self, other):
+ '''Return a twopoint crossover for the given chromosomes'''
+ left, right = pickpivots(self)
+
+ def mate(self, other):
+ chromosome = self.chromosome
+ chromosome[left:right] = other.chromosome[left:right]
+ child = self.copy(chromosome)
+ if child.repair:
+ child.repair(self, other)
+ return child
+
+ return mate(self, other), mate(other, self)
+
+
+class EdgeRecombinationCrossoverMixin(CrossoverMixin):
+ '''
+ Implemented as specified here:
+ http://en.wikipedia.org/wiki/Edge_recombination_operator
+ '''
+ def get_adjacency_matrix(self, matrix):
+ for i, g in enumerate(self.chromosome):
+ connections = matrix.get(g, {})
+ # Store the next and previous node as connections
+ # We use the modulo operator to do a simple wraparound
+ connections[self.chromosome[(i + 1) % self.length]] = None
+ connections[self.chromosome[
+ (i - 1 + self.length) % self.length]] = None
+ matrix[g] = connections
+
+ return matrix
+
+ def get_distance_matrix(self, matrix):
+ for from_, cities in matrix.items():
+ for to in cities:
+ cities[to] = self.env.cities[from_][to]
+
+ matrix[from_] = [k for k, v
+ in sorted(cities.iteritems(), key=lambda (k, v): (v, k))]
+
+ return matrix
+
+ def get_closest_neighbour(self, neighbours, unavailable):
+ for neighbour in neighbours:
+ if neighbour not in unavailable:
+ return neighbour
+
+ def crossover(self, other):
+ return self.mate(self, other), self.mate(other, self)
+
+ def mate(*parents):
+ self = parents[0]
+
+ # Build the adjacency matrix for the parents
+ matrix = {}
+ for parent in parents:
+ matrix = parent.get_adjacency_matrix(matrix)
+
+ matrix = self.get_distance_matrix(matrix)
+
+ # pick the first node of a random parent
+ n = random.choice(parents).chromosome[0]
+
+ k = []
+ unavailable = {}
+ for _ in range(self.length - 1):
+ k.append(n)
+ unavailable[n] = True
+
+ # Try to get the closest neighbour from our distance matrix
+ neighbour = self.get_closest_neighbour(matrix[n], unavailable)
+
+ # Couldn't find anything, let's try the neighbours of our
+ # neighbours
+ if neighbour is None:
+ neighbours = matrix[n][:]
+ random.shuffle(neighbours)
+ for i in neighbours:
+ neighbour = self.get_closest_neighbour(
+ matrix[i],
+ unavailable,
+ )
+ if neighbour is not None:
+ break
+
+ # We're really desperate now... let's try any random missing node
+ if neighbour is None and len(k) < self.length:
+ missing = list(set(self.alleles) - set(k))
+ neighbour = random.choice(list(missing))
+
+ assert neighbour is not None, 'Unable to get a full length ' \
+ 'chromosome: %s' % unavailable.keys()
+ n = neighbour
+
+ k.append(n)
+ child = self.copy(k)
+ return child
+
+
+class MutatorMixin(object):
+ def mutate(self, gene):
+ 'Mutate the individual'
+
+
+class RandomMutatorMixin(MutatorMixin):
+ '''Mutator which just picks a fully random gene'''
+ def mutate(self, gene):
+ 'Mutate by random mutation'
+ self.chromosome[gene] = random.choice(self.alleles)
+
+
+class RandomSwapMutatorMixin(MutatorMixin):
+ '''
+ Quite similar to the RandomMutator except that this mutator swaps 2
+ genes which makes it suitable for permutation based chromosomes
+ '''
+ def mutate(self, gene):
+ 'Mutate by random swap mutation'
+ c = self.chromosome
+ chromosome = len(c)
+ if chromosome > 2:
+ index = gene
+ while index == gene:
+ index = random.randrange(chromosome)
+
+ c[gene], c[index] = c[index], c[gene]
+
+class EvaluatorMixin(object):
+ def evaluate(self):
+ 'Update the score for the individual'
+
+
+class SumEvaluatorMixin(object):
+ '''Simple evaluator which calculates the sum of the chromosome'''
+ def evaluate(self):
+ self.score = sum(self.chromosome)
+
+
+class IndividualBase(object):
+ length = 30
+ alleles = (0,1)
+ optimization = minimize
+
+ # Enable a crossover/mutate/repair method by subclassing the
+ # Mutator/Crossover/Repair/Evaluate mixins
+ crossover = None
+ mutate = None
+ repair = None
+ evaluate = None
+
+ def __init__(self, env, chromosome=None):
+ self.env = env
+ self.chromosome = chromosome
+
+ def set_chromosome(self, chromosome):
+ '''
+ Set or create a new chromosome. If no chromosome is given, create
+ one based on the chromosome length based on the available alleles
+ '''
+ if not chromosome:
+ # By skipping the first allele we can fix the starting point
+ # chromosome = self.alleles[1:]
+ # random.shuffle(chromosome)
+ # chromosome.insert(0, 0)
+ chromosome = self.alleles[:]
+ random.shuffle(chromosome)
+ self._chromosome = chromosome
+
+ # With a new chromosome we need score of 0
+ self.score = None
+ # Recalculate the score
+ self.evaluate()
+
+ def get_chromosome(self):
+ '''Return the current chromosome'''
+ return self._chromosome
+
+ chromosome = property(get_chromosome, set_chromosome)
+
+ def __repr__(self):
+ chromosome = self.chromosome[:]
+ chromosome = ' '.join(colour(self.length, i) for i in chromosome)
+
+ 'Return the representation of the individual'
+ return (u'<%s[%s]: %s>' % (
+ self.__class__.__name__,
+ self.score,
+ chromosome,
+ )).encode('utf-8')
+
+ def __cmp__(self, other):
+ return self.optimization(self, other)
+
+ def copy(self, chromosome=None):
+ chromosome = chromosome or self.chromosome
+ twin = self.__class__(self.env, chromosome[:])
+ return twin
+
+
+class SelectorMixin(object):
+ def select(self):
+ 'A selection method to return a individual from our population'
+
+
+class TournamentSelectorMixin(SelectorMixin):
+ '''
+ The tournament selector creates a tournament with a specific size of
+ random individuals from the population.
+
+ After creating the tournament we have a certain probability (default 0.9)
+ of selecting the best individual in the tournament. Otherwise we simply
+ take a random competitor.
+ '''
+ def __init__(self, tournament_size=8, tournament_choosebest=0.9, **kwargs):
+ self.tournament_size = tournament_size
+ self.tournament_choosebest = tournament_choosebest
+ super(TournamentSelectorMixin, self).__init__(**kwargs)
+
+ def select(self):
+ competitors = [random.choice(self.population)
+ for i in range(self.tournament_size)]
+
+ competitors.sort()
+ if random.random() < self.tournament_choosebest:
+ return competitors[0]
+ else:
+ return random.choice(competitors[1:])
+
+class Environment(object):
+ '''The environment contains the population and manages the generations.'''
+ # Subclass this and set this to one of the selectors
+ select = None
+
+ def __init__(self, kind, population=None, population_size=100,
+ max_generations=100, crossover_rate=0.90, mutation_rate=0.01,
+ print_interval=10, elitism=1, **kwargs):
+ self.generation = 0
+ self.elitism = elitism
+ self.kind = kind
+ self.population_size = population_size
+ self.population = population
+ self.crossover_rate = crossover_rate
+ self.mutation_rate = mutation_rate
+ self.max_generations = max_generations
+ self.print_interval = print_interval
+
+ for individual in self.population:
+ individual.evaluate()
+
+ def get_population(self):
+ return self._population
+
+ def set_population(self, population):
+ if not population:
+ population = [self.kind(env=self)
+ for individual in range(self.population_size)]
+ self._population = population
+
+ population = property(get_population, set_population)
+
+ def run(self):
+ for generation in self:
+ pass
+
+ def __iter__(self):
+ '''Evolve until we've reached the max generations'''
+ self.generation = 0
+ while self.generation < self.max_generations:
+ yield self.next()
+
+ def next(self):
+ '''Next evolution'''
+ self.population.sort()
+ self.crossover()
+ self.generation += 1
+ if self.print_interval and self.generation % self.print_interval == 0:
+ print >>sys.stderr, repr(self)
+
+ return self
+
+ def crossover(self):
+ '''Crossover every individual in the population with the provided
+ crossover rate.'''
+ next_population = []
+ for i in range(self.elitism):
+ next_population.append(self.population[i].copy())
+
+ while len(next_population) < self.population_size:
+ a = self.select()
+
+ # Depending on the crossover rate, crossover or simply keep our
+ # individual
+ offspring = [a.copy()]
+ if random.random() < self.crossover_rate:
+ b = self.select()
+ if a != b:
+ offspring = a.crossover(b)
+
+ # Mutate all the individuals and store the scores
+ for individual in offspring:
+ self.mutate(individual)
+
+ # Evaluate the results for the individual and add it to the
+ # list of next populations
+ individual.evaluate()
+ next_population.append(individual)
+
+ # Keep only the latest results
+ self.population = next_population[:self.population_size]
+
+ def mutate(self, individual):
+ '''Mutate all genes for the given individual with the provided
+ mutation rate'''
+ for gene in range(individual.length):
+ if random.random() < self.mutation_rate:
+ individual.mutate(gene)
+
+ @property
+ def best(self):
+ 'individual with best fitness score in population.'
+ return self.population[0]
+
+ def __repr__(self):
+ # return 'generation %s, %s' % (
+ # self.generation,
+ # self.population,
+ # )
+ return 'generation %s, best: %s' % (
+ self.generation,
+ self.best,
+ )
+
+
+class TSPEnv(TournamentSelectorMixin, Environment):
+ '''The Traveling Salesman Problem environment.
+
+ Uses the tournament selector for selecting crossover individuals and
+ generates a distance table for all closests cities from every other city.
+ '''
+ def __init__(self, **kwargs):
+ cities = kwargs['cities']
+ kind = kwargs['kind']
+
+ # As alleles we use the indexes of the cities array
+ kind.alleles = range(len(cities))
+ kind.length = len(cities)
+ self.cities = cities
+ self.closest = self.get_closest(cities)
+
+ super(TSPEnv, self).__init__(**kwargs)
+
+ def get_closest(self, cities):
+ closest = {}
+ for distances in cities:
+ # Sort the cities by distances
+ distances = sorted(enumerate(distances), key=lambda x: x[::-1])
+ # Drop the score and keep the index only
+ distances = [k for k, _ in distances]
+ closest[distances[0]] = distances[1:]
+
+ return closest
+
+class PermutationCrossoverRepairMixin(object):
+ '''If the normal (e.g. Two Point Crossover) operators are used than we
+ need a smart repair method. Since this wasn't really the way to go we
+ replaced this method and the crossover with the Edge Recombination
+ Crossover.
+ '''
+ def repair(self, mother, father):
+ chromosome = set(self.chromosome)
+ missing = sorted(set(self.alleles) - chromosome)
+
+ if not missing:
+ # No need for repair, all chromosomes are there :)
+ return
+
+ unique_alleles = {}
+ for i, allele in enumerate(self.chromosome):
+ if allele in unique_alleles:
+ for missing_allele in missing:
+ for close_allele in self.env.closest:
+ if close_allele not in chromosome:
+ unique_alleles[close_allele] = True
+ self.chromosome[i] = close_allele
+ else:
+ unique_alleles[allele] = True
+
+class TSP(EdgeRecombinationCrossoverMixin, RandomSwapMutatorMixin,
+ IndividualBase):
+ '''The Traveling Salesman Problem class combines the Edge Recombination
+ with a simple Random Swap to provide for a permutation based chromosome.
+
+ The chromosome will be a list of city numbers where the order decides the
+ order in which the cities are visited.
+ '''
+ optimization = minimize
+
+ def evaluate(self):
+ score = 0
+ cities = self.env.cities
+
+ # calculate the cost by using the chromosome values as indexes for the
+ # cities array
+ for i in range(len(cities) - 1):
+ x, y = self.chromosome[i:i+2]
+ score += cities[x][y]
+
+ self.score = score
+
+def get_cities(count):
+ '''Get the cities for the given count from the data/<cities>_cities.txt
+ file'''
+ fh = open('data/%s_cities.txt' % count)
+
+ # The first line contains the amount of cities, not needed with Python
+ # lists so we ignore it
+ fh.next()
+
+ matrix = []
+ for i, distances in enumerate(fh):
+ matrix.append([])
+ for distance in distances.split():
+ matrix[i].append(float(distance))
+
+ return matrix
+
+def run_sample((sample, kwargs)):
+ print >>sys.stderr, 'Starting sample %s with %s cities' % (
+ sample,
+ len(kwargs['cities']),
+ )
+ data = []
+ env = TSPEnv(kind=TSP, **kwargs)
+ for generation in env:
+ if csv and generation.generation % csv == 0:
+ data.append((
+ generation.generation,
+ sample,
+ generation.best.score,
+ ))
+ data.append(())
+
+ return data
+
+if __name__ == '__main__':
+ parser = optparse.OptionParser()
+ parser.add_option('-g', '--max-generations', type='int', default=1000,
+ help='Continue till we have reached MAX_GENERATIONS')
+ parser.add_option('-p', '--population-size', type='int', default=10,
+ help='Store POPULATION_SIZE different individuals')
+ parser.add_option('-c', '--crossover-rate', type='float', default=0.9,
+ help='Set crossover probability to CROSSOVER_RATE (between 0 and 1)')
+ parser.add_option('-m', '--mutation-rate', type='float', default=0.01,
+ help='Set mutation probability to MUTATION_RATE (between 0 and 1)')
+ parser.add_option('-e', '--elitism', type='int', default=1,
+ help='Enable elitism for the top ELITISM results')
+ parser.add_option('--print-interval', type='int', default=100,
+ help='Print intermediate results for every PRINT_INTERVAL '
+ 'generations. Use 0 for no intermediate output')
+ parser.add_option('--csv', type='int', default=0,
+ help='Return csv output for easy plotting every CSV generation')
+ parser.add_option('--csv-file', type='string',
+ help='Where to write the csv output to (defaults to STDOUT)')
+ parser.add_option('-s', '--samples', type='int', default=1,
+ help='The amount of samples to use. Very useful with csv output')
+ parser.add_option('--processes', type='int',
+ help='By default the samples are calculated in parallel, you can '
+ 'change the amount of simultaneous processes with PROCESSES')
+
+ options, city_counts = parser.parse_args()
+ kwargs = dict(options.__dict__)
+
+ if not city_counts:
+ parser.print_help()
+ sys.exit(1)
+
+ csv = kwargs['csv']
+ samples = kwargs['samples']
+ generations = kwargs['max_generations']
+
+ data = []
+ # To allow for faster processing we use the Python Multiprocessing module
+ # to use all difference CPU's. If needed this can easily be extended to a
+ # distributed version where multiple servers are helping with the
+ # calculations.
+ pool = multiprocessing.Pool(kwargs['processes'])
+ for city_count in city_counts:
+ cities = get_cities(city_count)
+ print 'Processing %d cities' % len(cities)
+ kwargs['cities'] = cities
+
+ if samples > 1:
+ data += sum(
+ pool.map(
+ run_sample,
+ [(sample, kwargs) for sample in range(samples)],
+ chunksize=1,
+ ),
+ []
+ )
+ else:
+ data += run_sample((0, kwargs))
+
+ if csv:
+ if kwargs['csv_file']:
+ csv_fh = open(kwargs['csv_file'], 'w')
+ else:
+ csv_fh = sys.stdout
+
+ for generation in data:
+ if generation:
+ print >>csv_fh, ';'.join(map(str, generation))
+ else:
+ print >>csv_fh, ''
+
43 generate_charts.sh
@@ -0,0 +1,43 @@
+#!/bin/sh
+
+cities=15
+args="$cities -g 1000 -s 25 --csv 5 --print-interval 0"
+
+mkdir -p csv images
+
+echo "Warning, processing takes a long time."
+echo "On my 3.7GHz Intel Core I7 it took over 2 hours."
+echo
+echo "Do you want to continue? (ctrl+c to exit)"
+read
+
+for elitism in 0 1; do
+ for population in 10 20 50 100; do
+ for mutation in 0.01 0.02 0.05 0.1 0.2 0.3 0.4 0.5 0.6; do
+ for crossover in 0.1 0.3 0.5 0.7 0.9; do
+ file="elitism_${elitism}_population_${population}_mutation_${mutation}_crossover_${crossover}"
+ echo "Generating $file"
+ ./ga.py $args \
+ -e $elitism \
+ -p $population \
+ -m $mutation \
+ -c $crossover \
+ --samples 10 \
+ --csv-file csv/$file.csv
+
+ gnuplot << EOF
+ set zlabel 'Path length'
+ set xlabel 'Generation'
+ set ylabel 'Sample'
+ set datafile separator ';'
+ set title 'Elitism $elitism, population: $population, mutation: $mutation, crossover: $crossover'
+ set terminal png
+ set output 'images/$file.png'
+ splot 'csv/$file.csv' with pm3d
+EOF
+
+ done
+ done
+ done
+done
+
61 readme.rst
@@ -0,0 +1,61 @@
+Genetic Algorithm implementation in Python for the Traveling Salesman Problem
+==============================================================================
+
+Contents:
+
+.. toctree::
+ :maxdepth: 4
+
+The main script is the `ga.py` file which contains base classes for creating
+your own Genetic Algorithm (i.e.
+`import ga; env = ga.Environment(kind=YourKind)` where `YourKind` is some
+class inheriting `IndividualBase`) or for simply running the provided
+implementation of the Traveling Salesman Problem.
+
+Usage for the latter:
+
+::
+
+ Usage: ga.py [options]
+
+ Options:
+ -h, --help show this help message and exit
+ -g MAX_GENERATIONS, --max-generations=MAX_GENERATIONS
+ Continue till we have reached MAX_GENERATIONS
+ -p POPULATION_SIZE, --population-size=POPULATION_SIZE
+ Store POPULATION_SIZE different individuals
+ -c CROSSOVER_RATE, --crossover-rate=CROSSOVER_RATE
+ Set crossover probability to CROSSOVER_RATE (between 0
+ and 1)
+ -m MUTATION_RATE, --mutation-rate=MUTATION_RATE
+ Set mutation probability to MUTATION_RATE (between 0
+ and 1)
+ -e ELITISM, --elitism=ELITISM
+ Enable elitism for the top ELITISM results
+ --print-interval=PRINT_INTERVAL
+ Print intermediate results for every PRINT_INTERVAL
+ generations. Use 0 for no intermediate output
+ --csv=CSV Return csv output for easy plotting every CSV
+ generation
+ --csv-file=CSV_FILE Where to write the csv output to (defaults to STDOUT)
+ -s SAMPLES, --samples=SAMPLES
+ The amount of samples to use. Very useful with csv
+ output
+ --processes=PROCESSES
+ By default the samples are calculated in parallel, you
+ can change the amount of simultaneous processes with
+ PROCESSES
+
+To generate all output automatically there is also a script called
+`generate_charts.sh` available which automatically try all kinds of different
+values for elitism, population, mutation and crossover.
+
+
+If you have any questions, feel free to mail me at: `Rick _at_ Fawo _dot_ nl`
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`search`
+
Please sign in to comment.
Something went wrong with that request. Please try again.