<a href="https://colab.research.google.com/github/MengqinShen/Computer-Vision-with-openCV/blob/main/Intelligent_Home_Assistant_Robot_with_Conversational_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Intelligent Home Assistant Robot with Conversational AI

## Project Overview

This project develops an autonomous home assistant robot that integrates advanced conversational AI capabilities with ROS2 (Robot Operating System 2) for distributed system architecture and real-time communication. The robot can navigate autonomously, respond to voice commands, control smart home devices, and provide intelligent assistance.

## System Architecture

### Core Components
- **ROS2 Framework**: Distributed system architecture with real-time communication
- **Conversational AI Engine**: Natural language processing and generation
- **Navigation System**: SLAM, path planning, and obstacle avoidance
- **Smart Home Integration**: IoT device control and automation
- **Voice Interface**: Speech recognition and text-to-speech
- **Computer Vision**: Object recognition and scene understanding

## Hardware Requirements

### Minimum Setup
- **Computing Platform**: NVIDIA Jetson Xavier NX or Raspberry Pi 4B (8GB)
- **LiDAR**: RPLiDAR A1M8 or similar
- **Camera**: Intel RealSense D435i depth camera
- **Microphone Array**: ReSpeaker 4-Mic Array
- **Speaker**: Portable Bluetooth speaker
- **Motors**: Differential drive system with encoders
- **IMU**: 9-DOF inertial measurement unit
- **Wi-Fi Module**: For smart home connectivity

### Enhanced Setup
- **Computing**: NVIDIA Jetson AGX Orin
- **LiDAR**: Velodyne VLP-16 or Ouster OS1-64
- **Cameras**: Multiple RGB-D cameras for 360° vision
- **Manipulator**: 6-DOF robotic arm (optional)
- **Touch Display**: For visual interaction

## Software Stack

### ROS2 Packages and Dependencies

```bash
# Core ROS2 packages
sudo apt update
sudo apt install ros-humble-desktop-full
sudo apt install ros-humble-navigation2
sudo apt install ros-humble-nav2-bringup
sudo apt install ros-humble-slam-toolbox
sudo apt install ros-humble-robot-localization
sudo apt install ros-humble-joint-state-publisher
sudo apt install ros-humble-robot-state-publisher

# Computer Vision
sudo apt install ros-humble-vision-msgs
sudo apt install ros-humble-image-transport
sudo apt install ros-humble-cv-bridge

# Audio Processing
sudo apt install portaudio19-dev
sudo apt install espeak espeak-data
pip install pyaudio speechrecognition pyttsx3

# AI and ML Libraries
pip install openai anthropic
pip install torch torchvision torchaudio
pip install transformers
pip install rasa
```

### ROS2 Package Structure

```
home_assistant_robot/
├── src/
│   ├── ha_robot_bringup/
│   ├── ha_robot_description/
│   ├── ha_robot_navigation/
│   ├── ha_robot_ai/
│   ├── ha_robot_voice/
│   ├── ha_robot_vision/
│   └── ha_robot_smart_home/
├── launch/
├── config/
└── maps/
```

## Implementation

### 1. Robot Description (URDF)

