In [None]:
import socket
import numpy as np

import sys
sys.path.append('..') # Go back to base directory

from modules.differential_drive import DifferentialDrive
from modules.speech_recognizer import SpeechRecognizer
from modules.openai_llm import OpenAILLM

from modules.graph import *
from modules.viewer3d import *

In [14]:
# Robot class
class Robot:
    def __init__(
        self,
        SR=None,
        LLM=None,
        DDR=None
    ):
        self.SR=SR
        self.LLM=LLM
        self.DDR=DDR

In [None]:
SR = SpeechRecognizer(
    language="pt-BR",
    verbose=True
) 

system_role = """
You are an AI that generates commands for a differential drive robot based on the desired trajectory described in natural language. The commands you will generate follow this format:

Drive command: D d t, where d is the distance in meters and t is the time in seconds. Negative values of d will make the robot move backwards.
Turn command: T a t, where a is the angle in degrees and t is the time in seconds. Negative values of a will make the robot turn clockwise.
Circle command: C r a t, where r is the radius of the circle in meters (positive for left, negative for right), a is the angle in degrees of the arc, and t is the time in seconds.
    
Your response should include only the commands, with no additional explanations or text. Try to interpret the received message as best as you can. 

For example, if given the instruction 'make a square trajectory with a length of 1 meter', your output should be:

D 1.0 5.0
T 90.0 2.0
D 1.0 5.0
T 90.0 2.0
D 1.0 5.0
T 90.0 2.0
D 1.0 5.0
"""

LLM = OpenAILLM(
    verbose=True
)

DDR = DifferentialDrive(
    # Robot geometry
    r=0.04, # In m
    s=0.24, # In m
    h=0.04, # in m

    # Time parameters
    dt=50e-3
)

MyRobot = Robot(
    SR,
    LLM,
    DDR
)

Client successfuly created!


In [16]:
def commands_to_signals(
    DDR, 
    commands,
    verbose=False
):
    signals, time_vector = [], []

    # Parsing through lines
    for line in commands.splitlines():
        if not line:
            continue

        tokens = line.split()

        type, args = tokens[0], np.array(tokens[1:]).astype(np.float64)

        try:
            phi_dot_L_signal, phi_dot_R_signal = DDR.command[type](*args)

            signals.append(np.vstack((phi_dot_L_signal, phi_dot_R_signal)))

        except:
            if verbose: print("Command not recognized.")
            continue
    
    if signals:
        signals, time_vector = DDR.concatenate_signals(signals)

    return signals, time_vector

In [17]:
output, poses, positions = [], [], []
recognized_speech = ""

# GoPiGo3 Socket Data
GoPiGo3_address = (socket.gethostbyname("dex"), 25565)

GoPiGo3_socket = socket.socket(
    socket.AF_INET,
    socket.SOCK_DGRAM
)

while not len(output):
    recognized_speech = input("Natural Language Command: ")

    if not recognized_speech:
        continue

    else: 
        print(f"Natural Language Command: {recognized_speech}" )

    commands = MyRobot.LLM.chat_completion(
        system_role=system_role,
        prompt=recognized_speech,
        max_tokens=100
    )

    print("Commands:")
    print(commands)

    GoPiGo3_socket.sendto(commands.encode(), GoPiGo3_address)

    signals, time_vector = commands_to_signals(
        DDR=MyRobot.DDR, 
        commands=commands, 
        verbose=True)

    if not len(signals):
        continue

    phi_dot_L_signal, phi_dot_R_signal = signals
    
    print("Computing solution...")
    for phi_dot_L, phi_dot_R in zip(phi_dot_L_signal, phi_dot_R_signal):
        MyRobot.DDR.kinematic_model(phi_dot_L, phi_dot_R)
        
        output.append(MyRobot.DDR.get_output())
        positions.append(MyRobot.DDR.get_position())
        poses.append(MyRobot.DDR.get_pose())

    if not len(output):
        continue
    
    output = np.hstack(output)
    trajectory = np.hstack(positions)


Natural Language Command: Ande 50 cm
Sending message...
Received response.
Commands:
D 0.5 2.0
Computing solution...


In [18]:
# Create the Scene Viewer
scene = Viewer3D(title=f"Requested:\n'{recognized_speech}'", 
                 size=4) # In m

# Add inertial reference
scene.add_frame(
    transformation=np.eye(4), 
    name="Inertial Reference", 
    axis_size=5
)

# Add robot trajectory 
scene.add_points(points=trajectory, 
                 name='DDR',
                 color=time_vector,
                 colorscale='plasma',
                 range=[time_vector[0], time_vector[-1] + 0.5],
                 colorbar="Time (s)")

# Display poses
n_poses = 5
for pose in poses[::len(poses) // n_poses]:
    # Add poses
    scene.add_frame(
        transformation=pose, 
        axis_size=0.1,
        color='black'
    )

scene.figure.show()