In [1]:
# !python -m pip install --upgrade pip

In [2]:
# !pip install --quiet langchain langchain-community langchain-mistralai

In [3]:
# !pip install --quiet google-api-python-client

In [4]:
# !pip install --quiet peewee

In [5]:
# !pip install RealtimeTTS

In [6]:
# !pip install duckduckgo-search

Collecting duckduckgo-search
  Downloading duckduckgo_search-6.1.4-py3-none-any.whl.metadata (18 kB)
Collecting pyreqwest-impersonate>=0.4.7 (from duckduckgo-search)
  Downloading pyreqwest_impersonate-0.4.7-cp310-none-win_amd64.whl.metadata (9.8 kB)
Downloading duckduckgo_search-6.1.4-py3-none-any.whl (24 kB)
Downloading pyreqwest_impersonate-0.4.7-cp310-none-win_amd64.whl (2.6 MB)
   ---------------------------------------- 0.0/2.6 MB ? eta -:--:--
    --------------------------------------- 0.0/2.6 MB 1.9 MB/s eta 0:00:02
   --- ------------------------------------ 0.2/2.6 MB 2.8 MB/s eta 0:00:01
   -------- ------------------------------- 0.5/2.6 MB 4.2 MB/s eta 0:00:01
   ----------------- ---------------------- 1.1/2.6 MB 6.3 MB/s eta 0:00:01
   ------------------------- -------------- 1.6/2.6 MB 8.1 MB/s eta 0:00:01
   -------------------------------- ------- 2.1/2.6 MB 7.8 MB/s eta 0:00:01
   ---------------------------------------  2.6/2.6 MB 8.6 MB/s eta 0:00:01
   ----------

Callbacks


In [1]:
from queue import SimpleQueue
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union, Dict, List
from RealtimeTTS import TextToAudioStream, GTTSEngine, SystemEngine
from langchain_core.messages import AIMessage, HumanMessage

engine = SystemEngine()
stream = TextToAudioStream(engine)

q = SimpleQueue()
chat_history = []

job_done = object()

class StreamingGradioCallbackHandler(BaseCallbackHandler):
    def __init__(self, q: SimpleQueue):
        self.q = q
        self.jarvis_answer = ""

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        # chat_history.append(HumanMessage(content=prompts[-1].split('Human:')[-1]))
        # TODO: треба зробити split по \n Шукати чанки з Human: i брати контент
        # print("CHAT_HISTORY", chat_history)
        """Run when LLM starts running. Clean the queue."""
        while not self.q.empty():
            try:
                self.q.get(block=False)
            except Exception as e:
                print(f"Ecception in on_llm_start, {e}")
                continue
                
    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Run on new LLM token. Only available when streaming is enabled."""
        self.q.put(token)
        stream.feed(token)
        self.jarvis_answer += token
        # TODO: Не вiдповiдати коли приходять символи, тiльки текст
        if not stream.is_playing():
            stream.play_async()


    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Run when LLM ends running."""
        self.q.put(job_done)
        chat_history.append(AIMessage(content=self.jarvis_answer))
        self.jarvis_answer = ""

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        """Run when LLM errors."""
        self.q.put(job_done)

Main Jarvis Model

In [1]:
from langchain_mistralai.chat_models import ChatMistralAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.agents.format_scratchpad.openai_tools import format_to_openai_tool_messages
from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser
from langchain.agents import AgentExecutor
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from dotenv import load_dotenv
import os

load_dotenv()

MISTRAL_API_KEY = os.getenv("MISTRAL_API")

class Jarvis:
    def __init__(self, tools: list, chat_history: list[AIMessage | HumanMessage] = []):
        self.chat_history = chat_history

        self.llm = ChatMistralAI(
            mistral_api_key=MISTRAL_API_KEY,
            temperature=0,
            model="mistral-large-latest",
            timeout=30,
            max_tokens=5000,
            callbacks=[StreamingGradioCallbackHandler(q), StreamingStdOutCallbackHandler()],
            streaming=True)

        self.prompt = ChatPromptTemplate.from_messages(
            [
                ("system", "You are powerful assistant named 'Jarvis'"),
                MessagesPlaceholder(variable_name="chat_history"),
                ("user", "{input}"),
                MessagesPlaceholder(variable_name="agent_scratchpad"),
            ]
        )

        self.llm_with_tools = self.llm.bind_tools(tools, tool_choice="auto")

        self.agent = (
            {
                "input": lambda x: x['input'],
                "agent_scratchpad": lambda x: format_to_openai_tool_messages(
                    x["intermediate_steps"]
                ),
                "chat_history": lambda x: x['chat_history'],
            } | self.prompt | self.llm_with_tools | OpenAIToolsAgentOutputParser()
        )

        self.agent_executor = AgentExecutor(
            agent=self.agent,
            tools=tools,
            verbose=False, # TODO: True
            max_iterations=5,
            handle_parsing_errors=self._handle_error)

    def _handle_error(error) -> str:
        print('\n\n\n\n\nERROR INSIDE LLM AGENT EXECUTOR', error)
        return str(error)

    def get_answer(self, user_prompt: str):
        print(self.chat_history, "CHAT HISTORY")
        yield from self.agent_executor.stream({"input": user_prompt, "chat_history": self.chat_history})