```xml
<!-- ha_robot_description/urdf/robot.urdf.xacro -->
<?xml version="1.0"?>
<robot xmlns:xacro="http://www.ros.org/wiki/xacro" name="home_assistant_robot">

  <!-- Base Properties -->
  <xacro:property name="base_width" value="0.35"/>
  <xacro:property name="base_length" value="0.4"/>
  <xacro:property name="base_height" value="0.1"/>
  <xacro:property name="wheel_radius" value="0.08"/>
  <xacro:property name="wheel_width" value="0.04"/>

  <!-- Base Link -->
  <link name="base_link">
    <visual>
      <geometry>
        <box size="${base_length} ${base_width} ${base_height}"/>
      </geometry>
      <material name="blue">
        <color rgba="0.2 0.2 0.8 1.0"/>
      </material>
    </visual>
    <collision>
      <geometry>
        <box size="${base_length} ${base_width} ${base_height}"/>
      </geometry>
    </collision>
    <inertial>
      <mass value="15.0"/>
      <inertia ixx="0.5" iyy="0.5" izz="0.5" ixy="0" ixz="0" iyz="0"/>
    </inertial>
  </link>

  <!-- Wheels -->
  <xacro:macro name="wheel" params="prefix y_reflect">
    <link name="${prefix}_wheel">
      <visual>
        <geometry>
          <cylinder radius="${wheel_radius}" length="${wheel_width}"/>
        </geometry>
        <material name="black">
          <color rgba="0.1 0.1 0.1 1.0"/>
        </material>
      </visual>
      <collision>
        <geometry>
          <cylinder radius="${wheel_radius}" length="${wheel_width}"/>
        </geometry>
      </collision>
      <inertial>
        <mass value="2.0"/>
        <inertia ixx="0.1" iyy="0.1" izz="0.1" ixy="0" ixz="0" iyz="0"/>
      </inertial>
    </link>

    <joint name="${prefix}_wheel_joint" type="continuous">
      <parent link="base_link"/>
      <child link="${prefix}_wheel"/>
      <origin xyz="0 ${y_reflect*base_width/2} 0" rpy="-1.57 0 0"/>
      <axis xyz="0 0 1"/>
    </joint>
  </xacro:macro>

  <xacro:wheel prefix="left" y_reflect="1"/>
  <xacro:wheel prefix="right" y_reflect="-1"/>

  <!-- Sensors -->
  <link name="lidar_link">
    <visual>
      <geometry>
        <cylinder radius="0.05" length="0.03"/>
      </geometry>
    </visual>
  </link>

  <joint name="lidar_joint" type="fixed">
    <parent link="base_link"/>
    <child link="lidar_link"/>
    <origin xyz="0.1 0 0.3" rpy="0 0 0"/>
  </joint>

</robot>
```

### 2. Navigation Configuration

