# **G2P Algorithm for Roboy NTC**
Please check https://miro.com/app/board/uXjVP2DvcmU=/ to get an overview

---



In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
#Imports
import tensorflow as tf
from tensorflow.keras import regularizers
import numpy as np
import pandas as pd
import sklearn
from scipy import signal
from matplotlib import pyplot as plt
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import MinMaxScaler

In [10]:
#prepare data for training
df = pd.read_csv('/content/drive/MyDrive/Roboy/bench_data_2.csv')

#simple dataset: similar to Ali's approach
kinematics = df[['angle', 'angular_vel', 'angular_acc']].values

#extended dataset with more features for kinematics data #TODO: Eliminate unessesary features 
kinematics_extended = df[[
    'timestamp',
    'angle',
    'kill_switch_pressed',
    'flex_pv_pos_encoder',
    'flex_pv_torque_encoder',
    'flex_pv_current',
    'flex_sp_pwm',
    'flex_active',
    'extend_pv_pos_encoder',
    'extend_pv_torque_encoder',
    'extend_pv_current',
    'extend_sp_pwm',
    'extend_active',
    'angular_acc',
    'angular_vel'
]].values

activations = df[['flex_sp_pwm','extend_sp_pwm']].values

# **Inverse Kinematics (Solving for non-linear regression problem)**

In [None]:
#Training Data
x = kinematics
y = activations

#Scale Data for training
scalerIn = MinMaxScaler()
scalerOut = MinMaxScaler()
x_scaled = scalerIn.fit_transform(x)
y_scaled = scalerOut.fit_transform(y)

#Split unscaled data 
x_train, x_valid, y_train, y_valid = sklearn.model_selection.train_test_split(x, y, test_size=0.2)

#Split scaled data
x_train_scaled, x_valid_scaled, y_train_scaled, y_valid_scaled = sklearn.model_selection.train_test_split(x_scaled, y_scaled, test_size=0.2)

In [None]:
#Approach 0: MLP to solve regression problem (Ali's implementation without significant changes)
def inverse_mapping_fcn0():

  model = tf.keras.Sequential()
  # Adds a densely-connected layer with 15 units to the model:
  model.add(tf.keras.layers.Dense(15, activation='relu'))
  # Add a softmax layer with 3 output units:
  model.add(tf.keras.layers.Dense(2, activation='sigmoid'))
  model.compile(optimizer=tf.keras.optimizers.SGD(),
              loss='mse',       # mean squared error
              metrics=['mse'])  # mean squared error
  #training the model
  history = \
  model.fit(
  x_train_scaled,
  y_train_scaled,
  epochs=20,
  validation_data=(x_valid_scaled, y_valid_scaled))

  return (model, history)
  

def inverse_mapping_fcn0_1():
  model = MLPRegressor(
      hidden_layer_sizes=15,
      activation="logistic",
      verbose=True,
      warm_start=True,
      early_stopping=True)

  history = model.fit(x_train_scaled, y_train_scaled)
 
  return (model, history)

In [None]:
print("################################## Model 0 (Ali's Model) ##################################")

[model0, history0] = inverse_mapping_fcn0()

# Get the validation loss from the model's history
val_loss = history0.history['val_loss']

# Create a figure and axes
fig, ax = plt.subplots()

# Plot the validation loss
ax.plot(val_loss, label='Validation Loss')

# Set the x-axis label
ax.set_xlabel('Epoch')

# Set the y-axis label
ax.set_ylabel('Loss')

# Add a title
ax.set_title('Loss Model 0')

# Add a legend
ax.legend()

# Show the plot
plt.show()

In [None]:
#Approach 1: DNN to solve regression problem
def inverse_mapping_fcn1():

  #Define Model and apply dropout and L2 regularisation to prevent overfitting
  #Use non linear activation (tanh) for non linear regression
  model = tf.keras.Sequential()

  model.add(tf.keras.layers.Dense(32, input_shape=(3,), activation='relu', kernel_regularizer = regularizers.l2(0.01)))
  model.add(tf.keras.layers.Dropout(0.2))
  model.add(tf.keras.layers.Dense(64, activation='relu', kernel_regularizer = regularizers.l2(0.01)))
  model.add(tf.keras.layers.Dropout(0.2))
  model.add(tf.keras.layers.Dense(4, activation='relu', kernel_regularizer = regularizers.l2(0.01)))
  model.add(tf.keras.layers.Dense(units=2, activation='tanh'))

  #Compile the model
  model.compile(loss='mean_squared_error', optimizer= tf.optimizers.Adam(0.001), metrics=['mse'])

  # Define the early stopping callback
  early_stopping = tf.keras.callbacks.EarlyStopping(monitor='mse', patience=10)

  #Train the model for 100 epochs and allow early stopping
  history = model.fit(x_train_scaled, y_train_scaled, epochs=100, validation_data=(x_valid_scaled, y_valid_scaled), callbacks=[early_stopping], batch_size=256)
	
  return (model, history)

