In [16]:
import re, os, sys

def extract_tool_requests(response):

    # Regex to match  the tool blocks 
    pattern = r'<tool\$(.*?)>(.*?)</tool\$>'
    matches = re.findall(pattern=pattern,       # find the regex in input data 
                         string=response,       # input data 
                         flags=re.DOTALL        # flags allows the . to match newline characters as well.
                         )
    
    tool_usages = []

    for match in matches:
        # print("match", match)
        attributes, body = match 
        # print("attributes: ", attributes)
        # print("body: ", body)
        tool_dir = {}

        # Parse attributes 
        for attr in re.findall(r'(\w+)="([^"]+)"', attributes):     # This line finds all substrings in response that match the defined pattern and return them as a list of tuples.
            # print("attr1: ", attr[1])
            tool_dir[attr[0]] = attr[1]
            # print("tool_dir", tool_dir)


        # Add body content 
        tool_dir["body"] = body.strip()
        tool_usages.append(tool_dir)
        # print("tool_usage", tool_usages)


    return tool_usages


# Example 
response = """
<tool$tool="search" query="example query">This is a search tool request.</tool$>
<tool$tool="image" query="example image">This is an image tool request.</tool$>
"""

tool_requests = extract_tool_requests(response)
tool_requests

[{'tool': 'search',
  'query': 'example query',
  'body': 'This is a search tool request.'},
 {'tool': 'image',
  'query': 'example image',
  'body': 'This is an image tool request.'}]

In [17]:
import os, re, sys

def read_file(relative_path, **kwargs):
    absolute_path = get_abs_path(relative_path)  # Construct the absolute path to the target file
    with open(absolute_path) as f:
        content = f.read()
    # Replace placeholders with values from kwargs
    for key, value in kwargs.items():
        placeholder = "{{" + key + "}}"
        strval = str(value)
        content = content.replace(placeholder, strval)
    return content 

def get_abs_path(*relative_paths):
    return os.path.join(get_base_dir(), *relative_paths)

def get_base_dir():
    # Use the current working directory as the base directory
    base_dir = os.getcwd()
    return base_dir

# Example usage
result = read_file('file.txt', name='Alice', place='Wonderland')
print(result)


Hello Alice
welcome to Wonderland


In [18]:
import os, re, sys 

def read_file(relative_path, **kwargs):

    absolute_path = get_abs_path(relative_path)
    with open(absolute_path) as f:
        content = f.read()

    # Replace placeholder with values from kwargs 
    for key, value in kwargs.items():
        placeholder = "{{" + key + "}}"
        strval = str(value)
        content = content.replace(placeholder, strval)

    return content


def get_abs_path(*relative_path):
    return os.path.join(get_base_dir(), *relative_path)



def get_base_dir():

    # Use the current working directory as the base directory 
    base_dir = os.getcwd()
    return base_dir



## Print Style

In [19]:
import webcolors

class PrintStyle:
    last_endline=True
    
    def __init__(self, bold=False, italic=False, underline=False, font_color="default", background_color="default", padding=False):
        self.bold = bold
        self.italic = italic
        self.underline = underline
        self.font_color = font_color
        self.background_color = background_color
        self.padding = padding
        self.padding_added = False  # Flag to track if padding was added



    def _get_rgb_color_code(self, color, is_background=False):
        try:
            if color.startswith("#") and len(color) == 7:
                # Convert hex color to RGB
                r = int(color[1:3], 16)
                g = int(color[3:5], 16)
                b = int(color[5:7], 16)
            else:
                # Convert named color to RGB
                rgb_color = webcolors.name_to_rgb(color)
                r, g, b = rgb_color.red, rgb_color.green, rgb_color.blue
            
            if is_background:
                return f"\033[48;2;{r};{g};{b}m"
            else:
                return f"\033[38;2;{r};{g};{b}m"
        except ValueError:
            # Fallback to default color
            return "\033[49m" if is_background else "\033[39m"
        


    def _get_styled_text(self, text):
        start = ""
        end = "\033[0m"  # Reset ANSI code
        if self.bold:
            start += "\033[1m"
        if self.italic:
            start += "\033[3m"
        if self.underline:
            start += "\033[4m"
        start += self._get_rgb_color_code(self.font_color)
        start += self._get_rgb_color_code(self.background_color,True)
        return start + text + end
    


    def _add_padding_if_needed(self):
        if self.padding and not self.padding_added:
            print()  # Print an empty line for padding
            if not PrintStyle.last_endline: print() # add one more if last print was streamed
            self.padding_added = True


            
    def get(self, *args, sep=' ', **kwargs):
        text = sep.join(map(str, args))
        return self._get_styled_text(text)
    
    
        
    def print(self, *args, sep=' ', **kwargs):
        self._add_padding_if_needed()
        styled_text = self.get(*args, sep=sep, **kwargs)
        print(styled_text, end='\n', flush=True)
        PrintStyle.last_endline = True



    def stream(self, *args, sep=' ', **kwargs):
        self._add_padding_if_needed()
        styled_text = self.get(*args, sep=sep, **kwargs)
        print(styled_text, end='', flush=True)
        PrintStyle.last_endline = False




