### IMPORTS

In [1]:
import whisper
import speech_recognition as sr
import torch
import numpy as np
import wave
import ollama
import json
import msvcrt
import pybullet as p
import pybullet_data
import time
import math 
import threading

### LLM MODEL

In [2]:
LLM_MODEL = "mistral"

### VOICE RECOGNITION

In [3]:
MODEL_NAME = "turbo" 

device = "cuda" if torch.cuda.is_available() else "cpu" 
print(f"Loading {MODEL_NAME} model on {device}...")
model = whisper.load_model(MODEL_NAME, device=device)

def audio_by_user():
    recognizer = sr.Recognizer()
    with sr.Microphone(sample_rate=16000) as source: # Force 16kHzl, Trust me its better for voice cerognition, I learned it the hard way
        print("\n--- RECORDING ---")
        print("Say: 'Robot, move forward one meter.'")
        
        # Adjust for noise to prevent early timeouts
        recognizer.adjust_for_ambient_noise(source, duration=0.5)
        print("Listening...")
        
        try:
            audio = recognizer.listen(source, timeout=8, phrase_time_limit=10)
            
            # Optional: Save debug audio
            # with open("debug_audio.wav", "wb") as f:
            #     f.write(audio.get_wav_data())

            print("Transcribing...")
            
            # Prepare audio for Whisper
            audio_data = np.frombuffer(audio.get_raw_data(), np.int16).flatten().astype(np.float32) / 32768.0
            
            # Use the model instance passed to the function
            result = model.transcribe(audio_data, language="en", fp16=torch.cuda.is_available())
            text = result['text'].strip()
            
            # CRITICAL FIX: Return the text, don't assign the print() result
            print(f"Whisper Result: \"{text}\"")
            return text
        
        except sr.WaitTimeoutError:
            print("[ERROR] Listening timed out. No speech detected.")
            return None
        except Exception as e:
            print(f"[ERROR] An error occurred: {e}")
            return None


Loading turbo model on cuda...


In [4]:
# audio_by_user()

### LLM PROCESSING FOR ROBOT CODE GENERATION

In [5]:
def get_mission_plan(user_input):
    print(f"[BRAIN] Processing logic with {LLM_MODEL}...")

    # STRICT System Prompt
    # We tell the LLM exactly what objects exist so it doesn't hallucinate "yellow_box"
    system_manual = """
    You are a Robot Controller.
    OBJECTS: 'red_box', 'green_box', 'blue_box'.
    LOCATIONS: 'home', 'drop_zone'.
    ACTIONS: 
    - 'move_to': requires 'target'
    - 'magnet': requires 'state' ('on'/'off')
    
    Task: Convert command to JSON list of actions.
    Example: "Grab red box" -> [{"action": "move_to", "target": "red_box"}, {"action": "magnet", "state": "on"}, {"action": "move_to", "target": "home"}]
    """

    try:
        response = ollama.generate(
            model=LLM_MODEL,
            system=system_manual,
            prompt=f"User Command: {user_input}",
            format="json",  # Forces JSON output mode in Ollama
            options={"temperature": 0} # 0 makes it deterministic (logic > creativity)
        )
        
        raw_response = response['response']
        print(f"[BRAIN] Raw Model Output: {raw_response}") # Debug print
        
        # Parse JSON
        data = json.loads(raw_response)
        
        # Normalize: Try to find a list of actions
        if "plan" in data:
            return data["plan"]
        elif isinstance(data, list):
            return data
        else:
            # If it returns a single object, wrap it in a list
            return [data]

    except Exception as e:
        print(f"[BRAIN] Error: {e}")
        return []

### INITIALIZE SIMULATION IN PYBULLET

In [6]:
ROBOT_ID = None
END_EFFECTOR_INDEX = 6
OBJECTS = {}   # Stores ID of red_box, blue_box, etc.
LOCATIONS = {} # Stores coordinates of home, drop_zone
MAGNET_CONSTRAINT = None # Stores the ID of the connection when holding an object
SIM_ACTIVE = True

