In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container{ width:100% !important; }</style>"))

# Define Environment
The environment is defined using a transition matrix, where each transition relates to a linear distance. 

In [42]:
import numpy as np 
import glob
from random import uniform, randint, random

dist_mat = np.array([
    #  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.  11 
    [0.0, 1.0, 0.0, 0.0, 0.0, 2.5, 0.0, 3.0, 0.0, 3.0, 0.0], # 1
    [1.0, 0.0, 1.0, 0.0, 0.0, 1.5, 0.0, 0.0, 0.0, 0.0, 0.0], # 2
    [0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0], # 3
    [0.0, 0.0, 1.0, 0.0, 1.0, 1.5, 0.0, 0.0, 0.0, 0.0, 0.0], # 4
    [0.0, 0.0, 0.0, 1.0, 0.0, 1.5, 1.0, 0.0, 0.0, 0.0, 0.0], # 5
    [2.5, 1.5, 1.0, 1.5, 1.5, 0.0, 1.5, 2.0, 0.0, 0.0, 0.0], # 6
    [0.0, 0.0, 0.0, 0.0, 1.0, 1.5, 0.0, 1.5, 1.0, 0.0, 0.0], # 7
    [3.0, 0.0, 0.0, 0.0, 0.0, 2.0, 1.5, 0.0, 1.5, 1.5, 0.0], # 8
    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.5, 0.0, 2.0, 2.0], # 9
    [3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.5, 2.0, 0.0, 2.0], # 10 
    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 2.0, 0.0], # 11
])

Each transition has a probability of success, probability of failure, and probability that the robot returns to the same state. These probabilities are randomly computer using the Random_Probability function. This function checks to ensure that all probabilities equal 1.00. 

In [29]:
# Create python data structure for the distance matrix
def Random_Probabilities(s1, s2, ratio):
    total = 0
    
    while total != 1.00:
        success = uniform(s1, s2)
        ret = (1 - success) * ratio
        fail = 1 - success - ret

        success = np.round(success, 2)
        ret = np.round(ret, 2)
        fail = np.round(fail, 2)
        
        total = success + fail + ret
        
    return success, ret, fail

Using the transition matrix, create a data structure where each node acts as a parent and all connections are present. In addition to the connections, compile an array of probabilities using the Random_Probability function.

The structure takes the form:

    nodes = {
                1 : {
                        2  : {'Distance': 1.0, 'Success': 0.74, 'Return': 0.21, 'Fail': 0.05},
                        6  : {'Distance': 2.5, 'Success': 0.73, 'Return': 0.22, 'Fail': 0.05},
                        8  : {'Distance': 3.0, 'Success': 0.75, 'Return': 0.20, 'Fail': 0.05},
                        10 : {'Distance': 3.0, 'Success': 0.61, 'Return': 0.31, 'Fail': 0.08}
                    }
                2 : {
                        ....
                    }
            }

In [43]:
nodes = {}
for i in range(dist_mat.shape[0]):
    nodes[i+1] = dict()
    for j in range(dist_mat.shape[1]):
        if dist_mat[j,i] != 0.0:       
            success, ret, fail = Random_Probabilities(0.5, 0.9, 0.80) # create random probability values
            nodes[i+1][j+1] = {"Distance" : dist_mat[j,i], "Success" : success, "Return" : ret, "Fail" : fail}

When located in a specific state, a number of actions equal to the number of transitions exist, the optimal policy occurring when the correct action is selected. To enable optimisation, we create either a single or multiple solutions corresponding to a set of randomly generated actions. This is performed using the generate_solution method, which returns an action set. 

In [None]:
[len(nodes[node]) for node in nodes]

In [5]:
def generate_soltuions(nodes, num_solutions):
    num_actions = [len(nodes[node]) for node in nodes]
    action_array = np.zeros(shape=(num_solutions, len(num_actions)), dtype=np.int32)

    for j, action in enumerate(action_array):
        for i, val in enumerate(action):
            max_action = num_actions[i]
            action = randint(1, max_action)
            action_array[j,i] = action
    
    return action_array

In [46]:
nodes[11]

{9: {'Distance': 2.0, 'Success': 0.7, 'Return': 0.24, 'Fail': 0.06},
 10: {'Distance': 2.0, 'Success': 0.5, 'Return': 0.4, 'Fail': 0.1}}

# PRISM Code Generation

