Imports

In [1]:
import math
import time
import sys
import ast
import openai
import tdmclient.notebook
import tdmclient.tools
from threading import Thread, Lock
from object_detection import detect_objects, detection
from copy import deepcopy

Connect Robot, Thymio Suite must be open and thymio must be connected to pc!

In [2]:
await tdmclient.notebook.start()

## Code that will run on the Thymio

In [3]:
%%run_python

# State 0, detect foward colisions and stop
# State 1, detect backwards colisions and stop
# State 2, Tracking or pushing object - colisions are ignored

state = 0

@onevent
def move(distance):
    global motor_left_target, motor_right_target, timer_period, state
    if state == 2:
        state = 0
    if distance > 0:
        timer_period[0] = 160 * distance
        motor_left_target = 200
        motor_right_target = 200
    if distance < 0:
        timer_period[0] = 160 * -distance
        motor_left_target = -200
        motor_right_target = -200
    if distance == 0:
        timer_period[0] = 100
    emit("motion", 1)

@onevent
def rotate(angle):
    global motor_left_target, motor_right_target, timer_period, state
    if state == 2:
        state = 0
    if angle > 0:
        timer_period[0] = angle * 26
        motor_left_target = -100
        motor_right_target = 100
    if angle < 0:
        timer_period[0] = -angle * 26
        motor_left_target = 100
        motor_right_target = -100
    if angle == 0:
        timer_period[0] = 100
    emit("motion", 1)


@onevent
def prox():
    global motor_left_target, motor_right_target, prox_horizontal, state
    #if state == 0:
    #    for i in range(5):
    #        if prox_horizontal[i] > 2000:
    #            state = 1
    #            timer_period[0] = 10000
    #            motor_left_target = 0
    #            motor_right_target = 0
    #            emit("problem", 0)
    if state == 2:
        motor_right_target = (prox_horizontal[0] + prox_horizontal[1]) // 20
        motor_left_target = (prox_horizontal[3] + prox_horizontal[4]) // 20


@onevent
def turn(right_speed, left_speed, timer):
    global motor_left_target, motor_right_target, timer_period, state
    if state == 2:
        state = 0
    motor_left_target = left_speed
    motor_right_target = right_speed
    if timer == 0:
        timer_period[0] = 100
    else:
        timer_period[0] = timer
    emit("motion", 1)


@onevent
def push(timer):
    global state
    state = 2
    timer_period[0] = timer * 1000

@onevent
def timer0():
    global motor_left_target, motor_right_target, state
    if state == 0 or state == 2 or state == 3:
        timer_period[0] = 0
        motor_left_target = 0
        motor_right_target = 0
        emit("done", 0)
        state = 0
    if state == 1:
        state = 0
    
    
@onevent
def tracking():
    global state
    state = 3
    
    
@onevent
def senddone():
    global state
    emit("done",0)


## Code to run on computer

Controllers and logic auxiliar to the main loop

In [4]:
def detectUniqueObject(detection, mutex_detection, shared_obj_arr: list[str], mutex_arr):
    """Detects unique objects from the detected objects, and saves them in shared memory

    Args:
        detection: The detected objects.
        mutex_detection: A mutex for accessing the detected objects.
        shared_obj_arr (list[str]): A shared array to hold the unique objects.
        mutex_arr: A mutex for accessing the shared array.
    """
    
    while True:
         
        mutex_detection.acquire()
        obj_detect = deepcopy(detection[0])
        mutex_detection.release()
        
        if obj_detect is not None:
            for cat in obj_detect.categories:
                mutex_arr.acquire()
                if cat[0] not in shared_obj_arr[0]:
                    shared_obj_arr[0].append(cat[0])
                mutex_arr.release()
                
        time.sleep(0.05)

def check_for_object(detection, mutex, category):
    """Checks for the presence of a specific object in the detected objects.

    Args:
        detection: The detected objects.
        mutex: A mutex for accessing the detected objects.
        category: The category of the object to check.

    Returns:
        bool: True if the object is detected, False otherwise.
    """

    for _ in range(20):
        mutex.acquire()
        obj = detection[0]
        mutex.release()
        if obj is not None:
            for cat in obj.categories:
                if cat[0] == category:
                    return True
        time.sleep(0.1)            
    
    return False


