In [None]:
# First try to use the data to train a model
import numpy as np
import random
import tensorflow as tf
from sklearn import metrics
import matplotlib.pyplot as plt
from keras.layers import Conv1D, MaxPooling1D, LSTM, Dense, Dropout, Flatten, TimeDistributed, Bidirectional, BatchNormalization
from keras import regularizers
from keras.optimizers import SGD
import pandas as pd
import matplotlib as mpl
from tensorflow.keras.optimizers import Adam
import tensorflow.keras.backend as K
import math
from tensorflow.keras import regularizers
import gc
from sklearn.metrics import r2_score

print(tf.config.experimental.list_physical_devices('GPU'))

gpu = len(tf.config.list_physical_devices('GPU'))>0
print("GPU is", "available" if gpu else "NOT AVAILABLE")

In [None]:
# Set variables

train_size_per = 80     # set the percentage of training data
                        # the validation and test data will be split equally from the remaining data
                        # e.g. if train_size_per = 70
                        # 70% of the data is used for training
                        # 15% of the data is used for validation
                        # 15% of the data is used for testing
                        
rotation_per_position = 16  # set the number of rotations per position used on the matlab file

reads_per_position = 1000   # set the number of reads per rotation used on the matlab file

In [None]:
# Load the dataset from the file

SensorValues = pd.read_csv('mySensorData.csv', header = None)
LocationValues = pd.read_csv('myLocationData.csv', header = None)


sensor_arr = SensorValues.values
location_arr = LocationValues.values


print(sensor_arr.shape)
print(location_arr.shape)


print(np.min(sensor_arr))
print(np.max(sensor_arr))


In [None]:
def inverse_min_max_scaling(data, min_val, max_val):
    arr = (max_val - min_val) * data + min_val
    return arr

In [None]:
def min_max_scaler(data):
    # Calculate the minimum and maximum values of the data
    
    min_val = np.min(data)
    max_val = np.max(data)
    
    scaled_data = (data - min_val) / (max_val - min_val)
    
    return scaled_data

In [None]:
# Preview the Location of the real data points

plt.scatter(location_arr[:,0], location_arr[:,1], color='blue', label='Real Points')

# # Set the x-axis label to "X"
plt.xlabel('X')

# # Set the y-axis label to "Y"
plt.ylabel('Y')

# # Set the title of the plot to "Real vs Predicted Points"
plt.title('Full dataset points')

# # Add a legend to the plot
plt.legend()

# Show the plot
plt.show()
        


In [None]:
total_values_per_position = rotation_per_position * reads_per_position              


X = min_max_scaler(sensor_arr)
Y_loc_0 = location_arr[:,0]
Y_loc_1 = location_arr[:,1]
Y_rot = location_arr[:,2]

Y = np.zeros((Y_loc_0.shape[0], 3))

for i in range(0, Y_loc_0.shape[0]):
    Y[i,0] = Y_loc_0[i]
    Y[i,1] = Y_loc_1[i]
    Y[i,2] = Y_rot[i]


data_sen_size = int(X.shape[0])
data_sen_size_2 = int(X.shape[1])

data_loc_size = int(Y.shape[0])
data_loc_size_2 = int(Y.shape[1])



val_size = int(data_sen_size * (((100 - train_size_per)/100) * 1/2))
test_size = int(data_sen_size * (((100 - train_size_per)/100) * 1/2))
train_size = int(data_sen_size * (train_size_per/100))

print("train_size: ", train_size)
print("val_size: ", val_size)
print("test_size: ", test_size)



In [None]:
data_size_rotation = int(data_sen_size / total_values_per_position)

val_test_size_per = int(data_size_rotation * ((100 - train_size_per)/100))

random_numbers = random.sample(range(0, data_size_rotation), val_test_size_per)

random.shuffle(random_numbers)

halfway = len (random_numbers) // 2

test_random = random_numbers[:halfway]
val_random = random_numbers[halfway:]

val_numbers = np.zeros((val_size, 1))
test_numbers = np.zeros((test_size, 1))

n_val = 0
for i in val_random:
    for j in range(0, total_values_per_position):
        val_numbers[n_val] = i * total_values_per_position + j
        n_val = n_val + 1

n_test = 0
for i in test_random:
    for j in range(0, total_values_per_position):
        test_numbers[n_test] = i * total_values_per_position + j
        n_test = n_test + 1
        
val_sorted = np.sort(val_numbers, axis=0)
test_sorted = np.sort(test_numbers, axis=0)

