In [None]:
import speech_recognition as sr
import warnings
import pyaudio
p = pyaudio.PyAudio()

warnings.filterwarnings('ignore', module='pyaudio')

for index, name in enumerate(sr.Microphone.list_microphone_names()):
    print("Microphone with name \"{1}\" found for `Microphone(device_index={0})`".format(index, name))

In [None]:

from kokoro import KPipeline
from IPython.display import display, Audio

pipeline_TTS = KPipeline(lang_code='b',repo_id = 'hexgrad/Kokoro-82M')


import speech_recognition as sr
import time
import re
from typing import List, Optional, Tuple
from rapidfuzz import fuzz

from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

import rospy
import numpy as np
import open3d as o3d
import sys
import moveit_commander
import moveit_msgs.msg
from configparser import ConfigParser

import pyautogui
import threading


from utils import CameraProcessor, PointCloudViewer, Inspector


np.set_printoptions(suppress=True)

config = ConfigParser() #For config.ini


rospy.init_node("Cognitive_Motion", anonymous=True)

moveit_commander.roscpp_initialize(sys.argv)
robot = moveit_commander.RobotCommander()
scene = moveit_commander.PlanningSceneInterface()
group_name = "panda_manipulator"
move_group = moveit_commander.MoveGroupCommander(group_name) #we'll pass it on while calling functions



zoom_def = 0.78
front_def = [ -0.1202421863757063, -0.98750918258911113, -0.10182058199487866 ]
lookat_def = [ 0.23267809182614518, 0.058752367596889288, 0.42837016799860811 ]
up_def = [ -0.018570743687429749, -0.10030934011800337, 0.99478297319766518 ]


saved_path = "VI_appdata/Saved_coordinates/"


pose_list = ['initial_coordinates_down_med', 'initial_coordinates_down_high', 'initial_coordinates_front_low',
             'initial_coordinates_front_med', 'initial_coordinates_left_origin_low', 'initial_coordinates_left_origin_med',
             'initial_coordinates_left_extended_low', 'initial_coordinates_left_extended_med', 'initial_coordinates_right_origin_low',
             'initial_coordinates_right_origin_med', 'initial_coordinates_right_extended_low', 'initial_coordinates_right_extended_med',
             'initial_coordinates_current_pose']

pose_list_joint = ['initial_coordinates_down_med_joint', 'initial_coordinates_down_high_joint', 'initial_coordinates_front_low_joint',
                   'initial_coordinates_front_med_joint', 'initial_coordinates_left_origin_low_joint', 'initial_coordinates_left_origin_med_joint',
                   'initial_coordinates_left_extended_low_joint', 'initial_coordinates_left_extended_med_joint', 'initial_coordinates_right_origin_low_joint',
                   'initial_coordinates_right_origin_med_joint', 'initial_coordinates_right_extended_low_joint','initial_coordinates_right_extended_med_joint',
                   'initial_coordinates_current_pose_joint']

mesh = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1,origin=[0, 0, 0])
World_mesh_big = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.3,origin=[0, 0, 0])

config = ConfigParser() #recreate the object
file_path = 'VI_appdata/'
file_name = 'config.ini'


if len(config.read(file_path+file_name)) < 1:
    print("File not Found!")
else:
    #Load all variables
    samples = max(config.getint('Settings', 'samples'),1)
    spacing = max(config.getfloat('Settings', 'spacing'),0)
    offset_y = config.getfloat('Settings', 'offset_y')
    offset_z = config.getfloat('Settings', 'offset_z')
    trim_base = config.getfloat('Settings', 'trim_base')
    manual_offset = config.getfloat('Settings', 'manual_offset')
    cluster_centered = config.getboolean('Settings', 'Cluster_centered')
    cluster_idx = config.getint('Settings', 'Cluster_idx')
    cluster_discard = config.getint('Settings', 'Cluster_discard')
    eps = config.getfloat('Settings', 'eps')
    min_points = config.getint('Settings', 'min_points')
    cluster_trim = config.getfloat('Settings', 'Cluster_trim')
    tgt_coord_samples = max(config.getint('Settings', 'TGT_coord_Samples'),3)
    tgt_final_trim = config.getfloat('Settings', 'TGT_final_trim')
    tgt_reverse = config.getboolean('Settings', 'TGT_reverse')
    tgt_preview = config.getboolean('Settings', 'TGT_preview')
    z_offset = config.getfloat('Settings', 'z_offset')
    coord_skip = max(config.getint('Settings', 'coord_skip'),0)+1 #+1 to match the for loop's skip method.
    tgt_motion_delay = config.getfloat('Settings', 'TGT_motion_delay')
    tgt_save = config.getboolean('Settings', 'TGT_save')
    dbug = config.getboolean('Settings', 'Dbug')


