In [8]:
# particleswarm.py
# python 3.4.3
# Based on code from https://jamesmccaffrey.wordpress.com/2015/06/09/particle-swarm-optimization-using-python/

import random
import copy    # array-copying convenience
import sys     # max float

# ------------------------------------

## Setup

Would like a wrapper around the component sizing problem which can just call the problem for a given set of component sizes and initial states.

Use this in grid search to check basic feasibility.

Then call the same as the error function of the PSO problem.

Want to load a radio file as a separate input to the simulator, so that we can simulate anything independently.


Process:

Create a model
   - Load a radio file
   - Create the initial system variables

Simulate a given set of parameters:
   - c, b, p, s, V_0, SOC_0, 
   - Get the results as a given cost: 
   
   
component_model object:
- Initialize(self, infile=Radio.csv): Define all variables, bounds, create initial model
- Simulate(variables={dict}, stopOnError=True, returnDf=False)


Load radio file
Simulate 


In [1]:
def error(position):
    # position = [c, V_0, p]
    
    return 1* position[0] + 1.52 * position[2]

def simulate(position, minx, maxx):
    # Returns the cost of the system if feasible, or float('inf') if it results in infeasibility
    # position: an n-d array of the initial variables
    # minx, maxx: n-d arrays of the limits on the variables 
    
    # Check to ensure that the initial position is feasible
    # Simulate forwards. Check that the constraints are respected.
    # If the simulation ends without violating constraints, return the system cost.
    
    inf = float('inf')
    
    # Check the initial position for feasibility
    for i in range(len(minx)):
        if position[i] < minx[i]:
            return inf
        if position[i] > maxx[i]:
            return inf
        
    # Simulate forwards
    # In this initial case, we assume 
    #   c*I_c = I_r
    #   I_c = c_1 * (V_0 - V_1)
    I_r = 2
    c_1 = 1
    c = position[0]
    V_0 = position[1]
    p = position[2]
    T_0 = 2
    z   = 0.5
    
    try:
#         I_c = I_r / c
#         V_1 = V_0 - I_c / c_1
        V_1 = (p * T_0 + c*c_1*V_0 - I_r) / (c*c_1 + p*z)
        I_c = c_1 * (V_0 - V_1)
        I_t = T_0 - z * V_1
    
    except:
        return inf
        
    if V_1 < minx[1]:
        return inf
    if V_1 > maxx[1]:
        return inf
    
    return error(position)

# ------------------------------------

class Particle:
    def __init__(self, minx, maxx, seed):
        self.rnd = random.Random(seed)
        dim = len(minx)
        self.position = [0.0 for i in range(dim)]
        self.velocity = [0.0 for i in range(dim)]
        self.best_part_pos = [0.0 for i in range(dim)]

        for i in range(dim):
            self.position[i] = ((maxx[i] - minx[i]) * self.rnd.random() + minx[i])
            self.velocity[i] = ((maxx[i] - minx[i]) * self.rnd.random() + minx[i])

        self.error = simulate(self.position, minx, maxx) # curr error
        self.best_part_pos = copy.copy(self.position) 
        self.best_part_err = self.error # best error

    def set_position(self, newPos):
        self.position = newPos
        self.error = simulate(self.position, minx, maxx)
        self.best_part_pos = copy.copy(self.position) 
        self.best_part_err = self.error # best error

def Solve(max_epochs, n, minx, maxx):
    # max_epochs: Number of simulation epochs, i.e. flight time steps
    # n : Number of particles
    # dim: dimensionality of Rastriggin's function
    # minx, maxx: Assuming that the simulation is in a hypercube defined by the range (minx, maxx) in each dimension

    
    ## Initialization
    w = 0.729    # inertia
    c1 = 1.49445 # cognitive (particle)
    c2 = 1.49445 # social (swarm)


    # create n random particles, stored in an array named Swarm
    swarm = [Particle(minx, maxx, i) for i in range(n)]
    swarm[0].set_position([2.5,1.5,1])  # Ensure that there is a feasible (non-optimal) point

    ## Identify the best value reported from the initial batch
    best_swarm_err = sys.float_info.max # High initial value    
    for i in range(n): # See what the actual best position is so far
        if swarm[i].error < best_swarm_err:
            best_swarm_err = swarm[i].error
            best_swarm_pos = copy.copy(swarm[i].position) 

    rnd = random.Random(0)
    dim = len(minx)
    epoch = 0
    while epoch < max_epochs:

        if epoch % 10 == 0:
            print("Epoch = " + str(epoch) + " best error = %.3f" % best_swarm_err)

        for i in range(n): # process each particle

            # compute new velocity of curr particle, in each dimension
            for k in range(dim): 
                r1 = rnd.random()    # randomizations
                r2 = rnd.random()

                # New velocity = w * inertia + c1 * own best + c2 * swarm best
                swarm[i].velocity[k] = ( (w * swarm[i].velocity[k]) + 
                                         (c1 * r1 * (swarm[i].best_part_pos[k] - swarm[i].position[k])) +  
                                         (c2 * r2 * (best_swarm_pos[k] - swarm[i].position[k])) )  

                # Make sure that the particles stay within the (minx, maxx) bounds in each dimension
                if (maxx[k] - swarm[i].position[k]) < swarm[i].velocity[k]:
                      swarm[i].velocity[k] = maxx[k] - swarm[i].position[k]
                elif (minx[k] - swarm[i].position[k]) > swarm[i].velocity[k]:
                      swarm[i].velocity[k] = minx[k] - swarm[i].position[k]

            # compute new position using new velocity
            for k in range(dim): 
                swarm[i].position[k] += swarm[i].velocity[k]

            # compute error of new position
            swarm[i].error = simulate(swarm[i].position, minx, maxx)

            # is new position a new best for the particle?
            if swarm[i].error < swarm[i].best_part_err:
                swarm[i].best_part_err = swarm[i].error
                swarm[i].best_part_pos = copy.copy(swarm[i].position)

            # is new position a new best overall?
            if swarm[i].error < best_swarm_err:
                best_swarm_err = swarm[i].error
                best_swarm_pos = copy.copy(swarm[i].position)

        # for-each particle
        epoch += 1
    
    return best_swarm_pos
# end Solve

In [121]:
num_particles = 500
max_epochs = 100
print("\nStarting PSO algorithm\n")

# x = [c, V_0, p]
minx = [  0, 1, 0]
maxx = [100, 2, 100]

best_position = Solve(max_epochs, num_particles, minx, maxx)

print("\nBest solution found:" + str(best_position))
print("Cost of best solution = %.6f" % error(best_position))

print("\nEnd particle swarm demo\n")


Starting PSO algorithm

Epoch = 0 best error = 4.020
Epoch = 10 best error = 2.027
Epoch = 20 best error = 2.027
Epoch = 30 best error = 2.027
Epoch = 40 best error = 2.027
Epoch = 50 best error = 2.027
Epoch = 60 best error = 2.027
Epoch = 70 best error = 2.027
Epoch = 80 best error = 2.027
Epoch = 90 best error = 2.027

Best solution found:[0.0, 1.1923380542768878, 1.3333333341632598]
Cost of best solution = 2.026667

End particle swarm demo



In [3]:
import pickle

a = pickle.load(open('swarm_20160712.pkl','rb'))

TypeError: 'str' does not support the buffer interface