In [1]:
import numpy as np
%matplotlib notebook
import matplotlib.pyplot as plt
import epics as PV
from logProgress import log_progress
import time
import math

In [2]:
m1 = PV.PV('wym:m1')

In [5]:
m1.put(-10)
while not m1.put_complete:
    print('Moving \r', end = '')
print('Done Moving')

Done Moving


In [6]:
m1.put_complete

True

In [7]:
class gaMotor:
    def __init__(self, pvName, scanStart, scanStop, scanStep):
        self.pvName = pvName
        self.scanStart = scanStart
        self.scanStop = scanStop
        self.scanStep = scanStep

### Define motors

In [8]:
motors = [gaMotor('wym:m1', -1.0, 1.0, 0.01),
         gaMotor('wym:m2', 0.0, 1.0, 0.01),
         gaMotor('wym:m3', -2.0, 0.0, 0.01),
         gaMotor('wym:m4', -4.0, 5.0, 0.01),
         gaMotor('wym:m5', -2.0, 2.0, 0.01),
         gaMotor('wym:m6', -3.0, 3.0, 0.01),
         gaMotor('wym:m7', 0.0, 2.0, 0.01)]

In [9]:
motors1 = [['wym:m1', -1.0, 1.0, 0.01],
         ['wym:m2', 0.0, 1.0, 0.01],
         ['wym:m3', -2.0, 0.0, 0.01],
         ['wym:m4', -4.0, 5.0, 0.01],
         ['wym:m5', -2.0, 2.0, 0.01],
         ['wym:m6', -3.0, 3.0, 0.01],
         ['wym:m7', 0.0, 2.0, 0.01]]

In [10]:
for i in motors1:
    print(i)

['wym:m1', -1.0, 1.0, 0.01]
['wym:m2', 0.0, 1.0, 0.01]
['wym:m3', -2.0, 0.0, 0.01]
['wym:m4', -4.0, 5.0, 0.01]
['wym:m5', -2.0, 2.0, 0.01]
['wym:m6', -3.0, 3.0, 0.01]
['wym:m7', 0.0, 2.0, 0.01]


In [11]:
PV.caput(motors[0].pvName+'.VAL', 2.0)

1

In [12]:
def moveMotors(positions, gaMotor):
    for pos, motor in zip(positions, gaMotor):
        PV.caput(motor.pvName, pos)    #moves done ~simultaneously, though started sequentially

In [13]:
def getMotors(gaMotor):
    positions = []
    for motor in gaMotor:
        pos = PV.caget(motor.pvName+'.RBV')
        positions.append(pos)
    
    return np.asarray(positions)

### Define genes
Based on motors -- each entry in gene list contains an array of all possible values for that gene/motor

In [14]:
genes = [np.arange(motor.scanStart,motor.scanStop+motor.scanStep,motor.scanStep) for motor in motors]

#### Test motor move

In [15]:
moveMotors(np.asarray([np.random.choice(gene) for gene in genes]), motors)

#### Set speed of all motors

In [16]:
def setMotorV(gaMotors, speed):
    speedPVs = ['.S', '.SBAK', '.SBAS']
    for motor in gaMotors:
        [PV.caput(motor.pvName+sPV,speed) for sPV in speedPVs]

In [17]:
setMotorV(motors, speed = 0.1)

#### Check if motors are still moving

In [18]:
def stillMoving(gaMotors):
    # DMOV = 0 while moving
    # DMOV = 1 when move complete
    status = [bool(PV.caget(motor.pvName+'.DMOV')) for motor in gaMotors]
    
    moving = not all(status)
    
    return moving
    

In [19]:
stillMoving(motors)

False

In [21]:
moveMotors(np.asarray([np.random.choice(gene) for gene in genes]), motors)
while stillMoving(motors):
    print("Still Moving \r", end = '')
print("Done Moving ")

Done Moving  


### Initialize population

In [22]:
def init_pop(nPopulation, genes):
    population = []
    for i in range(nPopulation):
        population.append(np.asarray([np.random.choice(gene) for gene in genes]))
        
    return population

### Fitness function

In [23]:
# Creating empty dataframe:
#
# df = pd.DataFrame(columns=['A','B','C','D','E','F','G'])

# Adding to data frame row by row:
#
# df.loc[0 if pd.isnull(df.index.max()) else df.index.max() + 1]=...