Notes functionality

In [3]:
import os
import peewee
import datetime
from playhouse.shortcuts import model_to_dict

db = peewee.SqliteDatabase('notes.db')

class BaseModel(peewee.Model):
    class Meta:
        database = db

class NoteModel(BaseModel):
    id = peewee.PrimaryKeyField()
    created_date = peewee.DateTimeField(default=datetime.datetime.now)
    title = peewee.CharField(unique=True)
    content = peewee.TextField()


class Singleton(type):
    _instances = {}
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


class NoteManager(metaclass=Singleton):
    def __init__(self, db_obj: peewee.SqliteDatabase, tables: list[peewee.ModelBase]) -> None:
        if not isinstance(db_obj, peewee.SqliteDatabase):
            raise TypeError("For now only peewee.SqliteDatabase is supported")

        self.db = db_obj

        if not os.path.exists(self.db.database):
            self.db.create_tables(tables)

    def get_last_notes(self, amount: int = 1) -> list:
        query = (NoteModel
                 .select()
                 .where(NoteModel.created_date <= datetime.datetime.now())
                 .order_by(NoteModel.created_date.desc())
                 .limit(amount))

        return list(map(lambda x: model_to_dict(x), query))
    
    def update_note(self):
        pass
    
    def get_note_by_title(self):
        pass

    def add_note(self, content: str, title: str = "") -> None:
        try:
            note = NoteModel.create(title=title, content=content)
            return f"Запись успешно добавлена: ID={note.id}, Title={note.title}, Created Date={note.created_date}"
        except peewee.IntegrityError as e:
            return f"There is an error occured when adding new note error: {e}"

note_manager = NoteManager(db, [NoteModel])
# note_manager.add_note("Hello this is my second note", "Second note second")
# note_manager.get_last_notes(5)

Tools that has Jarvis

In [4]:
from langchain.tools import StructuredTool
from langchain.pydantic_v1 import BaseModel, Field
from langchain.tools import DuckDuckGoSearchRun
from langchain.agents import Tool


class AddNotesSchema(BaseModel):
    title: str = Field(description="Note title", default="")
    content: str = Field(description="Note content")

class GetLastNotesSchema(BaseModel):
    amount: int = Field(description="How many notes to get", default=1)

ddg_search = DuckDuckGoSearchRun()

get_last_notes = StructuredTool.from_function(
    func=note_manager.get_last_notes,
    name="GetLastNotes",
    description="Getting few last notes or single last one",
    args_schema=GetLastNotesSchema,
    return_direct=False,
)
add_note = StructuredTool.from_function(
    func=note_manager.add_note,
    name="AddNote",
    description="Adding or saving one new note to database",
    args_schema=AddNotesSchema,
    return_direct=False,
)

search_tool = Tool(
    name="DuckDuckGo_Search",
    func=ddg_search.run,
    description="Useful to browse information from the Internet."
)

In [10]:
from voice_listener import get_text_from_speech
jarvis = Jarvis(tools=[add_note, get_last_notes, search_tool])
prompt = get_text_from_speech()
print(f"{prompt=}")
for chunk in jarvis.get_answer(user_prompt=prompt):
    print(chunk)

You can speak now ... 
Did you say  hey jarvis how tall is burj khalifa
prompt='hey jarvis how tall is burj khalifa'
[] CHAT HISTORY
Opening stream
Opening stream
Opening stream
Opening stream
Opening stream
Opening stream
Opening stream
Opening stream
{'actions': [ToolAgentAction(tool='DuckDuckGo_Search', tool_input='burj khalifa height', log='\nInvoking: `DuckDuckGo_Search` with `burj khalifa height`\n\n\n', message_log=[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'id': 'WJSk3wBDX', 'function': {'name': 'DuckDuckGo_Search', 'arguments': '{"__arg1": "burj khalifa height"}'}}]}, id='run-6c53edb4-c0a7-40a1-8f93-28e677a688ee', tool_calls=[{'name': 'DuckDuckGo_Search', 'args': {'__arg1': 'burj khalifa height'}, 'id': 'WJSk3wBDX'}], tool_call_chunks=[{'name': 'DuckDuckGo_Search', 'args': '{"__arg1": "burj khalifa height"}', 'id': 'WJSk3wBDX', 'index': None}])], tool_call_id='WJSk3wBDX')], 'messages': [AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'id': 'WJ

Opening stream


Text to speech with realtime tts