In [1]:
import keras.utils
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Normalization, Input, LSTM
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import os

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display

In [24]:
data = pd.read_parquet(path='/Users/dimanu/Developer/Python-Furuta-Pendulum/data/datasets/270820232036_training.parquet')

In [25]:
def update_plot(column_name, xlim):
    plt.figure(figsize=(10, 6))
    plt.plot(data.index, data[column_name])
    plt.xlabel('Index')
    plt.ylabel(column_name)
    plt.title(f'Plot of {column_name}')
    plt.xlim(*xlim)
    plt.show()

column_dropdown = widgets.Dropdown(
    options=data.columns,
    description='Select Column:',
    disabled=False,
)

# Set the initial zoom range based on the data indices
initial_zoom_range = (data.index.min(), data.index.max())
zoom_slider = widgets.FloatRangeSlider(
    value=initial_zoom_range,
    min=data.index.min(),
    max=data.index.max(),
    step=1,
    description='Zoom:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
)

output = widgets.interactive_output(update_plot, {'column_name': column_dropdown, 'xlim': zoom_slider})

display(column_dropdown, zoom_slider, output)


Dropdown(description='Select Column:', options=('time', 'set_point_rotary_arm', 'control_law', 'position_rotar…

FloatRangeSlider(value=(0.0, 900005.0), continuous_update=False, description='Zoom:', max=900005.0, step=1.0)

Output()

# Load Data

In [None]:
data_folder_path = r'C:\PROGRAMACION\PENDULO INVERTIDO\Pendulo Invertido Diego\Python-Furuta-Pendulum\data\datasets'

In [None]:
loader = LoaderFactory(folder_path=data_folder_path)
data_reader = DataReader(data_loader=loader)

In [None]:
raw_data = data_reader.read_data()

In [None]:
raw_data.describe().transpose()

In [None]:
print(f'Shape: {raw_data.shape}')
print(f'Columns: {raw_data.columns}')

# Manage Raw Data

In [None]:
data_splitter = DataSplitter(dataset=raw_data)

In [None]:
train_data, train_target, validation_data, validation_target, test_data, test_target = data_splitter.split_data()

In [None]:
print("Forma de X_train:", train_data.shape)
print("Forma de y_train:", train_target.shape)
print("Forma de X_val:", validation_data.shape)
print("Forma de y_val:", validation_target.shape)
print("Forma de X_test:", test_data.shape)
print("Forma de y_test:", test_target.shape)

In [None]:
def plot_data(data: pd.DataFrame, title: str, init: int = 0, end:int = 200) -> None:
    fig, axs = plt.subplots(nrows=2, ncols=4, figsize=(10, 10))
    axs = axs.flatten()

    for idx, column in enumerate(data.columns):
        axs[idx].plot(data[column][init:end])
        axs[idx].set_title(column)

    fig.subplots_adjust(hspace=0.4, wspace=0.6)
    fig.suptitle(title)

In [None]:
plot_data(raw_data, 'Raw Data', end=5000)

# Apply MinMaxScaler to Data

In [None]:
min_max_scaler = DataScaler(feature_range=(0, 1))

In [None]:
min_max_scaler.fit_scaler(train_data=train_data)

In [None]:
scaled_train_data = min_max_scaler.transform(data=train_data)
scaled_validation_data = min_max_scaler.transform(data=validation_data)
scaled_test_data = min_max_scaler.transform(data=test_data)

In [None]:
# columns_to_scale = [column for column in data.columns]

# min_max_scaler = MinMaxScaler(feature_range=(0, 1))
# data_min_max_scaled = pd.DataFrame(min_max_scaler.fit_transform(data), column =columns_to_scale)

# scaled_train_data, scaled_test_data, train_label, test_label = train_test_split(data_min_max_scaled, labels, test_size=0.2, shuffle=False)
# scaled_train_data, scaled_val_data, train_label, val_label = train_test_split(scaled_train_data, train_label, test_size=0.2, shuffle=False)

In [None]:
scaled_train_data.describe().transpose()

In [None]:
print("Forma de X_train:", scaled_train_data.shape)
print("Forma de y_train:", train_target.shape)
print("Forma de X_val:", scaled_validation_data.shape)
print("Forma de y_val:", validation_target.shape)
print("Forma de X_test:", scaled_test_data.shape)
print("Forma de y_test:", test_target.shape)

# Apply StandardScaler to Data

In [None]:
standard_scaler = StandardScaler()
data_standard_scaled = pd.DataFrame(standard_scaler.fit_transform(data), columns=columns_to_scale)

In [None]:
data_standard_scaled.columns

In [None]:
important_standard_scaled_data = data_standard_scaled.drop(data_standard_scaled.columns[data_standard_scaled.columns.isin(['position_pendulum', 'speed_pendulum', 'speed_rotary_arm'])], axis=1)

standard_scaled_train_data, standard_scaled_test_data, train_label, test_label = train_test_split(important_standard_scaled_data, labels, test_size=0.2, shuffle=False)
standard_scaled_train_data, standard_scaled_val_data, train_label, val_label = train_test_split(standard_scaled_train_data, train_label, test_size=0.2, shuffle=False)

In [None]:
print("Forma de X_train:", standard_scaled_train_data.shape)
print("Forma de y_train:", train_label.shape)
print("Forma de X_val:", standard_scaled_val_data.shape)
print("Forma de y_val:", val_label.shape)
print("Forma de X_test:", standard_scaled_test_data.shape)
print("Forma de y_test:", test_label.shape)

In [None]:
# train_dataset = complete_data.sample(frac= 0.8, random_state=0)
# train_labels = train_dataset.pop('control_law')
# test_dataset = complete_data.drop(train_dataset.index)
# test_labels = test_dataset.pop('control_law')

# Build, Compile and Train the Model

In [8]:
def build_model():
    model = Sequential()
    model.add(Input(shape=(4, )))
    model.add(Dense(units=16,
                    activation='relu'))
    model.add(Dense(units=8,
                    activation='relu'))
    model.add(Dense(units=4,
                    activation='relu'))
    model.add(Dense(units=1,
                    activation='linear'))

    return model

In [6]:
def build_advanced_model():
    inputs = Input(shape=(4, ))
    x = Dense(units=8,
              activation='relu')(inputs)
    x = Dense(units=4,
              activation='relu')(x)
    outputs = Dense(units=1,
                    activation='linear')(x)
    model = keras.Model(inputs=inputs,
                        outputs=outputs)
    return model

In [None]:
# advanced_model = build_advanced_model()
# keras.utils.plot_model(advanced_model, show_shapes=True)

In [9]:
model = build_model()
# keras.utils.plot_model(model, show_shapes=True)

In [10]:
model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 16)                80        
                                                                 
 dense_1 (Dense)             (None, 8)                 136       
                                                                 
 dense_2 (Dense)             (None, 4)                 36        
                                                                 
 dense_3 (Dense)             (None, 1)                 5         
                                                                 
Total params: 257 (1.00 KB)
Trainable params: 257 (1.00 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
model.compile(loss=tf.keras.losses.mse,
              optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
              metrics=['mae'])

In [None]:
# advanced_model.compile(loss=tf.keras.losses.mse,
#               optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
#               metrics=['accuracy', 'mae'])

In [None]:
model_name = 'swing_up_real'
checkpoint_dir = rf'C:\PROGRAMACION\PENDULO INVERTIDO\Pendulo Invertido Diego\Python-Furuta-Pendulum\tmp\checkpoint_{model_name}'

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_dir,
                                                         save_best_only=True)

early_stop_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                                       patience=5,
                                                       verbose=1)

