In [1]:
import opensim
import math
import numpy as np
import sys
import os
from osim.env import OsimEnv
import os.path

In [2]:
from GaitEnv import GaitEnv
from ArmEnv import ArmEnv
from BioInspiredHierarchicalLearning import BioInspiredHierarchicalLearning

In [3]:
class TwitchingEnv(ArmEnv):
    def __init__(self, visualize = False, musclesName = '', iterDuration = 4, twitchDuration = 0.05, stepsize = 0.01):
        
        print("Parameters copy...")
        self.model_path = './arm2dof6musc.osim'
        folder = ''
        self.musclesName = musclesName
        self.current_step = 0
   
        print("Files loading...")
        self.sensorsFile = open('saved_sensors.csv', 'w')
        self.actuatorsFile = open('saved_actuators.csv', 'w')

        self.musclesLength = [ 0 for i in range(len(musclesName))]

        print("Headers wirting...")

        self.writeHeaders([m + '_len' for m in self.musclesName] + [m + '_dlen' for m in self.musclesName],self.sensorsFile)
        self.writeHeaders([m + '_a' for m in self.musclesName],self.actuatorsFile)
        
        print("GaitEnv __init__ calling...")
        super(TwitchingEnv, self).__init__(visualize = visualize)
             
        
    def get_observations(self):
        # Get input / outputs from open sim
        musclesLength_new = [self.osim_model.model.getActuators().get(m).getStateVariableValues(self.osim_model.state).get(1)
                             for m in self.musclesName]
        musclesdLength = [ (x-y)/self.stepsize 
                          for x,y in zip( musclesLength_new, self.musclesLength) ]
        
        #values of the actuators
        self.actuatorsValues = [self.osim_model.model.getActuators().get(m).getStateVariableValues(self.osim_model.state).get(0) 
                           for m in self.musclesName]

        self.sensorsValues = musclesLength_new + musclesdLength 
        self.musclesLength = musclesLength_new
        
        return super(TwitchingEnv, self).get_observation()
        

    def writeHeaders(self,variable_names,file):
        file.write("{}\n".format(",".join(variable_names)))
    def writeContent(self,variables,file):
        file.write("{}\n".format(",".join([str(x) for x in variables])))


    def activate_muscles(self, action): # the argument action is not used
        for j in range(self.noutput):
            self.osim_model.model.getMuscles().get(j).setActivation(self.osim_model.state, action[j])

    
    def compute_reward(self):       
        #reward = np.norm(musclesLength - targetsLength)
        #reward = reward - self.meanReward
        #self.meanReward = self.meanReward*(N-1)/N
        
        return 0
    
    def _step(self, action):
        
        self.activate_muscles(action)
        obs = self.get_observations()
        
        # Logging
        self.writeContent(self.sensorsValues,self.sensorsFile)
        self.writeContent(self.actuatorsValues,self.actuatorsFile)

        # Integrate one step
        self.osim_model.manager.setInitialTime(self.stepsize * self.istep)
        self.osim_model.manager.setFinalTime(self.stepsize * (self.istep + 1))

        try:
            self.osim_model.manager.integrate(self.osim_model.state)
        except Exception:
            print ("Exception raised")
            return self.get_observation(), -500, True, {}

        self.istep = self.istep + 1

        res = [ obs, self.compute_reward(), self.is_done(), {} ]
        return res

In [4]:
print("Beginning")

# TODO ? minutes : Increase simulation duration time to M*T = 9*500
muscle_names = [
                "TRIlong",
                "TRIlat",
                "TRImed",
                "BIClong",
                "BICshort",
                "BRA"    
                ]

Beginning


In [19]:
def per(n):
    seq = []
    for i in range(1<<n):
        s=bin(i)[2:]
        s='0'*(n-len(s))+s
        seq.append(map(int,list(s)))
    return seq
sequence = per(6)
sequence = np.array(sequence).astype(float)

In [6]:
## Time parameters

current_step = 0
time_step = 0.01 # [sec]

