In [121]:
# import required libraries
import pandas as pd
import pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error

In [122]:
# define function to load data
def load_data(filepath):
    # setting column names of the dataframe
    headers = [
        "X_robot",
        "Y_robot",
        "Orientation_robot",
        "Collision",
        "X_candle1",
        "Y_candle1",
        "X_candle2",
        "Y_candle2",
        "X_candle3",
        "Y_candle3",
        "X_candle4",
        "Y_candle4",
        "X_speed",
        "Y_speed",
    ]

    # load data from file
    df = pd.read_csv(filepath, names=headers)

    return df

In [123]:
# function to calculate Euclidean distance between two points
def euclidean_distance(x1, y1, x2, y2):
    return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5


# function that performs feature engineering on the dataset
def feature_engineering(robots_df):
    # calculating the distance between the robot and each candle
    robots_df["distance_candle1"] = euclidean_distance(
        robots_df["X_robot"],
        robots_df["Y_robot"],
        robots_df["X_candle1"],
        robots_df["Y_candle1"],
    )
    robots_df["distance_candle2"] = euclidean_distance(
        robots_df["X_robot"],
        robots_df["Y_robot"],
        robots_df["X_candle2"],
        robots_df["Y_candle2"],
    )
    robots_df["distance_candle3"] = euclidean_distance(
        robots_df["X_robot"],
        robots_df["Y_robot"],
        robots_df["X_candle3"],
        robots_df["Y_candle3"],
    )
    robots_df["distance_candle4"] = euclidean_distance(
        robots_df["X_robot"],
        robots_df["Y_robot"],
        robots_df["X_candle4"],
        robots_df["Y_candle4"],
    )

    # calculating the distance between the robot and each candle in each axis
    robots_df["distance_candle1_x"] = robots_df["X_robot"] - robots_df["X_candle1"]
    robots_df["distance_candle1_y"] = robots_df["Y_robot"] - robots_df["Y_candle1"]
    robots_df["distance_candle2_x"] = robots_df["X_robot"] - robots_df["X_candle2"]
    robots_df["distance_candle2_y"] = robots_df["Y_robot"] - robots_df["Y_candle2"]
    robots_df["distance_candle3_x"] = robots_df["X_robot"] - robots_df["X_candle3"]
    robots_df["distance_candle3_y"] = robots_df["Y_robot"] - robots_df["Y_candle3"]
    robots_df["distance_candle4_x"] = robots_df["X_robot"] - robots_df["X_candle4"]
    robots_df["distance_candle4_y"] = robots_df["Y_robot"] - robots_df["Y_candle4"]

    return robots_df

In [124]:
# function to develop and save the model
def develop(X, y):
    # split the data into training, validation and test sets
    X_train_validate, X_test, y_train_validate, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    X_train, X_validate, y_train, y_validate = train_test_split(
        X_train_validate, y_train_validate, test_size=0.125, random_state=42
    )

    # scale the data
    scaler = StandardScaler().fit(X_train)

    X_train_scaled = scaler.transform(X_train)
    X_validate_scaled = scaler.transform(X_validate)
    X_test_scaled = scaler.transform(X_test)

    X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns)
    X_validate_scaled = pd.DataFrame(X_validate_scaled, columns=X_validate.columns)
    X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_test.columns)

    # apply PCA to reduce the dimensionality of the data
    pca = PCA(n_components=0.95).fit(X_train_scaled)
    x_train_pca = pca.transform(X_train_scaled)
    x_validate_pca = pca.transform(X_validate_scaled)
    x_test_pca = pca.transform(X_test_scaled)

    # train the model
    hyper_params = {
        "verbose": False,
        "random_state": 33,
        "activation": "relu",
        "hidden_layer_sizes": (12, 24, 48, 96),
        "learning_rate_init": 0.001,
        "batch_size": 24,
    }
    model = MLPRegressor(**hyper_params).fit(x_train_pca, y_train)

    # predict the target on the train, validation and test sets
    y_train_pred = model.predict(x_train_pca)
    y_val_pred = model.predict(x_validate_pca)
    y_test_pred = model.predict(x_test_pca)

    # calculate the mean squared error on the train and validation sets
    train_score_rmse = mean_squared_error(y_train, y_train_pred)
    validate_score_rmse = mean_squared_error(y_validate, y_val_pred)
    test_score_rmse = mean_squared_error(y_test, y_test_pred)

    train_score_mae = mean_absolute_error(y_train, y_train_pred)
    validate_score_mae = mean_absolute_error(y_validate, y_val_pred)
    test_score_mae = mean_absolute_error(y_test, y_test_pred)

    # model props
    model_props = {
        "hidden_layers": model.n_layers_ - 2,
        "architecture": model.hidden_layer_sizes,
        "activation": model.activation,
        "batch_size": model.batch_size,
        "solver": model.solver,
        "learning_rate_init": model.learning_rate_init,
        "learning_rate": model.learning_rate,
        "iterations": model.n_iter_,
        "train_score_rmse": train_score_rmse,
        "val_score_rmse": validate_score_rmse,
        "test_score_rmse": test_score_rmse,
        "train_score_mae": train_score_mae,
        "val_score_mae": validate_score_mae,
        "test_score_mae": test_score_mae,
    }

    # print the model properties
    # print(model_props)

    # print the scores
    print(f"Train score RMSE: {train_score_rmse}, MAE: {train_score_mae}")
    print(f"Validation score RMSE: {validate_score_rmse}, MAE: {validate_score_mae}")
    print(f"Test score RMSE: {test_score_rmse}, MAE: {test_score_mae}")

    # save the scaler
    with open("./out/scaler_3.pkl", "wb") as f:
        pickle.dump(scaler, f)

    # save the pca
    with open("./out/pca_3.pkl", "wb") as f:
        pickle.dump(pca, f)

    # save the model
    with open("./out/model_3.pkl", "wb") as f:
        pickle.dump(model, f)

