In [1]:
import os, sys, math, pickle, time
from zmqRemoteApi import RemoteAPIClient
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from random import randint as ri
from random import uniform as ru

from keras.models import Sequential, load_model
from keras.layers import Dense
from keras.losses import Huber
from keras.optimizers import Adam

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error

### Funciones previas de learning

In [84]:
#Function to split the training data into X,y datasets
def load_dataset(scene_in = "modular02a", date = "2023_15_11"):
    # Defining usefull variables
    path = os.getcwd() + "\\training_data\\" + date
    file_list = os.listdir(path)
    scene_files_list = [item for item in file_list if scene_in in item and "pkl" in item]


    file = open(path + "\\" + scene_files_list[0], "rb")
    training_data = pickle.load(file)
    training_df = pd.DataFrame(training_data)

    #List to store the name for every joint data column
    increments_columns = []
    prev_j_positions_columns = []
    post_j_positions_columns = []

    #Creates the name for every column
    num_joints = len(training_data[-1]["increments"])
    for joint_n in range(num_joints):
        #List to split inputs per joint
        joint_inc_col_name = "increments_" + str(joint_n)
        increments_columns.append(joint_inc_col_name)

        prev_joint_pos_col_name = "prev_j_positions_" + str(joint_n)
        prev_j_positions_columns.append(prev_joint_pos_col_name)

        #List to split outputs per joint
        post_joint_pos_col_name = "post_j_positions_" + str(joint_n)
        post_j_positions_columns.append(post_joint_pos_col_name)


    #Input columns per joint
    increments_df = pd.DataFrame(training_df['increments'].to_list())
    increments_df.columns = increments_columns


    prev_j_positions_df = pd.DataFrame(training_df['prev_j_positions'].to_list())
    prev_j_positions_df.columns = prev_j_positions_columns


    #Builds the X dataframe
    X_df = pd.concat([increments_df, prev_j_positions_df, 
                    training_df["prev_pos_x"], training_df["prev_pos_y"], training_df["prev_pos_z"]], 
                    axis="columns")


    #Output columns per joint
    post_j_positions_df = pd.DataFrame(training_df['post_j_positions'].to_list())
    post_j_positions_df.columns = post_j_positions_columns


    #Builds the y dataframe
    y_df = pd.concat([post_j_positions_df, 
                    training_df["post_pos_x"], training_df["post_pos_y"], training_df["post_pos_z"]], 
                    axis="columns")

    return(X_df,y_df)

In [76]:
def adapt_X_y_02(X_a,y_a):
    X_a_cols = ['current_state_x', 'current_state_y', 'current_state_z', 'joint0_current_state_rad', 'joint1_current_state_rad', 'joint0_actions_rad', 'joint1_actions_rad']

    X_a = pd.concat([X_a["prev_pos_x"], X_a["prev_pos_y"] , X_a["prev_pos_z"],
                X_a["prev_j_positions_0"], X_a["prev_j_positions_1"],
                X_a["increments_0"], X_a["increments_1"]], 
                        axis="columns")

    X_a.columns = X_a_cols

    y_a_cols = ['future_state_x', 'future_state_y', 'future_state_z', 'joint0_future_state_rad', 'joint1_future_state_rad']

    y_a = pd.concat([y_a["post_pos_x"], y_a["post_pos_y"] , y_a["post_pos_z"],
                y_a["post_j_positions_0"], y_a["post_j_positions_1"]], 
                        axis="columns")
    y_a.columns = y_a_cols
    return X_a, y_a

In [77]:
def adapt_X_y_03(X_a,y_a):
    X_a_cols = ['current_state_x', 'current_state_y', 'current_state_z', 'joint0_current_state_rad', 'joint1_current_state_rad', 'joint2_current_state_rad'
                , 'joint0_actions_rad', 'joint1_actions_rad', 'joint2_actions_rad']

    X_a = pd.concat([X_a["prev_pos_x"], X_a["prev_pos_y"] , X_a["prev_pos_z"],
                X_a["prev_j_positions_0"], X_a["prev_j_positions_1"], X_a["prev_j_positions_2"],
                X_a["increments_0"], X_a["increments_1"], X_a["increments_2"]], 
                        axis="columns")

    X_a.columns = X_a_cols

    y_a_cols = ['future_state_x', 'future_state_y', 'future_state_z', 'joint0_future_state_rad', 'joint1_future_state_rad', 'joint2_future_state_rad']

    y_a = pd.concat([y_a["post_pos_x"], y_a["post_pos_y"] , y_a["post_pos_z"],
                y_a["post_j_positions_0"], y_a["post_j_positions_1"], y_a["post_j_positions_2"]], 
                        axis="columns")
    y_a.columns = y_a_cols
    return X_a, y_a

In [86]:
def plot_pred_vs_test(test, pred):
    columns = test.columns.tolist()
    max_list = []
    min_list = []
    mse_list = []
    perc_mse_list = []
    for column_num in range(len(columns)):
        plt.figure()
        plt.scatter(test.iloc[:,column_num], pred.iloc[:,column_num])

        #Getting important values
        minim = min(pred.iloc[:,column_num])
        min_list.append(minim)
        maxim = max(pred.iloc[:,column_num])
        max_list.append(maxim)
        mse = mean_squared_error(test.iloc[:,column_num], pred.iloc[:,column_num])
        mse_list.append(round(mse,6))
        rang = (maxim-minim)
        perc = mse/rang *100
        perc_mse_list.append(round(perc,3))
        title = columns[column_num] + " - mse: " + str(round(mse,6))
        plt.title(title)
        plt.xlabel('True Values')
        plt.ylabel('Predictions')
        # Para tener una linea recta con la cual comparar los valores y que no altere
        # los limites de la figura se grafican los valores reales con ellos mismos
        plt.plot(test.iloc[:,column_num],test.iloc[:,column_num])
        plt.grid()
    data = {"Perception": columns, "Max.": max_list, "Min.": min_list, "Mse": mse_list, "Mse perc.": perc_mse_list}
    data_df = pd.DataFrame(data)
    return(data_df)

