In [4]:
import os
import sqlite3
import pyttsx3
import ollama
import speech_recognition as sr
from fuzzywuzzy import process
import pyttsx3
from docx import Document
import ollama
import os
import pandas as pd
import subprocess

In [5]:

# Initialize TTS engine
engine = pyttsx3.init()


# Function to speak text
def speak(text):
    engine.say(text)
    engine.runAndWait()

# Listen to voice commands
def listen(mic_index=None):
    recognizer = sr.Recognizer()
    mic = sr.Microphone(device_index=mic_index) if mic_index is not None else sr.Microphone()
    
    with mic as source:
        print("Listening for command...")
        recognizer.adjust_for_ambient_noise(source)
        audio = recognizer.listen(source)
        
        try:
            command = recognizer.recognize_google(audio)
            print(f"You said: {command}")
            return command.lower()
        except sr.UnknownValueError:
            speak("Sorry, I didn't catch that.")
            return ""
        except sr.RequestError:
            speak("Sorry, there was a network error.")
            return ""




# Intent recognition using LLM
def recognize_intent_with_llm(command):
    prompt = (
        f"User command: \"{command}\"\n"
        "Identify the intent of the command and respond with one of these labels:\n"
        "add_text, delete_line, read_document, read_line, next_line, previous_line, edit_line, exit.\n"
        "Respond only with the intent label and nothing else."
    )
    # Send prompt to LLM
    stream = ollama.chat(
        model='llama3.1:8b',
        messages=[{'role': 'user', 'content': prompt}],
        stream=True
    )
    
    intent = ""
    for chunk in stream:
        intent += chunk['message']['content']
    
    return intent.strip().lower()



# Function to extract line number using LLM
def get_line_number_with_llm(command):
    prompt = (
        f"User input: \"{command}\"\n"
        "Identify and respond with only the numeric line number mentioned in the input. "
        "For example, if the input is 'read line three,' respond with '3'. No extra text, just the number."
    )
    # Send prompt to LLM
    stream = ollama.chat(
        model='llama3.1:8b',
        messages=[{'role': 'user', 'content': prompt}],
        stream=True
    )
    
    line_number = ""
    for chunk in stream:
        line_number += chunk['message']['content']
    
    return line_number.strip()


# Open and initialize document
def open_document(filename):
    if filename.endswith(".docx"):
        return Document(filename)
    elif filename.endswith(".txt"):
        with open(filename, "r+") as file:
            return file.read().splitlines()  # Treat each line as an entry in a list


# Save changes to the document
def save_document(content, filename):
    if filename.endswith(".docx"):
        content.save(filename)
    elif filename.endswith(".txt"):
        with open(filename, "w") as file:
            file.write("\n".join(content))  # Save content list back to file


# Add text to the document
def add_text(content, filename, new_text):
    if filename.endswith(".docx"):
        content.add_paragraph(new_text)
    elif filename.endswith(".txt"):
        content.append(new_text)
    speak("Text added successfully.")

# Delete text by line number
def delete_line(content, line_num, filename):
    try:
        if filename.endswith(".docx"):
            paragraph = content.paragraphs[line_num]
            paragraph.clear()
        elif filename.endswith(".txt"):
            content.pop(line_num)
        speak("Line deleted successfully.")
    except IndexError:
        speak("Invalid line number.")

# Use LLM to edit a specific line
def edit_line_with_llm(original_line, edit_instruction):
    prompt = (
        f"Original line: \"{original_line}\"\n"
        f"Edit request: {edit_instruction}\n"
        "Provide only the updated line with no extra text:"
    )
    # Send prompt to LLM
    stream = ollama.chat(
        model='llama3.1:8b',
        messages=[{'role': 'user', 'content': prompt}],
        stream=True
    )
    
    edited_line = ""
    for chunk in stream:
        edited_line += chunk['message']['content']

    return edited_line.strip()


# Read and navigate through the document
def read_document(content, filename, line_num=None):
    if filename.endswith(".docx"):
        text = content.paragraphs[line_num].text if line_num is not None else "\n".join([p.text for p in content.paragraphs])
    elif filename.endswith(".txt"):
        text = content[line_num] if line_num is not None else "\n".join(content)
    
    speak(text)