In [47]:
def Create_Model(nodes, actions, folder, start_location=1, end_location=9, iter_save_model=True):   
    PREAMBLE = list()
    WORKFLOW = list()
    REWARD_DISTANCE = list()

    # Create the preamble
    PREAMBLE.append("// Code generaetion for preamble.\n")
    PREAMBLE.append("mdp\n\n")

    # model parameters
    PREAMBLE.append("// Model parameters\n")
    PREAMBLE.append(f"const int start = {start_location};\n")
    PREAMBLE.append(f"const int final = {end_location}; \n")
    PREAMBLE.append("\n")

    # begin action synthesis
    PREAMBLE.append("// Create action selections\n")
    for i in range(len(nodes)):
        act = actions[i]
        PREAMBLE.append(f"const int a_s{i+1} = {act}; \t// Selected action in the range 1 to {len(nodes[i+1])};\n")

    # create WORKFLOW module 
    WORKFLOW.append("\n\n\n")
    WORKFLOW.append("module workflow\n")
    WORKFLOW.append(f"\tend : bool init false;\n")
    WORKFLOW.append(f"\ts : [0..{len(nodes)}] init {start_location};\n\n")
    for node in nodes:
        act = 0 # start action counter
        for trans in nodes[node]:
            act += 1
            # Each state/action in the workflow is comprised of four parts (condition, success, return, and fail)
            WORKFLOW.append(f"\t[s{node}_s{trans}] (s={node}) & (a_s{node}={act}) & (!end) -> ")  # Condition
            WORKFLOW.append(f"{nodes[node][trans]['Success']}:(s'={trans}) + ")                  # Success state
            WORKFLOW.append(f"{nodes[node][trans]['Return']}:(s'={node}) + ")                    # Return state
            WORKFLOW.append(f"{nodes[node][trans]['Fail']}:(s'=0); \n")                          # Fail state

            # Each of the state/action needs to also have a reward structure
            REWARD_DISTANCE.append(f'\t[s{node}_s{trans}] true : {nodes[node][trans]["Distance"]};\n')

    WORKFLOW.append("\n\t[end] (!end) & (s=0 | s=final) -> (end'=true);\n")
    WORKFLOW.append("\nendmodule\n\n\n")

    # reward structure
    REWARD_DISTANCE.insert(0, '\nrewards "distance" \n')
    REWARD_DISTANCE.append("endrewards")
    
    # Compile the PRISM model and save based on the input argument
    if iter_save_model:
        # We want to iteratively output the model. Therefore we will compile an adaptable name and save the model 
        # as they are created.
        FOLDER = folder
        SUB = "Model/"
        N_FILES = len(glob.glob1(FOLDER+SUB, "*.prism"))
        MODEL_NAME = f"Model_{N_FILES}"
        
        with open(FOLDER + SUB + MODEL_NAME + ".prism", 'w') as f:
            # write the preamble of the script
            for row in PREAMBLE:
                f.write(row)

            # write the main workflow
            for row in WORKFLOW:
                f.write(row)

            # write the reward structure
            for row in REWARD_DISTANCE:
                f.write(row)
                
    else: 
        # WE will compile a single master model. This model will updated and re-written during creation.
        FOLDER = folder
        SUB = "Model/"
        MODEL_NAME = f"Model"
        
        with open(FOLDER + SUB + MODEL_NAME + ".prism", 'w') as f:
            # write the preamble of the script
            for row in PREAMBLE:
                f.write(row)

            # write the main workflow
            for row in WORKFLOW:
                f.write(row)

            # write the reward structure
            for row in REWARD_DISTANCE:
                f.write(row)
            
    return FOLDER, SUB, MODEL_NAME

# Random Search Algorithm

In [None]:
import subprocess 
PRISM_PATH = '/Users/jordanhamilton/Documents/PRISM/bin/prism'
FOLDER = "Models/1. Random Search/3. 11_3/"

PCTL = "Distance"

solutions = list() 
actions = generate_soltuions(nodes, 50)
data = dict()