In [None]:
EPOCHS = 100
BATCH_SIZE = 256
retrain_model = False

In [None]:
if retrain_model:
    model.load_weights(checkpoint_dir)

In [None]:
history = model.fit(train_data.values,
                    train_target.values,
                    epochs=EPOCHS,
                    batch_size=BATCH_SIZE,
                    validation_data=(validation_data.values, validation_target.values),
                    callbacks=[checkpoint_callback, early_stop_callback],
                    verbose=1,
                    shuffle=False)

In [None]:
model.get_weights()

In [None]:
test_loss, test_mae = model.evaluate(test_data, test_target)
print(f'Loss = {test_loss}\n MSE = {test_mae}')

In [None]:
predictions = model.predict(test_data.values)

In [None]:
test_label_series = test_target.reset_index(drop=True)

In [None]:
test_label_series

In [None]:
predictions_dataframe = pd.DataFrame(predictions, index=None)

In [None]:
predictions_dataframe

In [None]:
compare_test_predict = pd.DataFrame({'label': test_label_series, 'prediction': predictions_dataframe}, index=None)

In [None]:
compare_test_predict

In [None]:
model_folder_path = r'C:\PROGRAMACION\PENDULO INVERTIDO\Pendulo Invertido Diego\Python-Furuta-Pendulum\data\models'
model_name = 'swing_up_real.h5'

In [None]:
model_save_path = os.path.join(model_folder_path, model_name)

In [None]:
model.save(model_save_path)

In [None]:
# tf.saved_model.save(model, r'C:\Users\diego\OneDrive\Documentos\PENDULO INVERTIDO\Pendulo Invertido Diego\PenduloInvertidoGit\Python\models')

In [None]:
def plot_loss_comparation(history):
  loss = history.history.get('loss')
  validation_loss = history.history.get('val_loss')
  epochs = range(1, len(loss) + 1, 1)

  plt.plot(epochs, loss, 'r--', label='Training loss')
  plt.plot(epochs, validation_loss, 'b', label='Validation loss')
  plt.ylabel('Loss')
  plt.xlabel('Epochs')
  plt.title('Training loss vs Validation loss')

  plt.legend()
  plt.show()

In [None]:
def plot_accuracy_comparation(history):
  accuracy = history.history.get('mae')
  validation_accuracy = history.history.get('val_,ae')
  epochs = range(1, len(accuracy) + 1, 1)

  plt.plot(epochs, accuracy, 'r--', label='Training MAE')
  plt.plot(epochs, validation_accuracy, 'b', label='Validation MAE')
  plt.ylabel('MAE')
  plt.xlabel('Epochs')
  plt.title('Training MAE vs Validation MAE')

  plt.legend()
  plt.show()

In [None]:
plot_loss_comparation(history=history)

In [None]:
plot_accuracy_comparation(history=history)

In [None]:
tf.saved_model.load(r'C:\Users\diego\OneDrive\Documentos\PENDULO INVERTIDO\Pendulo Invertido Diego\PenduloInvertidoGit\Python\source')