In [1]:
import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.8'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.8'

In [2]:
import numpy as np
import random 
import matplotlib.pyplot as plt
import pandas as pd
import sys

In [3]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import Row 
import pyspark.sql.functions as F
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType, LongType, FloatType
from pyspark.sql.window import Window


In [4]:
sc = pyspark.SparkContext(appName ='TSP3')



In [22]:
sqlContext = SQLContext(sc)


In [5]:
def distance(city1, city2):
    
    Distance = np.sqrt(sum(map(lambda x, y: (x-y)**2, city1,city2)))

    return Distance
    
def routeDistance(route):
    dist = 0
    
    N = 5  #Number of cities # TODO get the length from the list?
    starting_point = route[0]
    final_point = route[N-1]
    for i in range(N-1):
        dist += distance(route[i], route[i+1])
        
        
    dist += distance(starting_point,final_point)      
        
    return dist
                   
    
def routeFitness(route):
    fitness = 0
    fitness = 1/ float(routeDistance(route))
    return fitness

In [78]:
   
def createRoute(cityList):
    """
    size = cityList.count() # as option we could use a fixed number: number of cities 
    route = cityList.takeSample(False, size)
    """
    
    route = random.sample(cityList, len(cityList))   
    return route
       
    
def initialPopulation(popSize, cityList):
    
    population = []

    for i in range(0, popSize):
        population.append([[i],createRoute(cityList)])
        
       
    # rdd
    rdd = sc.parallelize(population)      
    # rddPop = dPopulation.map(lambda x: Row(index=x[0], route=x[1])) # Is this line necessary or conveniance
    
    df = sqlContext.createDataFrame(population)    
    return rdd, df #sc.parallelize(population)

def rankRoutes(rdd, df):

    fitnessResults = F.udf(lambda x: routeFitness(x) , FloatType())
    new_df = df.select(df._2, fitnessResults(df._2).alias('RouteFitness'))
    final_df = df.join(new_df, df._2 == new_df._2, 'outer').select(df._1, new_df._2, new_df.RouteFitness)
    ordered_df = final_df.orderBy(final_df.RouteFitness, ascending = False)

    # rdd
    ordered_rdd = rdd.map(lambda row: (routeFitness(row[1]), row[1])).sortByKey(ascending=False)  

    return ordered_rdd, ordered_df

def randomNumber():
    
    return 100*random.random()


def selection(ordered_df, ordered_rdd, eliteSize):
    
    
    fitnessTotal = ordered_df.groupBy().sum('RouteFitness').collect()[0][0]
    cumulative_sum = ordered_df.withColumn('cumSum', F.sum(ordered_df.RouteFitness).over(Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)))    
    
    relativeFitness = F.udf(lambda x: 100*x/fitnessTotal , FloatType())
    cumulative_sum = cumulative_sum.withColumn('RelativeFitness', relativeFitness(cumulative_sum.cumSum))

    cumulative_rdd = ordered_rdd
      
    #randomGeneration = F.udf(lambda x: 100*random.random(), FloatType())
    #cumulative_sum = cumulative_sum.withColumn('RandomNumber', randomGeneration(cumulative_sum.cumSum))
    
    #firstSelection = spark.sql("SELECT _1, MAX ('RouteFitness') AS bestIndividual from ordered_df LIMIT eliteSize")
    
    return cumulative_sum, cumulative_rdd

                                                       
                              
"""
def selection(ordered_df, eliteSize):
    
    selectionResults = []
    relativeFitness = []
    
    fitnessTotal = np.sum(popRanked, axis = 0)
    cumulative_sum = np.cumsum(popRanked, axis =0)
    
    for i in range(len(popRanked)):
        relativeFitness.append(100*cumulative_sum[i][1]/fitnessTotal[1])
    
    for i in range(0, eliteSize):
        selectionResults.append(popRanked[i][0])
    
    for i in range(0, len(popRanked)-eliteSize):
        pick = 100*random.random()
        for i in range(0, len(popRanked)):
            if pick <= relativeFitness[i]: 
                selectionResults.append(popRanked[i][0])
                break
                
    return selectionResults
"""