In [None]:
#call function
print("################################## Model 1 ##################################")
[model1, history1] = inverse_mapping_fcn1()

# Get the training and validation loss from the model's history
val_loss = history1.history['val_loss']

# Create a figure and axes
fig, ax1 = plt.subplots()

# Plot the training and validation loss
ax1.plot(val_loss, label='Validation Loss')

# Set the x-axis label
ax1.set_xlabel('Epoch')

# Set the y-axis label
ax1.set_ylabel('Loss')

# Add a title
ax1.set_title('Loss Model 1')

# Add a legend
ax1.legend()

# Show the plot
plt.show()

In [None]:
#Approach 2: Simple MLP to solve regression problem
def inverse_mapping_fcn2():

  model = MLPRegressor(
			hidden_layer_sizes = (50,2),
      max_iter = 300,
      solver = "adam",
			activation = "logistic",
			verbose = True,
			warm_start = True,
			early_stopping = True)
  
  history = model.fit(x_train_scaled,y_train_scaled) #splits data internally

  return (model, history)

In [None]:
#call function
print("################################## Model 2 ##################################")
[model2, history2] = inverse_mapping_fcn2()

# Create figure and subplots
fig, ax2 = plt.subplots()

# Plot loss
ax2.plot(history2.loss_curve_, label='Validation Loss')
ax2.set_title('Loss Model 2')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Loss')
ax2.legend()

# Show the plot
plt.show()

In [None]:
#Approach 3: Deep MLP to solve regression problem
def inverse_mapping_fcn3():

  model = MLPRegressor(
			hidden_layer_sizes = (60,3),
      max_iter = 300,
      solver = "adam",
			activation = "logistic",
			verbose = True,
			warm_start = True,
			early_stopping = True)
  
  history = model.fit(x_train_scaled, y_train_scaled)

  return (model, history)

In [None]:
#call function
print("################################## Model 3 ##################################")
[model3, history3] = inverse_mapping_fcn3()

# Create figure and subplots
fig, ax3 = plt.subplots()

# Plot loss
ax3.plot(history3.loss_curve_, label='Validation Loss')
ax3.set_title('Loss Model 3')
ax3.set_xlabel('Epoch')
ax3.set_ylabel('Loss')
ax3.legend()

# Show the plot
plt.show()

In [None]:
#Approach 4: RNN (LSTM) to solve regression problem

def inverse_mapping_fcn4():

  x_train_lstm = x_train_scaled.reshape(x_train_scaled.shape[0], 1, x_train_scaled.shape[1])
  x_valid_lstm = x_valid_scaled.reshape(x_valid_scaled.shape[0], 1, x_valid_scaled.shape[1])

  # Define the model architecture
  model = tf.keras.Sequential()
  model.add(tf.keras.layers.LSTM(32, input_shape=(1,3), return_sequences=True))
  model.add(tf.keras.layers.LSTM(4, return_sequences=False))
  model.add(tf.keras.layers.Dense(2, activation='tanh'))

  # Define the early stopping callback
  early_stopping = tf.keras.callbacks.EarlyStopping(monitor='mse', patience=10)

  #compile model
  model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mean_squared_error', metrics=['mse'])

  #Train the model for 100 epochs and allow early stopping
  history = model.fit(x_train_lstm, y_train_scaled, epochs=100, validation_data=(x_valid_lstm, y_valid_scaled), callbacks=[early_stopping], batch_size=256)
	
  return (model, history)

In [None]:
#call function
print("################################## Model 4 ##################################")
[model4, history4] = inverse_mapping_fcn4()

# Create figure and subplots
fig, ax4 = plt.subplots()

# Plot loss
ax4.plot(history4.history['val_loss'], label='Validation Loss')
ax4.set_title('Loss')
ax4.set_title('Loss Model 4')
ax4.set_xlabel('Epoch')
ax4.set_ylabel('Loss')
ax4.legend()

