In [None]:
import numpy as np
import csv
import pandas as pd
import os
import torch
import matplotlib.pyplot as plt
import camera_tools as ct

from FableAPI.fable_init import api
api.setup(blocking=True)
moduleids = api.discoverModules()
print("Module IDs: ", moduleids)
moduleID = moduleids[0]
print("Battery level:",api.getBattery(moduleID),"%")

In [None]:
#Calibrate the camera to detect green box, if you haven't done this calibration before
low_green, high_green = ct.colorpicker()
print(low_green)
print(high_green)
#Check whether the camera detects the green object properly
cam = ct.prepare_camera()
img = ct.capture_image(cam)
ct.show_camera(cam)

In [None]:
cam = ct.prepare_camera()
img = ct.capture_image(cam)
ct.show_camera(cam)

In [None]:
#Create two files (if they were not already created) to collect data.
if (not os.path.exists("xyCoords_2.csv")):
    f = open('xyCoords_2.csv', 'w')
    with f:
        writer = csv.writer(f)
        writer.writerows([["X","Y"]])
    f.close()
if (not os.path.exists("angles_2.csv")):
    f = open('angles_2.csv','w')
    with f:
        writer = csv.writer(f)
        writer.writerows([["Y_angle"]])
    f.close()

#we use the collectData function to collect the data for training.
    #we collect Y (angular) position of the end effector
    #we collect x,y coordinates of the end effector in the camera image
    #see the video final_project_guidance.mp4
def collectData(desired_angle_change):
    cam = ct.prepare_camera()
    ct.show_camera(cam)
    cam.release()
    cam = ct.prepare_camera()
    api.setPos(-90,90,moduleID)
    api.sleep(1.5)
    Y_angle_list = []
    XY_coordinates_list = []
    traversed_directions = 0
    current_direction = 0     # 1 is for clockwise, 0 is for anticlockwise
    traversedDirections = 0
    num_of_iterations = (int)(np.round(360/desired_angle_change,1))
    #Fable's Y arm traverse anticlockwise
    if current_direction == 0:
        for i in range(num_of_iterations):
            img = ct.capture_image(cam)
            x,y = ct.locate(img)
            currentRobotYAng = (int)(np.round(api.getPos(1,moduleID),1))
            Y_angle_list.append([currentRobotYAng])
            XY_coordinates_list.append([x,y])
            currentRobotYAng = currentRobotYAng - (desired_angle_change)
            api.setPos(-90,currentRobotYAng,moduleID)
            api.sleep(1.5)
            if np.abs(currentRobotYAng) > 90:
                current_direction = 1
                traversedDirections = traversedDirections + 1
                break
    #Fable's Y Arm traverses clockwise
    if current_direction == 1:
        for i in range(num_of_iterations):
            img = ct.capture_image(cam)
            x,y = ct.locate(img)
            currentRobotYAng = (int)(np.round(api.getPos(1,moduleID),1))
            Y_angle_list.append([currentRobotYAng])
            XY_coordinates_list.append([x,y])
            currentRobotYAng = currentRobotYAng + (desired_angle_change)
            api.setPos(-90,currentRobotYAng,moduleID)
            api.sleep(1.5)
            if np.abs(currentRobotYAng) > 90:
                current_direction = 0
                traversedDirections = traversedDirections + 1
                break
    if traversedDirections == 2:
        cam.release()
        #Save collected data to files
        y_angle_file_ptr = open('angles_2.csv', 'a+', newline ='')
        with y_angle_file_ptr:
            writer = csv.writer(y_angle_file_ptr)
            writer.writerows(Y_angle_list)
        y_angle_file_ptr.close()
        file_xycoords = open('xyCoords_2.csv', 'a+', newline ='')
        with file_xycoords:
            writer = csv.writer(file_xycoords)
            writer.writerows(XY_coordinates_list)
        file_xycoords.close()
    return

In [None]:
api.setPos(-90,90,moduleID)

In [None]:
#TODO: Call the CollectData() function with different values for 'desired_angle_change' argument and collect sufficient
# sufficient amout 
# of data to angles_2.csv file and xycoords_2.csv file.
# angles = [1, 2, 3, 5, 7, 10, 13, 15, 17, 19, 30, 45, 50, 70, 80]
# for el in angles:
collectData(90)

