In [1]:
import numpy as np
import numpy.linalg.linalg as LA

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.lines import Line2D
from matplotlib.transforms import Affine2D
import matplotlib.patches as patches
from matplotlib.text import Annotation

#from matplotlib.mlab import find

from mpl_toolkits.mplot3d import Axes3D
from IPython.display import clear_output

import scipy.stats as stats
from scipy.spatial import ConvexHull, convex_hull_plot_2d
from scipy.spatial.distance import euclidean

import math
import random

twoPi = np.pi * 2

def find(condition):
    res, = np.nonzero(np.ravel(condition))
    return res

class MultipleNRV(object):
    """Multiple independent normal random variables."""
    def __init__(self, size, loc=0., scale=1.):
        self.size = size
        self.mean, self.std = loc, scale
        self.twoVariance = 2 * self.std ** 2
    
    def pdf(self, xs):
        """Returns the probability density function value for a particular
        vector."""
        twoVar = self.twoVariance
        if twoVar == 0:
            return 1 if xs == self.mean else 0
        else:
            delta2 = (xs - self.mean) ** 2
            return np.product( np.exp( -delta2 / twoVar ) / np.sqrt( twoVar * np.pi) )        
            
    def sample(self):
        """Returns a vector sampled from the PDF."""
        loc, scale, n = self.mean, self.std, self.size
        return loc if scale == 0 else np.random.normal(loc, scale, size=self.size)                                                                        