```yaml
# config/nav2_params.yaml
amcl:
  ros__parameters:
    use_sim_time: False
    alpha1: 0.2
    alpha2: 0.2
    alpha3: 0.2
    alpha4: 0.2
    alpha5: 0.2
    base_frame_id: "base_link"
    beam_skip_distance: 0.5
    beam_skip_error_threshold: 0.9
    beam_skip_threshold: 0.3
    do_beamskip: false
    global_frame_id: "map"
    lambda_short: 0.1
    laser_likelihood_max_dist: 2.0
    laser_max_range: 100.0
    laser_min_range: -1.0
    laser_model_type: "likelihood_field"
    max_beams: 60
    max_particles: 2000
    min_particles: 500
    odom_frame_id: "odom"
    pf_err: 0.05
    pf_z: 0.99
    recovery_alpha_fast: 0.0
    recovery_alpha_slow: 0.0
    resample_interval: 1
    robot_model_type: "nav2_amcl::DifferentialMotionModel"
    save_pose_rate: 0.5
    sigma_hit: 0.2
    tf_broadcast: true
    transform_tolerance: 1.0
    update_min_a: 0.2
    update_min_d: 0.25
    z_hit: 0.5
    z_max: 0.05
    z_rand: 0.5
    z_short: 0.05

bt_navigator:
  ros__parameters:
    use_sim_time: False
    global_frame: map
    robot_base_frame: base_link
    odom_topic: /odom
    bt_loop_duration: 10
    default_server_timeout: 20
    enable_groot_monitoring: True
    groot_zmq_publisher_port: 1666
    groot_zmq_server_port: 1667
    plugin_lib_names:
    - nav2_compute_path_to_pose_action_bt_node
    - nav2_compute_path_through_poses_action_bt_node
    - nav2_smooth_path_action_bt_node
    - nav2_follow_path_action_bt_node
    - nav2_spin_action_bt_node
    - nav2_wait_action_bt_node
    - nav2_assisted_teleop_action_bt_node
    - nav2_back_up_action_bt_node
    - nav2_drive_on_heading_bt_node
    - nav2_clear_costmap_service_bt_node
    - nav2_is_stuck_condition_bt_node
    - nav2_goal_reached_condition_bt_node
    - nav2_goal_updated_condition_bt_node
    - nav2_globally_updated_goal_condition_bt_node
    - nav2_is_path_valid_condition_bt_node
    - nav2_initial_pose_received_condition_bt_node
    - nav2_reinitialize_global_localization_service_bt_node
    - nav2_rate_controller_bt_node
    - nav2_distance_controller_bt_node
    - nav2_speed_controller_bt_node
    - nav2_truncate_path_action_bt_node
    - nav2_truncate_path_local_action_bt_node
    - nav2_goal_updater_node_bt_node
    - nav2_recovery_node_bt_node
    - nav2_pipeline_sequence_bt_node
    - nav2_round_robin_node_bt_node
    - nav2_transform_available_condition_bt_node
    - nav2_time_expired_condition_bt_node
    - nav2_path_expiring_timer_condition
    - nav2_distance_traveled_condition_bt_node
    - nav2_single_trigger_bt_node
    - nav2_goal_updated_controller_bt_node
    - nav2_is_battery_low_condition_bt_node
    - nav2_navigate_through_poses_action_bt_node
    - nav2_navigate_to_pose_action_bt_node
    - nav2_remove_passed_goals_action_bt_node
    - nav2_planner_selector_bt_node
    - nav2_controller_selector_bt_node
    - nav2_goal_checker_selector_bt_node
    - nav2_controller_cancel_bt_node
    - nav2_path_longer_on_approach_bt_node
    - nav2_wait_cancel_bt_node
    - nav2_spin_cancel_bt_node
    - nav2_back_up_cancel_bt_node
    - nav2_assisted_teleop_cancel_bt_node
    - nav2_drive_on_heading_cancel_bt_node
    - nav2_is_battery_charging_condition_bt_node

controller_server:
  ros__parameters:
    use_sim_time: False
    controller_frequency: 20.0
    min_x_velocity_threshold: 0.001
    min_y_velocity_threshold: 0.5
    min_theta_velocity_threshold: 0.001
    failure_tolerance: 0.3
    progress_checker_plugin: "progress_checker"
    goal_checker_plugins: ["general_goal_checker"]
    controller_plugins: ["FollowPath"]

    progress_checker:
      plugin: "nav2_controller::SimpleProgressChecker"
      required_movement_radius: 0.5
      movement_time_allowance: 10.0

    general_goal_checker:
      stateful: True
      plugin: "nav2_controller::SimpleGoalChecker"
      xy_goal_tolerance: 0.25
      yaw_goal_tolerance: 0.25

    FollowPath:
      plugin: "dwb_core::DWBLocalPlanner"
      debug_trajectory_details: True
      min_vel_x: 0.0
      min_vel_y: 0.0
      max_vel_x: 0.26
      max_vel_y: 0.0
      max_vel_theta: 1.0
      min_speed_xy: 0.0
      max_speed_xy: 0.26
      min_speed_theta: 0.0
      acc_lim_x: 2.5
      acc_lim_y: 0.0
      acc_lim_theta: 3.2
      decel_lim_x: -2.5
      decel_lim_y: 0.0
      decel_lim_theta: -3.2
      vx_samples: 20
      vy_samples: 5
      vtheta_samples: 20
      sim_time: 1.7
      linear_granularity: 0.05
      angular_granularity: 0.025
      transform_tolerance: 0.2
      xy_goal_tolerance: 0.25
      trans_stopped_velocity: 0.25
      short_circuit_trajectory_evaluation: True
      stateful: True
      critics: ["RotateToGoal", "Oscillation", "BaseObstacle", "GoalAlign", "PathAlign", "PathDist", "GoalDist"]
      BaseObstacle.scale: 0.02
      PathAlign.scale: 32.0
      PathAlign.forward_point_distance: 0.1
      GoalAlign.scale: 24.0
      GoalAlign.forward_point_distance: 0.1
      PathDist.scale: 32.0
      GoalDist.scale: 24.0
      RotateToGoal.scale: 32.0
      RotateToGoal.slowing_factor: 5.0
      RotateToGoal.lookahead_time: -1.0
```

### 3. Conversational AI Core