# Show the plot
plt.show()

In [None]:
# Approach 5: RNN to learn correlation between samples. This completly new approach is supposed to learn how to connect multiple samples simultaniously.

def inverse_mapping_fcn5(seq_len):

  # Get the number of sequences
  num_sequences_train = x_train_scaled.shape[0] // seq_len
  num_sequences_valid = x_valid_scaled.shape[0] // seq_len

  # Reshape x_train_scaled
  x_train_lstm = x_train_scaled[:num_sequences_train * seq_len, :]
  x_train_lstm = x_train_lstm.reshape(-1, seq_len, 3)

  # Reshape y_train_scaled
  y_train_lstm = y_train_scaled[:num_sequences_train * seq_len, :]
  y_train_lstm = y_train_lstm.reshape(-1, seq_len, 2)

  # Reshape x_valid_scaled
  x_valid_lstm = x_valid_scaled[:num_sequences_valid * seq_len, :]
  x_valid_lstm = x_valid_lstm.reshape(-1, seq_len, 3)

  # Reshape x_valid_scaled
  y_valid_lstm = y_valid_scaled[:num_sequences_valid * seq_len, :]
  y_valid_lstm = y_valid_lstm.reshape(-1, seq_len, 2)

  print("Shape of x_train_lstm: ", x_train_lstm.shape)
  print("Shape of y_train_lstm: ", y_train_lstm.shape)
  print("Shape of x_valid_lstm: ", x_valid_lstm.shape)
  print("Shape of y_valid_lstm: ", y_valid_lstm.shape)

  # Define the model architecture
  model = tf.keras.Sequential()
  model.add(tf.keras.layers.LSTM(64, input_shape=(seq_len, x_train_scaled.shape[1]), return_sequences=True))
  model.add(tf.keras.layers.LSTM(32, return_sequences=True))
  model.add(tf.keras.layers.LSTM(8, return_sequences=True))
  model.add(tf.keras.layers.Dense(2, activation='tanh'))

  # Define the early stopping callback
  early_stopping = tf.keras.callbacks.EarlyStopping(monitor='mse', patience=10)

  #compile model
  model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mean_squared_error', metrics=['mse'])

  #Train the model for 100 epochs and allow early stopping
  history = model.fit(x_train_lstm, y_train_lstm, epochs=100, validation_data=(x_valid_lstm, y_valid_lstm), callbacks=[early_stopping], batch_size=256)
	
  return (model, history)

In [None]:
#call function
print("################################## Model 5 ##################################")
[model5, history5] = inverse_mapping_fcn5(10)

# Create figure and subplots
fig, ax5 = plt.subplots()

# Plot loss
ax5.plot(history5.history['val_loss'], label='Validation Loss')
ax5.set_title('Loss')
ax5.set_title('Loss Model 5')
ax5.set_xlabel('Epoch')
ax5.set_ylabel('Loss')
ax5.legend()

# Show the plot
plt.show()

In [None]:
# Comparing predictions for specific example (line 42) to show how to use ANN to get Motor Activation Values
input = np.expand_dims(kinematics[42], axis=0)
expected_output = activations[42]

#print(df.loc[13])
#print("Input: ", input)
#print("Expected: ", expected_output)

#model0
#predicted_output_0 = scalerOut.inverse_transform(model0.predict(input))
#print("Prediction 0: ", predicted_output_0)

#model1
#predicted_output_1 = scalerOut.inverse_transform(model1.predict(input))
#print("Prediction 1: ", predicted_output_1)

#model2
#predicted_output_2 = scalerOut.inverse_transform(model2.predict(input))
#print("Prediction 2: ", predicted_output_2)

#model3
#prediction3 = scalerOut.inverse_transform(model3.predict(input))
#print("Prediction 3: ", prediction3)

#model4
#input_lstm = input.reshape(input.shape[0], 1, input.shape[1])
#prediction4 = scalerOut.inverse_transform(model4.predict(input_lstm))
#print("Prediction 4: ", prediction4)

#model5
input_lstm = input.reshape(input.shape[0], 1, input.shape[1])
prediction5 = scalerOut.inverse_transform(model5.predict(input_lstm))
print("Prediction 5: ", prediction5)

# **Motor Babbling**

In [None]:
### Motor Babbling for NTC Test Bench ###