view_cam_parameters = "VI_appdata/view.json"


camera_processor = CameraProcessor(samples=samples, offset_y=offset_y, offset_z=offset_z, trim_base=trim_base, manual_offset=manual_offset, 
                                   cluster_discard=cluster_discard, spacing=spacing, eps=eps, min_points=min_points, cluster_trim=cluster_trim, 
                                   tgt_coord_samples=tgt_coord_samples, tgt_final_trim=tgt_final_trim, tgt_reverse=tgt_reverse, tgt_preview=tgt_preview, 
                                   z_offset=z_offset, coord_skip=coord_skip, tgt_motion_delay=tgt_motion_delay, tgt_save=tgt_save, dbug=dbug, robo=True)





selected_pose_joint = eval(config.get('Init_Pose', pose_list_joint[1])) #eval will revert char string to whatever it was.
print("Moving to initial position")
camera_processor.go_to_joint_state(move_group, selected_pose_joint)

###====================================================================================================================================================================

def strip_code_block_old(text):
    #Remove triple-backtick code blocks (with optional language like ```python)
    text = re.sub(r"```(?:\w+)?\n(.*?)```", r"\1", text, flags=re.DOTALL)

    #Remove inline backtick-wrapped code like `print("Whatever!")`
    text = re.sub(r"`([^`]*)`", r"\1", text)

    return text.strip()


def strip_code_block(text):
    """Extracts Python code from a string wrapped in <code>...</code> tags."""
    match = re.search(r"<code>(.*?)</code>", text, re.DOTALL)
    return match.group(1).strip() if match else text.strip()


def update_msg(msg: list, role = "user",content="") -> list:
    """
    Returns a new message list with an additional message dictionary appended.

    Parameters:
    - msg (list): The original list of message dictionaries (e.g., [{"role": ..., "content": ...}, ...]).
    - role (str): The role for the new message (default is "user").
    - content (str): The content of the new message.

    Returns:
    - list: A new list with the additional message appended, leaving the original list unchanged.
    """
    tmp = list(msg)
    template = {"role": role, "content": content}
    tmp.append(template)
    #print(msg)
    return tmp


#============================================================================================================== To detect if generated response is code or text...
PYTHON_KEYWORDS = ["print(", "say(", "len(", "thread_handle."]

def is_python_code(text, heuristic_threshold=1):
    
    lines = text.strip().split('\n')
    keyword_count = sum(any(kw in line for kw in PYTHON_KEYWORDS) for line in lines)
    has_colon = any(line.strip().endswith(':') for line in lines)
    has_equals = any('=' in line and '==' not in line for line in lines)
    has_indent = any(line.startswith(('    ', '\t')) for line in lines)
    has_brackets = any(re.search(r'[\[\]\(\)\{\}]', line) for line in lines)
    is_multiline = len(lines) > 1
    score = (keyword_count + int(has_colon) + int(has_equals) + int(has_indent) + int(has_brackets) + int(is_multiline))
    #print(f"keyword_count = {keyword_count}, has_colon = {int(has_colon)}, has_equals = {int(has_equals)}, has_indent = {int(has_indent)}, is_multiline = {int(is_multiline)}")
    return score >= heuristic_threshold


#Qwen/Qwen2.5-1.5B-Instruct
#Qwen/Qwen2.5-Coder-1.5B-Instruct
#Qwen/Qwen2.5-Coder-3B-Instruct