```python
# ha_robot_ai/src/conversational_ai_node.py
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import PoseStamped
from nav2_msgs.action import NavigateToPose
from rclpy.action import ActionClient
import json
import openai
import threading
import time

class ConversationalAI(Node):
    def __init__(self):
        super().__init__('conversational_ai')

        # Publishers
        self.speech_pub = self.create_publisher(String, 'robot_speech', 10)
        self.smart_home_pub = self.create_publisher(String, 'smart_home_command', 10)

        # Subscribers
        self.voice_sub = self.create_subscription(String, 'voice_input', self.voice_callback, 10)
        self.vision_sub = self.create_subscription(String, 'vision_objects', self.vision_callback, 10)

        # Action clients
        self.navigate_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')

        # AI Configuration
        self.openai_client = openai.OpenAI()  # Configure with your API key

        # Robot state
        self.current_location = "living_room"
        self.known_locations = {
            "kitchen": {"x": 2.0, "y": 1.0, "z": 0.0},
            "living_room": {"x": 0.0, "y": 0.0, "z": 0.0},
            "bedroom": {"x": -2.0, "y": 2.0, "z": 0.0},
            "bathroom": {"x": 1.0, "y": -1.5, "z": 0.0}
        }

        self.conversation_history = []
        self.detected_objects = []

        self.get_logger().info("Conversational AI node initialized")

    def voice_callback(self, msg):
        """Process voice input and generate appropriate response"""
        user_input = msg.data
        self.get_logger().info(f"Received voice input: {user_input}")

        # Add to conversation history
        self.conversation_history.append({"role": "user", "content": user_input})

        # Process with AI
        threading.Thread(target=self.process_conversation, args=(user_input,)).start()

    def vision_callback(self, msg):
        """Update detected objects from vision system"""
        try:
            self.detected_objects = json.loads(msg.data)
        except json.JSONDecodeError:
            self.get_logger().warn("Invalid JSON in vision message")

    def process_conversation(self, user_input):
        """Main conversation processing with AI"""
        try:
            # Create system prompt with robot capabilities
            system_prompt = self.get_system_prompt()

            # Prepare conversation for AI
            messages = [{"role": "system", "content": system_prompt}]
            messages.extend(self.conversation_history[-10:])  # Keep last 10 exchanges

            # Get AI response
            response = self.openai_client.chat.completions.create(
                model="gpt-4",
                messages=messages,
                temperature=0.7,
                max_tokens=500
            )

            ai_response = response.choices[0].message.content

            # Parse response for actions
            self.parse_and_execute_response(ai_response)

            # Add AI response to history
            self.conversation_history.append({"role": "assistant", "content": ai_response})

        except Exception as e:
            self.get_logger().error(f"Error in conversation processing: {str(e)}")
            self.speak("I'm sorry, I encountered an error processing your request.")

    def get_system_prompt(self):
        """Generate system prompt with current robot state"""
        return f"""You are an intelligent home assistant robot with the following capabilities:

NAVIGATION: You can move to these locations: {list(self.known_locations.keys())}
SMART HOME: You can control lights, temperature, music, and other IoT devices
VISION: You can see and identify objects in your environment
CURRENT LOCATION: {self.current_location}
DETECTED OBJECTS: {self.detected_objects}

RESPONSE FORMAT:
Always respond in a conversational manner. If you need to take actions, include them as JSON in your response:

For navigation: {{"action": "navigate", "location": "kitchen"}}
For smart home: {{"action": "smart_home", "device": "lights", "command": "turn_on", "room": "living_room"}}
For speech only: Just respond naturally without JSON.

Be helpful, friendly, and proactive in assisting with home tasks."""

    def parse_and_execute_response(self, response):
        """Parse AI response and execute any actions"""
        try:
            # Extract JSON actions from response
            import re
            json_matches = re.findall(r'\{[^}]+\}', response)

            for json_str in json_matches:
                try:
                    action = json.loads(json_str)
                    self.execute_action(action)
                except json.JSONDecodeError:
                    continue

            # Always speak the response (remove JSON parts)
            clean_response = re.sub(r'\{[^}]+\}', '', response).strip()
            if clean_response:
                self.speak(clean_response)

        except Exception as e:
            self.get_logger().error(f"Error parsing response: {str(e)}")

    def execute_action(self, action):
        """Execute specific actions based on AI response"""
        action_type = action.get("action")

        if action_type == "navigate":
            location = action.get("location")
            if location in self.known_locations:
                self.navigate_to_location(location)
            else:
                self.speak(f"I don't know how to get to {location}")

        elif action_type == "smart_home":
            # Publish smart home command
            command_msg = String()
            command_msg.data = json.dumps(action)
            self.smart_home_pub.publish(command_msg)

    def navigate_to_location(self, location):
        """Navigate to a specific location"""
        if location not in self.known_locations:
            self.get_logger().error(f"Unknown location: {location}")
            return

        # Create navigation goal
        goal_msg = NavigateToPose.Goal()
        goal_msg.pose.header.frame_id = "map"
        goal_msg.pose.header.stamp = self.get_clock().now().to_msg()

        coords = self.known_locations[location]
        goal_msg.pose.pose.position.x = coords["x"]
        goal_msg.pose.pose.position.y = coords["y"]
        goal_msg.pose.pose.position.z = coords["z"]
        goal_msg.pose.pose.orientation.w = 1.0

        # Send goal
        self.navigate_client.wait_for_server()
        send_goal_future = self.navigate_client.send_goal_async(goal_msg)

        self.speak(f"Navigating to {location}")
        self.current_location = location

    def speak(self, text):
        """Publish text for speech synthesis"""
        msg = String()
        msg.data = text
        self.speech_pub.publish(msg)

def main(args=None):
    rclpy.init(args=args)
    node = ConversationalAI()

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()
```