class World(object):
    
    def __init__(self, sensor_angles=(0,), luminance=1.0, light_coords=(10, 0, -0.1), v_max=1.0, agent_radius=0.5, sensor_noise=0, motor_noise=0, dt=0.1, seed=None):
        self.sensors = np.array(sensor_angles)
        self.light_pos = np.array(light_coords)
        self.v_max = v_max
        self.agent_radius = agent_radius
        self.luminance = luminance
        self.dt = dt
        
        if seed is not None:
            np.random.seed(seed)

        # set up noise random variables
        sensor_sigma = sensor_noise * np.sqrt(dt)
        motor_sigma = motor_noise * np.sqrt(dt)
        

        self.sensor_rv = MultipleNRV(size=len(sensor_angles), scale = sensor_sigma)
        self.motor_rv = MultipleNRV(size=2, scale = motor_sigma)
                
    def sensor_pos(self, state):
        """Returns an array corresponding to a list of (x, y, 0) sensor 
        positions in world coordinates."""
        sensors, r = self.sensors, self.agent_radius
        x, y, theta = state
        n = len(sensors)
        
        result = np.zeros( (n, 3) )
        # copy robot x, y into sensors
        result[:,0:2] = state[0:2]
        
        angles = theta + sensors
        result[:,0] = r * np.cos(angles) + x
        result[:,1] = r * np.sin(angles) + y
        
        return result
        
    def sensor_input(self, state):
        """Returns an array of raw sensor input values for a particular
        agent state (position and orientation). These are calculated
        according to an inverse square distance law, and the agent's body
        can occlude a sensor reducing its input to zero.
        """        
        # various relevant parameters
        r, K = self.agent_radius, self.luminance
        # light position
        l_pos = self.light_pos        

        # unpack 3D position and heading from (x, y, theta) state
        pos, theta = np.array(tuple(state[0:2]) + (0,)), state[-1]        

        # positions in world coordinates of each sensor
        s_pos = self.sensor_pos(state)
        # array of distances of sensors from light source
        d_s = LA.norm(l_pos - s_pos, axis=1)
        
        # distance of light from robot's centre
        d_0 = LA.norm(l_pos - pos)

        # array of zeros or ones for each sensor according to whether the 
        # agent's body lies between the sensor and the light source
        not_occluded = (d_0**2 >= r**2 >= (d_s**2 - d_0**2))
        
        # light reaching each sensor
        return not_occluded * K / d_s **2                
        
    def sensor_transform(self, activation):
        """Returns a vector of sensor readings for a particular sensor input 
        value (activation) vector. Noise is usually applied to the activation 
        before applying the transform."""
        # rescale to (0, 1) interval, assuming activation is positive
        #return activation / (1 + activation)
        K, l_pos = self.luminance, self.light_pos
        # minimum distance is z coordinate of the light position
        d_min = l_pos[-1]
        
        # rescale activation to range between 0 and a_max
        # with midpoint around 
        a_max = K / ( d_min ** 2 )
        a = a_max / (1 + np.exp(5*(K/4 - activation)))
        
        #return 1 / (1 + np.exp(-activation))
        return activation
        #return np.sqrt(K / a)
    
    def sensor_inverse_transform(self, reading):
        """Returns the vector of sensor input values (activations) that would be 
        needed to produce the specified sensor reading. """
        return reading / (1 - reading)
    
    
    def sense(self, state):
        """Returns a vector of sensor reading values for a 
        particular agent state (position and orientation). 
        Noise is added to the raw luminance at the sensor's location
        and the result is rescaled to the (0, 1) interval.
        """
        activation = self.sensor_input(state) + self.sensor_rv.sample()
                        
        # and rescale to (0, 1) interval
        return self.sensor_transform(activation)
    
    def p_sensation(self, state, sensation):
        """Returns a probability density value for the likelihood of a 
        particular sensor reading vector given a particular agent state."""
        # invert rescaling operation to find the original activations 
        sensor_activation = self.sensor_inverse_transform(sensation)
        # determine the actual luminance at the sensors
        sensor_input = self.sensor_input(state)        
        
        # interrogate the RV object to get the PDF value
        return self.sensor_rv.pdf(sensor_input - sensor_activation)        

    def act(self, state, action):
        """Applies a motor activation vector to an agent state, and simulates 
        the consequences using Euler integration over a dt interval."""
        # noisily map the action values to a (-1, +1) interval
        motor_out = self.v_max * np.tanh(action) + self.motor_rv.sample()
        
        # calculate the linear speed and angular speed
        v = motor_out.mean()
        omega = (motor_out[1] - motor_out[0]) / (2.0 * self.agent_radius)
        
        # calculate time derivative of state
        theta = state[-1]
        deriv = [ v * np.cos(theta), v * np.sin(theta), omega ]
        
        # perform Euler integration
        return self.dt * np.array(deriv) + state              
        
    # Function for changing the light position, here we can take the light, move it for N timestamps (in this case 100 or 10 seconds) and move it back
    def light_change(self,original_state,active_token,stamp,light_poses):
        
        if active_token == 2: # If the token given is 2 initially we won't get a light change
            pass
        
        if active_token == 0: # change light from its intiailised location to a random point along the x=10 axis, as it was found, too far away from the light,t he robot will nto get distracted
            original_state = self.light_pos # initialisied light location for replacement
            light_coords = [10,random.uniform(-5,5),-0.1]
            self.light_pos = np.array(light_coords) # officially change light position in simulation
            new_active_token = 1 #change token for next round
            light_poses.append(light_coords) # add new coordinates to light locations

            
        if active_token == 1: # if the light had been moved here we move the light back to its initialised location
            self.light_pos = original_state # officially change light location in code
            new_active_token = 0 # change our token back
            self.luminance = 1 

        stamp = stamp + 100
   
        return(original_state,new_active_token,stamp,light_poses,self.light_pos)        
        
    def simulate(self, controller,active_token, interval=500.0):
        """Simulates the agent-environment system for the specified interval
        (in simulated time units) starting from a random state. Returns
        a (poses, sensations, actions, states) tuple where poses is a time array 
        of agent poses (position and orientation), sensations is a time array of 
        sensory readings, actions is a time array of motor activations, and
        states is a list of arbitrary internal controller state objects.
        
        Must be called with a controller function of the form 
        controller(sensation, state, dt) that returns a (action, state) tuple
        outputting motor activations and updated internal state in
        response to sensor readings.
        """
        poses = [ self.random_state() ]

        states = [ None ]
        sensations = [ ]
        actions = [ ]
        
        reached_source = 0 # here we indicate that we have reached the light source, in BTDR variations of th code we have
                           # a 'reached home' token but with motor reverse we have no ieda of our bearings

        motor_history = [] # lets give a motor history we can use to reverse our path after we are done
        
        original_state = self.light_pos # set our light home positions so we can move it temporarily and back
        
        light_poses = [] # list to keep track of new light positions for plotting
        
        stamp = 0 # timestamp is a placeholder that allows us to move the light for a specific number of time

        for i in range(int( interval / self.dt )):
            
            if i%1000 == 0 and active_token == 0:# here every 100 seconds the light will move 
                original_state,active_token,stamp,light_poses,self.light_pos = self.light_change(original_state,active_token,i,light_poses)
                
            if stamp == i  and active_token == 1:# here we call and move the light back when the allotted decoy time is up
                original_state,active_token,stamp,light_poses,self.light_pos = self.light_change(original_state,active_token,i,light_poses)

            
            sensations.append(self.sense(poses[-1])) 
            action, motor_history, reached_source,state = controller(sensations[-1], states[-1], self.dt,sensations,reached_source,motor_history) ###############
            actions.append(action)
            states.append(state)
            poses.append(self.act(poses[-1], actions[-1]))

        return np.array(poses), np.array(sensations), np.array(actions), states,self.light_pos,light_poses 
    
            
        
    def random_state(self):
        """Returns a random initial state."""
        result = np.zeros(3)
        result[-1] = np.random.rand() * twoPi

        return result
        
        
    def task1fitness(self, poses):
        """Returns the fitness of the trajectory described by poses on 
        assignment task 1 (reaching the light source)."""
        return -self.reached_light_at(poses)
        
    def task2fitness(self, poses):
        """Returns the fitness of the trajectory described by poses on 
        assignment task 1 (reaching the light source and returning to base)."""
        light_time = self.reached_light_at(poses)
        if light_time == np.inf:
            return -np.inf
        return -self.first_reached(poses, np.array([0, 0]), after=light_time) 
        
    def first_reached(self, poses, xy, after = 0, within = 1.5):
        after_index = int(np.floor(after / self.dt))
        ds = LA.norm(xy - poses[after_index:,0:2], axis=1)
        indices = np.nonzero(ds < within)[0]
        
        if len(indices) == 0:
            return np.inf
        
        return (indices[0] + after_index) * self.dt
        
    def reached_light_at(self, poses):
        return self.first_reached(poses, self.light_pos[0:2])
    
    
    def animate(self, poses, sensations, speedup=5):
        r, l_pos = self.agent_radius, self.light_pos
        x, y, theta = poses[0]
        
        # use an Ellipse to visually represent the agent's body
        body = patches.Ellipse(xy=(0, 0), width=2 * r, height=2 * r, fc='w', ec='k')
        # use a black dot to visually represent each sensor
        sensors = [ patches.Ellipse(xy=(r * np.cos(theta), r * np.sin(theta)), width=0.2, height=0.2, fc='b') for theta in self.sensors ]
        # use small rectangles to visually represent the motors
        motors = [ patches.Rectangle((-0.5*r, y), width = r, height = 0.2*r, color="black") for y in (-1.1*r, 0.9*r) ]
        # use a line to indicate the agent's orientation
        line = Line2D( (x, x + r * np.cos(theta)), (y, y + r * np.sin(theta)) )
        line = Line2D( (0, r), (0, 0) )
        # draw a line showing the agent's "trail"
        trail = Line2D( [], [], color='r') 
        # display a clock
        clock = Annotation('', (0.8, 0.9), xycoords='axes fraction')    
        # use a yellow circle to visually represent the light
        light_r = patches.Ellipse(xy=l_pos[0:2], width=1, height=1, fc='y', ec='none')
        light = patches.Ellipse(xy=l_pos[0:2], width=0.25, height=0.25, fc='b')
        
        fig, (ax1, ax2) = plt.subplots(2, 1, gridspec_kw = {'height_ratios': [10, 1] } )
        ax1.axis("equal")
        ax1.set_xlim([-15, 15])
        ax1.set_ylim([-15, 15])
        ax1.set_title("Click On Main Display To Pause / Unpause")
        
        ax2.set_title("Click On Sensor Graph To Change Time")
        
        tracker = ax2.axvline(0, 0, 1)
        paused = [ False ]
        last_index = [ -1 ]
        t_index = [ 0 ]
        
        if sensations is not None:
            times = np.arange(0, self.dt * len(sensations), self.dt)
            # plot the sensor values
            ax2.plot(times, sensations, 'k');
            # plot the ideal (noiseless) sensor values
            ideal = np.array([self.sensor_transform(self.sensor_input(pose)) for pose in poses[:-1]])
            ax2.plot(times, ideal, 'r')
    
        def draw(index):
            if not paused[0]:
                t_index[0] = t_index[0] + (index - last_index[0])
                t_index[0] = t_index[0] % len(poses)
    
            last_index[0] = index
                
            x, y, theta = poses[t_index[0]]
            tr = Affine2D().rotate(theta).translate(x, y) + ax1.transData
            
            agent_patches = (body, line) + tuple(sensors) + tuple(motors)
            
            for patch in agent_patches:
                patch.set_transform(tr);
                
            trail.set_data( poses[:t_index[0], 0], poses[:t_index[0], 1] )
            
            time = t_index[0] * self.dt
            tracker.set_xdata([time, time])
                
            clock.set_text("Time: %.02f" % time)
                    
            return (trail, light_r, light, clock, tracker) + agent_patches
        
        def init():
            result = draw(0)
            for artist in result:
                if artist is not tracker:
                    ax1.add_artist(artist)
            return result
    
        def onclick(event):
            if event.button == 1:
                # pause if the user clicks on the main figure
                if event.inaxes is ax1:
                    paused[0] = not paused[0]
                # edit time directly if the user clicks on the graph over time
                elif event.inaxes is ax2:
                    t_index[0] = (int) (event.xdata / self.dt)            
            
        def anim(index):
            return draw(index)
        
        
        ani = FuncAnimation(fig, anim, init_func=init, frames = None, interval=1000*self.dt/speedup, blit=True, save_count=len(poses))
        
        plt.show()
    
        fig.canvas.mpl_connect('button_press_event', onclick)
            
        return ani            
                                                  