model_name = "Qwen/Qwen2.5-Coder-1.5B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_name, trust_remote_code=True, torch_dtype=torch.bfloat16).cuda()

System_prompt_no_reason = """Imagine we are working on HRC based path planning system using a manipulator robot. The robotic arm has a depth camera attached to its end effector. I would like you to assist me in interacting with the system and sending commands to this robot. 
There are five main steps that are required to complete the Inspection.
STEP 1: Fetch point cloud
STEP 2: Cluster point cloud
STEP 3: Generate Inspection path
STEP 4: Create robot targets
STEP 5: Run through targets  OR   plan and execute path
If the user asks what to do, briefly explain in few lines these steps on higher level without giving function names or generating any code. Do not tell how to call functions. Instead, tell user the simple commands that can be called to complete the steps.

For example1:
Me: Who are you?
You: I am an AI assistant programmed to assist you with HRC based path planning system using a manipulator robot. 

For example2:
Me: What should I do first?
You: To get started with the HRC based path planning system using a manipulator robot, you can begin by loading the point cloud data from the depth camera by instructing me to `fetch point cloud`.

For example3:
Me: What should I do next?
You: Next, you can cluster the point cloud into individual objects or points by instructing me to `cluster point cloud`. This step helps in identifying different Objects in the frame and help is selecting correct object for inspection.

At any point, you have access to the following set of functions and coding blocks that starts with <code> tag and ends with </code> tag.
All functions and codes are in Python language and thier use is explained in comment that starts with #. You are not to use any hypothetical functions. Do not include code comments in codes.


<code>
say("fetching pointcloud.") #Notify the user of fetching point cloud.
pointcloud = camera_processor.load_point_cloud(samples, offset_y, offset_z, manual_offset, spacing, trim_base, Hide_prev=False, Dbug=dbug, eval_tag=False) #This function when called with all parameters as listed, fetches a point cloud.
</code>


<code>
thread_handle.exit_viewer() #This function when called, closes the opened point cloud viewer or selector. 
</code>

<code>
thread_handle.next_pc()     #This function when called, shows the next object or point cloud.
</code>

<code>
thread_handle.prev_pc()         #This function when called, shows the previous object or point cloud.
</code>

<code>
thread_handle.rotate_object()    #This function when called, applies rotation animation loop to the object or point cloud.
</code>

<code>
thread_handle.stop_rotation() #This function when called, stops rotation of the object or point cloud.
</code>

<code>
thread_handle.reset_view()  #This function when called, resets the view of the object or point cloud.
</code>

<code>
thread_handle.select_current()  #This function when called, selects the current object or point cloud.
</code>

<code>
thread_handle.deselect_current() #This function when called, deselects the current object or point cloud.
</code>

<code>
thread_handle.select_centered_profile_around_y() #This function when called, selects the centered profile around y axis.
</code>

<code>
thread_handle.select_centered_profile_around_x() #This function when called, selects the centered profile around x axis.
</code>

<code>
thread_handle.select_centered_profile_around_z() #This function when called, selects the centered profile around z axis.
</code>

<code>
thread_handle.select_specific_profile_around_y(p) #This function takes specific profile number p as input and returns that specific profile around y axis as output.
</code>

<code>
thread_handle.select_specific_profile_around_x(p) #This function takes specific profile number p as input and returns that specific profile around x axis as output.
</code>

<code>
thread_handle.select_specific_profile_around_z(p) #This function takes specific profile number p as input and returns that specific profile around z axis as output.
</code>

<code>
thread_handle.select_multiple_profiles_around_y(n=k) #This function takes number of profiles k as input and returns that many profiles around y axis as output.
</code>

<code>
thread_handle.select_multiple_profiles_around_x(n=k) #This function takes number of profiles k as input and returns that many profiles around x axis as output.
</code>

<code>
thread_handle.select_multiple_profiles_around_z(n=k) #This function takes number of profiles k as input and returns that many profiles around z axis as output.
</code>

<code>
thread_handle.select_profile_with_angle(angle_degrees=45, direction='left') #This function takes angle in degrees and direction 'left' or 'right' as input and returns single profile as output. Here, it selects a single profile 45 degrees towards left of the object.
</code>

<code>
thread_handle.select_multiple_profiles_with_angle(angle_degrees=45, direction='right', n=10) #This function takes angle in degrees, direction 'left' or 'right' and number of profiles n as input and returns n number of profiles as output. Here, it selects 10 profiles 45 degrees towards right of the object.
</code>

<code>
thread_handle.preview_points_order(profiles, delay=0.05) #This function takes list of profiles and delay in seconds as input and shows a preview of profiles.
</code>



The following block of code that starts with <code> and ends with </code> tag clusters the point cloud.
<code>
#cluster pointcloud or object 
clouds = camera_processor.cluster_point_cloud(pointcloud, eps=eps, min_points=min_points) 
if cluster_discard > 0:
    cld_idx_remove = []
    for cld_idx in range (len(clouds)):
        if len(np.array(clouds[cld_idx].points)) <= cluster_discard:
            cld_idx_remove.append(cld_idx)
    clouds = np.delete(clouds, cld_idx_remove, axis=0)

if len(clouds)==1:
    results = clouds[0]
    say("Clustering completed. One object found!")
else:
    thread_handle = None
    def start_viewer():
        global thread_handle
        thread_handle = PointCloudViewer(clouds, results, say, view_cam_parameters)
        thread_handle.run()
    say("There are " + str(len(clouds)) + " objects available. Please select the desired objects to inspect!")
    results = []
    thread = threading.Thread(target=start_viewer)
    thread.start()
</code>


The following block of code that starts with <code> and ends with </code> tag generates inspection profiles.
<code>
#generate inspection profiles
thread_handle = None
profiles = []
def start_inspector():
    global thread_handle
    thread_handle = Inspector(results, spacing, profiles, say, view_cam_parameters)
    thread_handle.run()

say("Please select the desired inspection paths around the selected object.")
say("You have the option to choose a path that passes through the center of the object around any axis. Alternatively, you can select a specific profile around an axis to serve as a reference for path generation. You can also direct me to inspect the object from any angle to the left or right.")
thread = threading.Thread(target=start_inspector)
thread.start()
</code>

<code>
say("creating robot targets")
Batch_Profiles = camera_processor.create_robot_targets(move_group, profiles) #This function when called with parameters as listed, creates robot targets. 
</code>

The following block of code that starts with <code> and ends with </code> tag creates cartesian plan and execute.
<code>
say("planning")
res,fra = camera_processor.plan_cartesian_path(move_group, Batch_Profiles, eef_step=0.01, jump_threshold=0.0, velocity_scale=0.1, acceleration_scale=0.1)  
say(f"Planned {fra * 100:.2f}% of the path.")
say("Executing plan!")
camera_processor.execute_plan(move_group, res)
</code>

The following block of code that starts with <code> and ends with </code> tag moves the robot end effector through the targets gennerated using create_robot_targets() function.
<code>
#run through targets
say("moving through targets.")
for coords in Batch_Profiles:
    cam_tgt = coords[0]
    eef_tgt = coords[1]
    for id_x in range(0,len(eef_tgt)):  
        camera_processor.publish_coordinates([cam_tgt[id_x]], "world", 'Camera_Target', static = False)   
        camera_processor.go_to_coord_goal(move_group, eef_tgt[id_x])
        print("Moving to Target:",id_x+1)
        time.sleep(tgt_motion_delay)
</code>


Note that when user gives specific commands like cluster point cloud you will have to generate that specific code block from provided section. Note that code sections start with <code> tag and ends with </code> tag. Also include variable that stores result. Do not include code comments in codes.

All of your outputs need to be identified by one of the following tags: 
<code>Output code command that achieves the desired goal</code>

For example1:
Me: rotate object
You:
<code>
thread_handle.rotate_object()
</code>

For example2:
Me: select center profile around y axis
You:
<code>
thread_handle.select_centered_profile_around_y()
</code>

For example3:
Me: generate inspection profiles
You:
<code>
thread_handle = None
profiles = []
def start_inspector():
    global thread_handle
    thread_handle = Inspector(results, spacing, profiles, say, view_cam_parameters)
    thread_handle.run()

say("Please select the desired inspection paths around the selected object.")
say("You have the option to choose a path that passes through the center of the object around any axis. Alternatively, you can select a specific profile around an axis to serve as a reference for path generation. You can also direct me to inspect the object from any angle to the left or right.")
thread = threading.Thread(target=start_inspector)
thread.start()
</code>

Whenever user asks a question, then answer using say function.
example1:
Me: How many clusters or objects are available?
You:
<code>
say(f"There are {len(clouds)} clusters available")
</code>

example2:
Me: How many profiles are available around x axis?
You:
<code>
say(f"There are {thread_handle.profiles_available('x')} profiles around x axis available for selection.") #use say() function to print number of profiles available around x axis.
</code>

example3:
Me: How many profiles are available around x and y axis?
You:
<code>
say(f"There are {thread_handle.profiles_available('x')} profiles around x axis and {thread_handle.profiles_available('y')} around y axis available for selection.")  #use say() function to print number of profiles available around x and y axis.
</code>

""".strip()