diagnosticPV = 'wym:userCalc3.VAL'

def get_fitness(pop, motors, diagPV = diagnosticPV, OM = False, minFit = 0):
    fitness = []
    for j, p in enumerate(pop):
        if OM:
            tempFit = minFit
            rePop = False
        moveMotors(p, motors)
        while stillMoving(motors):          
            if OM:
                currP = getMotors(motors)
                currFit = PV.caget(diagnosticPV)
                if currFit > tempFit:
                    tempP = currP
                    tempFit = currFit
                    rePop = True
                    
        finalFit = PV.caget(diagnosticPV)
        if OM and rePop:
            if tempFit > finalFit:
                finalFit = tempFit
                pop[j] = tempP
        fitness.append(finalFit)

    return pop, fitness

In [25]:
testPop = init_pop(10, genes)
print('Initial Population:')
for p in testPop:
    print(p)
_, testFitness = get_fitness(testPop, motors, OM = False)
print('test Fitness')
print(testFitness)
print('******************************************************')
newPop, newFitness = get_fitness(testPop, motors, OM = True)
print('New Population:')
for p in newPop:
    print(p)
print('New Fitness:')
print(newFitness)


Initial Population:
[-0.78  0.67 -1.21  1.71  1.66 -0.23  0.76]
[ 0.02  0.39 -0.81  4.93 -0.12 -1.8   1.84]
[ 0.07  0.04 -0.6   2.17 -1.55  2.34  0.9 ]
[ 0.55  0.79 -0.96  2.42  1.88  1.53  1.26]
[-0.33  0.62 -1.65  1.5   0.92 -0.64  0.87]
[-0.5   0.02 -0.62 -3.58  1.81 -1.89  1.85]
[ 0.75  0.72 -1.04  4.15  1.44 -0.61  0.73]
[ 0.14  0.09 -1.08 -1.4  -1.33  2.99  0.4 ]
[-0.85  0.23 -1.58  2.08 -0.95 -0.43  1.34]
[ 0.74  0.01 -0.33  3.13  1.87  2.79  1.78]
test Fitness
[0.006822699938896822, 7.098163033777284e-11, 1.8201619214668494e-05, 4.989448910914959e-05, 0.061964105818369024, 2.0759855876916813e-11, 9.558977296166334e-08, 3.745651814006564e-07, 0.00876227241122591, 2.3323254963584248e-09]
******************************************************
New Population:
[-0.78  0.67 -1.21  1.71  1.66 -0.02  0.76]
[-0.52  0.41 -0.95  1.97  1.4  -0.49  1.02]
[ 0.07  0.04 -0.6   2.17 -1.55  0.98  0.9 ]
[ 0.55  0.79 -0.96  2.42  0.01  1.53  1.26]
[-0.33  0.62 -1.65  1.5   0.92 -0.03  0.87]
[-0.33

In [None]:
#def get_fitness(pop, genes = genes):
#    fitness = []
#    peaks = np.asarray([np.mean(g) for g in genes])
#    for chromosome in pop:
#        fitness.append(np.prod(np.exp(-1.*(chromosome-peaks)**2)))
#    
#    return fitness
                       

### Ranking by fitness

In [26]:
def rank_pop(fitness):
    return np.argsort(fitness)

### Select Mating Pool

In [27]:
def mating_pool_probs(fitness):
    return [fit/sum(fitness) for fit in fitness]

In [28]:
def get_mating_pool(population, rankings, probs, nElites, nPopulation):
    rankedProbs = [probs[i] for i in rankings]
    
    #include elites
    #remainder based on probabilities from fitness_i/sum(fitness_i)
    breedingPop = []
    breedingPop[0:nElites] = rankings[::-1][0:nElites]
    breedingPop[nElites:nPopulation-nElites] = np.random.choice(rankings, size = nPopulation-nElites, p = rankedProbs)
    
    return breedingPop

### Breed (crossover)

In [29]:
def get_offspring(nGenes, nPopulation, population, breedingPop, Pc):
    #iterate through pairs i, i+1 (N/2 times) and if P < Pc swap genes 1 through randomly choose of 2,..,N-1)
    offspring = []
    for i in range(int(nPopulation/2)):
        parent1 = np.copy(population[breedingPop[2*i]])
        parent2 = np.copy(population[breedingPop[2*i+1]])
        
        child1 = np.copy(parent1)
        child2 = np.copy(parent2)
        if np.random.random() < Pc:
            crossover = np.random.choice(nGenes-1) + 1
            child1[crossover:]=parent2[crossover:]
            child2[crossover:]=parent1[crossover:]
        offspring.append(child1)
        offspring.append(child2)
    return offspring

### Mutate

In [30]:
def mutate_pop(pop, Pm, genes):
    #iterate through all genes on all chromosomes, if P < Pm pick randomly from gene's interval
    mutatedOffspring = np.copy(pop)
    for i, ind in enumerate(pop):
        for j, gene in enumerate(ind):
            if np.random.random() < Pm:
                mutatedOffspring[i][j] = np.random.choice(genes[j])
            
    
    return mutatedOffspring

### Put together

In [36]:
def gaScan(motors, nPopulation = 10, Pc = 0.8, Pm = 0.05, nGenerations = 100, nElite = 5, OM = True, critVal = 1.0):
    print('Observer Mode: {}'.format(OM))
    startTime = time.time()
    genes = [np.arange(motor.scanStart,motor.scanStop+motor.scanStep,motor.scanStep) for motor in motors]
    nGenes = len(genes)
    pop = init_pop(nPopulation, genes)
    peakFit = []
    aveFit = []
    for i in log_progress(range(nGenerations), name = "Generations"):
        pop, popFitness = get_fitness(pop, motors, OM = OM)
        peakFit.append(max(popFitness))
        aveFit.append(np.mean(popFitness))
        
        if peakFit[-1] < critVal:
            print('Current Peak Fitness: {}; Goal Value: {} \r'.format(peakFit[-1], critVal), end = '')
            rankings = rank_pop(popFitness) # list of indexes
            probs = mating_pool_probs(popFitness)
        
            breedingPop = get_mating_pool(pop, rankings, probs, nElite, nPopulation)
            offspring = get_offspring(nGenes, nPopulation, pop, breedingPop, Pc)
            pop = mutate_pop(offspring, Pm, genes)
        else:
            print('Final Peak Fitness: {}'.format(peakFit[-1]))
            print('Minimum fitness criteria met')
            break
    
    print(' ')
    pop, finalFitness = get_fitness(pop, motors, OM = False)
    peakFit.append(max(finalFitness))
    aveFit.append(np.mean(finalFitness))

    endTime = time.time()
    print('Total processing time {} s'.format(math.floor(endTime-startTime)))
    
    fig, ax = plt.subplots()
    line1, = ax.plot(peakFit, color = 'r', label = 'Peak Fitness')
    line2, = ax.plot(aveFit, color = 'b', label = 'Ave Fitness')
    ax.legend(loc='lower right')
    ax.set_title('Population = {}, Elites = {}, Pc = {}, Pm = {}, OM = {}'.format(nPopulation, nElites, 
                                                                                  Pc, Pm, OM))
    ax.set_xlabel('Generation')
    ax.set_ylabel('Fitness')
    plt.show
    
    return aveFit, peakFit
          
    
    

In [35]:
nPop = 10
Pc = 0.8
Pm = 0.05
nElites = 2
nGen = 100
setMotorV(motors, speed = 2.0)

aveFit, peakFit = gaScan(motors, nPopulation = nPop, Pc = Pc, Pm = Pm, 
                         nGenerations = nGen, nElite = nElites, OM = False, critVal = 0.9)

Observer Mode: False


Total processing time 444 s2968126698785; Goal Value: 0.9  


<IPython.core.display.Javascript object>

### OMEA - Oberver Mode Evolutionary Algorithm

In practice will have nPopulation = 10, but will track fitness during motor moves and replace individual/chromosome if "fitter" individual found on the way to destination.



In [44]:
#Randomize start position
moveMotors(np.asarray([np.random.choice(gene) for gene in genes]), motors)

In [45]:
nPop = 10
Pc = 0.8
Pm = 0.05
nElites = 2
nGen = 100
setMotorV(motors, speed = 2.0)

aveFit, peakFit = gaScan(motors, nPopulation = nPop, Pc = Pc, Pm = Pm, nGenerations = nGen, 
                         nElite = nElites, OM = True, critVal = 0.99)

Observer Mode: True


 urrent Peak Fitness: 0.8979867192887904; Goal Value: 0.99  
Total processing time 416 s


<IPython.core.display.Javascript object>