In [2]:
# This function defines the circulatiry of a set of coordinates, the more circular a path, the more aimless
# a search strategy appeasr to be, thus circularity is a good measure of purpose (or lack thereof)

def circul(x,y,graph):

    def x_y_tuple(x,y):
        list_coords_tuple = []
        for i in range(len(x)):
            list_coords_tuple.append([x[i], y[i]])
        return(list_coords_tuple)

    def perim(points,graph):
        hull = ConvexHull(points)
        if graph ==1 :
            plt.plot(points[:,0], points[:,1], 'o')
            for simplex in hull.simplices:
                plt.plot(points[simplex, 0], points[simplex, 1], 'r')
        perimeter = hull.area
        return(perimeter)
    
    def area_cal(pts):
        hull = ConvexHull(pts) 
        area = hull.volume
        return(area)
    
    pts = np.array(x_y_tuple(x,y))
    perimeter = perim(pts,graph)
    area = area_cal(pts)

    circularity = 4*math.pi*area/((perimeter**2))
    
    return(area, perimeter, circularity)


In [3]:
def BTMR(sensors, state, dt,sensations_list,reached,motor_history):
    
        if len(sensations_list) > 50 and sensations_list[-1] > 4*sum(sensations_list[:45]) and reached == 0:
            reached = 1 # After an arbitrary time period has occured and the threshold value is met we say we have arrived
                        # at location, here the threshold is 4 times the sum of the first 45 timesteps which is about 1.5 turns 
                        # this was found to be a satisfactory value though doesnt adapt to changing circumstances

        if reached == 0: # If we havent reached our target (the light) we implment our 'tumble/run' behaviour

            threshold_value = 0.01 # through trial and error this local threshold was found to be satisfactory for a range of sensor noise
                                   # though once again, it doesnt change to adapting circumstances


            if sensors[0]<threshold_value: # If our threshold value isnt met by the sensors we Tumble
                output = [(6),(-6)]

            else : 
                output = [5,5] # if it is met we 'Run'
                
            motor_history.append(output) # Keep track of our motor actions
                
        else :

            if len(motor_history)<1 : # If we have reversed all our motor steps, spiral out in the hopes we find home

                output = [3*(len(sensations_list))/(100),1]

            else: # while the motor history list isnt empty we

                output =[-motor_history[-1][0],-motor_history[-1][1]] # do teh inverse of the most recent motor command

                motor_history = motor_history[:-1] # remove this command from the list

        return (output, motor_history,reached,None)     

    