# Muscuskeletal_environment
env = TwitchingEnv(visualize=False, musclesName = muscle_names, stepsize = time_step)
# Neural network model
#net = Network(...)

N_tot = 2000
# Twitching duration [nb of steps]
N_twitch = 3
# Resting durantion [nb of steps]
N_rest = N_tot-N_twitch

def get_time(N_steps, time_step):
    '''
    Args:   N_step [int] : number of steps
            time_step [float] : duration of a single step
    Return: duration [float] : duration of N steps
    '''
    return N_steps*time_step

## Environment properties
# nb of muscles
N_muscle = 6
# nb of sensors
N_sensors = 12


# SMA sequence
record = np.zeros((N_sensors,N_muscle*N_twitch+(N_muscle+1)*N_rest))
no_action = np.zeros((N_muscle,))

# Step function
def step(action):
        env.step(action) # To do : modify step
        sensors = env.sensorsValues
        #actuators = env.actuatorsValues
        #action = net.step(inputs)
        
        return sensors
    
## SMA process
print('Environment stabililzation...')

for i in range(2*N_tot):
    sensors = step(no_action)

for s in sequence:
    
    print('Twitching '+str(s))
    action = s*1.0
    
    for i in range(N_twitch):
        sensors = step(action)
    
    for i in range(N_rest):
        sensors = step(no_action)

#for n in range(N_muscle):
    
#    print('Twitching muscle '+str(n+1))
#    action = np.zeros((N_muscle,))
#    action[n] = 1.0
    
#    for i in range(N_twitch):
#        sensors = step(action)
    
#    for i in range(N_rest):
#        sensors = step(no_action)