### 4. Voice Interface

```python
# ha_robot_voice/src/voice_interface_node.py
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import speech_recognition as sr
import pyttsx3
import threading
import queue
import time

class VoiceInterface(Node):
    def __init__(self):
        super().__init__('voice_interface')

        # Publishers
        self.voice_pub = self.create_publisher(String, 'voice_input', 10)

        # Subscribers
        self.speech_sub = self.create_subscription(String, 'robot_speech', self.speak_callback, 10)

        # Speech recognition setup
        self.recognizer = sr.Recognizer()
        self.microphone = sr.Microphone()

        # Text-to-speech setup
        self.tts_engine = pyttsx3.init()
        self.tts_engine.setProperty('rate', 150)
        self.tts_engine.setProperty('volume', 0.8)

        # Speech queue and flags
        self.speech_queue = queue.Queue()
        self.is_speaking = False
        self.listening = True

        # Wake words
        self.wake_words = ["robot", "assistant", "hey robot"]

        # Calibrate microphone
        with self.microphone as source:
            self.recognizer.adjust_for_ambient_noise(source)

        # Start threads
        threading.Thread(target=self.listen_continuous, daemon=True).start()
        threading.Thread(target=self.speech_worker, daemon=True).start()

        self.get_logger().info("Voice interface initialized. Say 'robot' or 'hey robot' to wake me up.")

    def listen_continuous(self):
        """Continuously listen for voice commands"""
        while rclpy.ok():
            try:
                with self.microphone as source:
                    # Listen for audio with timeout
                    audio = self.recognizer.listen(source, timeout=1, phrase_time_limit=5)

                try:
                    # Recognize speech
                    text = self.recognizer.recognize_google(audio).lower()
                    self.get_logger().info(f"Heard: {text}")

                    # Check for wake words
                    if any(wake_word in text for wake_word in self.wake_words):
                        # Remove wake word and process command
                        command = text
                        for wake_word in self.wake_words:
                            command = command.replace(wake_word, "").strip()

                        if command:
                            self.process_voice_command(command)
                        else:
                            self.add_to_speech_queue("Yes, how can I help you?")

                except sr.UnknownValueError:
                    # Speech was unintelligible
                    pass
                except sr.RequestError as e:
                    self.get_logger().error(f"Could not request results; {e}")

            except sr.WaitTimeoutError:
                # No speech detected, continue listening
                pass
            except Exception as e:
                self.get_logger().error(f"Error in speech recognition: {str(e)}")
                time.sleep(1)

    def process_voice_command(self, command):
        """Process recognized voice command"""
        if not command.strip():
            return

        self.get_logger().info(f"Processing command: {command}")

        # Publish voice command
        msg = String()
        msg.data = command
        self.voice_pub.publish(msg)

    def speak_callback(self, msg):
        """Handle text-to-speech requests"""
        text = msg.data
        self.add_to_speech_queue(text)

    def add_to_speech_queue(self, text):
        """Add text to speech queue"""
        self.speech_queue.put(text)

    def speech_worker(self):
        """Worker thread for text-to-speech"""
        while rclpy.ok():
            try:
                # Get text from queue (blocking)
                text = self.speech_queue.get(timeout=1)

                if text:
                    self.is_speaking = True
                    self.get_logger().info(f"Speaking: {text}")

                    # Use TTS to speak
                    self.tts_engine.say(text)
                    self.tts_engine.runAndWait()

                    self.is_speaking = False

            except queue.Empty:
                continue
            except Exception as e:
                self.get_logger().error(f"Error in speech synthesis: {str(e)}")
                self.is_speaking = False

def main(args=None):
    rclpy.init(args=args)
    node = VoiceInterface()

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()
```