# Example usage 
style = PrintStyle(bold=True, font_color="red", background_color="yellow", padding=True)
style.print("Hello world")
style.stream("Hello world@")



[1m[38;2;255;0;0m[48;2;255;255;0mHello world[0m
[1m[38;2;255;0;0m[48;2;255;255;0mHello world@[0m

## Rate Limiter

In [20]:
import time 
from collections import deque



def rate_limiter(max_requests_per_minute,
                 max_tokens_per_mintire):
    

    execution_times = deque()
    token_counts = deque()

    def limit(tokens):
        if tokens > max_tokens_per_mintire:
            raise ValueError("Number of tokens execeds the maximum allowed per minute.")
        current_time = time.time()



        # cleanup old execution times and token counts 
        while execution_times and current_time - execution_times[0] > 60:
            execution_times.popleft()
            token_counts.popleft()
        total_tokens = sum(token_counts)



        if len(execution_times) < max_requests_per_minute and total_tokens + tokens <= max_tokens_per_mintire:
            execution_times.append(current_time)
            token_counts.append(tokens)


        else:
            sleep_time = max(
                60 - (current_time - execution_times[0]),
                60 - (current_time - execution_times[0]) if total_tokens + tokens > max_requests_per_minute else 0 
            )


            PrintStyle(font_color="yellow", padding=True).print(f"Rate limiter: sleeping for {sleep_time} seconds...")
            time.sleep(sleep_time)
            current_time = time.time()
            execution_times.append(current_time)
            token_counts.append(tokens)

    return limit



# Example usage 
rate_limit = rate_limiter(5, 10)  # Allow max 5 requests and 10 tokens per minute 
# rate_limit


# Simulate making requests 
for i in range(10):
    try:
        rate_limit(2)   # each request uses 2 tokens 
        print(f"Request {i + 1} processed.")

    except ValueError as e:
        print(e)


Request 1 processed.
Request 2 processed.
Request 3 processed.
Request 4 processed.
Request 5 processed.


[38;2;255;255;0m[49mRate limiter: sleeping for 60.0 seconds...[0m
Request 6 processed.
Request 7 processed.
Request 8 processed.
Request 9 processed.
Request 10 processed.


## Vector Data base

In [21]:
from langchain.storage import InMemoryByteStore, LocalFileStore
from langchain.embeddings import CacheBackedEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
import uuid 
from sentence_transformers import SentenceTransformer

class VectorDB:

    def __init__(self, embeddings_model, in_memory=False, cache_dir="./cache"):
        print("Initializing vectorDB...")
        self.embeddings_model = embeddings_model
        em_cache = get_abs_path(cache_dir, "embeddings")
        db_cache = get_abs_path(cache_dir, "database")

        if in_memory:
            self.store = InMemoryByteStore()
        else:
            self.store = LocalFileStore(em_cache)

        # Setup the embeddings model with the chosen cache storage 
        self.embedder = CacheBackedEmbeddings.from_bytes_store(
            underlying_embeddings=embeddings_model,
            document_embedding_cache=self.store,
            namespace=getattr(embeddings_model, 'model', getattr(embeddings_model, 'model_name', "default"))
        )

        self.db = Chroma(embedding_function=self.embedder, persist_directory=db_cache)

    def search_similarity(self, query, results=3):
        return self.db.similarity_search(query, results)

    def search_max_rel(self, query, results=3):
        return self.db.max_marginal_relevance_search(query, results)

    def delete_documents(self, query):
        score_limit = 1
        k = 2 
        tot = 0 
        while True:
            # Perform similarity search with score 
            docs = self.db.similarity_search_with_score(query, k=k)
            # Extract document IDs and filter based on score 
            document_ids = [result[0].metadata["id"] for result in docs if result[1] < score_limit]
            # Delete documents with IDs over the threshold score 
            if document_ids:
                fnd = self.db.get(where={"id": {"$in": document_ids}})
                if fnd["ids"]:
                    self.db.delete(ids=fnd["ids"])
                    tot += len(fnd["ids"])
                # If fewer than k document IDs, break the loop 
                if len(document_ids) < k:
                    break
        return tot 

    def insert_document(self, data):
        id = str(uuid.uuid4())
        # Generate embeddings using the encode method
        embeddings = self.embeddings_model.encode([data])
        # Add the embeddings to your vector store
        self.db.add_documents(documents=[Document(page_content=data, metadata={"id": id, "embeddings": embeddings.tolist()})])
        return id

# Example usage 
embeddings_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

# Initialize the vectorDB 
vector_db = VectorDB(embeddings_model=embeddings_model, in_memory=True)

# Insert a document
doc_id = vector_db.insert_document("This is the first sentence")
print(f"Document ID: {doc_id}")


  from tqdm.autonotebook import tqdm, trange







Initializing vectorDB...


  attn_output = torch.nn.functional.scaled_dot_product_attention(


AttributeError: 'SentenceTransformer' object has no attribute 'embed_documents'

## code Executation

In [2]:
import os, json, contextlib
from io import StringIO
import ast 


In [4]:
def wrap_code_with_return_and_function(code):

    # Parse the code into a AST 
    parsed_code = ast.parse(code)

    # filter out only executable statements, ignoring comments and empty lines 
    executable_statements = [stmt for stmt in parsed_code.body if not isinstance(stmt, ast.Pass)]

    if not executable_statements:
        raise Exception("There are no executable statments in the code.")
    
    else:

        # GEt the last executable statment in the code 
        last_statement = executable_statements[-1]

        # check if the last statement is an expression (including function calls)
        if isinstance(last_statement, ast.Expr):

            # Convert the expression into a return statement 
            return_stmt = ast.Return(value=last_statement.value)
            return_stmt.lineno = last_statement.lineno
            return_stmt.col_offset = last_statement.col_offset
            parsed_code.body[parsed_code.body.index(last_statement)] = return_stmt


        # wrap the entire code in a function definition 
        function_def = ast.FunctionDef(
            name="isolate",
            args=ast.arguments(
                posonlyargs=[], args=[], kwonlyargs=[], defaults=[] 
            ),
            body=parsed_code.body,
            decorator_list=[],
            lineno=1,
            col_offset=0

        )    # type: ignore 

    # Create a new module with the function definition 
    module = ast.Module(body=[function_def],
                        type_ignores=[])
    

    # convert the AST back to source code 
    wrapped_code = compile(module, filename="<ast>", mode="exec")

    return wrapped_code



# Example usage 

code = """ 

a = 4 
b = 5
print(a + b)

"""

wrapped_code = wrap_code_with_return_and_function(code)
# execute wrapped code 
exec(wrapped_code)

# call the function 
# result = isolate()


In [15]:
def execute_user_code(code):
    try:
        wrapped_code = wrap_code_with_return_and_function(code)
        exec_globals = {}
        exec_locals = {}
        exec(wrapped_code, exec_globals, exec_locals)

        os.chdir(get_abs_path("./work_dir"))   # change cwd(change working directory) to work dir 
        try:
            return exec_locals.get('isolate', lambda: None)()
        
        except Exception as e:
            import traceback
            error_info = traceback.format_exc()
            return json.dumps({"error": str(e),
                               "details": error_info})
        


    except Exception as e:
        return "Error: " + str(e)
    

# Example usage 
code = """ 

a = 4 
b = 5
print(a + b)

"""

result = execute_user_code(code)
print(result)


Error: name 'get_abs_path' is not defined


In [7]:
import ast 

code = """ 

def hell_world():
    print("hello world")

"""

parsed_code = ast.parse(code)
# print(parsed_code)
print(ast.dump(parsed_code, indent=4))

Module(
    body=[
        FunctionDef(
            name='hell_world',
            args=arguments(
                posonlyargs=[],
                args=[],
                kwonlyargs=[],
                kw_defaults=[],
                defaults=[]),
            body=[
                Expr(
                    value=Call(
                        func=Name(id='print', ctx=Load()),
                        args=[
                            Constant(value='hello world')],
                        keywords=[]))],
            decorator_list=[],
            type_params=[])],
    type_ignores=[])


In [14]:
add_ten = lambda x: x + 4 
print(add_ten(4))

add_numbers = lambda x, y: x + y 
print(add_numbers(2, 3))

numbers = [2, 3, 4, 5, 6]
squared_numbers = list(map(lambda x: x ** 2, numbers))
print(squared_numbers)

# using lambda with filter to get even numbers 
even_numbers = list(filter(lambda x: x % 2 == 0, numbers))
print(even_numbers)

8
5
[4, 9, 16, 25, 36]
[2, 4, 6]


In [1]:
import os, json, contextlib, time, importlib, inspect
from io import StringIO
from typing import Optional, Dict

from langchain.schema import AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage
from langchain_core.language_models.chat_models import BaseChatModel



class Agent:

    paused = False 
    streaming_agent = None 


    @staticmethod
    def configure(model_chat, model_embedding, memory_subdir = "", memory_results = 3):

        # save configuration 
        Agent.model_chat = model_chat

        # initialize memory tool 

        from tools import memory_tool
        memory_tool.initialize(
            embeddings_model=model_embedding,
            messages_returned=memory_results,
            subdir=memory_subdir
        )




    def __init__(self, 
                 system_prompt: Optional[str] = None,
                 tools_prompt: Optional[str] = None,
                 superjior: Optional['Agent'] = None,
                 number = 0
                 ):
        
        self.number = number
        self.name = f"Agent {self.number}"
        if system_prompt is None: tools_prompt = read_file("./prompts/agent.system.md")
        if tools_prompt is None: tools_prompt = read_file("./prompts/agent.tools.md")

        self.system_prompt = system_prompt.replace("{", "{{").replace("}", "}}")
        self.tools_prompt = tools_prompt.replace("{", "{{").replace("}", "}}")

        self.superior = Optional['Agent'] = None 

        self.history = []
        self.last_message = ""
        self.message_for_superior = ""
        self.intervention_message = ""
        self.intervention_status = False 


        self.prompt = ChatPromptTemplate.from_messages([
            ("system", self.system_prompt + "\n\n" + self.tools_prompt),
            MessagesPlaceholder(variable_name="messages")
        ])



    def process_message(self, msg: str):

        try:
            self.append_message(msg, human=True)  # Append the user's input to the history 
            printer = PrintStyle(italic=True, font_color="#b3ffd9", padding=False)


            while True:   # let the agent iterate on his thoughts untill be stops by using a tool 
                Agent.streaming_agent = self   # mark self as current streamer 
                agent_response = ""
                self.intervention_status = False   # reset intervention status 

                try:

                    inputs = {"input": msg, "messages": self.history}
                    chain = self.prompt | Agent.model_chat
                    formatted_inputs = self.prompt.format(**inputs)

                    # wait for rate limiter - A helpful rule of thumb is that one token generally corresponds to ~4 characters of text for common English text. This translates to roughtly % of a word ( so 100 tokens ~= 75 words. )
                
                    rate_limit(len(formatted_inputs) / 4)  

                    # output that the agent is starting 
                    PrintStyle(bold=True, font_color="green", padding=True, background_color="white").print(f"{self.name}: Starting a message: ")

                    for chunk in chain.stream(inputs):
                        
                        if self.handle_intervention(agent_response):
                            break   # wait for intervention and handle it if paused.


                        if chunk.content is not None and chunk.content != '':
                            printer.stream(chunk.content)  # output the agent response steam 

                            agent_response += chunk.content 


                    if not self.handle_intervention(agent_response):

                        # if assist_response is the same as last messge in history, let him know 
                        if self.last_message == agent_response:
                            agent_response = read_file("./prompts/fw.msg_repeat.md")
                            PrintStyle(font_color="orange", padding=True).print(agent_response)

                        self.last_message = agent_response

                        self.append_message(agent_response)
                        self.process_message(agent_response)


                        # break the execution if there is a message for superior agent 
                        if self.message_for_superior and not self.intervention_status:
                            msg = self.message_for_superior
                            self.message_for_superior = ""
                            return msg 
                        

                # forward errors to the LLm, maybe he can fix them 
                except Exception as e:
                    msg_response = read_file("./prompts/fw.error.md", error=str(e))
                    self.append_message(msg_response, human=True)
                    PrintStyle(font_color="red", padding=True).print(msg_response)


        finally:
            Agent.streaming_agent = None    # unset current streamer 


            


In [2]:
def append_message(self, msg: str, human: bool = False):

    message_type = "human" if human else "ai"
    if self.history and self.history[-1].type == message_type:
        self.history[-1].content += "\n\n" + msg 


    else:
        new_message = HumanMessage(content=msg) if human else AIMessage(content=msg)
        self.history.append(new_message)
        self.cleanup_history(5, 10)

    if message_type == "ai":
        self.last_message = msg 



    

In [3]:
def cleanup_history(self, x, y):
    if len(self.history) <= x + y:
        return self.history 
    

    first_x = self.history[:x]
    last_y = self.histyr[-y:]

    cleanup_prompt = read_file("./prompts/fw.msg_cleanup.md")
    middle_values = [AIMessage(content=cleanup_prompt)]

    self.history = first_x + middle_values + last_y


    

In [4]:
def handle_intervention(self, progress: str = "") -> bool:

    while self.paused: time.sleep(0.1)  # wait if paused 

    # if there is an intervention message, but not yet processed 
    if self.intervention_message and not self.intervention_status:
        if progress.strip(): 
            self.append_message(progress)

        # format the user intervention template 
        user_msg = read_file("./prompts/fw.intervention.md", user_message= self.intervention_message)
        self.append_message(user_msg, human=True)
        self.intervention_messgae = ""
        self.intervention_status = True


    return self.intervention_status

In [1]:
def process_tools(self, msg: str):

    # search for tool usage requests in agent message 
    tool_requests = extract_tool_requests(msg)

    for tool_request in tool_requests:
        if self.handle_intervention():
            break;


        tool_name = tool_requests["name"]
        tool_function = self.get_tool(tool_name)


        if callable(tool_function):

            short_params = {k: v for k, v in tool_request.item() if k != "name" and k != "body"}  # only extra parameters to output to console 
            PrintStyle(font_color="#1B4F72", padding=True, background_color="white", bold=True).print(f"{self.name}: Using tool {tool_name}: ")
            PrintStyle(font_color="#85C1E9").print(short_params, tool_request['body'], sep="\n") if short_params else PrintStyle(font_color="85C1E9").print(tool_request["body"])

            tool_response = tool_function(self, tool_request["body"], **tool_request) or "" # call tool function with all parameters, body parameters separated for convenience 
            Agent.streaming_agent = self   # mark self as current streamer again, it may have changed during tool use 

            if self.handle_intervention(): 
                break

            msg_response = read_file("./prompts/fw.tool_response.md", tool_name=tool_name, tool_request=tool_request)
            self.append_message(msg_response, human=True)

            PrintStyle(font_color="#1B4F72", background_color="white", padding=True, bold=True).print(f"{self.name}: Response from {tool_name}: ")
            PrintStyle(font_color="#85C1E9").print(tool_request)


        else:
            if self.handle_intervention():
                break;

            msg_response = read_file("./prompts/fw.tool_not_found.md", tool_name=tool_name, tools_prompt=self.tools_prompt)
            self.append_message(msg_response, True)
            PrintStyle(font_color="orange", padding=True).print(msg_response)


            



In [2]:
def get_tool(self, name:str):
    module = importlib.import_module("tools." + name) # Import the module 
    function_list = {name: func for name, func in inspect.getmembers(module, inspect.isfunction)}  # get all function in the module 

    if "execute" in function_list: 
        return function_list["execute"]  # check if the module contains a function named "execute"
    
    if function_list:
        return next(iter(function_list.values()))  # Return the first function if no "execute" function is found 
    
    return None   # Return None if no function are found 


## Models

In [13]:
from dotenv import load_dotenv
from langchain_community.llms import Ollama
from langchain_openai import ChatOpenAI, OpenAI, OpenAIEmbeddings
from langchain_anthropic import ChatAnthropic
from langchain_groq import ChatGroq
from langchain_community.embeddings import HuggingFaceEmbeddings

import os

DEFAULT_TEMPERATURE = 0.0
load_dotenv()

True

In [7]:





# utility functions to get API keys from environment variables
def get_api_key(service):
    return os.getenv(f"API_key_{service.upper()}")


get_api_key("openai")



In [16]:
def get_openai_gpt35(api_key=None, temperature=DEFAULT_TEMPERATURE):
    api_key = api_key or get_api_key("openai")
    return ChatOpenAI(model_name="gpt-3.5-turbo", temperature=temperature, api_key=api_key)


    


In [19]:
from langchain.embeddings import HuggingFaceEmbeddings

def get_embedding_hf(model_name="sentence-transformers/all-MiniLM-L6-v2"):
    return HuggingFaceEmbeddings(model_name=model_name)

# Example usage
# Initialize the embedding model
embedding_model = get_embedding_hf()

# Example text to embed
text = "This is a sample sentence for embedding."

# Get the embeddings
embeddings = embedding_model.embed_documents([text])

print(f"Embeddings for the text: {embeddings}")




Embeddings for the text: [[0.019295422360301018, 0.01642497256398201, 0.07273902744054794, 0.03308621421456337, 0.059720929712057114, 0.06992031633853912, 0.03729814291000366, -0.005818242207169533, 0.03765908628702164, -0.047984614968299866, 0.045444898307323456, -0.03459855541586876, 0.04935990646481514, 0.00853636022657156, 0.014929299242794514, 0.06183437630534172, 0.08225728571414948, 0.02568216808140278, -0.0424724705517292, -0.019732292741537094, -0.009228910319507122, 0.03725957125425339, 0.08623967319726944, -0.05810059234499931, -0.004230275750160217, -0.0016023291973397136, -0.019658543169498444, 0.06552005559206009, 0.11449294537305832, 0.001924963085912168, 0.033311571925878525, -0.009164861403405666, 0.015894128009676933, 0.04008613899350166, 0.05279970169067383, 0.054821159690618515, -0.010900660417973995, 0.04938409477472305, -0.026740295812487602, 0.02009173482656479, 0.04378341883420944, 0.005599518306553364, 0.010202351026237011, 0.05376637354493141, 0.00587010430172

  attn_output = torch.nn.functional.scaled_dot_product_attention(


## Main 

In [23]:
import threading, sys, time, readline
from ansio import application_keypad, mouse_input
from ansio.input import InputEvent, get_input_event
# from tools.helpers.print_style import PrintStyle


input_lock = threading.Lock()

In [27]:
# Main conversation loop

def chat():

    # chat model used for agents 
    chat_llm = get_openai_gpt35()

    # embedding model used for memory 
    embedding_llm = get_embedding_hf()

    # initial configuration 
    Agent.configure(
        model_chat=chat_llm,
        model_embedding=
        embedding_llm
    )


    # create the first agent 
    agent0 = Agent()


    # start the conversation loop 
    while True:

        # ask user for message 
        PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User message ('exit' to leave): ")

        with input_lock:
            user_input = input("> ").strip()

        # exit the conversation when the user types 'exit'
        if user_input.lower() == 'exit':
            break 


        # send message to agent0 
        assistant_response = agent0.process_message(user_input)


        # print agent0 response 
        PrintStyle(font_color="white", background_color="#1D8348", bold=True, padding=True).print(f"{agent0.name}: response: ")
        PrintStyle(font_color="white").print(f"{assistant_response}")


        



In [24]:
# user intervention during agent streaming 
def intervention():

    if Agent.streaming_agent and not Agent.paused:
        Agent.paused = True   # stop agent streaming 

        PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User intervention ('exit' to leave, empty to continoue) :")

        user_input = input("> ").strip()
        if user_input.lower() == "exit":
            sys.exit()  # exit the conversation when the user types 'exit' 

        if user_input:
            Agent.streaming_agent.intervention_message = user_input   # set intervention message if non-empty 

        Agent.paused = False   # continue agent streaming 


In [25]:
# capture keybord input to trigger user intervention 
def capture_keys():

    global input_lock
    intervent = False 

    while True:
        if intervent:
            intervention()

        intervent = False

        if Agent.streaming_agent:
            with input_lock, application_keypad, mouse_input:
                event: InputEvent | None = get_input_event(timeout=0.1)
                if event and (event.shortcut.isalpha() or event.shortcut.isspace()):
                    intervent = True
                    continue

In [None]:
if __name__ == "__main__":
    print("Initializing framework....")

    # start the key capture thread for user intervention during agent streaming 
    threading.Thread(target=capture_keys, daemon=True).start()

    # start the chat 
    chat()