def edit_document(mic_index, file_path):
    # Hardcoded file path for testing
    filename = file_path

    if not os.path.exists(filename):
        speak("File not found. Please try again.")
        return

    # Open the document
    content = open_document(filename)
    current_line = 0

    speak("File opened successfully. You can say commands like add text, delete line, edit line, or read document.")
    
    while True:
        command = listen(mic_index)
        intent = recognize_intent_with_llm(command)
        
        if intent == "exit":
            save_document(content, filename)
            speak("Changes saved. Goodbye!")
            break
        
        elif intent == "add_text":
            speak("What text would you like to add?")
            new_text = listen(mic_index)
            add_text(content, filename, new_text)

        elif intent == "delete_line":
            speak("Which line number would you like to delete?")
            try:
                line_num = int(get_line_number_with_llm(listen(mic_index)))
                delete_line(content, line_num - 1, filename)
            except ValueError:
                speak("Invalid line number.")

        elif intent == "read_document":
            read_document(content, filename)

        elif intent == "read_line":
            speak("Which line number would you like to hear?")
            try:
                line_command = listen(mic_index)
                line_num = int(get_line_number_with_llm(line_command))
                read_document(content, filename, line_num - 1)
            except ValueError:
                speak("Invalid line number.")
        
        elif intent == "next_line":
            current_line += 1
            read_document(content, filename, current_line)

        elif intent == "previous_line":
            current_line = max(0, current_line - 1)
            read_document(content, filename, current_line)
        
        elif intent == "edit_line":
            speak("Which line number would you like to edit?")
            try:
                line_command = listen(mic_index)
                line_num = int(get_line_number_with_llm(line_command)) - 1
                
                original_line = content.paragraphs[line_num].text if filename.endswith(".docx") else content[line_num]
                
                speak("What changes would you like to make?")
                edit_instruction = listen(mic_index)
                
                # Get edited line from LLM
                edited_line = edit_line_with_llm(original_line, edit_instruction)
                
                # Update the document with the edited line
                if filename.endswith(".docx"):
                    content.paragraphs[line_num].text = edited_line
                elif filename.endswith(".txt"):
                    content[line_num] = edited_line
                
                speak("Line edited successfully.")
            except (ValueError, IndexError):
                speak("Invalid line number or edit command.")
        
        else:
            speak("Sorry, I didn't understand that command. Please try again.")



In [6]:
# Initialize the text-to-speech engine
engine = pyttsx3.init()

# Database Initialization and File Indexing
# class FileIndexer:
#     def __init__(self, db_name="file_index.db"):
#         self.db_name = db_name
#         self.initialize_database()
#         self.index_files()

#     def initialize_database(self):
#         """Creates a database to index system files."""
#         conn = sqlite3.connect(self.db_name)
#         cursor = conn.cursor()
#         cursor.execute('''CREATE TABLE IF NOT EXISTS files (id INTEGER PRIMARY KEY, name TEXT, path TEXT)''')
#         conn.commit()
#         conn.close()

#     def index_files(self):
#         """Indexes all files in the system."""
#         conn = sqlite3.connect(self.db_name)
#         cursor = conn.cursor()
#         for root, dirs, files in os.walk("E:/AIHCI-Blind/test"):  # Change path as needed
#             for file in files:
#                 print(file)
#                 cursor.execute("INSERT INTO files (name, path) VALUES (?, ?)", (file, os.path.join(root, file)))
#         conn.commit()
#         conn.close()

# Microphone Handling
class MicrophoneHandler:
    def list_microphones(self):
        """Lists all available microphones."""
        mic_list = sr.Microphone.list_microphone_names()
        print("Available microphones:")
        for i, microphone_name in enumerate(mic_list):
            print(f"{i}: {microphone_name}")
        return mic_list

    def select_microphone(self):
        """Prompts the user to select a microphone by index."""
        mic_list = self.list_microphones()
        try:
            mic_index = int(input("Enter the microphone index you want to use: "))
            if mic_index >= 0 and mic_index < len(mic_list):
                return mic_index
            else:
                print("Invalid index, using the default microphone.")
                return None
        except ValueError:
            print("Invalid input, using the default microphone.")
            return None