In [None]:
#Use this function to calculate Y (angular) position errors 
# Y (angular) positions
def readAngleFilesAndCollectErrors(fileName):
    angleDataFrame = pd.read_csv(fileName)
    desired_column = angleDataFrame.columns[0]
    angle_error_list = []
    for i in range(1,len(angleDataFrame)):
        current_data_element = angleDataFrame[desired_column][i]
        previous_data_element = angleDataFrame[desired_column][i-1]
        #TODO: Calculate the Y (angular) position error(say current_error) as the difference between 
        # the current Y (angular) position and previous Y (angular) position
        current_error = current_data_element - previous_data_element
        #print(current_data_element," ", previous_data_element, " ", current_error)
        angle_error_list = angle_error_list + [current_error]
    return angle_error_list

#Use this function to calculate errors between x and y coordinates
def readXYCoordsFilesAndCollectErrors(fileName):
    xyCoordsDataFrame = pd.read_csv(fileName)
    x_col_name = xyCoordsDataFrame.columns[0]
    y_col_name = xyCoordsDataFrame.columns[1]
    x_pos_error_list = []
    y_pos_error_list = []
    for i in range(1,len(xyCoordsDataFrame)):
        current_data_element_x = xyCoordsDataFrame[x_col_name][i]
        current_data_element_y = xyCoordsDataFrame[y_col_name][i]
        previous_data_element_x = xyCoordsDataFrame[x_col_name][i-1]
        previous_data_element_y = xyCoordsDataFrame[y_col_name][i-1]
        error_x_pos = current_data_element_x - previous_data_element_x 
        error_y_pos = current_data_element_y - previous_data_element_y
        x_pos_error_list = x_pos_error_list + [error_x_pos]
        y_pos_error_list = y_pos_error_list + [error_y_pos] 
    return [x_pos_error_list,y_pos_error_list]

In [None]:
#Read the provided files and load the calculated errors into to appropriate lists as follows
angle_error_list_1 = readAngleFilesAndCollectErrors("angles_1.csv")
x_pos_error_list_1,y_pos_error_list_1 = readXYCoordsFilesAndCollectErrors("xyCoords_1.csv")

#merge both angle_error_list_1 and angle_error_list_2 to a single list and make it a numpy array called 'angle_error_array'
angle_error_array = np.array(angle_error_list_1)
#merge both x_pos_error_list_1 and x_pos_error_list_2 to a single list and make it a numpy array called 'x_coord_error_array'
x_coord_error_array = np.array(x_pos_error_list_1)
#merge both y_pos_error_list_1 and y_pos_error_list_2 to a single list and make it a numpy array called 'y_coord_error_array'
y_coord_error_array = np.array(y_pos_error_list_1)

In [None]:
# Here we use 80% of the collected data  as the training set and 20% of the collected data as test set.
#TODO: Assign different propotions of the collected data set and test set and check how the test set error varies of the 
#Neural Network
data =  np.vstack((x_coord_error_array,y_coord_error_array)).T
target = np.vstack(angle_error_array)
data_input_tensor = torch.tensor(data.tolist()).float()
data_target_tensor = torch.tensor(target.tolist()).float()
data_with_target = torch.cat((data_input_tensor,data_target_tensor),1)
#TODO: what is the importance of using DataLoader utility function here?
loader= torch.utils.data.DataLoader(data_with_target,
                                     batch_size=data_with_target.size()[0], shuffle=True,
                                     num_workers=0)
#training set
train_set = []
#test set
test_set = []
for i in iter(loader):
    train_set_index = (int)(np.round(i.shape[0]*0.8))
    train_set = i[:train_set_index,:]
    test_set = i[train_set_index:,:]

print(train_set.shape)
print(test_set.shape)

