# Importing Necessary Libraries and Modules

In this first cell, we import all the required libraries and modules for the project. 
This includes the following:
- **`crewai` framework**: For defining agents, tasks, and tools in our robotic system.
- **`requests` library**: To handle REST API calls.
- **`pydantic`**: For data validation.
- **`dotenv`**: To load environment variables securely.
- **`warnings`**: To suppress warnings for a cleaner output during video recording.

These libraries will enable us to program, execute, and control the robotic servos.

In [11]:
from crewai.tools import BaseTool
from crewai import Agent, Task, Crew
import requests
from time import sleep
from pydantic import Field
from dotenv import load_dotenv
import os

# Warning control
import warnings
warnings.filterwarnings('ignore')

# Defining the Servo Control Tool

In this cell, we define the `ServoControlTool`, a custom tool designed to control servo motors using REST services hosted on an ESP32 device. Here's what the tool can do:
- **Move individual servos**: Specify the servo and the target angle, ensuring it's within the allowed range.
- **Perform predefined actions**: Execute actions like "blink" or "double blink".
- **Execute complex sequences**: Handle lists of commands, including servo movements, actions, and pauses.

The tool includes:
1. **Validation of servo names and angles**: Ensures commands won't cause errors or unsafe movements.
2. **Error handling**: Manages network issues and invalid inputs gracefully.
3. **Support for synchronous and asynchronous execution**: Adapts to various use cases.

The servo ranges are predefined, and the tool interacts with the ESP32 via HTTP requests.

In [12]:


class ServoControlTool(BaseTool):
    name: str = "servo_control_tool"
    description: str = (
        "Tool to control servo motors and perform predefined actions via HTTP requests. "
        "Specify the servo name and angle, or send an action command like 'blink' or 'double blink'. "
        "Can also process a list of commands, including pauses."
    )
    base_url: str = Field(default="http://192.168.1.147/", description="Base URL of the ESP32.")
    
    servo_ranges: dict = Field(
        default={
            "top_left": (55, 100),
            "bottom_left": (55, 100),
            "top_right": (55, 105),
            "bottom_right": (65, 100),
            "Xarm": (40, 140),
            "Yarm": (60, 115),
        },
        description="Dictionary defining servo names and their movement ranges.",
    )

    def move_servo(self, servo_name: str, angle: int) -> str:
        """
        Move a servo to the specified angle.

        Args:
            servo_name (str): Name of the servo to move.
            angle (int): Target angle for the servo.

        Returns:
            str: Status message about the result of the operation.
        """
        if servo_name not in self.servo_ranges:
            return f"Error: Servo '{servo_name}' not recognized."

        min_angle, max_angle = self.servo_ranges[servo_name]
        if angle < min_angle or angle > max_angle:
            return (
                f"Error: Angle {angle} is out of range for servo '{servo_name}' "
                f"(allowed range: {min_angle}-{max_angle})."
            )

        try:
            response = requests.get(f"{self.base_url}mover", params={"servo": servo_name, "angle": angle})
            if response.status_code == 200:
                return f"Servo '{servo_name}' successfully moved to {angle} degrees."
            else:
                return f"Error moving servo '{servo_name}': HTTP {response.status_code}."
        except requests.exceptions.RequestException as e:
            return f"Connection error while moving servo '{servo_name}': {e}"

    def perform_action(self, action: str) -> str:
        """
        Perform a predefined action like 'blink' or 'double blink'.

        Args:
            action (str): Name of the action to perform.

        Returns:
            str: Status message about the result of the operation.
        """
        valid_actions = ["blink", "double blink"]
        if action not in valid_actions:
            return f"Error: Action '{action}' not recognized. Valid actions: {valid_actions}."

        # Direct endpoint for actions
        try:
            response = requests.get(f"{self.base_url}{action.replace(' ', '_')}")
            if response.status_code == 200:
                return f"Action '{action}' successfully performed."
            else:
                return f"Error performing action '{action}': HTTP {response.status_code}."
        except requests.exceptions.RequestException as e:
            return f"Connection error while performing action '{action}': {e}"

    def execute_commands(self, commands: list) -> str:
        """
        Execute a list of commands, including servo movements, actions, and pauses.

        Args:
            commands (list): List of commands. Each command is a dictionary with keys:
                - "type": "servo", "action", or "pause"
                - Additional keys depending on the type

        Returns:
            str: Combined status messages of all executed commands.
        """
        results = []
        for command in commands:
            if command["type"] == "servo":
                servo_name = command.get("servo_name")
                angle = command.get("angle")
                if servo_name and angle is not None:
                    results.append(self.move_servo(servo_name, angle))
                else:
                    results.append("Error: Missing 'servo_name' or 'angle' in servo command.")

            elif command["type"] == "action":
                action = command.get("action")
                if action:
                    results.append(self.perform_action(action))
                else:
                    results.append("Error: Missing 'action' in action command.")

            elif command["type"] == "pause":
                duration = command.get("duration")
                if duration is not None:
                    sleep(duration)
                    results.append(f"Paused for {duration} seconds.")
                else:
                    results.append("Error: Missing 'duration' in pause command.")

            else:
                results.append(f"Error: Unknown command type '{command['type']}'.")
            sleep(0.1)

        return "\n".join(results)

    def _run(self, commands: list = None) -> str:
        """
        Synchronously execute a list of commands.

        Args:
            commands (list): List of commands to execute.

        Returns:
            str: Status message about the result of the operation.
        """
        if commands:
            return self.execute_commands(commands)
        else:
            return "Error: No commands provided."

    async def _arun(self, commands: list = None) -> str:
        """
        Asynchronously execute a list of commands.

        Args:
            commands (list): List of commands to execute.

        Returns:
            str: Status message about the result of the operation.
        """
        return self._run(commands)