In [None]:
import csv

# Commented out the code below if you want to use the data from the csv files with the train,val and test positions
 
######################################################################
#
# with open('mySortedVal153.csv', 'w', newline='') as file:
#     writer = csv.writer(file)
#     for row in val_sorted:
#         writer.writerow(row)
#
# with open('mySortedTest153.csv', 'w', newline='') as file:
#     writer = csv.writer(file)
#     for row in test_sorted:
#         writer.writerow(row)   
#
######################################################################

# Comment the code bellow if you want to save the train,val and test positions to the .csv files

######################################################################

SortedVal = pd.read_csv('mySortedVal153.csv', header = None)
SortedTest = pd.read_csv('mySortedTest153.csv', header = None)

val_sorted= SortedVal.values
test_sorted= SortedTest.values

######################################################################

In [None]:
X_train = np.zeros((train_size, data_sen_size_2))
Y_train = np.zeros((train_size, data_loc_size_2))

X_val = np.zeros((val_size, data_sen_size_2))
Y_val = np.zeros((val_size, data_loc_size_2))

X_test = np.zeros((test_size, data_sen_size_2))
Y_test = np.zeros((test_size, data_loc_size_2))

n_test = 0
n_train = 0
n_val = 0

for i in range (0, data_sen_size):  
    if i in val_sorted:
        X_val[n_val] = X[i]
        Y_val[n_val] = Y[i]
        n_val = n_val + 1
    elif i in test_sorted:
        X_test[n_test] = X[i]
        Y_test[n_test] = Y[i]
        n_test = n_test + 1
    else:
        X_train[n_train] = X[i]
        Y_train[n_train] = Y[i]
        n_train = n_train + 1

In [None]:
# Split the data into training and testing sets
plt.scatter(Y_train[:,0], Y_train[:,1], color='blue', label='Real Points')

# # Set the x-axis label to "X"
plt.xlabel('X')

# # Set the y-axis label to "Y"
plt.ylabel('Y')

# # Set the title of the plot to "Real vs Predicted Points"
plt.title('Y train dataset points')

# # Add a legend to the plot
plt.legend()

# Show the plot
plt.show()

In [None]:
plt.scatter(Y_val[:,0], Y_val[:,1], color='blue', label='Real Points')

# # Set the x-axis label to "X"
plt.xlabel('X')

# # Set the y-axis label to "Y"
plt.ylabel('Y')

# # Set the title of the plot to "Real vs Predicted Points"
plt.title('Y val dataset points')

# # Add a legend to the plot
plt.legend()

# Show the plot
plt.show()

In [None]:
plt.scatter(Y_test[:,0], Y_test[:,1], color='blue', label='Real Points')

# # Set the x-axis label to "X"
plt.xlabel('X')

# # Set the y-axis label to "Y"
plt.ylabel('Y')

# # Set the title of the plot to "Real vs Predicted Points"
plt.title('Y test dataset points')

# # Add a legend to the plot
plt.legend()

# Show the plot
plt.show()

In [None]:
plt.scatter(Y_train[:,0], Y_train[:,1], color='blue', label='Train')
plt.scatter(Y_val[:,0], Y_val[:,1], color='red', label='Validation')
plt.scatter(Y_test[:,0], Y_test[:,1], color='green', label='Test')

# # Set the x-axis label to "X"
plt.xlabel('X (meters)')

# # Set the y-axis label to "Y"
plt.ylabel('Y (meters)')

# # Set the title of the plot to "Real vs Predicted Points"
plt.title('Dataset points')

# # Add a legend to the plot
plt.legend()

# Show the plot
plt.show()

In [None]:
# Machine Learning - Euclidean Loss Functions for X and Y

def loss_function_xy_euclidean(y_true, y_pred):
    xy_true = y_true[:, :2]
    xy_pred = y_pred[:, :2]
    y_true = xy_true[:, 1]
    x_true = xy_true[:, 0]
    y_pred = xy_pred[:, 1]
    x_pred = xy_pred[:, 0]
    distance = tf.reduce_mean(K.sqrt(K.square(y_pred - y_true) + K.square(x_pred - x_true)))
    return distance

In [None]:
# Machine Learning - Euclidean Metric Functions for X and Y