In [None]:
#Defining Neural Network Model
class NN(torch.nn.Module):
    def __init__(self,n_feature,n_hidden1,n_hidden2,n_output):
        super(NN,self).__init__()
        self.hidden1 = torch.nn.Linear(n_feature,n_hidden1)
        # self.do1 = torch.nn.Dropout(0.15)
        #self.relu1 = torch.nn.LeakyReLU()
        #self.bn1 = torch.nn.BatchNorm1d(n_hidden1,affine=False)
        self.hidden2 = torch.nn.Linear(n_hidden1,n_hidden2)
        #self.bn2 = torch.nn.BatchNorm1d(n_hidden2,affine=False)
        #self.relu2 = torch.nn.LeakyReLU()
        # self.do2 = torch.nn.Dropout(0.1)
        self.predict = torch.nn.Linear(n_hidden2,n_output)
        
        
    def forward(self,x):
        x = self.hidden1(x)
        x = torch.sigmoid(x)
        #x = self.do1(x)
        x = self.hidden2(x)
        x = torch.sigmoid(x)
        #x = self.do2(x)
        x = self.predict(x)
        return x

In [None]:
train_set_inputs = train_set[:,:2]
#TODO: calculate the mean value of the train_set_inputs.
mean_of_train_input = torch.mean(train_set_inputs,0)
#standard deviation of the train set inputs.
std_of_the_train_input = torch.std(train_set_inputs,0)
#here we normalize the inputs of the neural network. What is the importance of that?
normalized_train_set_inputs = (train_set_inputs - mean_of_train_input)/std_of_the_train_input
#targets of the training set
train_set_targets = train_set[:,2][:,np.newaxis]
print(normalized_train_set_inputs.shape)
print(train_set_targets.shape)

In [None]:
#instantiate the Neural Network
# model = NN(n_feature=2,n_hidden1=17,n_hidden2=7, n_output=1)
# model = NN(n_feature=2,n_hidden1=31,n_hidden2=13, n_output=1)

n_hidden1 = 17
n_hidden2 = 7
lr = 0.15
model = NN(n_feature=2,n_hidden1=n_hidden1,n_hidden2=n_hidden2, n_output=1)
# model = NN(n_feature=2,n_hidden1=70,n_hidden2=49, n_output=1)


#Define loss function : 
# here we use Mean Square Error as the loss function
loss_func = torch.nn.MSELoss()

#Define the optimizer that should be used in training the Neural Network.
# Here 'lr' is the learning rate
optimizer = torch.optim.Adam(model.parameters(),lr=lr)

#TODO: train the Neural network model by changing the hyper parameters such as learning rate, number of epochs, number of neurons in hidden layers of the neural network.
# What is the minimum mean square error that you can achieve as your neural network converges for the training set.
#  (you will be able to achive a MSE of less than 10 as the Neural network converges.)
num_epochs = 20000
losslist = []
for _ in range(num_epochs):
    prediction = model(normalized_train_set_inputs) # Forward pass prediction. Saves intermediary values required for backwards pass
    loss = loss_func(prediction, train_set_targets) # Computes the loss for each example, using the loss function defined above
    optimizer.zero_grad() # Clears gradients from previous iteration
    loss.backward() # Backpropagation of errors through the network
    optimizer.step() # Updating weights
    # print("prediction =",prediction)
    print("Loss: ", loss.detach().numpy())
    losslist.append(loss.detach().numpy())

In [None]:
#plot the mean square error in each epoch/iteration
%matplotlib qt
plt.plot(np.arange(len(losslist)),losslist)
plt.show()

In [None]:
#save the best neural network model you have obtained.
torch.save(model.state_dict(), 'best_nn_model_DEMO.pth')
torch.save(n_hidden1, 'best_nn_hidden1_DEMO.pth')
torch.save(n_hidden2, 'best_nn_hidden2_DEMO.pth')
#Save the mean and standard deviation of the train set inputs because we need to use them at test time.
torch.save(mean_of_train_input, 'best_mean_DEMO.pth')
torch.save(std_of_the_train_input, 'best_std_DEMO.pth')

In [None]:
#reload the your best neural network model with saved parameters
n_hidden1 = torch.load('best_nn_hidden.pth')
n_hidden2 = torch.load('best_nn_hidden22.pth')
lr = torch.load('best_nn_lr.pth')
NN_model = NN(n_feature=2,n_hidden1=n_hidden1,n_hidden2=n_hidden2, n_output=1)
NN_model.load_state_dict(torch.load('best_nn_model1.pth'))
#TODO: Extract inputs of the test_set
test_set_inputs = test_set[:,:2]
#TODO: Extract test set targets from the test_set
test_set_targets = test_set[:,2][:,np.newaxis]
#TODO: Normalize test set inputs by using the mean and standard deviation of the inputs of the training set
mean_training_inputs = torch.load('best_mean1.pth')
std_training_inputs = torch.load('best_std1.pth')
normalized_test_set_inputs = (test_set_inputs - mean_training_inputs)/std_training_inputs
#TODO: feed the normalized test set inputs to the Neural Network model and obtain the prediction for the test set.
prediction_test = NN_model(normalized_test_set_inputs)
print(prediction_test.shape)