import sys
sys.path.append("../catkin_ws/devel/lib/python3/dist-packages")

import rospy
from bench.msg import BenchState, BenchMotorControl, BenchRecorderControl
import csv
import time
import random
import numpy as np

def babbling_fcn(babbling_seconds=300, kinematics_activations_show=True):

	#local variables
	timestep=0.01
	run_samples=int(np.round(babbling_seconds/timestep))
	pass_chance = timestep
	skip_rows = 200

	#TODO: Normalizing current values between 0,1. Might have to change that so that data is normalized in ANN implementation
	max_in = 1
	min_in = 0

	#Generating random activations in range (min_in, max_in)
	motor1_act = systemID_input_gen_fcn(babbling_seconds, pass_chance, max_in, min_in, timestep)
	motor2_act = systemID_input_gen_fcn(babbling_seconds, pass_chance, max_in, min_in, timestep)
 	
	babbling_activations = np.transpose(np.concatenate([[motor1_act],[motor2_act]], axis=0))
 
	#Save resulting kinematics data
	if kinematics_activations_show:
		kinematics_activations_show_fcn(activations=babbling_activations)
	[babbling_kinematics, babbling_activations, chassis_pos] = run_activations_fcn(babbling_activations, timestep)
 
	#return kinematics,activations map and skip skip_rows rows (setup phase)
	return babbling_kinematics[skip_rows:,:], babbling_activations[skip_rows:,:]

def systemID_input_gen_fcn(signal_duration_in_seconds, pass_chance, max_in, min_in, timestep):

	number_of_samples = int(np.round(signal_duration_in_seconds/timestep))
	samples = np.linspace(0, signal_duration_in_seconds, number_of_samples)
 
	gen_input = np.zeros(number_of_samples,) * min_in

	#generating number_of_samples random activations
	for i in range(1, number_of_samples):
		pass_rand = np.random.uniform(0,1,1)
	
		if pass_rand < pass_chance:
			gen_input[i] = ((max_in-min_in)*np.random.uniform(0,1,1)) + min_in
		else:
			gen_input[i] = gen_input[i-1]

	return gen_input


def kinematics_activations_show_fcn(vs_time=False, timestep=0.01, **kwargs):

	#plotting the resulting kinematics or activations
	sample_no_kinematics=0
	sample_no_activations=0

	#setting up variables
	if ("kinematics" in kwargs):
		kinematics = kwargs["kinematics"]
		sample_no_kinematics = kinematics.shape[0]

	if ("activations" in kwargs):
		activations = kwargs["activations"]
		sample_no_activations = activations.shape[0]

	if not (("kinematics" in kwargs) or ("activations" in kwargs)):
		raise NameError('Either kinematics or activations needs to be provided')
	
	if (sample_no_kinematics!=0) & (sample_no_activations!=0) & (sample_no_kinematics!=sample_no_activations):
		raise ValueError('Number of samples for both kinematics and activation matrices should be equal and not zero')
	
	else:
		number_of_samples = np.max([sample_no_kinematics, sample_no_activations])
		if vs_time:
			x = np.linspace(0,timestep*number_of_samples,number_of_samples)
		else:
			x = range(number_of_samples)
	 
	#plotting kinematics: angle degree, angular velocity, angular acceleration
	if ("kinematics" in kwargs):
		plt.figure()
	
		plt.subplot(3, 1, 1)
		plt.plot(x, kinematics[:,0])
		plt.ylabel('q0 (rads)')
	
		plt.subplot(3, 1, 2)
		plt.plot(x, kinematics[:,1])
		plt.ylabel('q0 dot (rads/s)')
	
		plt.subplot(3, 1, 3)
		plt.plot(x, kinematics[:,2])
		plt.ylabel('q0 double dot (rads/s^2)')
	
		plt.xlabel('motor 1 activation values')
	
	#plotting activation values
	if ("activations" in kwargs):
		plt.figure()
	
		plt.subplot(2, 1, 1)
		plt.plot(x, activations[:,0])
		plt.ylabel('motor 1 activation values')
	
		plt.subplot(2, 1, 2)
		plt.plot(x, activations[:,1])
		plt.ylabel('motor 2 activation values')

	plt.show(block=True)
 
def run_activations_fcn(activations, timestep):
#todo

# **Reinforcement Learning to refine Inverse Kineatics**