def metric_function_xy_euclidean (y_true, y_pred):
    xy_true = y_true[:, :2]
    xy_pred = y_pred[:, :2]
    y_true = xy_true[:, 1]
    x_true = xy_true[:, 0]
    y_pred = xy_pred[:, 1]
    x_pred = xy_pred[:, 0]
    distance = tf.reduce_mean(K.sqrt(K.square(y_pred - y_true) + K.square(x_pred - x_true)))
    return distance

In [None]:
# Machine Learning - Loss Functions for theta

def loss_function_theta(y_true, y_pred):
    delta_angles = y_pred - y_true
    # Wrap delta_angles to the [-pi, pi] range
    wrapped_diff = tf.atan2(tf.sin(delta_angles), tf.cos(delta_angles))
    return tf.reduce_mean(tf.square(wrapped_diff))

In [None]:
def metric_function_theta(y_true, y_pred):
    delta_angles = y_pred - y_true
    # Wrap delta_angles to the [-pi, pi] range
    wrapped_diff = tf.atan2(tf.sin(delta_angles), tf.cos(delta_angles))
    return tf.reduce_mean(K.abs(wrapped_diff))

In [None]:
def loss_function_xy_mse(y_true, y_pred):
    xy_true = y_true[:, :2]
    xy_pred = y_pred[:, :2]
    y_true = xy_true[:, 1]
    x_true = xy_true[:, 0]
    y_pred = xy_pred[:, 1]
    x_pred = xy_pred[:, 0]
    distance = tf.reduce_mean(K.square(K.sqrt(K.square(y_pred - y_true) + K.square(x_pred - x_true))))
    return distance


In [None]:
def loss_function_xy_mae(y_true, y_pred):
    xy_true = y_true[:, :2]
    xy_pred = y_pred[:, :2]
    y_true = xy_true[:, 1]
    x_true = xy_true[:, 0]
    y_pred = xy_pred[:, 1]
    x_pred = xy_pred[:, 0]
    distance = tf.reduce_mean(K.abs(K.sqrt(K.square(y_pred - y_true) + K.square(x_pred - x_true))))
    return distance

In [None]:
def loss_function_xy_huber(y_true, y_pred, delta=1.0):
    xy_true = y_true[:, :2]
    xy_pred = y_pred[:, :2]
    x_true = xy_true[:, 0]
    y_true = xy_true[:, 1]
    x_pred = xy_pred[:, 0]
    y_pred = xy_pred[:, 1]
    error = tf.reduce_mean(K.sqrt(K.square(y_pred - y_true) + K.square(x_pred - x_true)))
    is_small_error = K.abs(error) < delta
    squared_loss = 0.5 * K.square(error)
    linear_loss = delta * (K.abs(error) - 0.5 * delta)
    return tf.reduce_mean(tf.where(is_small_error, squared_loss, linear_loss))

In [None]:
def loss_function_theta_mse(y_true, y_pred):
    delta_angles = y_pred - y_true
    wrapped_diff = tf.atan2(tf.sin(delta_angles), tf.cos(delta_angles))
    return tf.reduce_mean(K.square(wrapped_diff))

In [None]:
def loss_function_theta_mae(y_true, y_pred):
    delta_angles = y_pred - y_true
    wrapped_diff = tf.atan2(tf.sin(delta_angles), tf.cos(delta_angles))
    return tf.reduce_mean(K.abs(wrapped_diff))

In [None]:
def loss_function_theta_huber(y_true, y_pred, delta=1.0):
    delta_angles = y_pred - y_true
    wrapped_diff = tf.atan2(tf.sin(delta_angles), tf.cos(delta_angles))
    error = wrapped_diff
    is_small_error = K.abs(error) < delta
    squared_loss = 0.5 * K.square(error)
    linear_loss = delta * (K.abs(error) - 0.5 * delta)
    return tf.reduce_mean(tf.where(is_small_error, squared_loss, linear_loss))


In [None]:
#Early Stopping

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', 
    patience=10,  # Stop training if no improvement after 10 epochs
    restore_best_weights=True  # Restore the best model weights
)

In [None]:
#Model Checkpoint

model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath='best_model.h5',
    save_weights_only=True,
    monitor='val_loss',
)

In [None]:
# Learning Rate Schedule

def lr_schedule(epoch):
    if epoch < 2:
        return 0.0001
    elif epoch < 10:
        return 0.00001
    elif epoch < 20:
        return 0.000001
    elif epoch < 40:
        return 0.0000001
    elif epoch < 75:
        return 0.00000001
    else:
        return 0.000000001
    
lr_scheduler = tf.keras.callbacks.LearningRateScheduler(lr_schedule)