In [None]:
#plot the prediction error of the test set
test_set_prediction_error = prediction_test - test_set_targets
plt.plot(np.arange(len(test_set_prediction_error.tolist())),test_set_prediction_error.tolist())

In [None]:
# In the example model trained with about 600 data, from a test set of 165 samples,
# 159 samples are predicted with prediction error less than 10.

In [None]:
#TODO: Based on the prediction error of the test set, you can try to train the neural network again by changing the hyper parameters mentioned above.
# Also Try to add Dropout layers to the Neural network and check whether test prediction errors can be reduced further.

Usefull functions

In [None]:
# Grab an image and locate the largest green object:
def getPos():
    cam = ct.prepare_camera()
    while True:
        img = ct.capture_image(cam)
        x, y = ct.locate(img)
        if x is not None:
            break
    # print("Now the camera is done adjusting!")
    X,Y = [],[] 
    for _ in range(10):
        img = ct.capture_image(cam)
        x, y = ct.locate(img)
        # print(x, y)
        X.append(x)
        Y.append(y)
    cam.release()
    X = np.rint(np.mean(np.asarray(X))).astype(int)
    Y = np.rint(np.mean(np.asarray(Y))).astype(int)
    return (X,Y)

# DETECTING THE OBJECT AT TESTING PHASE

In [None]:
#Here we implement the control loop which is having Neural Network as the controller.
#In this case we donot integrate CMAC to the control loop
def ControlLoopWithNNWithoutCMAC(target__x_coordinate,target__y_coordinate):
    
    number_of_iterations_for_convergence = 0
    #TODO:Intialize your best neural network model and load the saved paramemeters
    NN_model = NN(n_feature=2,n_hidden1=n_hidden1,n_hidden2=n_hidden2, n_output=1)
    NN_model.load_state_dict(torch.load('best_nn_model1.pth'))
    mean_training_inputs = torch.load('best_mean1.pth')
    std_training_inputs = torch.load('best_std1.pth')

    #TODO: Obtain the x and y coodinates of the green box placed on the end effector of the robot
    robot_current_X_pos, robot_current_Y_pos = getPos()
    # Here we loop for 50 iterations assuming that 
    # the controller should achieve the desired target within atmost 50 iterations
    err_treshold = 20
    for i in range(10):
        print("Curr_pos: ",robot_current_X_pos, robot_current_Y_pos)
        x_coord_error = target__x_coordinate - robot_current_X_pos
        y_coord_error = target__y_coordinate - robot_current_Y_pos
        print("Error_y: ",y_coord_error)
        #Here if the errors are less than twenty pixels we assume robot reaches the target. 
        # However you can choose any reasonable threshold value instead of 20.
        if (np.abs(x_coord_error) < err_treshold and np.abs(y_coord_error) < err_treshold):
            print("Number of iterations for convergence = ", number_of_iterations_for_convergence)
            break

        # Normalize the input to the Neural network model using meaning and variance of the training set inputs
        xy_input_nn_model = torch.tensor([x_coord_error,y_coord_error]).float()
        normalize_xy_input_nn_model = (xy_input_nn_model - mean_training_inputs)/std_training_inputs
        # Predict 
        prediction_for_Y_pos_increment = NN_model(normalize_xy_input_nn_model)
        print("Increment_y: ",prediction_for_Y_pos_increment[0])
        # Get current motorY angle
        currAngY = api.getPos('Y',moduleID)
        print("Curr_angleY: ",currAngY)
        # Set the next position of the robot to (-90,robot_next_Y_pos) using the setPos function of the fable.
        api.setPos(-90,currAngY - prediction_for_Y_pos_increment, moduleID)
        api.sleep(1.5)
        print("AngleY after: ",api.getPos('Y',moduleID))
        # Get current position of the robot in the camera frame
        robot_current_X_pos, robot_current_Y_pos = getPos()
        number_of_iterations_for_convergence = number_of_iterations_for_convergence + 1
    return number_of_iterations_for_convergence