In [None]:
#Reinforement Learning for particular Learning to refine inverse mapping
#model: ANN trained during Motor Babbling
#cum_kinematics: kinematics as part of the cummulative training_set collected during Motor Babbling and RL -> Refine ANN
#cum_activations: activations as part of the cummulative training_set collected during Motor Babbling and RL -> Refine ANN
#penalty_thresh: predefined penalty_threshold to decide which Policy should be choosen
#energy_cost_weight: predefined weight for calculating activation values during execution in experiment
#refinement: True when ANN should be refined


 def learn_to_move_2_fcn(model, cum_kinematics, cum_activations, penalty_thresh=42, energy_cost_weight=0, refinement = False): #need to set default for penalty_thresh
	
	#initializing local variables
	exploration_limit = 100
	exploitation_limit = 15
	prev_penalty = np.array([0])
	lowest_penalty_so_far = np.array([0])
	best_model= model
	all_penaltys = []
	exploration_run_no = 0
	exploitation_run_no = 0
	new_features = gen_features_fcn(penalty_thresh=penalty_thresh, lowest_penalty_so_far=lowest_penalty_so_far, feat_vec_length=10) #might have to change feat_vec_length
	best_features_so_far = new_features
	exploration_limit_reached = False

	#Check necessaty condition for both exploration phase and exploitation pahse
	while exploitation_run_no <= exploitation_limit and not exploration_limit_reached:

		#count iterations for both exploration and exploitation phase
		if lowest_penalty_so_far < penalty_thresh:
			exploration_run_no += 1
		else:
			exploitation_run_no += 1

		#generate new feature vector F_k. Depending on lowest_penalty_so_far the function gen_features_fcn will choose the correct Policy
		new_features = gen_features_fcn(penalty_thresh=penalty_thresh, lowest_penalty_so_far=lowest_penalty_so_far, prev_penalty=prev_penalty, best_features_so_far=best_features_so_far)

		#Execute generated kinematics to interact with environment and receive estimations and real data as well as the penalty
		[prev_penalty, est_attempt_kinematics, est_attempt_activations, real_attempt_kinematics, real_attempt_activations] = feat_to_run_attempt_fcn(features=new_features, model=model, energy_cost_weight=energy_cost_weight, feat_show=False)
		
		#concatinate data to train model and refine
		[cum_kinematics, cum_activations] = concatinate_data_fcn(cum_kinematics, cum_activations, real_attempt_kinematics, real_attempt_activations, throw_percentage = 0.20)
	
		#Append new penalty
		all_penaltys = np.append(all_penaltys, prev_penalty)

		if prev_penalty > lowest_penalty_so_far:
			lowest_penalty_so_far = prev_penalty
			best_features_so_far = new_features
			best_model = copy_model_fcn(model)

		if refinement:
			model = inverse_mapping_fcn1(cum_kinematics, cum_activations, prior_model=model)

		print("best penalty so far: ", lowest_penalty_so_far)
		if exploration_run_no == exploration_limit and lowest_penalty_so_far < penalty_thresh:
			exploration_limit_reached = True

	#update after run
	[prev_penalty_best, attempt_kinematics_best, est_attempt_activations_best, real_attempt_kinematics_best, real_attempt_activations_best]= \
	feat_to_run_attempt_fcn(features = best_features_so_far, model=best_model, energy_cost_weight=energy_cost_weight, feat_show=False)

	print("all_penalty: ", all_penaltys)
	print("prev_penalty_best: ", prev_penalty_best)

	return lowest_penalty_so_far, all_penaltys, best_features_so_far, real_attempt_activations, exploration_run_no

In [None]:
#Policy1 + Policy2
def gen_features_fcn(penalty_thresh, lowest_penalty_so_far, **kwargs):

	#local variables to generate new vectors
	feat_min = 0.4
	feat_max = 0.9

	if ("best_features_so_far" in kwargs):
		best_features_so_far = kwargs["best_features_so_far"]
	elif ("feat_vec_length" in kwargs):
		best_features_so_far = np.random.uniform(feat_min,feat_max,kwargs["feat_vec_length"])
	else:
		raise NameError('Either best_features_so_far or feat_vec_length needs to be provided')
	
	if lowest_penalty_so_far < penalty_thresh:
		#Policy 1: Generate random kinematics data
		new_features = np.random.uniform(feat_min, feat_max, best_features_so_far.shape[0])
	else:
		#Policy 2: Use Multivariate Gaussian Distribution Based Stochastic Search
		sigma= np.max([(2*penalty_thresh - lowest_penalty_so_far)/100, 0.01])# should be inversly proportional to penalty
		new_features = np.zeros(best_features_so_far.shape[0],)
	
		for i in range(0,len(new_features)):
			new_features[i] = np.random.normal(best_features_so_far[i], sigma)
	 
	 	#Building new F_k using the new_features from above
		new_features = np.maximum(new_features, feat_min * np.ones(best_features_so_far.shape[0],))
		new_features = np.minimum(new_features, feat_max * np.ones(best_features_so_far.shape[0],))
	
	return new_features