In [None]:
# TensorBoard Logging

tensorboard_callback = tf.keras.callbacks.TensorBoard(
    log_dir='logs',
    histogram_freq=1,
    write_graph=True,
    write_images=True,
)

In [None]:
# randomize the data

data = list(zip(X_train, Y_train))
random.shuffle(data)

X_train, Y_train = zip(*data)

X_train = np.array(X_train)
Y_train = np.array(Y_train)

In [None]:
# Machine Learning - Build the model

inputs = tf.keras.Input(shape=(data_sen_size_2,))

x = tf.keras.layers.Dense(480, activation='relu')(inputs)
x = tf.keras.layers.Dense(960, activation='relu')(x)
# x = tf.keras.layers.Dense(1440, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
# # x = tf.keras.layers.Dropout(0.1)(x)  # Add Dropout layer with a dropout rate of 0.5
# x = tf.keras.layers.BatchNormalization()(x)  # Add BatchNormalization layer
x = tf.keras.layers.Dense(480, activation='relu')(x)
xy_output = tf.keras.layers.Dense(2, activation='linear', name='xy')(x)
theta_output = tf.keras.layers.Dense(1, activation='linear', name='theta')(x)

model = tf.keras.Model(inputs=inputs, outputs=[xy_output, theta_output])

model.summary()

optimizer = Adam(learning_rate=0.00001)
model.compile(optimizer = optimizer, loss = {'xy': loss_function_xy_huber, 'theta': loss_function_theta_mae}, loss_weights={'xy': 0.5, 'theta': 0.5}, metrics = {'xy': metric_function_xy_euclidean, 'theta': metric_function_theta})

In [None]:
gc.collect()

In [None]:
# Machine Learning - Train the model
predict = model.fit(
    X_train, 
    {
        "xy": Y_train[:, :2], 
        "theta": Y_train[:, 2]
    }, 
    validation_data=(X_val, {"xy": Y_val[:, :2], "theta": Y_val[:, 2]}), 
    epochs=100, 
    batch_size=512, 
    verbose = 1, 
    callbacks=[
        model_checkpoint,
        lr_scheduler,
        early_stopping
    ]
    )

In [None]:
# Extract the loss values from the history object
training_loss = predict.history['loss']
validation_loss = predict.history['val_loss']