# Speech Handling Functions
def speak(text):
    """Converts text to speech."""
    engine.say(text)
    engine.runAndWait()

def listen(mic_index=None):
    """Listens to the user's voice input using the selected microphone."""
    recognizer = sr.Recognizer()
    mic = sr.Microphone(device_index=mic_index) if mic_index is not None else sr.Microphone()
    with mic as source:
        print("Listening...")
        recognizer.adjust_for_ambient_noise(source)
        audio = recognizer.listen(source)
        try:
            command = recognizer.recognize_google(audio)
            print(f"You said: {command}")
            return command.lower()
        except sr.UnknownValueError:
            speak("Sorry, I didn't catch that.")
        except sr.RequestError:
            speak("Sorry, there was a network error.")
    return ""

# LLM Interface
def interact_with_ollama(prompt):
    """Sends the user prompt to Ollama and receives a CLI command."""
    try:
        stream = ollama.chat(
            model='llama3.1:8b',
            messages=[{'role': 'user', 'content': prompt}],
            stream=True
        )
        response = ""
        for chunk in stream:
            response += chunk['message']['content']
        print("Response from interact with Ollama:", response.strip())
        return response
    except Exception as e:
        print(f"An error occurred: {e}")
        return "Sorry, something went wrong."
    

# Intent Classification
def get_intent(command):
    """Determines intent of user command."""
    prompt = f"Classify this command as one of the following: 'changedirectory', 'list', 'read' or 'search'. Reply with only one word from this list, without any extra text, punctuation, or phrases, based on the command given after the colon:'{command}'"
    # print the intent
    response = interact_with_ollama(prompt)

    return response.strip().lower()

# Command Execution Functions
def change_directory(command):
    """Executes a 'change directory' command."""
    directory = interact_with_ollama(command)
    print("Directory to change to:", directory)
    try:
        os.chdir(directory)
        current_dir = os.getcwd()
        # Open the same directory in Windows Explorer
        subprocess.Popen(f'explorer "{current_dir}"')
        speak(f"Changed directory to {current_dir}")
        return current_dir
    except Exception as e:
        speak(f"Error changing directory: {e}")
        return f"Error: {e}"

def list_files():
    """Lists files in the current directory."""
    try:
        files = os.listdir()
        result = "\n".join(files) if files else "No files found in this directory."
        speak(f"Files in current directory are: {result}")
        return result
    except PermissionError:
        speak("Permission denied. Unable to list the contents of this directory.")
        return "Error: Permission denied."

# def search_files(query):
#     """Searches indexed files using a query."""
#     conn = sqlite3.connect("file_index.db")
#     cursor = conn.cursor()
#     cursor.execute("SELECT name, path FROM files")
#     files = cursor.fetchall()
#     conn.close()

#     search_term = " ".join(query.split()[1:])
#     matched_files = process.extract(search_term, [file[0] for file in files], limit=5)
#     results = []

#     for match in matched_files:
#         file_name, path = next((f for f in files if f[0] == match[0]), (None, None))
#         if path:
#             results.append(f"{file_name} located at {path}")
#             speak(f"{file_name} located at {path}")

#     return "\n".join(results) if results else "No matching files found."




def search_files(file_name,file_df):
    """Searches indexed files using a query with the DataFrame."""
    #search_term = " ".join(query.split()[1:])  # Extract the search term from the query
    file_names = file_df['File name'].tolist()  # Convert file names to a list

    # Use fuzzy matching to find the closest file names
    matched_files = process.extract(file_name, file_names, limit=3)
    results = []

    for match in matched_files:
        # Retrieve the file path based on the matched file name
        file_row = file_df[file_df['File name'] == match[0]]
        if not file_row.empty:
            file_path = file_row.iloc[0]['Path']  # Get the first matched path
            results.append(file_path)
            speak(f"{match[0]} located at {file_path}")

    return "\n".join(results) if results else "No matching files found."