class PID:
    """A class that implements a simple PID controller.

    Attributes:
        kp (float): The proportional gain.
        ki (float): The integral gain.
        kd (float): The derivative gain.
        integral (float): The integral term.
        last_value (float): The last controlled value.
        last_error (float): The last error value.
        last_update (int): The timestamp of the last update.
        max_integral (int): The maximum allowable integral term.
    """
    
    def __init__(self, kp, ki, kd) -> None:
        self.kp = kp
        self.ki = ki
        self.kd = kd
        
        self.integral = 0
        self.last_value = 0
        self.last_error = 0
        self.last_update = time.time_ns()
        self.max_integral = 100
        
    def compute(self, setpoint, value):
        """Computes the PID control output.

        Args:
            setpoint (float): The desired setpoint.
            value (float): The current value.

        Returns:
            float: The control output.
        """
        
        now = time.time()
        dt = now - self.last_update
        
        # compute error
        error = setpoint - value
        
        # compute integral and derivative terms
        self.integral += error * dt
        if self.integral > self.max_integral:
            self.integral = self.max_integral
            
        derivative = (error - self.last_error) / dt
        
        output = self.kp * error + self.ki * self.integral + self.kd * derivative
        
        self.last_error = error
        self.last_update = now
        
        return output

def calculate_turn(radius, angle):
    """Calculates the turn of the robot based on the radius and angle.

    Args:
        radius (float): The radius of the turn.
        angle (float): The angle of the turn in degrees.

    Returns:
        tuple: The calculated motor target speeds and the expected duration of the turn.
    """
    wheel_distance = 10.1
    velocity = 5.8
    
    radian_angle = math.radians(angle)  # Convert angle from degrees to radians
    arc_length = radius * abs(radian_angle)  # Calculate the arc length the robot needs to travel

    # Calculate the target velocities for each wheel
    if angle == 0:
        motor_left_target = motor_right_target = velocity
    else:
        motor_right_target = velocity * (1 + wheel_distance / (2 * radius)) if angle > 0 else velocity * (1 - wheel_distance / (2 * radius))
        motor_left_target = velocity * (1 - wheel_distance / (2 * radius)) if angle > 0 else velocity * (1 + wheel_distance / (2 * radius))

    return to_int(motor_left_target * 100/3.2), to_int(motor_right_target * 100/3.2), abs(to_int((arc_length / velocity) * 1000))


def get_expected_time(event_name, args):
    """Gets the expected time for an event based on its name and arguments.

    Args:
        event_name (str): The name of the event.
        args (list): The arguments of the event.

    Returns:
        float: The expected time for the event.
    """
    match event_name:
        case "move":
            return 0.16 * args[0]
        case "rotate":
            return 0.026 * args[0]

def clamp(value, min_value, max_value):
    return max(min(value, max_value), min_value)

# Event handling exceptions

class Done(Exception):
    """An exception that is raised when a task is done."""
    pass

class Problem(Exception):
    """An exception that is raised when a problem occurs."""
    pass

class EventTimeout(Exception):
    """An exception that is raised when an event times out."""
    pass

class ObjectNotFound(Exception):
    """An exception that is raised when an object is not found."""
    pass

# In case the robot fails to receive an event (happens alot :( ) raise this expection
def timeout_handler(sig, NULL):
    """A handler function for a timeout signal.

    Args:
        sig: The signal number.
        NULL: Not used.

    Raises:
        EventTimeout: An exception indicating that a timeout has occurred.
    """
    raise EventTimeout

def parse_input(task_str):
    """Parses an input string into a list of actions.

    Args:
        task_str (str): The input string.

    Returns:
        list: A list of actions.
    """
    actions = ast.literal_eval(task_str)
    return actions

def to_int(str):
    """Converts a string to an integer, rounding it if necessary.

    Args:
        str (str): The string to convert.

    Returns:
        int: The converted integer.
    """
    return int(round(float(str)))
    

Unchecked Raise(exc=Name(id='EventTimeout', ctx=Load()))


ChatGPT Integration

