In [61]:
from abc import ABC, abstractmethod
from altair import Literal

import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions

from bot import BotConfig


COLLECTION_TYPES = chromadb.api.models.Collection.Collection
VECTOR_DATABASES_TYPES = Literal["chroma"]


def get_collection(vector_database: VECTOR_DATABASES_TYPES):
    _vdbs_mapping = {
        "chroma": ChromaVectorDB,
    }

    if vector_database not in _vdbs_mapping:
        raise VectorDatabaseNotRecognizedError(f"VDB '{vector_database}' not recognized.")

    return _vdbs_mapping[vector_database]().start()


class VectorDB(ABC):
    def __init__(self):
        self.bot_config = BotConfig()

    @abstractmethod
    def start(self) -> COLLECTION_TYPES:
        pass

    @abstractmethod
    def _set_embedder(self):
        pass


class ChromaVectorDB(VectorDB):
    def __init__(self):
        super().__init__()
        self.chroma_client = self._set_client()
        self.embedder = self._set_embedder()

    def start(self) -> chromadb.api.models.Collection.Collection:
        return self.chroma_client.get_collection(
            self.bot_config.EMBEDDING_COLLECTION, embedding_function=self.embedder
        )

    def _set_embedder(self):
        return embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name=self.bot_config.HUGGINGFACE_EMBEDDING_MODEL_NAME
        )

    def _set_client(self):
        _settings = Settings(
            allow_reset=True,
            anonymized_telemetry=True,
            persist_directory=self.bot_config.VECTORDATABASE_PERSIST_DIRECTORY,
        )
        return chromadb.HttpClient(
            host=self.bot_config.VECTORDATABASE_HOSTNAME,
            port=self.bot_config.VECTORDATABASE_PORT,
            settings=_settings,
        )


class VectorDatabaseNotRecognizedError(Exception):
    pass


In [64]:
import os
import ast
import re
import json
from importlib import resources
from uuid import uuid4

import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions
import langchain_core
from langchain.chains import LLMChain
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.memory import ConversationSummaryBufferMemory, ChatMessageHistory
from langchain.llms import OpenAI
from langchain.prompts import load_prompt
from loguru import logger
import tiktoken
from bot import BotConfig
from bot._local_memory import LocalMemory


_config = BotConfig()

LLM_CONTEXT_WINDOW_SIZE = 4096
PROMPT_MAX_TOKENS = 3200


def get_prompt(key):
    filepath = str(resources.files("bot.prompts").joinpath(f"{key}.json"))

    return load_prompt(filepath)