In [None]:
#Execute kinematics to interact with environment
def feat_to_run_attempt_fcn(features, model,energy_cost_weight=0, feat_show=False):
	[q0_filtered, q1_filtered] = feat_to_positions_fcn(features, show=feat_show)
 
	step_kinematics = positions_to_kinematics_fcn(q0_filtered, q1_filtered, timestep = 0.01)
 
	attempt_kinematics = step_to_attempt_kinematics_fcn(step_kinematics=step_kinematics)
 
	est_attempt_activations = estimate_activations_fcn(model=model, desired_kinematics=attempt_kinematics)
 
 	#not implemented yet. Returns Hardcoded values
	[real_attempt_kinematics, real_attempt_activations, chassis_pos]=run_activations_fcn(MuJoCo_model_name=MuJoCo_model_name, est_activations=est_attempt_activations, Mj_render=Mj_render)

	real_attempt_energy = np.square(real_attempt_activations).sum(0).sum(0)
 
	prev_penalty = chassis_pos[-1] - energy_cost_weight*real_attempt_energy

	return prev_penalty, attempt_kinematics, est_attempt_activations, real_attempt_kinematics, real_attempt_activations

In [None]:
def step_to_attempt_kinematics_fcn(step_kinematics, number_of_steps_in_an_attempt = 10):
	attempt_kinematics=np.matlib.repmat(step_kinematics,number_of_steps_in_an_attempt,1)
	return(attempt_kinematics)

In [None]:
#Merges new data to the current dataset after throwing out the first few samples (defined by throw_percentage)
def concatinate_data_fcn( cum_kinematics, cum_activations, kinematics, activations, throw_percentage = 0.20):
	size_of_incoming_data = kinematics.shape[0]
	samples_to_throw = int(np.round(throw_percentage * size_of_incoming_data))
 
	#concatenate after skipping the firs #samples_to_throw rows
	cum_kinematics = np.concatenate([cum_kinematics, kinematics[samples_to_throw:,:]])
	cum_activations = np.concatenate([cum_activations, activations[samples_to_throw:,:]])
 
	return cum_kinematics, cum_activations

In [None]:
def copy_model_fcn(original_model):
	config=original_model.get_config()
	new_model=tf.keras.Sequential.from_config(config)
	new_model.set_weights(original_model.get_weights())
	new_model.compile(optimizer=tf.train.AdamOptimizer(0.01),loss='mse',metrics=['mse'])  # mean squared error
	return new_model