### 5. Smart Home Integration

```python
# ha_robot_smart_home/src/smart_home_node.py
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import json
import requests
import paho.mqtt.client as mqtt
from typing import Dict, Any

class SmartHomeController(Node):
    def __init__(self):
        super().__init__('smart_home_controller')

        # Subscribers
        self.command_sub = self.create_subscription(
            String, 'smart_home_command', self.command_callback, 10)

        # Publishers
        self.status_pub = self.create_publisher(String, 'smart_home_status', 10)

        # Device configurations
        self.devices = {
            "lights": {
                "living_room": {"type": "philips_hue", "id": "1"},
                "kitchen": {"type": "philips_hue", "id": "2"},
                "bedroom": {"type": "philips_hue", "id": "3"},
                "bathroom": {"type": "philips_hue", "id": "4"}
            },
            "thermostat": {
                "main": {"type": "nest", "id": "thermostat_1"}
            },
            "music": {
                "speakers": {"type": "spotify", "device_id": "main_speakers"}
            },
            "tv": {
                "living_room": {"type": "samsung", "ip": "192.168.1.100"}
            }
        }

        # API configurations (load from config file)
        self.philips_hue_bridge = "192.168.1.50"
        self.philips_hue_token = "your_hue_token_here"
        self.nest_token = "your_nest_token_here"
        self.spotify_token = "your_spotify_token_here"

        # MQTT setup for other IoT devices
        self.mqtt_client = mqtt.Client()
        self.mqtt_client.on_connect = self.on_mqtt_connect
        self.mqtt_client.on_message = self.on_mqtt_message

        try:
            self.mqtt_client.connect("localhost", 1883, 60)
            self.mqtt_client.loop_start()
        except Exception as e:
            self.get_logger().warn(f"Could not connect to MQTT broker: {e}")

        self.get_logger().info("Smart Home Controller initialized")

    def command_callback(self, msg):
        """Process smart home commands"""
        try:
            command = json.loads(msg.data)
            self.get_logger().info(f"Received smart home command: {command}")

            device_type = command.get("device")
            action = command.get("command")
            room = command.get("room", "main")

            if device_type == "lights":
                self.control_lights(action, room, command)
            elif device_type == "thermostat":
                self.control_thermostat(action, command)
            elif device_type == "music":
                self.control_music(action, command)
            elif device_type == "tv":
                self.control_tv(action, room, command)
            else:
                self.get_logger().warn(f"Unknown device type: {device_type}")

        except json.JSONDecodeError:
            self.get_logger().error("Invalid JSON in smart home command")
        except Exception as e:
            self.get_logger().error(f"Error processing smart home command: {str(e)}")

    def control_lights(self, action: str, room: str, command: Dict[str, Any]):
        """Control Philips Hue lights"""
        if room not in self.devices["lights"]:
            self.get_logger().warn(f"No lights configured for room: {room}")
            return

        light_id = self.devices["lights"][room]["id"]

        try:
            if action == "turn_on":
                payload = {"on": True}
                if "brightness" in command:
                    payload["bri"] = int(command["brightness"] * 2.54)  # 0-255 scale
                if "color" in command:
                    # Convert color name to hue value
                    color_map = {