In [1]:
from __future__ import print_function

import tensorflow as tf
import sys, os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import tensorflow.keras
import pandas as pd
import numpy as np
import sklearn

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, BatchNormalization, Dropout



def load_joint_space_csv_chunks(file_path):
    data_frame = pd.read_csv(file_path, skiprows=1, header=None)
    del data_frame[18]
    return data_frame

def load_task_space_csv_chunks(file_path):
    return pd.read_csv(file_path, skiprows=1, header=None)

TRAIN_FOLDER = '/home/dfki.uni-bremen.de/bmanickavasakan/newdataset_rh5_leg/leg_5steps/'
TEST_FOLDER = '/home/dfki.uni-bremen.de/bmanickavasakan/newdataset_rh5_leg/leg_5steps/test_4steps'

X_TRAIN_FILE = os.path.join(TRAIN_FOLDER, 'leg_forwardkinematics_x.csv')
Q_TRAIN_FILE = os.path.join(TRAIN_FOLDER, 'leg_sysstate_q.csv')

X_TEST_FILE = os.path.join(TEST_FOLDER, 'leg_forwardkinematics_x.csv')
Q_TEST_FILE = os.path.join(TEST_FOLDER, 'leg_sysstate_q.csv')

x_train = load_task_space_csv_chunks(X_TRAIN_FILE)
q_train = load_joint_space_csv_chunks(Q_TRAIN_FILE)
print(x_train.head(10))
print(q_train.head(10))

x_test = load_task_space_csv_chunks(X_TEST_FILE)
q_test = load_joint_space_csv_chunks(Q_TEST_FILE)

print("Training: Loaded X: {} and Q: {} data points".format(x_train.shape, q_train.shape))
print("Testing : Loaded X: {} and Q: {} data points".format(x_test.shape, q_test.shape))

x_scaler = MinMaxScaler()
q_scaler = MinMaxScaler()

x_train = x_scaler.fit_transform(x_train)
q_train = q_scaler.fit_transform(q_train)

x_test = x_scaler.transform(x_test)
q_test = q_scaler.transform(q_test)

def model_builder(input_shape, output_shape):
    def build_model(depth, width, reduction_factor):
        model = Sequential()

        for i in range(depth):
            num_neurons = max(int(width * (reduction_factor ** i)), 4)
            if i == 0:
                model.add(Dense(num_neurons, activation='relu', input_shape=(input_shape,)))
            else:
                model.add(Dense(num_neurons, activation='relu'))
                model.add(Dropout(0.5))

            model.add(BatchNormalization())

        model.add(Dense(output_shape, activation='sigmoid'))

        model.compile(loss='mse', optimizer='adam', metrics=["mae"])

        return model
    return build_model

HYPERPARAMETERS = {'depth': 6, 'width': 64, 'reduction_factor':  0.9}

with tf.device('/cpu:0'):
    model = model_builder(9, 18)(**HYPERPARAMETERS)
    model.fit(x_train, q_train, epochs = 20, batch_size = 128, verbose = 1, validation_data=(x_test, q_test))
    model.save("NEW_inverse-kinematics-rh5-leg-5steps.h5")

q_pred = model.predict(x_test, verbose=0)
q_unnorm = q_scaler.inverse_transform(q_test)
q_pred_unnorm = q_scaler.inverse_transform(q_pred)

global_mae = mean_absolute_error(q_test, q_pred)

print("Testing MAE: {:.5f}".format(global_mae))

# Compute MAE for each output independently.
for i in range(q_test.shape[1]):
    norm_mae_i = mean_absolute_error(q_test[:, i], q_pred[:, i])
    mae_i = mean_absolute_error(q_unnorm[:, i], q_pred_unnorm[:, i])
    print("Q feature {} has unnorm MAE: {:.4f} (Range {:.4f} to {:.4f}) normalized MAE: {:.4f}".format(i, mae_i, q_scaler.data_min_[i], q_scaler.data_max_[i], norm_mae_i))

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


          0         1         2         3         4         5         6  \
0  0.398812  0.431706 -0.499640 -0.207795 -0.753744  0.415787  0.464561   
1  0.273052  0.597029 -0.465943 -0.081651 -0.790698  0.448379  0.408767   
2  0.106506  0.717667 -0.421317  0.046245 -0.810690  0.471352  0.344204   
3 -0.086611  0.783325 -0.369571  0.173150 -0.813291  0.484214  0.272257   
4 -0.289816  0.788397 -0.315122  0.296341 -0.798447  0.486689  0.194468   
5  0.320593  0.494222 -0.523277 -0.148793 -0.734773  0.363151  0.553257   
6  0.180242  0.636809 -0.485670 -0.022604 -0.761616  0.409540  0.501708   
7  0.006076  0.731316 -0.439002  0.104069 -0.772120  0.447143  0.439396   
8 -0.187041  0.769676 -0.387257  0.228510 -0.766060  0.475153  0.367657   
9 -0.382626  0.748617 -0.334850  0.348049 -0.743565  0.492970  0.288031   

          7         8  
0  0.000080  0.000012  
1  0.000080  0.000012  
2  0.000080  0.000012  
3  0.000080  0.000012  
4  0.000080  0.000012  
5  0.000087  0.000013  
6  0.0

In [7]:
from keras_uncertainty.models import MCDropoutRegressor

def test_mcdropout_regressor(x_test_values, q_test_values, model, data_scaler):   
    mc_model = MCDropoutRegressor(model)
    inp = x_test_values  
    
    mean, std = mc_model.predict(inp, num_samples = 10)
    
    q_pred_unnormalised = data_scaler.inverse_transform(mean)
    
    #q_sd_unnromalised = data_scaler.inverse_transform(std)
    
    global_mae = mean_absolute_error(q_test_values, mean)

    print("Testing MAE: {:.5f}".format(global_mae))

    return q_pred_unnormalised, std

In [8]:
mean, std = test_mcdropout_regressor(x_test, q_test, model, q_scaler)
q_test_unorm = q_scaler.inverse_transform(q_test)
sd_test = pd.DataFrame(std)
print(sd_test[0].min())

Testing MAE: 0.28872
0.002311056