def matingPool(population, selectionResults):
    matingpool = []
    
    for i in range(len(selectionResults)):
        matingpool.append(population[selectionResults[i]])
    
    return matingpool


def breed(parent1, parent2):
    
    a = random.randint(0, len(parent1))
    b = random.randint(0, len(parent1))
    
    if a>b:
        child= parent1[b:a]
    else:
        child=parent1[a:b]
        
    for i in range(len(parent2)):
        if (parent2[i] not in child):
            child.append(parent2[i])                

    return child

def breedPopulation(matingpool, eliteSize):
    children = []
    length = len(matingpool) - eliteSize
    pool = random.sample(matingpool, len(matingpool))

    for i in range(0,eliteSize):
        children.append(matingpool[i])
    
    for i in range(0, length):
        child = breed(pool[i], pool[len(matingpool)-i-1])
        children.append(child)
    return children

def mutate(individual, mutationRate):
    
    """ Partial Shuffle Mutation """
    
    for swapped in range(len(individual)):
        if(random.random() < mutationRate):
            swapWith = int(random.random() * len(individual))
            
            city1 = individual[swapped]
            city2 = individual[swapWith]
            
            individual[swapped] = city2
            individual[swapWith] = city1
    return individual

def mutatePopulation(population, mutationRate):
    mutatedPop = []
    
    for ind in range(0, len(population)):
        mutatedInd = mutate(population[ind], mutationRate)
        mutatedPop.append(mutatedInd)
    return mutatedPop


def nextGeneration(currentGen, eliteSize, mutationRate):
    popRanked = rankRoutes(currentGen)
    selectionResults = selection(popRanked, eliteSize)
    matingpool = matingPool(currentGen, selectionResults)
    children = breedPopulation(matingpool, eliteSize)
    nextGeneration = mutatePopulation(children, mutationRate)
    return nextGeneration

def geneticAlgorithmPlot(population, popSize, eliteSize, mutationRate, generations):
    pop = initialPopulation(popSize, population)
    progress = []
    progress.append(1 / rankRoutes(pop)[0][1])
    
    for i in range(0, generations):
        pop = nextGeneration(pop, eliteSize, mutationRate)
        progress.append(1 / rankRoutes(pop)[0][1])
    
    plt.plot(progress)
    plt.ylabel('Distance')
    plt.xlabel('Generation')
    plt.show()
    
    

    


In [79]:
cityList = []

for i in range(0,15):
    cityList.append([int(random.randint(0,100)), int(random.randint(0,100))])


In [80]:
rdd, df = initialPopulation(5, cityList)

In [81]:
display(df)
df.show(5)