# Command Routing
def execute_command(command,mic_index,file_df):
    """Routes the command based on intent."""
    intent = get_intent(command)
    print("Intent:", intent)
    if intent == "changedirectory":
        print("I am in the change directory intent")
        # chance directory prompt for llama to take into account we are using os.chdir("") and want what we want to put in the brackets
        command = f"Translate the following request into only the directory path needed for os.chdir, without any extra words, explanations, or punctuation: '{command}'"
        return change_directory(command)
    elif intent == "list":
        # list files prompt for llama to take into account we are using os.listdir() and want what we want to put in the brackets
        command = f"Translate the following request into a command suitable for os.listdir(): '{command}'"
        return list_files()
    elif intent == "search":
        # search files prompt for llama to take into account we are using the search function and want what we want to put in the brackets
        command = f"Translate the following request into a search query: '{command}'"
        return search_files(command,file_df)
    elif intent == "read":
        
        # Get the current working directory
        current_directory = os.getcwd()

        # Extract the drive letter
        current_drive = os.path.splitdrive(current_directory)[0]
        
        # Handle document reading command
        #document_name = command.split("read")[-1].strip()  # Extract document name from command

        command =f"Extract only the file name from the following command without any extra text, words, or punctuation and if there is dot in the command change it it '.': '{command}'"
        document_name = interact_with_ollama(command)

        print("Document name:", document_name)
        file_paths = search_files(document_name,file_df)  # Search for the document in the indexed files
        print("File paths from db search:", file_paths)

        # If multiple files are found, ask the user to specify the file
        if len(file_paths.split("\n")) > 0:
            speak("Multiple files found. Please specify the file you want to read.")
            filenumber = listen(mic_index)
            # pass the list to llama to get the file path based on our input number/ choice
            command = f"Translate the following request into a file path corresponding to the file number in the list of gile given you want to read from {file_paths} only return the corrsponding file pathwithout any extra text, words, or punctuation : '{filenumber}'"
            file_path = interact_with_ollama(command)

            print("File path from llama:", file_path)
            
            #file_path = search_files(file_name,file_df)

        
        #file_path=r"E:\AIHCI-Blind\test\test.docx"

        print("File path:", file_path)
        
        if file_path:
            return edit_document(mic_index,file_path)  # Call the document reader function
        else:
            speak("Document not found.")
            return "Document not found."
    else:
        speak("Sorry, I can only execute 'cd', 'ls', or 'search' commands.")
        return "Invalid command"

# Main Pipeline
def main():
    #indexer = FileIndexer()\
    file_df = pd.read_pickle("file_list.pkl")
    mic_handler = MicrophoneHandler()
    mic_index = mic_handler.select_microphone()  # Select microphone

    speak("Hello! I am ready to assist you with basic command line tasks. Please tell me what you want to do.")
    
    while True:
        user_command = listen(mic_index)
        if "exit" in user_command:
            speak("Goodbye!")
            break

        result = execute_command(user_command,mic_index,file_df)
        print(result)
        if result:
            speak(result)

if __name__ == "__main__":
    main()


Available microphones:
0: Microsoft Sound Mapper - Input
1: Headset Microphone (Oculus Virt
2: Microphone (G435 Wireless Gamin
3: Microphone (Realtek USB Audio)
4: Microphone (3- USB PnP Audio De
5: Microphone (Steam Streaming Mic
6: Headset Microphone (2- Big bro)
7: Microsoft Sound Mapper - Output
8: Speakers (2- Big bro)
9: Realtek Digital Output (Realtek
10: Headphones (Realtek USB Audio)
11: ASUS VG32V (NVIDIA High Definit
12: Headphones (Oculus Virtual Audi
13: Headset Earphone (G435 Wireless
14: Speakers (Steam Streaming Speak
15: Speakers (Steam Streaming Micro
16: U32J59x (NVIDIA High Definition
17: Primary Sound Capture Driver
18: Headset Microphone (Oculus Virtual Audio Device)
19: Microphone (G435 Wireless Gaming Headset)
20: Microphone (Realtek USB Audio)
21: Microphone (3- USB PnP Audio Device)
22: Microphone (Steam Streaming Microphone)
23: Headset Microphone (2- Big bro)
24: Primary Sound Driver
25: Speakers (2- Big bro)
26: Headphones (Realtek USB Audio)
27: ASUS VG32V

KeyboardInterrupt: 