#Template: List has dictionary values. Each dictionary has 2 sets of keys and values, keys: role and content, values like system, user etc.. and respective content.
#msg[2]['role']

msg_system = [{'role': 'system', 'content': System_prompt_no_reason}]


#================================================================================================= TTS KOKORO

def say(text):
    TTS_generator = pipeline_TTS(text, voice='bf_emma', speed=1, split_pattern=r'\n+')
    
    for i, (gs, ps, audio) in enumerate(TTS_generator):
        #print(i)  #i => index
        #print(gs) #gs => graphemes/text
        #print(ps) #ps => phonemes
        duration = len(audio) / 24000  #duration in seconds

        display(Audio(data=audio, rate=24000, autoplay=True))
        #display(Audio(data=audio, rate=24000, autoplay=i==0))
        time.sleep(duration)
        #sf.write(f'{i}.wav', audio, 24000) #save each audio file


#============================================================================================  Speech Recognition


WAKE_WORDS = ["hey Franka", "Here Franco", "Here, Franco"]
EXIT_WORDS = ["goodbye Franka"]

System_cmd = ["System command"]
System_WORDS = ["reload model"]


prompt_flag = False
llm_flag=False
system_flag = False
reload_model = False
Exit_speech = False  #Exit_speech flag for background listener.
say_flag=False