### Defining the Text-to-Morse Tool

In this cell, we define the **TextToMorseTool**, a custom tool designed to convert plain text into **Morse code**. This tool translates letters, numbers, and common punctuation into their Morse equivalents, enabling communication in Morse format.  

#### **Capabilities of the Tool**
- **Letter and number translation**: Converts each character (A-Z, 0-9) into its corresponding Morse representation.
- **Support for common punctuation**: Recognizes characters like `.` and `,` and translates them accordingly.
- **Handles spaces**: Replaces spaces with `/` to separate words in Morse code.
- **Graceful fallback for unsupported characters**: Uses a placeholder (`?`) for any unrecognized characters.

#### **Key Features**
- **Predefined Morse dictionary**: Ensures accurate and standardized translation.
- **Efficient text processing**: Iterates through the input text and dynamically generates the Morse code output.
- **Lightweight and adaptable**: Can be used as a standalone tool or integrated into larger multi-agent systems.

This tool provides a simple yet effective way to encode messages into Morse code, allowing seamless integration into systems requiring encoded communication or legacy signal processing.


In [13]:
from typing import ClassVar
from crewai.tools import BaseTool

class TextToMorseTool(BaseTool):
    name: str = "text_to_morse_tool"
    description: str = (
        "Tool to translate plain text into Morse code. "
        "Each letter, number, and common punctuation is converted into its Morse equivalent."
    )
    
    morse_dict: ClassVar[dict] = {
        'A': '.-', 'B': '-...', 'C': '-.-.', 'D': '-..', 'E': '.', 'F': '..-.', 'G': '--.',
        'H': '....', 'I': '..', 'J': '.---', 'K': '-.-', 'L': '.-..', 'M': '--', 'N': '-.',
        'O': '---', 'P': '.--.', 'Q': '--.-', 'R': '.-.', 'S': '...', 'T': '-', 'U': '..-',
        'V': '...-', 'W': '.--', 'X': '-..-', 'Y': '-.--', 'Z': '--..', 
        '1': '.----', '2': '..---', '3': '...--', '4': '....-', '5': '.....',
        '6': '-....', '7': '--...', '8': '---..', '9': '----.', '0': '-----', 
        ' ': '/', '.': '.-.-.-', ',': '--..--'
    }
    
    def _run(self, text: str) -> str:
        morse = []
        for char in text.upper():
            if char in self.morse_dict:
                morse.append(self.morse_dict[char])
            else:
                morse.append('?')  # Placeholder for unsupported characters
        return ' '.join(morse)