In [None]:
#calculate 
def feat_to_positions_fcn(features, timestep=0.01, cycle_duration_in_seconds = 1.3, show=False):
	number_of_features = features.shape[0]
	each_feature_length =  int(np.round((cycle_duration_in_seconds/number_of_features)/timestep))
	feat_angles = np.linspace(0, 2*np.pi*(number_of_features/(number_of_features+1)), number_of_features)
	q0_raw = features*np.sin(feat_angles)
	q1_raw = features*np.cos(feat_angles)
	q0_scaled = (q0_raw*np.pi/3)
	q1_scaled = -1*((-1*q1_raw+1)/2)*(np.pi/2) # since the mujoco model goes from 0 to -pi/2
	q0_scaled_extended = np.append(q0_scaled, q0_scaled[0])
	q1_scaled_extended = np.append(q1_scaled, q1_scaled[0])

	q0_scaled_extended_long = np.array([])
	q1_scaled_extended_long = np.array([])
	for ii in range(features.shape[0]):
		q0_scaled_extended_long = np.append(
			q0_scaled_extended_long, np.linspace(
				q0_scaled_extended[ii], q0_scaled_extended[ii+1], each_feature_length))
		q1_scaled_extended_long = np.append(
			q1_scaled_extended_long, np.linspace(
				q1_scaled_extended[ii], q1_scaled_extended[ii+1], each_feature_length))
	q0_scaled_extended_long_3 = np.concatenate(
		[q0_scaled_extended_long[:-1], q0_scaled_extended_long[:-1], q0_scaled_extended_long])
	q1_scaled_extended_long_3 = np.concatenate(
		[q1_scaled_extended_long[:-1], q1_scaled_extended_long[:-1], q1_scaled_extended_long])

	fir_filter_length = int(np.round(each_feature_length/(1)))
	b=np.ones(fir_filter_length,)/fir_filter_length # a simple moving average filter > users can 
	#change these if they need smoother pattern
	a=1
	q0_filtered_3 = signal.filtfilt(b, a, q0_scaled_extended_long_3)
	q1_filtered_3 = signal.filtfilt(b, a, q1_scaled_extended_long_3)

	q0_filtered = q0_filtered_3[q0_scaled_extended_long.shape[0]:2*q0_scaled_extended_long.shape[0]-1] # length = 1999 (the 
	#very last was ommited since it is going to be the first one on the next cycle)
	q1_filtered = q1_filtered_3[q1_scaled_extended_long.shape[0]:2*q1_scaled_extended_long.shape[0]-1]
	if show:
		plt.figure()
		plt.scatter(q0_scaled, q1_scaled)
		plt.plot(q0_filtered, q1_filtered)
		plt.xlabel("q0")
		plt.ylabel("q1")
		plt.show(block=True)
	return q0_filtered, q1_filtered

In [None]:
def positions_to_kinematics_fcn(q0, q1, timestep = 0.01):
	kinematics=np.transpose(
	np.concatenate(
		(
			[[q0],
			[np.gradient(q0)/timestep],
			[np.gradient(np.gradient(q0)/timestep)/timestep],
			[q1],
			[np.gradient(q1)/timestep],
			[np.gradient(np.gradient(q1)/timestep)/timestep]]),
		axis=0
		)
	)
	return kinematics

In [None]:
def estimate_activations_fcn(model, desired_kinematics):

	print("running the model")
 
	est_activations = model.predict(desired_kinematics)

	return est_activations

In [None]:
# results from Hardware/Simulation. Returns hardcoded values at the moment
def run_activations_fcn(est_activations, timestep=0.01):
	real_attempt_kinematics = [1543, 119.78934140629462, 597.8981090205831]
	real_attempt_activations = [2.7492387, 0.87028396]
	chassis_pos = [0,0]
	return real_attempt_kinematics, real_attempt_activations, chassis_pos

# **Main**

In [None]:
# next is to add accel and see the difference
# add stiffness too
import tensorflow as tf
import numpy as np
from matplotlib import pyplot as plt
import pickle
from warnings import simplefilter

simplefilter(action='ignore', category=FutureWarning)

experiment_ID = "rl_transfer_learning_1"

mc_run_number = 1
babbling_time = 3
number_of_refinements = 0
errors_all_A_A = np.zeros([2, number_of_refinements+1, mc_run_number])
errors_all_A_B = np.zeros([2, number_of_refinements+1, mc_run_number])
errors_all_B_B = np.zeros([2, number_of_refinements+1, mc_run_number])

stiffness_version_A = 8
MuJoCo_model_name_A="nmi_leg_w_chassis_air_v{}.xml".format(stiffness_version_A)
MuJoCo_model_name_A_walk="nmi_leg_w_chassis_air_v{}_walk.xml".format(stiffness_version_A)
stiffness_version_B = 3
MuJoCo_model_name_B="nmi_leg_w_chassis_air_v{}.xml".format(stiffness_version_B)
MuJoCo_model_name_B_walk="nmi_leg_w_chassis_air_v{}_walk.xml".format(stiffness_version_B)

random_seed = -1