class NewsBot:
    def __init__(self, local_filepath: str):
        self.local_filepath = local_filepath

        self.llm = ChatOpenAI(model_name=_config.LLM_MODEL_NAME, temperature=0)
        self.llm_chat = ChatOpenAI(
            model_name=_config.LLM_MODEL_NAME,
            temperature=0,
            model_kwargs={"stop": ["HUMAN_INPUT", "IA:"]},
        )

        self.embeddings = HuggingFaceEmbeddings(
            model_name=_config.HUGGINGFACE_EMBEDDING_MODEL_NAME
        )
        self.embedder = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name=_config.HUGGINGFACE_EMBEDDING_MODEL_NAME
        )

        self.verbose_chains = True

        # Load utilitary prompts and chains
        self.prompt_intention = get_prompt("prompt_user_intention")
        self.prompt_standalone_question = get_prompt("prompt_standalone_question")

        # Load chat prompts and chains
        self.prompt_greeting = get_prompt("prompt_greeting")
        self.prompt_query = get_prompt("prompt_query")

        self.collection = get_collection("chroma")

        self.memory = ConversationSummaryBufferMemory(
            llm=OpenAI(temperature=0),
            chat_history=ChatMessageHistory(),
            return_messages=True,
            memory_key="chat_history",
            input_key="human_input",
            human_prefix="Human",
            ai_prefix="AI",
        )

        self.local_memory = LocalMemory(
            self.local_filepath, message_history=self.memory.chat_memory
        )

        self.memory.chat_memory = self.local_memory.message_history


    def predict(
        self, collection, query, n_neighbors: int = 1000, n_results: int = 10, **kwargs
    ):
        def f(key, value, k):
            def limit(x):
                if isinstance(x, list):
                    return x[:k]
                return x

            if isinstance(value, list):
                return (key, [limit(x) for x in value])
            return (key, None)

        res = collection.query(
            query_texts=query,
            n_results=n_neighbors,
            **kwargs
            # where={"metadata_field": "is_equal_to_this"},
            # where_document={"$contains":"search_string"}
        )

        return dict([f(key, value, n_results) for key, value in res.items()])

    def _set_local_context(self, content, meta):
        return (
            f"<data>{meta.get('date', '')}</data>\n"
            f"<titulo>{meta.get('title', '')}</titulo>\n"
            f"<autor>{meta.get('author', '')}</autor>\n"
            f"<link>{meta.get('link', '')}</link>\n"
            f"<conteudo>{content}</conteudo>\n"
        )

    def count_tokens(self, context: str) -> int:
        encoding = tiktoken.encoding_for_model(_config.LLM_MODEL_NAME)
        num_tokens = len(encoding.encode(context))

        return num_tokens

    def _set_query_content(self, documents, metadata):
        context_list = [
            self._set_local_context(content, meta)
            for content, meta in zip(documents, metadata)
        ]

        tokens = LLM_CONTEXT_WINDOW_SIZE
        max_tokens = PROMPT_MAX_TOKENS
        while tokens > max_tokens:
            tokens = self.count_tokens("\n".join(context_list))
            if tokens >= max_tokens:
                context_list.pop()

        return "\n".join(sorted(context_list))

    def get_content(self, query, n_results=10, n_neighbors=1000):
        result = self.predict(
            self.collection,
            query,
            n_results=n_results,
            n_neighbors=n_neighbors,
        )

        return self._set_query_content(result["documents"][0], result["metadatas"][0])

    def format_history_message(
        self, messages, human_prefix: str = "Human", ai_prefix: str = "AI"
    ):
        for message in messages:
            if isinstance(message, langchain_core.messages.human.HumanMessage):
                yield f"{human_prefix}: {message.content}\n"

            elif isinstance(message, langchain_core.messages.ai.AIMessage):
                yield f"{ai_prefix}: {message.content}\n"

    def get_standalone_question(self, message: str, history: str, *args, **kwargs) -> str:
        self.chain_standalone_question = LLMChain(
            llm=self.llm,
            prompt=self.prompt_standalone_question,
            verbose=self.verbose_chains,
        )

        result = self.chain_standalone_question.predict(
            history=history, human_input=message
        )

        return result

    def get_user_intention(self, message: str, history: str, *args, **kwargs) -> str:
        self.chain_intention = LLMChain(
            llm=self.llm, prompt=self.prompt_intention, verbose=self.verbose_chains
        )

        return self.chain_intention.predict(history=history, human_input=message)

    def handler_start_conversation(self, message: str, history: str, *args, **kwargs) -> str:
        self.chain_greeting = LLMChain(
            llm=self.llm_chat,
            prompt=self.prompt_greeting,
            verbose=self.verbose_chains,
            memory=self.memory,
        )

        result = self.chain_greeting.predict(history=history, human_input=message)

        self.local_memory.update(message_history=self.memory.chat_memory)
        return result

    def handler_query(self, message: str, history: str, *args, **kwargs) -> str:
        self.chain_query = LLMChain(
            llm=self.llm_chat,
            prompt=self.prompt_query,
            verbose=self.verbose_chains,
            memory=self.memory,
        )

        context = self.get_content(query=message)

        for i in range(3):
            response = self.chain_query.predict(
                history=history, human_input=message, context=context
            )
            self.local_memory.update(message_history=self.memory.chat_memory)
            resp_dict = extract_dict_from_string(response)
            if len(resp_dict) > 0:
                _resposta = f'{resp_dict.get("resposta", "")}'
                _link = (
                    f'\n\nlink da noticia: {resp_dict.get("link", "")}'
                    if resp_dict.get("link", None)
                    else ""
                )
                return _resposta + _link
            continue

        return response

    def handler_fallback(self, *args, **kwargs) -> str:
        return (
            "Desculpe, mas não posso responder a essa pergunta. "
            "Algo em que possa ajudar sobre notícias de Poços de Caldas e região?"
        )

    def execute(self, message: str):
        chat_history = "".join(
            self.format_history_message(self.memory.chat_memory.messages)
        )

        standalone_question = self.get_standalone_question(
            message=message, history=chat_history
        )
        logger.debug(f"Pergunta original: {message}")
        logger.debug(f"Pergunta melhorada: {standalone_question}")

        intention = self.get_user_intention(
            message=standalone_question, history=chat_history
        )
        logger.debug(f"Intenção: {intention}")

        handlers = {
            "inicio de conversa": self.handler_start_conversation,
            "consulta de conteudo": self.handler_query,
            "": self.handler_fallback,
        }

        response = None
        for category, handler in handlers.items():
            if category in intention.lower().replace("ú", "u").replace("í", "i"):
                response = handler(
                    message=message, memory=self.memory, history=chat_history
                )
                break

        self.local_memory.update(message_history=self.memory.chat_memory)

        return dict(response=response, execution_id=uuid4().hex)