### Defining the Morse-to-Blink Tool

In this cell, we define the **MorseToBlinkTool**, a custom tool designed to **convert Morse code into servo commands**, enabling visual communication through blinking sequences. This tool translates dots, dashes, and pauses into predefined servo actions.

#### **Capabilities of the Tool**
- **Morse-to-servo translation**: Maps each Morse symbol to a corresponding servo action.
- **Dot (`.`) → Blink**: Represents a short flash using a single blink.
- **Dash (`-`) → Double Blink**: Represents a longer flash using a double blink.
- **Space (` `) → Short Pause**: Inserts a **0.6s pause** between letters.
- **Slash (`/`) → Word Pause**: Inserts a **1.0s pause** between words.

#### **Key Features**
- **Real-time execution**: Converts Morse code dynamically into a sequence of servo actions.
- **Predefined mapping**: Ensures a standardized and intuitive visual representation of Morse code.
- **Modular and adaptable**: Can be integrated into larger multi-agent systems for gesture-based or encoded communication.

This tool provides a **seamless way to visualize Morse code** through servo movements, making it useful for accessibility applications, silent signaling, or interactive robotics projects.


In [14]:
class MorseToBlinkTool(BaseTool):
    name: str = "morse_to_blink_tool"
    description: str = (
        "Tool to convert Morse code into servo commands for blinking. "
        "Maps dots, dashes, and pauses to servo actions."
    )
    
    def _run(self, morse_code: str) -> list:
        commands = []
        for symbol in morse_code:
            if symbol == '.':
                commands.append({"type": "action", "action": "blink"})
            elif symbol == '-':
                commands.append({"type": "action", "action": "double blink"})
            elif symbol == '/':
                commands.append({"type": "pause", "duration": 1.0})  # Pause between words
            elif symbol == ' ':
                commands.append({"type": "pause", "duration": 0.6})  # Pause between letters
        return commands


# Configuring Environment Variables for OpenAI API

In this cell, we:
1. **Load environment variables**: Using `load_dotenv()` to securely retrieve sensitive information like the API key from a `.env` file.
2. **Set the API key**: Retrieve the `OPENAI_API_KEY` and set it as an environment variable for authentication.
3. **Configure the OpenAI model**: Specify the model to use, such as `gpt-4o` or `gpt-3.5-turbo`. This configuration determines the AI's capabilities during execution.

> **Note**: Using `.env` files ensures sensitive information isn't hardcoded, improving security and flexibility.


In [15]:
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")

# Set API key and model to use gpt-4o
os.environ["OPENAI_API_KEY"] = api_key
os.environ["OPENAI_MODEL_NAME"] = "gpt-4o"  # Update to gpt-4o
#os.environ["OPENAI_MODEL_NAME"] = "gpt-3.5-turbo"

### Defining Agents and Tasks for Morse Code Blinking

In this cell, we define:
1. **Agents**: The system includes an agent responsible for interpreting Morse code and translating it into servo movements:
   - **Role**: `"Blink Interpreter"` – Responsible for processing text and converting it into blinking sequences.
   - **Goal**: `"Convert text into Morse code and then into servo movements for blinking."`
   - **Backstory**: `"This agent translates input text to Morse code, then converts Morse to blinking commands."`

2. **Tasks**: Each task defines a specific action for the agent:
   - **Description**: `"Translate '{text}' into servo movements for Morse code blinking."`
   - **Expected Output**: `"Commands executed for blinking Morse code."`
   - **Tools**:
     - `textToMorseTool()`: Converts plain text into Morse code.
     - `morseToBlinkTool()`: Maps Morse symbols (dots, dashes, and pauses) to servo actions.
     - `servoControlTool()`: Executes the servo movements to create the blinking pattern.