for mc_counter in range(mc_run_number):
	random_seed+=1
	# train model_A
	np.random.seed(random_seed) # change the seed for different initial conditions
	tf.random.set_random_seed(random_seed)
	[babbling_kinematics, babbling_activations] =\
		babbling_fcn(
			MuJoCo_model_name=MuJoCo_model_name_A,
			simulation_minutes=babbling_time,
			kinematics_activations_show=False)
	model_A_babble = inverse_mapping_fcn2(
		kinematics=babbling_kinematics,
		activations=babbling_activations,
		log_address="./logs/{}/scalars/babble_A_mc_run{}/".format(experiment_ID, mc_counter),
		early_stopping=False)
	cum_kinematics_A_babble = babbling_kinematics
	cum_activations_A_babble = babbling_activations
	#A_A test
	np.random.seed(random_seed) # change the seed for different initial conditions
	tf.random.set_random_seed(random_seed)
	model_A_refined_A=tf.keras.models.clone_model(model_A_babble)
	model_A_refined_A.compile(optimizer=tf.train.AdamOptimizer(0.01),
          loss='mse',       # mean squared error
          metrics=['mse'])  # mean squared error
	[model_A_refined_A, errors, cum_kinematics_A_refined, cum_activations_A_refined] =\
		in_air_adaptation_fcn(
			MuJoCo_model_name=MuJoCo_model_name_A,
			model=model_A_refined_A,
			babbling_kinematics=cum_kinematics_A_babble,
			babbling_activations=cum_activations_A_babble,
			number_of_refinements=number_of_refinements,
			log_address="./logs/{}/scalars/refine_A_A_mc_run{}/".format(experiment_ID, mc_counter),
			error_plots_show=False,
			Mj_render=False)
	#import pdb; pdb.set_trace()
	errors_all_A_A[:,:, mc_counter] = [errors[0],errors[1]]
	#A_B test
	np.random.seed(random_seed) # change the seed for different initial conditions
	tf.random.set_random_seed(random_seed)
	model_A_refined_B=tf.keras.models.clone_model(model_A_babble)
	model_A_refined_B.compile(optimizer=tf.train.AdamOptimizer(0.01),
      loss='mse',       # mean squared error
      metrics=['mse'])  # mean squared error
	[model_A_refined_B, errors, cum_kinematics_B_refined, cum_activations_B_refined] =\
		in_air_adaptation_fcn(
			MuJoCo_model_name=MuJoCo_model_name_B,
			model=model_A_refined_B,
			babbling_kinematics=cum_kinematics_A_babble,
			babbling_activations=cum_activations_A_babble,
			number_of_refinements=number_of_refinements,
			log_address="./logs/{}/scalars/refine_A_B_mc_run{}/".format(experiment_ID, mc_counter),
			error_plots_show=False,
			Mj_render=False)
	errors_all_A_B[:,:, mc_counter] = [errors[0],errors[1]]
## saving the results
np.save("./results/{}/errors_all_A_A".format(experiment_ID),errors_all_A_A)
np.save("./results/{}/errors_all_A_B".format(experiment_ID),errors_all_A_B)
np.save("./results/{}/errors_all_B_B".format(experiment_ID),errors_all_B_B)

np.random.seed(random_seed+2) # change the seed for different initial conditions
tf.random.set_random_seed(random_seed+2)
## Higher level learning (RL)
[ lowest_penalty_so_far, all_penaltys, best_features_so_far, real_attempt_activations ]=\
 learn_to_move_fcn(
 	MuJoCo_model_name=MuJoCo_model_name_A_walk,
 	model=model_A_refined_A,
 	cum_kinematics=cum_kinematics_A_babble,
 	cum_activations=cum_activations_A_babble,
 	penalty_thresh=6,
 	refinement=False,
 	Mj_render=False)
#import pdb; pdb.set_trace()
[penaltyA_A, _, _, _, _]=\
feat_to_run_attempt_fcn(MuJoCo_model_name=MuJoCo_model_name_A_walk, features=best_features_so_far, model=model_A_refined_A, feat_show=True, Mj_render=True)
[penaltyA_B, _, _, _, _]=\
feat_to_run_attempt_fcn(MuJoCo_model_name=MuJoCo_model_name_B_walk, features=best_features_so_far, model=model_A_refined_A, feat_show=True, Mj_render=False)
[penaltyAB_B, _, _, _, _]=\
feat_to_run_attempt_fcn(MuJoCo_model_name=MuJoCo_model_name_B_walk, features=best_features_so_far, model=model_A_refined_B, feat_show=True, Mj_render=False)
print("penaltyA_A: ", penaltyA_A)
print("penaltyA_B: ", penaltyA_B)
print("penaltyAB_B: ", penaltyAB_B)


## printing the results
# print("errors_mean: ",errors_all_A_A.mean(1))
# print("errors_std: ",errors_all_A_A.std(1))
# print("errors_mean: ",errors_all_A_B.mean(1))
# print("errors_std: ",errors_all_A_B.std(1))
#import pdb; pdb.set_trace()