Parameters copy...
Files loading...
Headers wirting...
GaitEnv __init__ calling...
Environment stabililzation...
Twitching [0, 0, 0, 0, 0, 0]
Twitching [0, 0, 0, 0, 0, 1]
Twitching [0, 0, 0, 0, 1, 0]
Twitching [0, 0, 0, 0, 1, 1]
Twitching [0, 0, 0, 1, 0, 0]
Twitching [0, 0, 0, 1, 0, 1]
Twitching [0, 0, 0, 1, 1, 0]
Twitching [0, 0, 0, 1, 1, 1]
Twitching [0, 0, 1, 0, 0, 0]
Twitching [0, 0, 1, 0, 0, 1]
Twitching [0, 0, 1, 0, 1, 0]
Twitching [0, 0, 1, 0, 1, 1]
Twitching [0, 0, 1, 1, 0, 0]
Twitching [0, 0, 1, 1, 0, 1]
Twitching [0, 0, 1, 1, 1, 0]
Twitching [0, 0, 1, 1, 1, 1]
Twitching [0, 1, 0, 0, 0, 0]
Twitching [0, 1, 0, 0, 0, 1]
Twitching [0, 1, 0, 0, 1, 0]
Twitching [0, 1, 0, 0, 1, 1]
Twitching [0, 1, 0, 1, 0, 0]
Twitching [0, 1, 0, 1, 0, 1]
Twitching [0, 1, 0, 1, 1, 0]
Twitching [0, 1, 0, 1, 1, 1]
Twitching [0, 1, 1, 0, 0, 0]
Twitching [0, 1, 1, 0, 0, 1]
Twitching [0, 1, 1, 0, 1, 0]
Twitching [0, 1, 1, 0, 1, 1]
Twitching [0, 1, 1, 1, 0, 0]
Twitching [0, 1, 1, 1, 0, 1]
Twitching [0, 1, 

In [8]:
import pandas as pd

sensors = pd.read_csv('saved_sensors.csv')
data = sensors.get_values()
sx,sy = data.shape
data = np.delete(data,sx-1,0).astype(float)

In [7]:
print(np.max(data[2*N_tot:,:],axis=0))
print(np.min(data[2*N_tot:,:],axis=0))

[ 0.15181873  0.08710606  0.09287816  0.15412324  0.14900839  0.09145565
  0.47008624  0.93630293  1.25178026  1.00669784  0.30690825  0.39276072]
[ 0.13322543  0.06693752  0.06185069  0.13487658  0.13629819  0.07997199
 -0.31889741 -0.293035   -0.293035   -0.9039047  -0.33125757 -0.54261208]


In [23]:
print(np.max(abs(data[2*N_tot:,:]-equ_position),axis=0))
print(np.min(abs(data[2*N_tot:,:]-equ_position),axis=0))

[ 0.02549461  0.0269353   0.03166716  0.03037798  0.03190358  0.0226281
  1.46476511  1.61340098  1.40349505  1.67646206  1.47195913  0.78534519]
[  9.91200000e-09   5.18100007e-10   8.56900009e-10   7.61499999e-09
   1.02000020e-10   1.36200010e-10   7.06420480e-09   1.81073906e-09
   1.70297109e-10   3.03120029e-09   2.14975260e-10   1.06641085e-10]


In [9]:
# equilibrium positions
equ_position = data[2*N_tot-1,:]

In [12]:
import matplotlib.pyplot as plt
fig1, ax1 = plt.subplots()
fig2, ax2 = plt.subplots()
for i in range(6):
    ax1.plot(data[2*N_tot+110000:,i]-equ_position[i] + i*0.03)
    ax2.plot(data[2*N_tot+110000:,6+i]-equ_position[6+i] + i)
fig1.savefig('sensors1.png')
fig2.savefig('sensors2.png')

In [None]:
import matplotlib.pyplot as plt

fig1, ax1 = plt.subplots()
fig2, ax2 = plt.subplots()
for i in range(6):
    ax1.plot(record[i,2:]+i*0.05)
    ax2.plot(record[6+i,2:]+i*0.5)
fig1.savefig('sensors1.png')
fig2.savefig('sensors2.png')

In [24]:
len(seq)

64

### Generation of length_0

In [None]:
### DO NOT TOUCH THIS ###
length_0 = [0.14216617288226033, 0.08269375662806208, 0.07766732577665353, 
            0.13944816053740705, 0.1410019757515655, 0.08559110518023604]


#### Twitchs Collection

In [None]:
print("Environment Generation")
env = TwitchingEnv(visualize=False, musclesName = muscle_names, stepsize = 0.01)


In [None]:
env.reset()

In [None]:
print(env.musclesName)
print(env.istep)
print(env.stepsize)

In [None]:
duration = (len(muscle_names)+env.n_iter_waiting)*(env.iterDuration) 

print("Twitching Experiment")

for i in range(int(duration//env.stepsize)):
    (obs, reward, done, nothing) = env.step([0])

In [None]:
print(env.actuatorsValues)
print(env.sensorsValues)

In [None]:
sensors

#### Correlation Matrix Computation

In [None]:
class TwitchingEnv(ArmEnv):
    def __init__(self, visualize = False, musclesName = '', iterDuration = 4, twitchDuration = 0.05, stepsize = 0.01):
        print("Parameters copy...")
        self.model_path = './arm2dof6musc.osim'
        folder = ''
        self.musclesName = musclesName
        self.iterNumber = 0
        
        self.iterDuration = iterDuration # time step

        self.twitchDuration = twitchDuration
        self.twitchStartPercentage = 0 # not used
        
        self.n_iter_waiting = 5

        self.twitchingEpisodeEnded = True
        
        print("Files loading...")
        self.sensorsFile = open('sensors_arm.csv', 'w')
        self.actuatorsFile = open('actuators_arm.csv', 'w')

        self.musclesLength = [ 0 for i in range(len(musclesName))]
        self.musclesLengthConverged = [ 0 for i in range(len(musclesName))]

        print("Headers wirting...")

        self.writeHeaders([m + '_len' for m in self.musclesName] + [m + '_dlen' for m in self.musclesName],self.sensorsFile)
        self.writeHeaders([m + '_a' for m in self.musclesName],self.actuatorsFile)
        
        print("GaitEnv __init__ calling...")
        super(TwitchingEnv, self).__init__(visualize = visualize)
        
        
        
    def get_observation(self):
        super(TwitchingEnv, self).get_observation()

        #m1_length = self.osim_model.model.getActuators().get(1).getStateVariableValues(self.osim_model.state).get(1);
        #m1_name = self.osim_model.model.getActuators().get(1).getStateVariableNames().get(1)
        
    def get_values(self):
        # Get input / outputs from open sim
        musclesLength_new = [self.osim_model.model.getActuators().get(m).getStateVariableValues(self.osim_model.state).get(1)
                             for m in self.musclesName]
        musclesdLength = [ (x-y)/self.stepsize 
                          for x,y in zip( musclesLength_new, self.musclesLength) ]
        
        #values of the actuators
        self.actuatorsValues = [self.osim_model.model.getActuators().get(m).getStateVariableValues(self.osim_model.state).get(0) 
                           for m in self.musclesName]

        #musclesLength: length, musclesdLength: speed (d for differentiation ?)
        self.sensorsValues = musclesLength_new + musclesdLength 
            
        self.musclesLength = musclesLength_new

    def writeHeaders(self,variable_names,file):
        file.write("{}\n".format(",".join(variable_names)))
    def writeContent(self,variables,file):
        file.write("{}\n".format(",".join([str(x) for x in variables])))


    def activate_muscles(self, action, twitch, muscleNum): # the argument action is not used
        muscleSet = self.osim_model.model.getMuscles()
        for j in range(self.noutput):
            muscle = muscleSet.get(j)
            if j == muscleNum and twitch == True:
                muscle.setActivation(self.osim_model.state, 0.99) # action ?
    
    def compute_reward(self):
        
        return 0
    
    def _step(self, action):
        
        self.last_action = action

        totalTime = self.istep*self.stepsize # where are initialized istep and stepsize ? But it works
        iter_time = math.fmod(totalTime,self.iterDuration) # totalTime modulo iterDuration (number of isteps?) : rest
        
        iter_episode = int(totalTime/self.iterDuration) # number of episodes
        
        self.muscleNumber = iter_episode - self.n_iter_waiting # which muscle is activated
        
        
        #no muscle is activated
        if self.muscleNumber < 0:
            self.activate_muscles(action,False,0)
        else:
            if iter_time == 0:
                print("Beginning of twitching for muscle number " + str(int(self.muscleNumber)))
            if iter_time == self.twitchDuration:
                print("End of stimulation " + str(self.muscleNumber))
            if(iter_time <= self.twitchDuration):
                self.activate_muscles(action,True,self.muscleNumber)
            else:
                self.activate_muscles(action,False,self.muscleNumber)
            
        self.get_values()
        
        # Logging
        self.writeContent(self.sensorsValues,self.sensorsFile)
        self.writeContent(self.actuatorsValues,self.actuatorsFile)

        # Integrate one step
        self.osim_model.manager.setInitialTime(self.stepsize * self.istep)
        self.osim_model.manager.setFinalTime(self.stepsize * (self.istep + 1))


        try:
            self.osim_model.manager.integrate(self.osim_model.state)
        except Exception:
            print ("Exception raised")
            return self.get_observation(), -500, True, {}

        self.istep = self.istep + 1
        #TT = self.osim_model.model.getActuators().get(1)

        res = [ self.get_observation(), self.compute_reward(), self.is_done(), {} ]
        return res

In [None]:
# Twitching process
import pandas as pd

sensors = pd.read_csv('sensors_arm.csv')
actuators = pd.read_csv('actuators_arm.csv')
print(length_0)
print(len(length_0))

In [None]:
def print_full(x):
    pd.set_option('display.max_rows', len(x))
    print(x)
    pd.reset_option('display.max_rows')
#print_full(sensors)

In [None]:
data = sensors.get_values()
sx,sy = data.shape
data = np.delete(data,[0,1,sx-1],0)

In [None]:
print(np.nonzero(data1))
print(np.nonzero(np.isnan(data)))

In [None]:
mins = np.min(data,0)
maxs = np.max(data,0)

print(mins,maxs)

In [None]:
sensors

In [None]:
actuators

In [None]:
print(sensors.shape)
print(actuators.shape)

In [None]:
# Compute the weights for only one leg
length_0_mat = np.zeros((sensors.values)[:-1,0:6].shape)

for i in range(length_0_mat.shape[0]):
    length_0_mat[i,:] = length_0[0:6]

delta_length = (sensors.values)[:-1,0:6] - length_0_mat
rate_length = (sensors.values)[:-1,6:12]
stim = actuators.values[:-1,:]

weight_1 = np.zeros((6,6))
weight_2 = np.zeros((6,6))
all_weight = np.zeros((6,6))

dt = 0.01
lr = 0.01

In [None]:
def computeActivation(stim, dt):

    past_activation = 0;
    activation = np.zeros(len(stim))

    for i in range(len(stim)):

        if(stim[i] > 0):
            tau = 15;
        else:
            tau = 50;
        
        activation_derivative = tau * ( -past_activation + stim[i] )
        activation[i] = past_activation + dt * activation_derivative
        past_activation = activation[i]
    
    return activation

In [None]:
import matplotlib.pyplot as plt

In [None]:
### Test Data plot
plt.figure()

## weird: 0 is always activated
plt.subplot(311)
plt.plot(stim[:,0])
plt.plot(stim[:,1])
plt.plot(stim[:,2])
plt.plot(stim[:,3])
plt.plot(stim[:,4])
plt.plot(stim[:,5])
plt.ylabel('stim')

plt.subplot(312)
plt.plot(delta_length[:,0])
plt.ylabel('TRILong')

plt.subplot(313)
plt.plot(delta_length[:,1])
plt.ylabel('TRILat')

plt.show()

In [None]:
print("Computation of the weights...")
for i in range(6):
    
    #% Filter the activation
    ## should we invert line and column ? No
    activation = computeActivation(stim[:,i], dt);

    #% 40 replications of a specific twitching
    for l in range(4000):
            #what is this j ?!?
        for j in range(6):
            for index, s in enumerate(np.where(activation > 0.05)[0]):
                #% Anti-Oja rule
                weight_1[i,j] = weight_1[i,j] - lr * activation[s] * (delta_length[s+1,j]+activation[s]*weight_1[i,j])
                weight_2[i,j] = weight_2[i,j] - lr * activation[s] * (rate_length[s+1,j]+activation[s]*weight_2[i,j])

print("Weights computed.")

                #why do we use blkdiag ?!
#% Weights' matrices for both legs
#weight_1 =  blkdiag(weight_1,weight_1);
#weight_2 =  blkdiag(weight_2,weight_2);

#% Saving weights' matrices
#dlmwrite('weight_1.csv',weight_1,',');
#dlmwrite('weight_2.csv',weight_2,',');

I think those weights are not revelant: activation is always greater than 0, and thus it introduces noise

In [None]:
print(weight_1)

In [None]:
print(weight_2)

In [None]:
print(np.where(activation > 0.05))

### Tentative of computation through Matrix Diagonalization

In [None]:
for i in range(6):
    
    #% Filter the activation
    ## should we invert line and column ? No
    activation = computeActivation(stim[:,i], dt);
    
    #% 40 replications of a specific twitching
    for j in range(6):
        covariant_matrix = np.dot(np.transpose(delta_length[np.where(activation > 0.05)[0],:]),
                                 delta_length[np.where(activation > 0.05)[0],:])
        #print(covariant_mat
        e_v, e_vect = np.linalg.eig(covariant_matrix)
        
        
            
            #% Anti-Oja rule
            #weight_1[i,j] = weight_1[i,j] - lr * activation[s] * (delta_length[s+1,j]+activation[s]*weight_1[i,j])
            #weight_2[i,j] = weight_2[i,j] - lr * activation[s] * (rate_length[s+1,j]+activation[s]*weight_2[i,j])

In [None]:
e_vect[i]