> **Example Execution**:
- The **morse_agent** receives a text input such as `"HELLO"`.
- The **textToMorseTool** converts it to `.... . .-.. .-.. ---` (Morse code).
- The **morseToBlinkTool** translates each Morse symbol into corresponding servo commands.
- The **servoControlTool** executes the blinking sequence, visually encoding the Morse message.

This setup enables **dynamic, natural language-driven Morse code blinking**, making it adaptable for accessibility solutions, silent communication, or interactive robotic displays.


In [18]:
morse_agent = Agent(
    role="Blink Interpreter",
    goal="Convert text into Morse code and then into servo movements for blinking.",
    backstory="This agent translates input text to Morse code, then converts Morse to blinking commands."
)


morse_task = Task(
    description="Translate '{text}' into servo movements for Morse code blinking.",
    tools=[TextToMorseTool(), MorseToBlinkTool(), ServoControlTool()],
    expected_output="Commands executed for blinking Morse code.",
    agent=morse_agent
)

### Defining the Crew for Morse Code Interpretation

In this cell, we define the `Crew`, which acts as the orchestrator for the system. The crew **coordinates the agents and their assigned tasks**, ensuring a structured and efficient workflow. 

#### **Crew Structure**
1. **Agents**:  
   - The crew includes the **Morse Agent (`morse_agent`)**, responsible for converting text into Morse code and then into servo blinking sequences.

2. **Tasks**:  
   - The **Morse Task (`morse_task`)** handles the **full transformation pipeline**, from text input to servo commands:
     1. **Convert text to Morse code** (`textToMorseTool`).
     2. **Map Morse symbols to servo movements** (`morseToBlinkTool`).
     3. **Execute the blinking sequence** (`servoControlTool`).

3. **Additional parameters**:
   - **`verbose=True`**: Enables detailed logs during execution, making it easier to debug or observe agent interactions.
   - **`memory=False`**: Ensures that the system does not retain previous states, processing each input independently.

#### **Why This Approach?**
- This **modular architecture** allows new agents or tasks to be added effortlessly.
- The **crew handles coordination**, letting individual tools focus on their specific tasks.
- The setup makes it possible to extend functionality beyond blinking—potentially integrating new communication methods, audio signals, or physical gestures.

This implementation showcases the power of **naXo's scalable, multi-agent approach**, demonstrating how **hardware can dynamically adapt to new tasks with minimal reconfiguration**.


In [19]:
# Crew
crew = Crew(
    agents=[morse_agent],
    tasks=[morse_task],
    verbose=True,
    memory=False
)

# Inputs and execution



In [21]:
inputs = {
    "text": "Bye",  # 'asleep' surprisedº
}

result = crew.kickoff(inputs=inputs)

print(result)

[1m[95m# Agent:[00m [1m[92mBlink Interpreter[00m
[95m## Task:[00m [92mTranslate 'Bye' into servo movements for Morse code blinking.[00m


[1m[95m# Agent:[00m [1m[92mBlink Interpreter[00m
[95m## Thought:[00m [92mThought: To translate 'Bye' into servo movements, I need to first convert it to Morse code and then translate the Morse code into blinking commands using the tools available to me.[00m
[95m## Using tool:[00m [92mtext_to_morse_tool[00m
[95m## Tool Input:[00m [92m
"{\"text\": \"Bye\"}"[00m
[95m## Tool Output:[00m [92m
-... -.-- .[00m


[1m[95m# Agent:[00m [1m[92mBlink Interpreter[00m
[95m## Thought:[00m [92mThought: Now that I have the Morse code for 'Bye' as '-... -.-- .', I need to convert it into blinking commands using the morse_to_blink_tool.[00m
[95m## Using tool:[00m [92mmorse_to_blink_tool[00m
[95m## Tool Input:[00m [92m
"{\"morse_code\": \"-... -.-- .\"}"[00m
[95m## Tool Output:[00m [92m
[{'type': 'action', 'action': 'd