Script to run current injection experiments. All results are saved to pdf

In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2
    
import numpy as np
import matplotlib.pyplot as plt

import central_complex
import cx_rate
import trials
import plotter

save_figs=True



In [2]:
def update_cells_w_inject(heading, velocity, tb1, memory, cx, filtered_steps=0.0, inject=None, inject_val=1.0):
    """Generate activity for all cells, based on previous activity and current
    motion."""
    tl2 = cx.tl2_output(heading)
    if 'tl2' in inject:
        tl2[inject['tl2']] = inject_val

    cl1 = cx.cl1_output(tl2)
    if 'cl1' in inject:
        cl1[inject['cl1']] = inject_val

    tb1 = cx.tb1_output(cl1, tb1)
    if 'tb1' in inject:
        tb1[inject['tb1']] = inject_val

    # Speed
    flow = cx.get_flow(heading, velocity)
    tn1 = cx.tn1_output(flow)
    if 'tn1' in inject:
        tn1[inject['tn1']] = inject_val
    tn2 = cx.tn2_output(flow)
    if 'tn2' in inject:
        tn2[inject['tn2']] = inject_val

    # Update memory for distance just travelled
    memory = cx.cpu4_update(memory, tb1, tn1, tn2)
    if 'memory' in inject:
        memory[inject['memory']] = inject_val
    cpu4 = cx.cpu4_output(memory)
    if 'cpu4' in inject:
        cpu4[inject['cpu4']] = inject_val

    # Steer based on memory and direction
    cpu1 = cx.cpu1_output(tb1, cpu4)
    if 'cpu1' in inject:
        cpu1[inject['cpu1']] = inject_val
    motor = cx.motor_output(cpu1)

    return tl2, cl1, tb1, tn1, tn2, memory, cpu4, cpu1, motor

In [3]:
import bee_simulator
def homing_w_inject(T, tb1, memory, cx, acceleration=0.15, drag=0.15,
           current_heading=0.0, current_velocity=np.array([0.0, 0.0]),
           turn_sharpness=1.0, inject=None, start_inject=250, stop_inject=1000,
           inject_val=1.0):
    """Based on current state, return home. First is duplicate"""
    # TODO: Check some serious off by one errors here! on a small course best
    headings = np.empty(T + 1)
    headings[0] = current_heading
    velocity = np.empty([T + 1, 2])
    velocity[0, :] = current_velocity

    cx_log = trials.CXLogger(0, T + 1, cx)

    for t in range(1, T + 1):
        tl2, cl1, tb1, tn1, tn2, memory, cpu4, cpu1, motor = update_cells_w_inject(
            heading=headings[t - 1],  # Remove sign to use proportionate shift
            velocity=velocity[t - 1, :],
            tb1=tb1,
            memory=memory,
            cx=cx,
            inject=inject if t>start_inject and t<stop_inject else {},
            inject_val=inject_val)

        
        cx_log.update_log(t, tl2, cl1, tb1, tn1, tn2, memory, cpu4, cpu1, motor)

        rotation = turn_sharpness * motor

        headings[t], velocity[t, :] = bee_simulator.get_next_state(
            headings[t - 1], velocity[t - 1, :], rotation, acceleration, drag)
    return headings, velocity, cx_log

In [4]:
T_outbound = 500
T_inbound = 1500

# TEST CODE
memories = []
tb1_snapshots = []

cx = cx_rate.CXRatePontin()

start_inject_out = 50
stop_inject_out = 250
start_inject_in = 550
stop_inject_in = 750
v_out = np.ones([T_outbound, 2]) * -0.5
v_out[:,0] = 0
h_out = np.ones(T_outbound) * np.pi
inject_val = 1.0

labels = ['TB1:1', 'TB1:2', 'TB1:3', 'TB1:4', 'TB1:5', 'TB1:6', 'TB1:7', 'TB1:8']

experiments = [{'name':'tb1_injection_out', 'start':start_inject_out, 'stop':stop_inject_out,
                'injects':[{'tb1':[0]}, {'tb1':[1]}, {'tb1':[2]}, {'tb1':[3]},
                           {'tb1':[4]}, {'tb1':[5]}, {'tb1':[6]}, {'tb1':[7]}],
                'labels':['TB1:1', 'TB1:2', 'TB1:3', 'TB1:4', 'TB1:5', 'TB1:6', 'TB1:7', 'TB1:8']},
               {'name':'tb1_injection_in', 'start':start_inject_in, 'stop':stop_inject_in,
                'injects':[{'tb1':[0]}, {'tb1':[1]}, {'tb1':[2]}, {'tb1':[3]},
                           {'tb1':[4]}, {'tb1':[5]}, {'tb1':[6]}, {'tb1':[7]}],
                'labels':['TB1:1', 'TB1:2', 'TB1:3', 'TB1:4', 'TB1:5', 'TB1:6', 'TB1:7', 'TB1:8']},
               {'name':'tn2_injection_out', 'start':start_inject_out, 'stop':stop_inject_out,
                'injects':[{'tn2':[0]}, {'tn2':[1]}],
                'labels':['TN2:1', 'TN2:2']},
               {'name':'tn2_injection_in', 'start':start_inject_in, 'stop':stop_inject_in,
                'injects':[{'tn2':[0]}, {'tn2':[1]}],
                'labels':['TN2:1', 'TN2:2']},
               {'name':'cpu1_injection_out', 'start':start_inject_out, 'stop':stop_inject_out,
                'injects':[{'cpu1':[0]}, {'cpu1':[0, 1]}, {'cpu1':[0, 1, 2]}, {'cpu1':[0, 1, 2, 3]}],
                'labels':['CPU1:1', 'CPU1:1,2', 'CPU1:1-3', 'CPU1:1-4']},
               {'name':'cpu1_injection_in', 'start':start_inject_in, 'stop':stop_inject_in,
                'injects':[{'cpu1':[0]}, {'cpu1':[0, 1]}, {'cpu1':[0, 1, 2]}, {'cpu1':[0, 1, 2, 3]}],
                'labels':['CPU1:1', 'CPU1:1,2', 'CPU1:1-3', 'CPU1:1-4']}]