for i in range(actions.shape[0]):
    # create new model
    path, sub, model = Create_Model(nodes, actions[i,:], FOLDER, start_location=11, end_location=3)
    
    # Model inputs and outputs
    MODEL_PATH = path+sub+model+'.prism'
    POLICY_PATH = path+'Policy/'+model+'.tra'
    STATES_PATH = path+'States/'+model+'.sta'
    
    # Create prism expression
    if PCTL == "Distance":
        EXPRESSION = [f"{PRISM_PATH}", f"{MODEL_PATH}",  "-pctl", 'R{"distance"}min=? [F (end)]', 
                      "-exportadv", f"{POLICY_PATH}", "-exportmodel", f"{STATES_PATH}"]

    elif PCTL == "Prob"
        EXPRESSION = [f"{PRISM_PATH}", f"{MODEL_PATH}", "-pctl", "Pmax=? [F (end & s=final)]", 
                      "-exportadv", f"{POLICY_PATH}", "-exportmodel", f"{STATES_PATH}"]
    
    # Run PRISM command line interface command using subprocess
    process = subprocess.Popen(EXPRESSION, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = process.communicate()
    out = out.decode('utf-8')
    out = out.split()

    # Locate the result amongst the output
    for j, entry in enumerate(out):
        if entry == 'Result:':
            RESULT = float(out[j+1])
            
            # When adding entry to the data, use the length of the data structure as the index for the next entry.
            # this prevent previous data from being overwritten. 
            data[len(data)] = {
                "Solution" : RESULT,
                "Actions" : list(actions[i]),
                "Model"   : MODEL_PATH,
                "States"  : STATES_PATH,
                "Policy"  : POLICY_PATH,
            }
            
            solutions.append([list(actions[i]), RESULT])
            
        

In [None]:
import pickle

with open(path+"nodes.pickle", 'wb') as f:
    pickle.dump(nodes, f, pickle.HIGHEST_PROTOCOL)

with open(path+"data.pickle", 'wb') as f:
    pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
    
with open(path+"sols.pickle", 'wb') as f:
    pickle.dump(solutions, f, pickle.HIGHEST_PROTOCOL)

In [None]:
min_val = 10000
ind_min = 0
min_act = 0
for i in range(len(data)):
    if data[i]['Solution'] < min_val:
        min_val = data[i]['Solution']
        ind_min = i
        min_act = data[i]['Actions']

print("Minuminim Value ", min_val, "occurred at index ", ind_min, "with actions: ", min_act)

max_val = 0
ind_max = 0
max_act = 0
for i in range(len(data)):
    if data[i]['Solution'] > max_val:
        max_val = data[i]['Solution']
        ind_max = i
        max_act = data[i]['Actions']

print("Minuminim Value ", max_val, "occurred at index ", ind_max, "with actions: ", max_act)

# Bat Algorithm Optimisation

In [48]:
def Simulate_Prism(node, action, FOLDER):
    PCTL = "Distance"
    
    # create new model based on the current bat
    path, sub, model = Create_Model(nodes, action, FOLDER, 
                                    start_location=11, end_location=3, iter_save_model=False)
    
    # Model inputs and outputs
    MODEL_PATH = path+sub+model+'.prism'
    POLICY_PATH = path+'Policy/'+model+'.tra'
    STATES_PATH = path+'States/'+model+'.sta'
    
    # Create prism expression
    if PCTL == "Distance":
        EXPRESSION = [f"{PRISM_PATH}", f"{MODEL_PATH}",  "-pctl", 'R{"distance"}min=? [F (end)]', 
                      "-exportadv", f"{POLICY_PATH}", "-exportmodel", f"{STATES_PATH}"]

    elif PCTL == "Prob":
        EXPRESSION = [f"{PRISM_PATH}", f"{MODEL_PATH}", "-pctl", "Pmax=? [F (end & s=final)]", 
                      "-exportadv", f"{POLICY_PATH}", "-exportmodel", f"{STATES_PATH}"]
    
    # Run PRISM command line interface command using subprocess
    process = subprocess.Popen(EXPRESSION, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = process.communicate()
    out = out.decode('utf-8')
    out = out.split()

    # Locate the result amongst the output
    for j, entry in enumerate(out):
        if entry == 'Result:':
            fitness = float(out[j+1])
            
    return fitness
            

In [49]:
import subprocess 
import random 

PRISM_PATH = '/Users/jordanhamilton/Documents/PRISM/bin/prism'
FOLDER = "Models/2. Bat Algorithm/1. Distance/"

PCTL = "Distance"

Max_Acts = [len(nodes[node]) for node in nodes]

# Output arrays
solutions = list() 
data = dict()

# Bat algorithm settings -- Population and Dynamics
Population_Size = 20
Population = generate_soltuions(nodes, Population_Size)
Loudness   = np.ones(shape=(Population_Size, 1), dtype=np.float32)*0.75
Pulse_Rate = np.zeros(shape=(Population_Size, 1), dtype=np.float32)
Fitness    = np.ones(shape=(Population_Size, 1), dtype=np.float32)
Frequency  = np.zeros(shape=(Population_Size, Population.shape[1]), dtype=np.int32)
Velocity   = np.zeros(shape=(Population_Size, Population.shape[1]), dtype=np.float32)

# Bat algorithm settings -- Algoritm Settings
Max_Movements    = 30
Act_Movements    = 0
Frequency_Range  = np.array([0, 1], dtype=np.float32)
Loudness_Decay   = 0.5
Loudness_Limit   = 0.01
Pulse_Rate_Decay = 0.5
Gamma            = 0.5
Fitness_Stop     = 1e-3

# Optimisation Results
Best_Position = np.empty(shape=(1, Population.shape[1]), dtype=np.float32)
Best_Fitness = np.inf
Best_Bat = 0

# Iterate through initial population
for bat in range(Population_Size):
    Fitness[bat,:] = Simulate_Prism(nodes, Population[bat,:], FOLDER)
    
    # Locate the fittest individual
    Best_Bat = Fitness.argmin()
    Best_Fitness = Fitness[Best_Bat,:]
    Best_Position = Population[Fitness.argmin(), :]
    
# Iterate thorough each movement
for move in range(Max_Movements):
    
    # Iterature through each bat
    for bat in range(Population_Size):
        
        # Update the dynamics of the bat
        freq_array = np.array([[0, 1][random.random() < 0.75] for x in range(Frequency.shape[1])]).reshape(Frequency.shape[1])
        sign_array = np.array([[-1,1][random.random() < 0.75] for x in range(Frequency.shape[1])]).reshape(Frequency.shape[1])
        Frequency[bat,:] = (freq_array * sign_array)
        Velocity[bat,:] = (Best_Position - Population[bat,:]) + Frequency[0,:]
        Position = Population[bat,:] + Velocity[bat,:]
        
        # Perform Random Walk
        if random.random() > Pulse_Rate[bat,:]:
            walk_array = np.array([random.randint(-1, 1) for x in range(Population.shape[1])]).reshape(Population.shape[1])
            conf_array = np.array([[0, 1][random.random() < 0.75] for x in range(Population.shape[1])]).reshape(Population.shape[1])
            Position += walk_array * conf_array
        
        # Check to make sure the position is within the search space
        # Position[Position<1] = 1 # minimum action should 1
        for i in range(Position.shape[0]):
            if Position[i] > Max_Acts[i]:
                Position[i] = Max_Acts[i]
            
            elif Position[i] < 1:
                Position[i] = 1
                
        # Evaluate the fitness of the bat
        evaluation = Simulate_Prism(nodes, Position.astype(np.int32), FOLDER)
        
        if evaluation < Fitness[bat,:]:
            Fitness[bat,:] = evaluation
            Population[bat,:] = Position
            Pulse_Rate[bat,:] = Pulse_Rate_Decay*(1-np.exp(-Gamma*move))
            
            if Loudness[bat,:] > Loudness_Limit:
                Loudness[bat,:] *= Loudness_Decay
            else:
                Loudness[bat,:] = Loudness_Limit
             
            if evaluation < Best_Fitness:
                Best_Fitness = evaluation
                Best_Position = Position
                Best_Bat = bat
                
    
    print(f"Iteration {move} -- Best Bat {Best_Bat} -- Fitness {Best_Fitness} -- Loudness {Loudness.mean()}")
    
    
        
        
    

Iteration 0 -- Best Bat 3 -- Fitness [8.390138] -- Loudness 0.637499988079071
Iteration 1 -- Best Bat 7 -- Fitness 7.914880028440463 -- Loudness 0.6000000238418579
Iteration 2 -- Best Bat 5 -- Fitness 6.895214226041294 -- Loudness 0.515625
Iteration 3 -- Best Bat 5 -- Fitness 6.895214226041294 -- Loudness 0.4781250059604645
Iteration 4 -- Best Bat 5 -- Fitness 6.895214226041294 -- Loudness 0.3890624940395355
Iteration 5 -- Best Bat 1 -- Fitness 6.895210762749509 -- Loudness 0.3843750059604645
Iteration 6 -- Best Bat 1 -- Fitness 6.895210762749509 -- Loudness 0.3375000059604645
Iteration 7 -- Best Bat 1 -- Fitness 6.895210762749509 -- Loudness 0.328125
Iteration 8 -- Best Bat 1 -- Fitness 6.895210762749509 -- Loudness 0.328125
Iteration 9 -- Best Bat 1 -- Fitness 6.895210762749509 -- Loudness 0.3257812559604645
Iteration 10 -- Best Bat 1 -- Fitness 6.895210762749509 -- Loudness 0.2578125
Iteration 11 -- Best Bat 1 -- Fitness 6.895210762749509 -- Loudness 0.25312501192092896
Iteration 12

KeyboardInterrupt: 