transcription = ""


#Normalize and clean transcription
def normalize_text(text: str) -> str:
    return re.sub(r'[^a-z ]+', '', text.lower().strip())

def is_similar_word(text: str, keywords: List[str], threshold: int = 80) -> Tuple[bool, Optional[str]]:
    """Check if text is similar to any keyword above a threshold.

    Args:
        text (str): The input text to compare.
        keywords (List[str]): List of wake words.
        threshold (int, optional): Matching threshold. Defaults to 80.

    Returns:
        Tuple[bool, Optional[str]]: (True, matching keyword) if found, otherwise (False, None).
    """
    if not text or not keywords:
        return False, None
        
    norm_text = normalize_text(text)

    for keyword in keywords:
        if fuzz.ratio(norm_text, normalize_text(keyword)) >= threshold:
            return True, keyword

    return False, None

def listen_and_trigger(recognizer):
    global mic2
    #global transcription
    global llm_flag
    global prompt_flag
    
    with mic2 as source:
        #recognizer.adjust_for_ambient_noise(source)
        print("\n[Trigger Listening...]")
        try:
            audio = recognizer.listen(source, timeout=20)

            #use internal faster whisper...
            transcription = recognizer.recognize_faster_whisper(audio_data=audio, model="faster-whisper-small.en", show_dict=False)
            
            #transcription = recognizer.recognize_google(audio)
            print(f"Trigger STT: {transcription}")

            llm_flag = True
            print("llm_flag set\n")
            prompt_flag = False
            return transcription

        except sr.WaitTimeoutError:
            print("Timeout reached.")
        except sr.UnknownValueError:
            print("Could not understand audio.")
        except sr.RequestError as e:
            print(f"Speech recognition error: {e}")

    