def setup_world():
    global ROBOT_ID, OBJECTS, LOCATIONS
    
    # Initialize
    p.connect(p.GUI)
    p.setAdditionalSearchPath(pybullet_data.getDataPath())
    p.setGravity(0, 0, -9.81)

    # Load Floor
    p.loadURDF("plane.urdf")
    
    # Load Table (Rotated 90 deg, set at proper height)
    p.loadURDF("table/table.urdf", basePosition=[0.5, 0, -0.65], baseOrientation=p.getQuaternionFromEuler([0, 0, 1.57]))

    # Load Robot (Kuka IIWA)
    # Mounted at the edge (0.1, 0, 0) sitting ON the table
    ROBOT_ID = p.loadURDF("kuka_iiwa/model.urdf", basePosition=[0.1, 0, 0], useFixedBase=True)
    
    # Define Locations
    LOCATIONS = {
        "home": [0.3, 0, 0.6],
        "drop_zone": [0.0, 0.5, 0.4] 
    }

    # Spawn Boxes (Helper function called inside)
    OBJECTS["red_box"] = spawn_box([0.6, 0.2, 0.1], [1, 0, 0, 1])
    OBJECTS["green_box"] = spawn_box([0.6, 0.0, 0.1], [0, 1, 0, 1])
    OBJECTS["blue_box"] = spawn_box([0.6, -0.2, 0.1], [0, 0, 1, 1])

def spawn_box(pos, color_rgba):
    # Spawns a box and colors it
    box_id = p.loadURDF("cube.urdf", basePosition=pos, globalScaling=0.08)
    p.changeVisualShape(box_id, -1, rgbaColor=color_rgba)
    return box_id

def get_coordinate(name):
    # Returns [x,y,z] for a name (box or location)
    if name in OBJECTS:
        pos, _ = p.getBasePositionAndOrientation(OBJECTS[name])
        return list(pos)
    elif name in LOCATIONS:
        return LOCATIONS[name]
    return None

def move_arm(target_pos):
    print(f"[ROBOT] Moving to {target_pos}...")
    
    # Orientation: Gripper pointing down
    orn = p.getQuaternionFromEuler([0, 3.14, 0])
    
    # Inverse Kinematics calculation
    joint_poses = p.calculateInverseKinematics(
        ROBOT_ID, 
        END_EFFECTOR_INDEX, 
        target_pos, 
        orn,
        maxNumIterations=100
    )

    # Apply movement to motors
    for i in range(len(joint_poses)):
        p.setJointMotorControl2(ROBOT_ID, i, p.POSITION_CONTROL, joint_poses[i], force=500)
    
    # Small blocking delay to let it travel
    time.sleep(1.5)

def toggle_magnet(state):
    global MAGNET_CONSTRAINT
    
    # Get current position of the gripper
    ee_pos = p.getLinkState(ROBOT_ID, END_EFFECTOR_INDEX)[0]
    
    if state == "on":
        # Find closest object
        closest_obj = None
        min_dist = 0.2 # 20cm search radius

        for name, obj_id in OBJECTS.items():
            obj_pos, _ = p.getBasePositionAndOrientation(obj_id)
            dist = np.linalg.norm(np.array(ee_pos) - np.array(obj_pos))
            if dist < min_dist:
                min_dist = dist
                closest_obj = obj_id
        
        # If found and not already holding something
        if closest_obj is not None and MAGNET_CONSTRAINT is None:
            print(f"[ROBOT] Magnet ON. Attached to object {closest_obj}")
            # Create a fixed link between gripper and box
            MAGNET_CONSTRAINT = p.createConstraint(
                ROBOT_ID, END_EFFECTOR_INDEX, 
                closest_obj, -1, 
                p.JOINT_FIXED, [0, 0, 0], [0, 0, 0], [0, 0, 0]
            )
        else:
            print("[ROBOT] Magnet ON. No object found nearby.")

    elif state == "off":
        if MAGNET_CONSTRAINT is not None:
            print("[ROBOT] Magnet OFF. Object dropped.")
            p.removeConstraint(MAGNET_CONSTRAINT)
            MAGNET_CONSTRAINT = None