In [125]:
# function to evaluate the model
def evaluate(X, y):
    scaler = pickle.load(open("./out/scaler_3.pkl", "rb"))
    model = pickle.load(open("./out/model_3.pkl", "rb"))
    pca = pickle.load(open("./out/pca_3.pkl", "rb"))

    # scale the data
    X_scaled = scaler.transform(X)
    X_scaled = pd.DataFrame(X_scaled, columns=X.columns)

    # apply PCA to reduce the dimensionality of the data
    x_pca = pca.transform(X_scaled)

    # predict the target on the test set
    y_pred = model.predict(x_pca)

    # evaluate the model
    test_score_rmse = mean_squared_error(y, y_pred)
    test_score_mae = mean_absolute_error(y, y_pred)

    # model props
    model_props = {
        "hidden_layers": model.n_layers_ - 2,
        "architecture": model.hidden_layer_sizes,
        "activation": model.activation,
        "batch_size": model.batch_size,
        "solver": model.solver,
        "learning_rate_init": model.learning_rate_init,
        "learning_rate": model.learning_rate,
        "iterations": model.n_iter_,
        "test_score_rmse": test_score_rmse,
        "test_score_mae": test_score_mae,
    }

    # print the model properties
    print(model_props)

    # print the model scores
    print(f"Test score  RMSE: {test_score_rmse}, MAE: {test_score_mae}")

In [126]:
# the main function that runs the program
def main(mode="develop", file_path="./data/RobotData.csv"):

    # Load data
    df = load_data(file_path)

    # data augmentation needs to be done here, if required

    # Perform feature engineering
    df_fe = feature_engineering(df)

    # print head of the dataframe
    # print(df_fe.head())

    # selected features
    selected_features = [
        "distance_candle1_x",
        "distance_candle1_y",
        "distance_candle2_x",
        "distance_candle2_y",
        "distance_candle3_x",
        "distance_candle3_y",
        "distance_candle4_x",
        "distance_candle4_y",
        "Orientation_robot",
        "Collision",
    ]

    # target variables
    target = ["X_speed", "Y_speed"]

    # get feature and target vectors
    X = df_fe[selected_features]
    y = df_fe[target]

    # scale the data
    if mode == "develop":
        develop(X, y)
    else:
        evaluate(X, y)

In [127]:
# run the main function
main("develop", "./data/RobotData.csv")

Train score RMSE: 0.006276799679741773, MAE: 0.06424411402468536
Validation score RMSE: 0.006518804555387753, MAE: 0.06563259968476423
Test score RMSE: 0.006686550020099288, MAE: 0.06618931976577525


In [128]:
# run main in evaluate mode
main("evaluate", "./data/RobotData.csv")

{'hidden_layers': 4, 'architecture': (12, 24, 48, 96), 'activation': 'relu', 'batch_size': 50, 'solver': 'adam', 'learning_rate_init': 0.001, 'learning_rate': 'constant', 'iterations': 17, 'test_score_rmse': 0.006382960851495031, 'test_score_mae': 0.06477205653316212}
Test score  RMSE: 0.006382960851495031, MAE: 0.06477205653316212