# Test it out
w = World(sensor_noise =0.3,motor_noise=0.3)

act_token = 0

poses, sensations, actions, states,light_pos,light_poses = w.simulate(BTMR,active_token=act_token)

print("Fitness on task 1: %f" % w.task1fitness(poses))
print("Fitness on task 2: %f" % w.task2fitness(poses))

plt.ion()
%matplotlib qt

if np.isinf(w.task1fitness(poses)) ==True:
    plt.plot(poses[:,0],poses[:,1],label = 'Outward Journey')
    
else:
    plt.plot(poses[:int(-w.task1fitness(poses)*10)][:,0], poses[:int(-w.task1fitness(poses)*10)][:,1],c='b',label = 'Outward Journey')
    plt.plot(poses[int(-w.task1fitness(poses)*10):][:,0], poses[int(-w.task1fitness(poses)*10):][:,1],c='r',label = 'Return Journey')

if len(light_poses)>0:
    plt.scatter((np.array(light_poses))[:,0],(np.array(light_poses))[:,1],c='y',label='decoy light')

plt.scatter(light_pos[0], light_pos[1],c='r',s=100,label = 'Final_light')

circ_num =circul(poses[:,0],poses[:,1],0)


plt.title('Bacterial tumbling and reverse motor controller, path of robot \n Circularity = '+str(round(circ_num[2],3)))
plt.legend(fontsize=10)