In [5]:
class ChatApp:
    """A class to represent a chat interface for robot navigation.
    
    If manual mode is enabled, responses must be copy pasted into input.

    Attributes:
        OBJECT_NOT_FOUND (str): A string to show when an object was not found.
        PROBLEM (str): A string to show when a problem is encountered.
        messages (list): A list to hold the chat messages.
        manual_mode (bool): A boolean to decide if manual mode is enabled or not.
    """
    
    OBJECT_NOT_FOUND = """The object you were trying to follow was not in front of you.
    Please give the robot new movement instructions in the form of a list. Include the previous
    follow instruction at the end please.
    """
    
    PROBLEM = """The robot encountered an obstacle in front of it.
    Please give the robot new movement instructions to avoid the obstacle in front of it.
    
    You can give a list of actions for the robot to perform, for example:
    [("turn",10,90),("rotate",45),("move",-10)]
    
    The robot will resume the previous actions afterwards.
    """
    
    
    messages = []
    
    manual_mode = False
    
    
    def __init__(self, manual = False):
        with open("apikey", "r") as file:
            key = file.read()
        openai.api_key = key
        
        if manual:
            self.manual_mode = True
    
    def begin(self):
        """Starts a chat session and gets a response from the chatbot based on the provided message.

        Returns:
            str: The response from the chatbot.
        """
        
        message = [
        {"role": "user", "content": 
         """Imagine you are a two wheeled robot with a wheelabase of 10 cm that can move in a XY plane,
          you move through a differential drive system. The main functions you can use
          to move arround are:
          move(x: int): Moves the robot linearly either x cm to the front or x cm backwards.
          rotate(angle: int): Rotates the robot by a number of degrees without changing the XY position. Positive angles are to the right.
          turn(radius where radius > 5, angle): Performs a turn with the given radius and with the correspoding arc to the angle. Radius cannot be negative and must greater than 5. Positive angles are right turns
          follow(object): Follows the object if it is in front of the robot.
          push(time: int): Pushes an object directly in front of the robot. Time is given in seconds.
          whereiam(none): Asks ChatGPT where the robot is based on a list of unique objects found, must include none as an argument
          
          You can give a list of actions for the robot to perform, for example:
          [("turn",10,90),("rotate",45),("move",-10)]

          Only respond in these lists

          Here is your task:

          """  + input() + """\nONLY RESPOND IN LIST OF TASKS""",
          }
        ]
        
        if self.manual_mode:
            reponse = input("Please insert ChatGPT response here")
            return reponse
        else:
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages= message
            )

            self.messages.append(message[0])
            self.messages.append({"role": "assistant", "content": response["choices"][0]["message"].content})

            print(response["choices"][0]["message"].content)

            return response["choices"][0]["message"].content
    
    def __return_nice_list(self, objects):
        """Formats a list of objects into a string.

        Args:
            objects (list): A list of objects to format.

        Returns:
            str: A formatted string of the objects.
        """

        objs = ""
        for o in objects:
            objs += str(o) + ", "
        return str(objs)

    def ask_where(self, list_objects):
        """Asks the chatbot about the possible location of the robot based on a list of found objects.

        Args:
            list_objects (list): A list of objects found by the robot.
        """
        
        ASK_WHERE = [
        {"role": "user", "content": 
         f"""Consider that a small two wheeled robot just did a patrol arround a room, and
          the following unique objects were found in the room:
          
          {self. __return_nice_list(list_objects)}

          In what kind of room or place could the robot be in?""",
          }
        ]        
        
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages= ASK_WHERE
        )

        
        print(response["choices"][0]["message"].content)


        
    def chat(self, message):
        """Sends a message to the chatbot and gets a response.

        Args:
            message (str): The message to send to the chatbot.

        Returns:
            str: The response from the chatbot.
        """

        if self.manual_mode:
            reponse = input("Please insert ChatGPT response here")
            return reponse
        else:
            self.messages.append({"role": "user", "content": message})
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=self.messages
            )
            self.messages.append({"role": "assistant", "content": response["choices"][0]["message"].content})
            return response["choices"][0]["message"].content

Robot main logic

In [6]:
import signal

@tdmclient.notebook.sync_var
def update_speed(left, right):
    """Updates the speed of the robot's motors.
    
    Args:
        left (int): Desired speed of the left motor.
        right (int): Desired speed of the right motor.
    """
    set_var(motor_left_target = left, motor_right_target = right)

@tdmclient.notebook.sync_var
def get_current_speed():
    """Gets the current speed of the robot's motors.
    
    Returns:
        tuple: Current speeds of left and right motors.
    """
    return motor_left_target, motor_right_target

@tdmclient.notebook.sync_var
def get_prox():
    """Gets the robot's current proximity reading.
    
    Returns:
        int: Current proximity reading of the robot.
    """
    return prox_horizontal