In [None]:
#TODO: Detect the target object and obtain the coordinates of the object in the image
api.setPos(-90,0,moduleID)
# cam = ct.prepare_camera()
# img = ct.capture_image(cam)
# ct.show_camera(cam)
# target_x, target_y = getPos()
# print(target_x,target_y)

In [None]:
#TODO: Call the control loop for a target which is detected. Record the number of iterations that the control loop spent for convergence.
iteretionList = []
# iteretionList.append(ControlLoopWithNNWithoutCMAC(torch.tensor(target_x).float(),torch.tensor(target_y).float()))
iteretionList.append(ControlLoopWithNNWithoutCMAC(target_x,target_y))

#TODO: change your target location and try again. You may change the target 4-5 times and check how the control loop work.
#Record the number of iterations that the control loop spent for convergence.

In [None]:
# Now integrate the CMAC to the previous control loop which had only the Neural Network. 
#The implementation of the CMAC can be found in code given for second week exercises.
#TODO: Implement the control loop with both neural network and CMAC. 
from cmac2 import CMAC
# Initialize CMAC
n_rfs = 3
xmin = [172,172]
xmax = [485,485]
cmac = CMAC(n_rfs, xmin, xmax, 0.1)
def ControlLoopWithBothNNandCMAC(target__x_coordinate,target__y_coordinate):
    number_of_iterations_for_convergence = 0
    
    # Initialize NN    
    NN_model = NN(n_feature=2,n_hidden1=n_hidden1,n_hidden2=n_hidden2, n_output=1)
    NN_model.load_state_dict(torch.load('best_nn_model.pth'))
    mean_training_inputs = torch.load('best_mean.pth')
    std_training_inputs = torch.load('best_std.pth')
    # Initialize CMAC
    n_rfs = 11
    xmin = [-np.pi,-np.pi]
    xmax = [np.pi,np.pi]
    cmac = CMAC(n_rfs, xmin, xmax, 1e-3)
    # Get x and y coodinates of the green box placed on the end effector of the robot
    robot_current_X_pos, robot_current_Y_pos = getPos()
    
    # Here we loop for 50 iterations assuming that 
    # the controller should achieve the desired target within atmost 50 iterations
    err_treshold = 20
    for i in range(50):
        print("Curr_pos: ",robot_current_X_pos, robot_current_Y_pos)
        x_coord_error = target__x_coordinate - robot_current_X_pos
        y_coord_error = target__y_coordinate - robot_current_Y_pos
        print("Error_y: ",y_coord_error)
        if (np.abs(x_coord_error) < err_treshold and np.abs(y_coord_error) < err_treshold):
            print("Number of iterations for convergence = ", number_of_iterations_for_convergence)
            break
        xy_input_nn_model = [x_coord_error,y_coord_error]
        # Normalize the input to the Neural network model using meaning and variance of the training set inputs.
        normalize_xy_input_nn_model = (xy_input_nn_model - mean_training_inputs)/std_training_inputs
        deltaY_nn = NN_model(normalize_xy_input_nn_model)
        print("DeltaY_nn: ",deltaY_nn)
        # Update CMAC 
        deltaY_cmac = cmac.predict([robot_current_Y_pos, target__y_coordinate])
        print("DeltaY_cmac: ",deltaY_cmac)
        cmac.learn(deltaY_nn)
        # Get current motorY angle
        currAngY = api.getPos('Y',moduleID)
        print("Curr_angleY: ",currAngY)
        # Set new angleY
        robot_next_Y_pos = deltaY_nn + deltaY_cmac + currAngY
        api.setPos(-90,robot_next_Y_pos, moduleID)
        api.sleep(1.5)
        print("AngleY after: ",api.getPos('Y',moduleID))
        # Get current position of the robot in the camera frame
        robot_current_X_pos, robot_current_Y_pos = getPos()
        number_of_iterations_for_convergence = number_of_iterations_for_convergence + 1
    return number_of_iterations_for_convergence

In [None]:
#TODO: Compare the number of iteration it takes for convergence in the control loop with 
# neural network only and with both CMAC and neural network.