# Plotting the loss values
plt.figure(figsize=(10,6))
plt.plot(training_loss, label='Training Loss')
plt.plot(validation_loss, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.show()

In [None]:
# save the model

model.save('./models/Best_models/model.5x6.h5')

In [None]:
# Free up memory
gc.collect()

In [None]:
# Machine Learning - Evaluate the model

losses = model.evaluate(X_train, {'xy': Y_train[:, :2], 'theta': Y_train[:, 2]}, batch_size=512)
print("Mean Squared Error For Train Dataset: ", losses)
gc.collect()
losses = model.evaluate(X_val, {'xy': Y_val[:, :2], 'theta': Y_val[:, 2]}, batch_size=32)
print("Mean Squared Error For Validation Dataset: ", losses)
gc.collect()
losses = model.evaluate(X_test, {'xy': Y_test[:, :2], 'theta': Y_test[:, 2]}, batch_size=32)
print("Mean Squared Error For Test Dataset: ", losses)


In [None]:
gc.collect()

In [None]:
# Machine Learning - Make predictions

y_pred, theta_pred = model.predict(X_test)

y_pred_x = y_pred[:, 0]
y_pred_y = y_pred[:, 1]
y_pred_theta = theta_pred

# Round the predicted values to the nearest integer


# Combine the predicted x and y values into a single array of (x, y) tuples
y_pred = np.stack((y_pred_x, y_pred_y), axis=1)

# Calculate accuracy score
mse = metrics.mean_squared_error(Y_test[:,:2], y_pred)

print("Mean Error for test data (distance): ", math.sqrt(mse), "meters/meters" )

mse = metrics.mean_squared_error(Y_test[:, 2], y_pred_theta)
print("Mean squared error for test data (degrees): ", math.sqrt(mse), "radians/radians" )

print("#############################################################################################################")

y_pred_theta = y_pred_theta.squeeze()

delta_angles = Y_test[:, 2] - y_pred_theta
# Wrap delta_angles to the [-pi, pi] range
wrapped_diff = tf.atan2(tf.sin(delta_angles), tf.cos(delta_angles))
theta_err = tf.reduce_mean(K.abs(wrapped_diff))
print("Mean Error for test data (degrees): ", theta_err, "radians/radians" )

err = tf.reduce_mean(tf.sqrt(tf.square(Y_test[:, 0] - y_pred[:, 0]) + tf.square(Y_test[:, 1] - y_pred[:, 1])))
print("Mean Error for test data (distance): ", err, "meters")

distance = (tf.sqrt(tf.square(Y_test[:, 0] - y_pred[:, 0]) + tf.square(Y_test[:, 1] - y_pred[:, 1])))

fig, ax = plt.subplots()

print(len(distance))

ax.scatter(range(len(distance)), distance)

ax.set_xlabel('Sample')
ax.set_ylabel('Error (meters)')
ax.set_title('Sample Error plot')


# Show lines between adjacent samples
for i in range(len(distance)-1):
    ax.plot([i, i+1], [distance[i], distance[i+1]], )

# Show the plot
plt.show()


In [None]:
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator

# Assuming residuals_x and residuals_y are calculated as shown in the previous example
maxNumber = np.max(distance)
multiple = maxNumber / 10
plt.figure(figsize=(8, 6))
plt.hist(distance, bins=10, color='b', alpha=1, range=(0, maxNumber))
plt.xlabel('Residuals (True - Predicted)')
plt.ylabel('Frequency')
plt.legend()
plt.title('Error Histogram')
# plt.grid(True)
# plt.show()

plt.gca().xaxis.set_major_locator(MultipleLocator(multiple))
plt.gca().yaxis.set_major_locator(MultipleLocator(distance.shape[0]/50))

plt.grid(True)
plt.show()

maxNumber = np.max(distance)
multiple = maxNumber / 10
plt.figure(figsize=(8, 6))
plt.hist(distance, bins=10, color='b', alpha=1, cumulative=True, density=True, range=(0, maxNumber))
plt.xlabel('Residuals (True - Predicted)')
plt.ylabel('Frequency')
plt.legend()
plt.title('CDF Histogram')

plt.gca().xaxis.set_major_locator(MultipleLocator(multiple))
plt.gca().yaxis.set_major_locator(MultipleLocator(0.1))

plt.grid(True)
plt.show()

In [None]:
# Assuming y_true contains the true positions and y_pred contains the predicted positions
# Calculate the R-squared value

y_pred, theta_pred = model.predict(X_test)

y_pred_x = y_pred[:, 0]
y_pred_y = y_pred[:, 1]
y_pred_theta = theta_pred

# Round the predicted values to the nearest integer
# Combine the predicted x and y values into a single array of (x, y) tuples
y_pred = np.stack((y_pred_x, y_pred_y), axis=1)

r_squared = r2_score(Y_test[:, :2], y_pred)

print("R-squared (R^2):", r_squared)

# import matplotlib.pyplot as plt

# # Assuming y_true contains the true positions and y_pred contains the predicted positions
residuals_x = Y_test[:, 0] - y_pred[:, 0]
residuals_y = Y_test[:, 1] - y_pred[:, 1]



In [None]:

y_pred, theta_pred = model.predict(X_test)

x_pred = y_pred[:, 0]
y_pred = y_pred[:, 1]

# Create a scatter plot showing the predicted points in red
# plt.scatter(x_pred, y_pred, color='red', label='Predicted Points')
number_for_plot = int(x_pred.shape[0]/4)

X_train = np.array(X_train)
Y_train = np.array(Y_train)

print(number_for_plot)
# Loop through each point in the test set
for i in range(0 , 32000):
    k = i * 10
    # percentage = (i/number_for_plot)*100
    # print(percentage)
    # Get the x and y coordinates for the real and predicted points
    x_real, y_real, theta_real = Y_test[k]
    x_pred_arrow = x_pred[k]
    y_pred_arrow = y_pred[k]
    plt.scatter(x_real, y_real, color='blue')
    plt.scatter(x_pred_arrow, y_pred_arrow, color='red')
    # Add an arrow from the real point to the predicted point
    # plt.arrow(x_real, y_real, x_pred_arrow - x_real, y_pred_arrow - y_real, 
    #           length_includes_head=True, head_width=0.01, color='green')
    
# Set the x-axis label to "X"
plt.xlabel('X(meters)')

# Set the y-axis label to "Y"
plt.ylabel('Y(meters)')

# Set the title of the plot to "Real vs Predicted Points"
plt.title('Real vs Predicted Points')

# Add a legend to the plot
plt.legend()

# Show the plot
plt.show()