if act_token == 0:
    str1 = 'Bacterial tumbling and reverse motor controller with decoys, path of robot'
    
if act_token == 2:
    str1 = 'Bacterial tumbling and reverse motor controller without decoys, path of robot'
    
if np.isinf(w.task1fitness(poses)) == False and np.isinf(w.task2fitness(poses)) == False:
    str2 = 'success'
else:
    str2 = 'Failure'
plt.savefig(str1+str2)

ani = w.animate(poses, sensations)



Fitness on task 1: -inf
Fitness on task 2: -inf


In [4]:
# Here we can run the simulation many times to get a pcolor plot of successes with different levels of noise with 
# or without decoy lights (please be aware this may take some time to run ~ 40 Minutes)

def test_func(act_token):
    
    res = 20
    
    sens_vars = np.linspace(0.1,1,res)
    mot_vars = np.linspace(0.1,1,res)
    
    results_outward =np.zeros((res,res))
    results_return =np.zeros((res,res))

    k = 0
    for s in sens_vars:
        l = 0 
        for m in mot_vars:
            w = World(sensor_noise =s,motor_noise=m)
            
            avg_o = 0
            avg_r = 0
            
            for i in range(5):
                
                poses, sensations, actions, states,light_pos,light_poses = w.simulate(BTMR,active_token=act_token)
                fit_1 = w.task1fitness(poses)
                fit_2 = w.task2fitness(poses)
    
                if np.isinf(fit_1)==True:
                    fit_1 = -1000
                
                if np.isinf(fit_2)==True:
                    fit_2 = -1000
                    
                avg_r+=fit_2
                avg_o+=fit_1
                
            results_outward[k][l] = avg_o/5
            results_return[k][l] = avg_r/5
            
            l+=1
        k+=1
        
        clear_output()
        print(k*100/(res),'% Complete')

    
    fig, (ax0,ax1) = plt.subplots(1,2)

    c = ax0.pcolor(sens_vars,mot_vars,results_outward, cmap='RdBu')
    ax0.set_title('Outward journey successes')
    ax0.set_xlabel('Motor Noise')
    ax0.set_ylabel('Sensor Noise')
    ax0.axis([0.1, 1, 0.1, 1])
    fig.colorbar(c , ax=ax0)
            
    c = ax1.pcolor(sens_vars,mot_vars,results_return, cmap='RdBu')
    ax1.set_title('Return journey successes')
    ax1.set_xlabel('Motor Noise')
    ax1.set_ylabel('Sensor Noise')
    ax1.axis([0.1, 1, 0.1, 1])
    fig.colorbar(c , ax=ax1)
    
    if act_token == 0 :
        fig.suptitle('Bacterial tumbling with reverse motor command performance (w/ Decoy light)')
        fig.savefig('Bacterialtumblingwithreversemotorcommandperformance(wDecoylight).PNG')
        
    if act_token == 2 :
        fig.suptitle('Bacterial tumbling with reverse motor command performance (No decoy light)')
        fig.savefig('Bacterialtumblingwithreversemotorcommandperformance(Nodecoylight).PNG')
test_func(0)            

KeyboardInterrupt: 

In [None]:
test_func(2)