# LIBS

In [8]:
import random
import copy
import time
import pyspark

# UTILS

In [9]:
cities = None
def fitness(state):
    total = 0
    for i in range(len(state)-1):
        a = state[i+1] - 1
        b = state[i] - 1
        cityPosition1 = cities[a]
        cityPosition2 = cities[b]
        dist = (cityPosition1[0] - cityPosition2[0])**2 + (cityPosition1[1] - cityPosition2[1])**2
        total += dist
    a = state[0] - 1
    b = state[-1] - 1
    cityPosition1 = cities[a]
    cityPosition2 = cities[b]
    dist = (cityPosition1[0] - cityPosition2[0])**2 + (cityPosition1[1] - cityPosition2[1])**2
    total += dist
    return total


def generateRandomPosition(N):
    l = [c+1 for c in range(N)]
    position = []
    for j in range(N):
        indexChosen = random.randint(0,len(l)-1)
        position.append(l[indexChosen])
        del l[indexChosen]
    return position

def generateRandomSwap(N):
    numOfSwaps = random.randint(1,N//2)
    swaps = []
    for _ in range(numOfSwaps):
        i = random.randint(0,N-1)
        j = random.randint(0,N-1)
        swaps.append((i,j))
    return swaps


class Position():
    def __init__(self, values):
        self.type = 'position'
        self.values = values
    def __add__(self, other):
        if self.type == 'position' and other.type == 'velocity':
            returnObj = copy.deepcopy(self)
            for swap in other.values:
                i,j = swap
                returnObj.values[i], returnObj.values[j] = returnObj.values[j], returnObj.values[i]
            return returnObj
        
        if self.type == 'velocity' and other.type == 'position':
            returnObj = copy.deepcopy(other)
            for swap in self.values:
                i,j = swap
                returnObj.values[i], returnObj.values[j] = returnObj.values[j], returnObj.values[i]
            return returnObj
        
        if self.type == 'velocity' and other.type == 'velocity':
            returnObj = copy.deepcopy(self)
            returnObj.values.extend(other.values)
            return returnObj
        
        print("DiscreteDimension(Class): operation not specified")
        return 
    
    def __sub__(self, other):
        if self.type == 'position' and self.type == 'position':
            returnObj = copy.deepcopy(self)
            returnObj.values = []
            returnObj.type = 'velocity'
            auxObj = copy.deepcopy(other)
            for i in range(len(self.values)):
                if auxObj.values[i] != self.values[i]:
                    j = auxObj.values.index(self.values[i])
                    swap = (i,j)
                    returnObj.values.append(swap)
                    auxObj.values[i], auxObj.values[j] = auxObj.values[j], auxObj.values[i]
            return returnObj
    
    # def __mul__(self, other):
    #     same as rmul

    def __rmul__(self, other):
        if self.type == 'velocity':
            if other < 0:
                print("DiscreteDimension(Class): operation not specified: multiplication by negative number")
                return 
            if other > 1:
                returnObj = copy.deepcopy(self)
                integerPart = int(other)
                fracPart = int(other*10)%10
                returnObj.values = returnObj.values * integerPart
                returnObj.values.extend(self.values[:fracPart])
                return returnObj
            if other == 0:
                returnObj = copy.deepcopy(self)
                returnObj.values = []
                return returnObj
            
            fracPart = int(other*10)%10
            returnObj = copy.deepcopy(self)
            returnObj.values = returnObj.values[:fracPart]
            return returnObj
    
    def __str__(self):
        return f"{self.values}"
        

class Velocity():
    def __init__(self, values):
        self.type = 'velocity'
        self.values = values
    def __add__(self, other):
        if self.type == 'position' and other.type == 'velocity':
            returnObj = copy.deepcopy(self)
            for swap in other.values:
                i,j = swap
                returnObj.values[i], returnObj.values[j] = returnObj.values[j], returnObj.values[i]
            return returnObj
        
        if self.type == 'velocity' and other.type == 'position':
            returnObj = copy.deepcopy(other)
            for swap in self.values:
                i,j = swap
                returnObj.values[i], returnObj.values[j] = returnObj.values[j], returnObj.values[i]
            return returnObj
        
        if self.type == 'velocity' and other.type == 'velocity':
            returnObj = copy.deepcopy(self)
            returnObj.values.extend(other.values)
            return returnObj
        
        print("DiscreteDimension(Class): operation not specified")
        return 
    
    def __sub__(self, other):
        if self.type == 'position' and self.type == 'position':
            returnObj = copy.deepcopy(self)
            returnObj.values = []
            returnObj.type = 'velocity'
            auxObj = copy.deepcopy(other)
            for i in range(len(self.values)):
                if auxObj.values[i] != self.values[i]:
                    j = auxObj.values.index(self.values[i])
                    swap = (i,j)
                    returnObj.values.append(swap)
                    auxObj.values[i], auxObj.values[j] = auxObj.values[j], auxObj.values[i]
            return returnObj
    
    # def __mul__(self, other):
    #     same as rmul

    def __rmul__(self, other):
        if self.type == 'velocity':
            if other < 0:
                print("DiscreteDimension(Class): operation not specified: multiplication by negative number")
                return 
            if other > 1:
                returnObj = copy.deepcopy(self)
                integerPart = int(other)
                fracPart = int(other*10)%10
                returnObj.values = returnObj.values * integerPart
                returnObj.values.extend(self.values[:fracPart])
                return returnObj
            if other == 0:
                returnObj = copy.deepcopy(self)
                returnObj.values = []
                return returnObj
            
            fracPart = int(other*10)%10
            returnObj = copy.deepcopy(self)
            returnObj.values = returnObj.values[:fracPart]
            return returnObj
    
    def __str__(self):
        return f"{self.values}"


class Particle:
    def __init__(self, id, data):
        self.id = id
        self.N = data["nOfParticles"]
        self.position = data["position"]
        self.velocity = data["velocity"]
        self.pbest = (data["pbest"], data["pbest_fitness"])
        self.gbest = data["gbest"]
        self.gbest_fitness = data["gbest_fitness"]

        self.omega = 0.7
        self.alfa = 1.2
        self.beta = 2.3
    
    def update(self):
        self.r1 = random.uniform(0,1)
        self.r2 = random.uniform(0,1)
        
        inertiaFactor = self.omega*self.velocity
        cognitiveFactor = self.alfa*self.r1*(self.pbest[0] - self.position)
        socialFactor = self.beta*self.r2*(self.gbest - self.position)

        self.velocity = inertiaFactor + cognitiveFactor + socialFactor
        self.position = self.position + self.velocity

        if fitness(self.position.values) < self.pbest[1]:
            self.pbest = (self.position, fitness(self.position.values))
            if  self.pbest[1] < self.gbest_fitness:
                self.gbest = self.pbest[0]
                self.gbest_fitness = self.pbest[1]
    
    def make_message(self):
        a = str(-1)
        b = str(self.position.values)
        c = str(self.velocity.values)
        d = str(fitness(self.position.values))
        e = str(self.pbest[0].values)
        f = str(self.pbest[1])
        g = str(self.gbest.values)
        h = str(self.gbest_fitness)
        return ':'.join([a,b,c,d,e,f,g,h])
    
    def getParticleInfo(self):
        a = str(self.N)
        b = str(self.position.values)
        c = str(self.velocity.values)
        d = str(fitness(self.position.values))
        e = str(self.pbest[0].values)
        f = str(self.pbest[1])
        g = str(self.gbest.values)
        h = str(self.gbest_fitness)
        return ':'.join([a,b,c,d,e,f,g,h])
        

def parseData(data):
    nOfParticles, position, velocity, fitness, pbest_particle, pbest_fitness, gbest_particle, gbest_fitness = data.split(':')

    return {
        "nOfParticles": int(nOfParticles),
        "position": Position(eval(position)),
        "velocity": Velocity(eval(velocity)),
        "fitness": float(fitness),
        "pbest": Position(eval(pbest_particle)),
        "pbest_fitness": float(fitness),
        "gbest": Position(eval(gbest_particle)),
        "gbest_fitness": float(gbest_fitness),
    }

def ParticleUpdateGBest(data, new_gbest, new_gbest_fitness):
    a = str(data["nOfParticles"])
    b = str(data["position"])
    c = str(data["velocity"])
    d = str(data["fitness"])
    e = str(data["pbest"])
    f = str(data["pbest_fitness"])
    g = str(new_gbest)
    h = str(new_gbest_fitness)
    return ':'.join([a,b,c,d,e,f,g,h])

## CITY

In [10]:
def GenerateCities(numberOfCities):
    cities = []
    for x in range(numberOfCities//2):
        cities.append((x,0))
        cities.append((x,1))
    return cities 


## GenerateParticles

In [11]:
def GenerateParticle(numCities, numParticles):
    position = generateRandomPosition(numCities)
    velocity = generateRandomSwap(numCities)
    custo = fitness(position)
    string = str(numParticles) + ":" + str(position) + ":" 
    string += str(velocity) + ":" + str(custo) + ":"
    string += str(position) + ":" + str(custo) + ":"
    string += str(position) + ":" + str(custo)

    return string

# MAP

In [12]:
def mapper(mapperInput):
    a="test"
    if type(mapperInput) == type(a):
        mapperInput = eval(mapperInput)
    id, stringfiedData = mapperInput
    
    # parse data
    data = parseData(stringfiedData)

    # update particle velocity
    particleObj = Particle(id, data)
    particleObj.update()
    
    emits = []
    
    # Emit message if new value is better than gbest
    #if particleObj.gbest_fitness > particleObj.pbest[1]:
    message = particleObj.make_message()
    for i in range(1, data["nOfParticles"]+1):
        if i != id:
            emits.append((i, message))
    
    newParticleInfo = particleObj.getParticleInfo()

    emits.append((id, newParticleInfo))
    return emits

# REDUCE

In [13]:
def reducer(value1,value2):
    # Parser dos values
    data1 = parseData(value1)
    data2 = parseData(value2)

    # Quando value1 eh mensagem, 
    # considere o value2 como particula
    if data1["nOfParticles"] == -1:
        # Verifica se o global do value1 e melhor
        if data1["gbest_fitness"] < data2["gbest_fitness"]:
            return ParticleUpdateGBest(data2, data1["gbest"], data1["gbest_fitness"])
        else:
            return value2
    # Quando value1 eh particula
    else:
        # Verifica se o global do value2 e melhor
        if data2["gbest_fitness"] < data1["gbest_fitness"]:
            return ParticleUpdateGBest(data1, data2["gbest"], data2["gbest_fitness"])
        else:
            return value1

# MAP-REDUCE

In [14]:
# Constants
numberOfParticles = 60
numberOfCities = 100

# Generate cities
cities = GenerateCities(numberOfCities)

# Generate swarm
swarm = [(i,GenerateParticle(numberOfCities, numberOfParticles)) for i in range(1, numberOfParticles+1)]

# Transform swarm to RDD object
swarmRDD = sc.parallelize(swarm)

# Start timer
start = time.time()

# Run 100 iterations
for _ in range(100):
    swarmRDD = swarmRDD.flatMap(lambda word: mapper(word))\
                      .reduceByKey(lambda a,b: reducer(a,b))

# Finish timer
end = time.time()

# Show results 
print("Time taked to run 100 iterations:", end - start)

Time taked to run 100 iterations: 3.9194729328155518