line_colors = ['purple', 'orange', 'gold', 'olivedrab', 'lightgreen', 'green', 'turquoise', 'blue', 'darkblue']

for experiment in experiments:
    fig, ax = plt.subplots()
    injects = experiment['injects']
    start_inject = experiment['start']
    stop_inject = experiment['stop']
    labels = experiment['labels']
    handles = []
    
    for i, inject in enumerate(injects):
        #Initialize memory stuff
        tb1 = np.zeros(central_complex.N_TB1)
        memory = 0.5 * np.ones(central_complex.N_CPU4)
        log_out = trials.CXLogger(T_outbound, 0, cx)

        for t in range(T_outbound):
            tl2, cl1, tb1, tn1, tn2, memory, cpu4, cpu1, motor = update_cells_w_inject(
                heading=h_out[t], velocity=v_out[t], tb1=tb1, memory=memory,
                cx=cx, inject=inject if t>start_inject and t<stop_inject else {},
                inject_val=inject_val)
            log_out.update_log(t, tl2, cl1, tb1, tn1, tn2, memory, cpu4, cpu1, motor)

        tb1 = log_out.tb1[:, -1]
        memory = log_out.memory[:, -1]
        cpu4_snapshot = memory.copy()
        memories.append(cpu4_snapshot)

        h_in, v_in, log_in = homing_w_inject(T=T_inbound, tb1=tb1, memory=cpu4_snapshot, cx=cx, acceleration=0.1,
                                             current_heading=h_out[-1], current_velocity=v_out[-1], drag=0.15,
                                             inject=inject, start_inject=start_inject-T_outbound,
                                             stop_inject=stop_inject-T_outbound, inject_val=inject_val)
        h = np.hstack([h_out, h_in])
        v = np.vstack([v_out, v_in])

        log = log_out + log_in

        # Zoomed for spiral observation
        ax.set_aspect('equal')
        ax.set_xlim([-200, 200])
        ax.set_aspect('equal')
        ax.set_xticks([-200, -100, 0, 100, 200])
        
        xy = np.vstack([np.array([0.0, 0.0]), np.cumsum(v, axis=0)])
        x, y = xy[:, 0], xy[:, 1]
        
        # Plot line out
        if i==0:
            line_out, = ax.plot(x[0:T_outbound+1], y[0:T_outbound+1], lw=0.5,
                            color='purple', label='Outbound')
            handles.append(line_out)
        
        # Plot line in
        line_in, = ax.plot(x[T_outbound:], y[T_outbound:], lw=0.5,
                            color=line_colors[i+1], label=labels[i])
        handles.append(line_in)


        # Plot injection
        ax.plot(x[start_inject:stop_inject], y[start_inject:stop_inject], color='cyan')
        
        if experiment['name'] == 'tb1_injection_in':
            tb1_snapshots.append(log.tb1[:, start_inject_in+50])
        
    
        if experiment['name'] == 'tn2_injection_out' or experiment['name'] == 'tn2_injection_in':
            fig2, ax2 = plotter.plot_traces(
                    log, include=['TN1', 'TN2', 'TB1', 'CPU4', 'CPU1', 'motor'])
            plotter.save_plot(fig2, 'current_injection_{0}{1}'.format(experiment['name'], i))
                
    l = ax.legend(handles=handles,
                  loc='best',
                  fontsize=10,
                  handlelength=0,
                  handletextpad=0)
    for handle in l.legendHandles:
        handle.set_visible(False)
    for i, text in enumerate(l.get_texts()):
        text.set_color(line_colors[i])
    l.draw_frame(False)
    
    if save_figs:
        plotter.save_plot(fig, 'current_injection_{0}'.format(experiment['name']))
        
    if experiment['name'] == 'cpu1_injection_in':
        ax.set_xlim([-50, 50])
        ax.set_ylim([-300, -200])
        plotter.save_plot(fig, 'current_injection_cpu1_injection_in_zoomed')
        
plt.close('all')