DataFrame[_1: array<bigint>, _2: array<array<bigint>>]

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|[0]|[[18, 50], [92, 3...|
|[1]|[[94, 26], [55, 1...|
|[2]|[[94, 26], [55, 1...|
|[3]|[[69, 73], [99, 8...|
|[4]|[[99, 81], [95, 5...|
+---+--------------------+



In [82]:
display(rdd)
df.take(5)

ParallelCollectionRDD[221] at readRDDFromFile at PythonRDD.scala:247

[Row(_1=[0], _2=[[18, 50], [92, 37], [99, 81], [94, 26], [79, 56], [39, 81], [31, 22], [95, 58], [57, 59], [60, 4], [88, 77], [45, 45], [15, 45], [55, 12], [69, 73]]),
 Row(_1=[1], _2=[[94, 26], [55, 12], [79, 56], [88, 77], [45, 45], [18, 50], [92, 37], [15, 45], [31, 22], [39, 81], [60, 4], [99, 81], [69, 73], [57, 59], [95, 58]]),
 Row(_1=[2], _2=[[94, 26], [55, 12], [92, 37], [60, 4], [88, 77], [31, 22], [39, 81], [69, 73], [99, 81], [95, 58], [79, 56], [45, 45], [18, 50], [57, 59], [15, 45]]),
 Row(_1=[3], _2=[[69, 73], [99, 81], [92, 37], [79, 56], [55, 12], [95, 58], [88, 77], [15, 45], [94, 26], [60, 4], [31, 22], [18, 50], [57, 59], [45, 45], [39, 81]]),
 Row(_1=[4], _2=[[99, 81], [95, 58], [39, 81], [69, 73], [31, 22], [15, 45], [18, 50], [45, 45], [92, 37], [57, 59], [60, 4], [88, 77], [94, 26], [79, 56], [55, 12]])]

In [83]:
ordered_rdd, ordered_df = rankRoutes(rdd, df)

In [84]:
ordered_df.show(5)

+---+--------------------+------------+
| _1|                  _2|RouteFitness|
+---+--------------------+------------+
|[3]|[[69, 73], [99, 8...|0.0016066377|
|[4]|[[99, 81], [95, 5...|0.0014728141|
|[2]|[[94, 26], [55, 1...|0.0014720978|
|[1]|[[94, 26], [55, 1...|0.0013865575|
|[0]|[[18, 50], [92, 3...|0.0012287076|
+---+--------------------+------------+



In [85]:
ordered_rdd.take(5)

[(0.0016066377343552168,
  [[69, 73],
   [99, 81],
   [92, 37],
   [79, 56],
   [55, 12],
   [95, 58],
   [88, 77],
   [15, 45],
   [94, 26],
   [60, 4],
   [31, 22],
   [18, 50],
   [57, 59],
   [45, 45],
   [39, 81]]),
 (0.0014728141609162016,
  [[99, 81],
   [95, 58],
   [39, 81],
   [69, 73],
   [31, 22],
   [15, 45],
   [18, 50],
   [45, 45],
   [92, 37],
   [57, 59],
   [60, 4],
   [88, 77],
   [94, 26],
   [79, 56],
   [55, 12]]),
 (0.0014720978104754495,
  [[94, 26],
   [55, 12],
   [92, 37],
   [60, 4],
   [88, 77],
   [31, 22],
   [39, 81],
   [69, 73],
   [99, 81],
   [95, 58],
   [79, 56],
   [45, 45],
   [18, 50],
   [57, 59],
   [15, 45]]),
 (0.001386557501615706,
  [[94, 26],
   [55, 12],
   [79, 56],
   [88, 77],
   [45, 45],
   [18, 50],
   [92, 37],
   [15, 45],
   [31, 22],
   [39, 81],
   [60, 4],
   [99, 81],
   [69, 73],
   [57, 59],
   [95, 58]]),
 (0.0012287075714732519,
  [[18, 50],
   [92, 37],
   [99, 81],
   [94, 26],
   [79, 56],
   [39, 81],
   [31, 22],
 

In [90]:
cumulative_df, cumulative_rdd = selection(ordered_df, ordered_rdd, 5)

In [91]:
cumulative_df.show()

+---+--------------------+------------+--------------------+---------------+
| _1|                  _2|RouteFitness|              cumSum|RelativeFitness|
+---+--------------------+------------+--------------------+---------------+
|[3]|[[69, 73], [99, 8...|0.0016066377|0.001606637728400...|      22.417738|
|[4]|[[99, 81], [95, 5...|0.0014728141|0.003079451853409...|       42.96821|
|[2]|[[94, 26], [55, 1...|0.0014720978|0.004551549674943...|      63.508682|
|[1]|[[94, 26], [55, 1...|0.0013865575|0.005938107147812843|        82.8556|
|[0]|[[18, 50], [92, 3...|0.0012287076| 0.00716681475751102|          100.0|
+---+--------------------+------------+--------------------+---------------+



In [None]:
cumulative_rdd.collect()