def extract_dict_from_string(string):
    """
    Extrai um elemento JSON de uma string.

    Args:
    string: A string que contém o elemento JSON.

    Returns:
    O elemento JSON como um dicionário Python, ou None se não for possível extrair.
    """

    match = re.search(r"{([^}]+)}", string)
    if match is None:
        return dict()

    json_content = match.group(1)

    try:
        json_dict = json.loads("{" + json_content + "}")
        return json_dict
    except json.JSONDecodeError as err:
        logger.error(str(err))
        return dict()


In [62]:
collection = get_collection("chroma")

In [67]:
news_bot = NewsBot(local_filepath="./teste.json")



In [126]:
from abc import ABC, abstractmethod
from langchain.chains import LLMChain
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.memory.chat_memory import BaseChatMemory
from langchain.chains.base import Chain
from langchain_core.language_models.chat_models import BaseChatModel

class Answerer(ABC):
    prompts_folder: str = "bot.prompts"

    def __init__(self):
        pass

    def get_prompt(self, key: str) -> PromptTemplate:
        filepath = str(resources.files(self.prompts_folder).joinpath(f"{key}.json"))
        return load_prompt(filepath)

    @abstractmethod
    def predict(self):  # add return typehint
        pass

    @property
    @abstractmethod
    def llm(self) -> BaseChatModel:
        pass

    @property
    @abstractmethod
    def prompt_key(self) -> str:
        pass

    @property
    @abstractmethod
    def temperature(self) -> float:
        pass

    @property
    @abstractmethod
    def verbose(self) -> bool:
        pass

    @property
    def prompt(self) -> PromptTemplate:
        return self.get_prompt(self.prompt_key)

    @property
    @abstractmethod
    def chain(self) -> Chain:
        pass


class StandaloneAnswerer(Answerer):
    prompt_key: str = "prompt_standalone_question"
    temperature: float = 0
    verbose: bool = True
    
    def __init__(self, llm_model: str):
        self.llm_model = llm_model

    def predict(self, message: str, memory):
        return self.chain.predict(
            human_input=message,
            history=history,
        )

    @property
    def llm(self):
        return ChatOpenAI(
            model_name=self.llm_model,
            temperature=self.temperature
        )

    @property
    def chain(self) -> Chain:
        return LLMChain(
            llm=self.llm,
            prompt=self.prompt,
            verbose=self.verbose,
        )

In [127]:
_config = BotConfig()

standalone_answerer = StandaloneAnswerer(llm_model=_config.LLM_MODEL_NAME)

In [130]:
standalone_answerer.prompt

PromptTemplate(input_variables=['history', 'human_input'], template='Voce vai receber uma pergunta de follow up do usuario, que pode nao ter muitos detalhes.\nO usuario quer realizar consultas na base de dados que contem noticias de Pocos de Caldas e regiao.\nREESCREVA a frase do HUMAN INPUT para que ela possa ser considerada uma pergunta independente.\nInclua o maximo de detalhes possivel a partir do CHAT HISTORY, como contexto, data, titulo e autor da noticia.\nNAO RESPONDA AO USUARIO, APENAS REESCREVA A PERGUNTA.\nSua resposta deve conter apenas a pergunta reescrita.\n\n## CHAT HISTORY\n```{history}```\n\n## HUMAN INPUT\n`{human_input}`\n\nIA:')

In [None]:
BaseChatMemory

In [134]:
standalone_answerer.llm

ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x7ff924a8fcd0>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x7ff924aa2690>, temperature=0.0, openai_api_key='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', openai_proxy='')

In [128]:
standalone_answerer.predict("Olá")


NameError: name 'history' is not defined

In [None]:
ConversationSummaryBufferMemory

In [73]:
from importlib import resources


def get_prompt(key):
    filepath = str(resources.files("bot.prompts").joinpath(f"{key}.json"))
    return load_prompt(filepath)

In [74]:
from langchain.prompts import load_prompt

get_prompt("prompt_standalone_question")

PromptTemplate(input_variables=['history', 'human_input'], template='Voce vai receber uma pergunta de follow up do usuario, que pode nao ter muitos detalhes.\nO usuario quer realizar consultas na base de dados que contem noticias de Pocos de Caldas e regiao.\nREESCREVA a frase do HUMAN INPUT para que ela possa ser considerada uma pergunta independente.\nInclua o maximo de detalhes possivel a partir do CHAT HISTORY, como contexto, data, titulo e autor da noticia.\nNAO RESPONDA AO USUARIO, APENAS REESCREVA A PERGUNTA.\nSua resposta deve conter apenas a pergunta reescrita.\n\n## CHAT HISTORY\n```{history}```\n\n## HUMAN INPUT\n`{human_input}`\n\nIA:')