### Pruebas de modelos

In [79]:
class modular_scene():
    def __init__(self, i_scene, i_date, i_model, i_scaler):
        self.scene = i_scene
        self.date = i_date
        self.model = i_model
        self.scaler = i_scaler

In [11]:
scene = "modular03"
date = "2023_15_11"
models_path = os.getcwd() + "\\models\\" + date
file_list = os.listdir(models_path)
model_files_list = [item for item in file_list if scene in item and "model" in item]
scaler_files_list = [item for item in file_list if scene in item and "scaler" in item]
model_files_list, scaler_files_list

(['model_modular03_2023_15_11.keras'], ['scaler_modular03_2023_15_11.pkl'])

In [14]:
print(models_path)

c:\Users\carlo\OneDrive\Imágenes\Documentos\GitHub\EMERGE\Morphology\Multimodular\models\2023_15_11


In [52]:
models = {}
for model in model_files_list:
    model_path = models_path + "\\" + model
    loaded_model = load_model(model_path)
    models[model_files_list.index(model)] = loaded_model
print("Moldels loaded:")
for key in models.keys():
    print(f"    {key}")

Moldels loaded:
    0


In [35]:
# Load the scaler object using pickle
def load_scaler(file_name): #'standard_scaler.pkl'
    with open(file_name, 'rb') as file:
        model_scaler = pickle.load(file)
    return model_scaler

In [53]:
scalers = {}
for scaler in scaler_files_list:
    scaler_path = models_path + "\\" + scaler
    loaded_scaler = load_scaler(scaler_path)
    scalers[scaler_files_list.index(scaler)] = loaded_scaler
print("Scalers loaded:")
for key in scalers.keys():
    print(f"    {key}")

Scalers loaded:
    0


In [80]:
# Load the scaler object using pickle
def load_scaler(file_name): #'standard_scaler.pkl'
    with open(file_name, 'rb') as file:
        model_scaler = pickle.load(file)
    return model_scaler

# Load the model and scaler of a scene
def load_scene(scene = "modular03", date = "2023_15_11"):
    models_path = os.getcwd() + "\\models\\" + date
    file_list = os.listdir(models_path)
    model_files_list = [item for item in file_list if scene in item and "model" in item]
    scaler_files_list = [item for item in file_list if scene in item and "scaler" in item]
    model_files_list, scaler_files_list

    # Load models
    models = {}
    for model in model_files_list:
        model_path = models_path + "\\" + model
        loaded_model = load_model(model_path)
        models[model_files_list.index(model)] = loaded_model
    print(f"Moldels loaded: {len(models.keys())}")

    # Load scalers
    scalers = {}
    for scaler in scaler_files_list:
        scaler_path = models_path + "\\" + scaler
        loaded_scaler = load_scaler(scaler_path)
        scalers[scaler_files_list.index(scaler)] = loaded_scaler
    print(f"Scalers loaded: {len(scalers.keys())}")
    
    latest_model = len(models.keys()) - 1
    latest_scaler = len(scalers.keys()) - 1
    
    # Creates scene class with latest model/scaler
    scene_class = modular_scene(scene, date, models[latest_model], scalers[latest_scaler])
    print(scene_class.__dict__)
    return scene_class

In [81]:
modular = load_scene("modular03", "2023_15_11")

Moldels loaded: 1
Scalers loaded: 1
{'scene': 'modular03', 'date': '2023_15_11', 'model': <keras.src.engine.sequential.Sequential object at 0x0000020E27BC3950>, 'scaler': MinMaxScaler()}


In [85]:
scene = "modular03"
date = "2023_15_11"
modular = load_scene(scene, date)
X, y =load_dataset(scene, date)
if "03" in modular.scene:
    X, y = adapt_X_y_03(X,y)
else:
    X, y = adapt_X_y_02(X,y)

X_scaled = modular.scaler.transform(X)

Moldels loaded: 1
Scalers loaded: 1
{'scene': 'modular03', 'date': '2023_15_11', 'model': <keras.src.engine.sequential.Sequential object at 0x0000020E28E4EF10>, 'scaler': MinMaxScaler()}


In [None]:
test_predictions = modular.model.predict(X_scaled)
df_test_predictions = pd.DataFrame(test_predictions)

In [None]:
plot_pred_vs_test(y, df_test_predictions)

In [87]:
def test_model(scene, date):
    modular = load_scene(scene, date)
    X, y =load_dataset(scene, date)
    if "03" in modular.scene:
        X, y = adapt_X_y_03(X,y)
    else:
        X, y = adapt_X_y_02(X,y)

    X_scaled = modular.scaler.transform(X)

    test_predictions = modular.model.predict(X_scaled)
    df_test_predictions = pd.DataFrame(test_predictions)
    
    plot_pred_vs_test(y, df_test_predictions)
    return modular, X, y

In [88]:
scene = "modular03"
date = "2023_15_11"
#test_model(scene, date)