#this is called from the background thread
def callback_franka(recognizer, audio):
    #received audio data, now we'll recognize it using Google Speech Recognition
    global Exit_speech
    #global transcription
    global prompt_flag
    global system_flag
    global reload_model
    global llm_flag
    global say_flag
    
    #use internal faster whisper...
    transcription = recognizer.recognize_faster_whisper(audio_data=audio, model="faster-whisper-small.en", show_dict=False)
    #transcription = recognizer.recognize_google(audio)
    if len(transcription)== 0:
        return
        
    print(f"Background STT: {transcription}")

    if system_flag:
        
        cmd = is_similar_word(transcription, System_WORDS)[1]
        
        if cmd == "reload model":
            print(f"System cmd: {cmd}")
            reload_model = True
        else:
            print("Invalid System command.")

        system_flag = False
        print("system_flag reset","\n")

    elif is_similar_word(transcription, System_cmd)[0]:
        print(f"System Wake word: {transcription}")
        system_flag = True
        prompt_flag = False
        say_flag = True
        print("What is your command?")
        print()

    elif is_similar_word(transcription, WAKE_WORDS)[0]:
        print(f"Wake word: {transcription}")
        prompt_flag = True
        say_flag = True        

    elif is_similar_word(transcription, EXIT_WORDS)[0]:
        print(f"Exit word: {transcription}")
        print("Exiting...")
        Exit_speech = True

recognizer = sr.Recognizer()
mic = sr.Microphone()
mic2 = sr.Microphone()

with mic as source:
    recognizer.adjust_for_ambient_noise(source)  #we only need to calibrate once, before we start listening

print("Listening...")
#start listening in the background
stop_listening = recognizer.listen_in_background(mic, callback_franka) #`stop_listening` is now a function that, when called, stops background listening


#Do your stuff here in main thread...
while True: 

    #Reload LLM model
    if reload_model:
        del tokenizer
        del model
        torch.cuda.empty_cache()
        time.sleep(5)
        tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
        model = AutoModelForCausalLM.from_pretrained(model_name, trust_remote_code=True, torch_dtype=torch.bfloat16).cuda()
        
        msg_system = [{'role': 'system', 'content': System_prompt_no_reason}]  #Reset context
        
        reload_model = False
        print("model reloaded!")
        say("model reloaded!")
        print()

    elif system_flag and say_flag:
        say("what is your command?")
        say_flag=False
        
    elif prompt_flag and say_flag:
        say("yes?")
        say_flag=False
        prompt = listen_and_trigger(recognizer)

    elif llm_flag:
        if transcription != "  You": #Get rid of empty noise...
            #prompt = transcription
            print(f"Prompt: {prompt}")
            print("Running LLM...")

            msg = update_msg(msg_system, role="user", content = prompt)
            inputs = tokenizer.apply_chat_template(msg, add_generation_prompt=True, return_tensors="pt").to(model.device)
            outputs = model.generate(inputs, max_new_tokens=4096, do_sample=False, top_k=50, num_return_sequences=1, eos_token_id=tokenizer.eos_token_id)
            response = tokenizer.decode(outputs[0][len(inputs[0]):], skip_special_tokens=True) 
            print("=========================== Response:")
            print("\n",response,"\n")
            print()
            
            #Feedback the generated response to model prompt to keep context. (will increase context size and VRAM usage...)
            msg_system = update_msg(msg_system, role="assistant", content = response)
            
            try:
                code_block = strip_code_block(response)
                if not (is_python_code(code_block)):
                    code_block = f'say("{response}")' #if text then speak...
                    exec(code_block)
                else:
                    say("Okay!")
                    exec(code_block)
                    say("done!")
            except Exception as e:
                print("Error in executing command!")
                print(e)
                say("Error in executing command!")
            
        llm_flag = False
        print("llm_flag reset\n")

    
    #Stop background listener, delete model... Exit_speech flag set by callback fuction.
    elif Exit_speech == True:
        say("Goodbye!")
        del tokenizer
        del model
        del pipeline_TTS
        torch.cuda.empty_cache()
        stop_listening(wait_for_stop=False)
        break