@tdmclient.notebook.sync_var
def set_timer(time, i=0):
    """Sets a timer for a certain amount of time.
    
    Args:
        time (int): Time (in seconds) to set the timer for.
        i (int, optional): A parameter that is not used in this function.
    """
    timer_period = time * 1000
    set_var(timer_period = timer_period)


# Initialize global variables
DETECTED_OBJECTS = [None]
MUTEX = Lock()
UNIQUE_OBJECTS = [[]]
MUTEX_ARR = Lock()
RobotGPT = ChatApp()

@tdmclient.notebook.sync_var
def object_tracking(object_category: str):
    """Performs object tracking based on object_category.

    Args:
        object_category (str): Category of object to track.
    """

    # Max turn speed
    max_speed = 50
    min_speed = -50

    # PID Controller
    pid = PID(0.1,0,0.1)
    
    # Event timeout 
    timer = 0
    
    # Going towards an object flag
    towards = False

    while True:
        MUTEX.acquire()
        detect_ref = DETECTED_OBJECTS[0]
        if detect_ref is not None:
            detect = deepcopy(detect_ref)
        else:
            detect = None
        DETECTED_OBJECTS[0] = None
        MUTEX.release()
        
        left_speed, right_speed = get_current_speed()                     
        
        if detect is not None:
            # Get the current object position (x, y) from the camera
            object_x, x_left, x_right = detect.get_position_of_object(object_category)
            if object_x is not None:
                x_size = x_right - x_left
                obj_size = x_size / 480

                # Apply proportional control
                speed_adjustment = pid.compute((480//2 - 65), object_x)
            
                if abs(speed_adjustment) < 5 + 10 * obj_size and not towards:
                    towards = True
                    update_speed(200,200)
                    time.sleep(0.1)
                elif towards:
                    if abs(speed_adjustment) > 5 + 10 * obj_size:
                        towards = False
                        update_speed(0,0)
                        time.sleep(0.1)
                    else:
                        time.sleep(0.1)
                else:                
                    # Update the wheel speeds based on the control algorithm
                    new_left_speed = to_int(clamp(left_speed - speed_adjustment, min_speed, max_speed))
                    new_right_speed = to_int(clamp(right_speed + speed_adjustment, min_speed, max_speed))

                    update_speed(new_left_speed, new_right_speed)
                    
                    time.sleep(0.1)
                timer = 0
            else:
                timer += 1
                if timer == 25:
                    towards = False
                    timer = 0
                    update_speed(0,0)
                    break
                time.sleep(0.1)
        else:
            timer += 1
            if timer == 25:
                towards = False
                timer = 0
                update_speed(0,0)
                break
            time.sleep(0.1)
        # Sleep for a while to let the robot move and adjust its position
    return "Done"
        

def pop_and_send(event_list, past_events) -> bool:
    """Sends event commands to the robot, pops events off of the list, 
    and handles events based on their names.

    Args:
        event_list (list): List of events to be processed.
        past_events (list): List of events that have been processed.

    Returns:
        bool: True if there are events left in the list, False otherwise.
    """
    
    if len(event_list) > 0:
        event = event_list.pop(0)
        past_events.append(event)
        event_name = event[0]
        print(event_name)
        match event_name:
            case "turn":
                right_speed, left_speed, timer = calculate_turn(event[1], event[2])
                expected_time = timer / 1000
                print(right_speed, left_speed, timer)
                send_event("turn", right_speed, left_speed, timer)
                print("Sent turn event")
            case "move":
                expected_time = get_expected_time(event[0], event[1:])
                send_event(event[0], to_int(event[1]))
                print("Sent move event")
            case "rotate":
                expected_time = get_expected_time(event[0], event[1:])
                send_event(event[0], to_int(event[1]))
                print("Sent rotate event")
            case "follow":
                expected_time = 0
                object_name = event[1]
                if check_for_object(DETECTED_OBJECTS, MUTEX, object_name):
                    send_event("tracking")
                    object_tracking(object_name)
                    send_event("senddone")
                else:
                    raise ObjectNotFound
            case "whereiam":
                expected_time = 0
                MUTEX_ARR.acquire()
                un_objects = UNIQUE_OBJECTS[0]
                MUTEX_ARR.release()
                print(un_objects)
                print(RobotGPT.ask_where(un_objects))
                send_event("senddone")
            case "push":
                expected_time = event[1] * 100
                send_event("push", event[1])
                print("Sent push event")
            case _:
                expected_time = get_expected_time(event[0], event[1:])
                send_event(event[0], *event[1:])
                print("Sent event")
        # Set an alarm with the aproximate time the robot will take to complete
        # the action plus a 1 second delay
        if expected_time != 0:
            signal.alarm(int(expected_time + 2))
        return True
    else:
        return False

def execute_actions(actions): 
    """Executes a series of actions. 

    Args:
        actions (list): List of actions to be executed.

    Returns:
        tuple: Status message and any remaining actions.
    """
    
    # Define handler for the alarm
    signal.signal(signal.SIGALRM, timeout_handler)
    event_list = actions.copy()
    past_events = []

    def on_event_data(node, event_name):
        """Handles event data received from the robot.

        Args:
            node (str): The node from which the event data was received.
            event_name (str): The name of the event.

        Raises:
            Done: An exception indicating that all actions have been completed.
            Problem: An exception indicating that the robot encountered a problem.
            Exception: An exception for unknown events.
        """
        data = get_event_data(event_name)
        if event_name == "done":
            clear_event_data(event_name)
            success = pop_and_send(event_list, past_events)
            if not success:
                signal.alarm(0)  # Cancel the alarm
                raise Done
        elif event_name == "problem":
            signal.alarm(0)
            clear_event_data(event_name)
            raise Problem
        elif event_name == "motion":
            clear_event_data(event_name)
        else:
            raise Exception("Unknown event")
    try:
        pop_and_send(event_list, past_events) 
        tdmclient.notebook.process_events(all_nodes=False, on_event_data=on_event_data)
    except Done:
        print("Robot task completed successfully")
        return "done", None
    except Problem:
        print("Robot encounted a problem")
        return "problem", event_list
    except EventTimeout:
        print("Missed event, resending")
        event_list.insert(0, past_events[-1])
        return execute_actions(event_list)
    except ObjectNotFound:
        print("Couldn't find object, asking ChatGPT for directions")
        response = RobotGPT.chat(RobotGPT.OBJECT_NOT_FOUND)
        print(response)
        new_actions = parse_input(response)
        return execute_actions(new_actions)


@tdmclient.notebook.sync_var
def main(response=None):
    """
    Manages TensorFlow object detection and performs robot actions.
    """
    
    # Initiate thread that will receive tensorflow detections
    tensor_flow_client = Thread(target=detect_objects, args=(DETECTED_OBJECTS, MUTEX))
    # Initiate thread that will save unique objects found
    unique_object_detection = Thread(target=detectUniqueObject, args=(DETECTED_OBJECTS, MUTEX, UNIQUE_OBJECTS, MUTEX_ARR))
    try:
        tensor_flow_client.start()
        unique_object_detection.start()
        quit = False
        if response is None:
            tasks = [parse_input(input("Please input a list of actions for the robot to perform"))]
        else:
            chatgpt_response = RobotGPT.begin()
            tasks = [parse_input(chatgpt_response)]
        while not quit:
            if len(tasks) > 0:
                response, left_over_events = execute_actions(tasks.pop())
                match response:
                    case "done":
                        if len(tasks) == 0:
                            quit = True
                    case "problem":
                        if left_over_events is not None:
                            tasks.append(left_over_events)
                        response = RobotGPT.chat(RobotGPT.PROBLEM)
                        print(response)
                        if len(tasks) > 2:
                            tasks.pop()
                        tasks.append(parse_input(response))
                        
            else:
                break
    except KeyboardInterrupt:
        sys.exit()

Unchecked Raise(exc=Name(id='ObjectNotFound', ctx=Load()))
Unchecked MatchAs()


Stop the robot

In [7]:
update_speed(0,0)

Initiate robot

In [8]:
main(1)

trying to connect
sucessfully connected
[("follow", "bottle"), ("push", 5), ("turn", 10, 180), ("whereiam", "none")]
follow
push
Sent push event
turn
90 273 5417
Sent turn event
whereiam
['bottle', 'dining table', 'vase', 'bed']
Connection error
trying to connect
sucessfully connected
Missed event, resending
whereiam
['bottle', 'dining table', 'vase', 'bed']
It is difficult to determine the type of room or place the robot is in based solely on the objects it found during its patrol. However, based on the objects listed, it is likely that the robot is in a bedroom or a living room of a house or apartment.
None
Robot task completed successfully


Connection error
trying to connect
Connection not sucessfull
trying to connect
Connection not sucessfull
trying to connect
sucessfully connected