def physics_thread():
    # This runs in the background to keep gravity working
    while SIM_ACTIVE:
        p.stepSimulation()
        time.sleep(1./240.)

### MAIN LOOP THAT CONTROLS ALL 3 PARTS 

In [7]:
if __name__ == "__main__":
    
    # A. Setup for Whisper
    print("--- 1. Loading Whisper Model ---")
    whisper_model = whisper.load_model(MODEL_NAME, device=device)
    
    print("--- 2. Setting up PyBullet ---")
    setup_world()
    
    # B. Start Physics in Background Thread
    t = threading.Thread(target=physics_thread)
    t.start()
    
    # Move to starting position, for me it was convinient for keep the robot in the home position
    move_arm(LOCATIONS["home"])

    try:
        while True:
            # To make The code observations easier it stops here until ENTER is pressed to make it ready.
            input("\n[USER] >>> Press ENTER to start listening... (or Ctrl+C to exit)")

            # Listen
            user_text = audio_by_user()
            
            if user_text:
                print(f"[USER] Heard: \"{user_text}\"")
                
                if "exit" in user_text.lower():
                    break
                
                # D. Plan
                plan = get_mission_plan(user_text)
                
                # E. Execute
                if plan:
                    print(f"[PLAN] Executing {len(plan)} steps...")
                    for step in plan:
                        action = step.get('action')
                        
                        if action == "move_to":
                            target = step.get('target')
                            coords = get_coordinate(target)
                            
                            if coords:
                                if "box" in target: 
                                    coords[2] += 0.05
                                move_arm(coords)
                            else:
                                print(f"[ERROR] Unknown target: {target}")

                        elif action == "magnet":
                            state = step.get('state')
                            toggle_magnet(state)
                            
                        time.sleep(0.5)
                    
                    print("[SYSTEM] Execution finished.")

    except KeyboardInterrupt:
        print("\n[SYSTEM] Stopping...")

    finally:
        SIM_ACTIVE = False
        t.join()
        p.disconnect()

--- 1. Loading Whisper Model ---
--- 2. Setting up PyBullet ---
[ROBOT] Moving to [0.3, 0, 0.6]...

--- RECORDING ---
Say: 'Robot, move forward one meter.'
Listening...
Transcribing...
Whisper Result: "Go to red box."
[USER] Heard: "Go to red box."
[BRAIN] Processing logic with mistral...
[BRAIN] Raw Model Output: {"action": "move_to", "target": "red_box"}
[PLAN] Executing 1 steps...
[ROBOT] Moving to [0.5999970930564358, 0.2003142621971403, 0.08998860621253653]...
[SYSTEM] Execution finished.

--- RECORDING ---
Say: 'Robot, move forward one meter.'
Listening...
Transcribing...
Whisper Result: "Turn on turn on magnet"
[USER] Heard: "Turn on turn on magnet"
[BRAIN] Processing logic with mistral...
[BRAIN] Raw Model Output: {"action": "magnet", "state": "on"}
[PLAN] Executing 1 steps...
[ROBOT] Magnet ON. Attached to object 3
[SYSTEM] Execution finished.

--- RECORDING ---
Say: 'Robot, move forward one meter.'
Listening...
Transcribing...
Whisper Result: "Go to home position."
[USER] Hea

Exception in thread Thread-3 (physics_thread):
Traceback (most recent call last):
  File "C:\Users\Userl\AppData\Local\Programs\Python\Python312\Lib\threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "d:\envoirment1\Lib\site-packages\ipykernel\ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "C:\Users\Userl\AppData\Local\Programs\Python\Python312\Lib\threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Userl\AppData\Local\Temp\ipykernel_53236\1649454323.py", line 113, in physics_thread
pybullet.error: Not connected to physics server.



--- RECORDING ---
Say: 'Robot, move forward one meter.'
Listening...
[ERROR] Listening timed out. No speech detected.

[SYSTEM] Stopping...


error: Not connected to physics server.