In [None]:
!python -m spacy download pt_core_news_sm

In [None]:
import json
import os
import logging
import sys

# NOTE: This is ONLY necessary in jupyter notebook.
# Details: Jupyter runs an event-loop behind the scenes.
#          This results in nested event-loops when we start an event-loop to make async queries.
#          This is normally not allowed, we use nest_asyncio to allow it for convenience.
import nest_asyncio


from llama_index.core import Document, QueryBundle
from llama_index.core.schema import TextNode
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.llms.groq import Groq

from langchain_groq import ChatGroq
from langchain.tools import tool
from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.prompts import PromptTemplate

import spacy

from glob import glob
from dataclasses import dataclass
from typing import List, Dict
from thefuzz import process

In [None]:
nest_asyncio.apply()

os.environ["GROQ_API_KEY"] = "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"

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().handlers = []
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

In [None]:
SENTENCE_WINDOW_SIZE = 5
SENTENCE_WINDOW_STRIDE = 2

TOP_K = 4
BM25_TOP_K = 1000

LLM_MODEL = "llama3-70b-8192"
LLM_TEMPERATURE = 0
RERANKER_MODEL = "unicamp-dl/monoptt5-base"

In [None]:
def get_transcription_documents(base_transcriptions_path):
    transcription_documents = []

    for transcriptions_path in glob(base_transcriptions_path):
        with open(transcriptions_path) as f:
            transcriptions = json.load(f)
        
        for transcription in transcriptions:
            transcription_documents.append(
                Document(
                    text=transcription['transcription'],
                    metadata={
                        'title': transcription['title'],
                        'publishing_date': transcription['publishing_date'],
                        'quadro': transcription['quadro'],
                        'hashtag': transcription['hashtag'],
                    },
                )
            )
            
    return transcription_documents

In [None]:
def sliding_window_split(documents, stride, window_size):
    sentencizer = spacy.blank('pt')
    sentencizer.add_pipe('sentencizer')
    
    window_documents = []

    for document in documents:
        doc_sentencized = sentencizer(document.text)
        sentences = [sent.text.strip() for sent in doc_sentencized.sents]
        for i in range(0, len(sentences), stride):
            window_text = ' '.join(sentences[i : min(len(sentences), i+window_size)]).strip()
            window_metadata = document.metadata.copy()
            window_metadata['parent_document_id'] = document.id_
            window_documents.append(Document(text=window_text, metadata=window_metadata))

    return window_documents

In [None]:
def get_transcriptions_nodes(transcription_documents):
    transcription_window_documents = sliding_window_split(transcription_documents, SENTENCE_WINDOW_STRIDE, SENTENCE_WINDOW_SIZE)
    
    transcription_nodes = dict()
    for document in transcription_window_documents:
        new_node = TextNode(id=document.id_, text=document.text, metadata=document.metadata)
        if new_node.metadata['title'] not in transcription_nodes.keys():
            transcription_nodes[document.metadata['title']] = [new_node]
        else:
            transcription_nodes[document.metadata['title']].append(new_node)
    
    return transcription_nodes

In [None]:
transcription_documents = get_transcription_documents("../transcriptions-headless/*.json")

transcription_documents[0]

In [None]:
transcription_nodes = get_transcriptions_nodes(transcription_documents)

transcription_nodes['milton']

In [None]:
@dataclass
class RAGResponse:
    answer: str
    contexts: List[str]

In [None]:
class MultiIndexRetriever:
    def __init__(self, indexes_nodes: Dict[str, List[TextNode]], top_k, bm25_top_k, reranker_model) -> None:
        self.indexes: Dict[str, List[TextNode]] = indexes_nodes
        
        self.bm25_retrievers: Dict[str, BM25Retriever] = dict()
        for index, nodes in indexes_nodes.items():
            self.bm25_retrievers[index] = BM25Retriever.from_defaults(nodes=nodes, similarity_top_k=bm25_top_k)

        self.reranker = SentenceTransformerRerank(top_n=top_k, model=reranker_model)
        

    def retrieve(self, index_name: str, query: str) -> List[str]:
        if index_name not in self.indexes:
            raise ValueError(f"Index {index_name} not found")
        
        retriever = self.bm25_retrievers[index_name]

        retrieved_nodes = retriever.retrieve(query)
        reranked_nodes = self.reranker.postprocess_nodes(
            retrieved_nodes,
            query_bundle=QueryBundle(query),
        )

        context_chunks = [ node.get_text() for node in reranked_nodes ]

        return context_chunks

In [None]:
class IndexDetector:
    def __init__(self, llm_model: str, index_names: List[str], GROQ_API_KEY: str = os.environ['GROQ_API_KEY']):
        self._base_prompt = \
            "Given the following Portuguese query, regarding an episode of a podcast, please tell me the title of the episode. " \
            "The query starts now: '{query}'." \
            "You MUST answer with only the title of the episode."
        
        self.llm = Groq(model=llm_model, api_key=GROQ_API_KEY, temperature=LLM_TEMPERATURE)
        self.llm_model_name = llm_model
        self.index_names = index_names

    def detect_index(self, query: str) -> str:
        prompt = self._base_prompt.format(query=query)
        raw_llm_guess = self.llm.complete(prompt).text
        llm_guess = raw_llm_guess.strip().lower()

        if llm_guess not in self.index_names:
            index_name = llm_guess
        else:
            index_name, _ = process.extractOne(llm_guess, self.index_names)
            
        return index_name

In [None]:
class RAGGenerator:
    def __init__(self, llm_model, GROQ_API_KEY=os.environ['GROQ_API_KEY']):
        self._base_prompt = \
            "Consider the following context passages of a podcast episode and answer the given question." \
            "You MUST answer the question only in Portuguese." \
            "\n\n" \
            "{context_passages}" \
            "\n\n" \
            "If there is not enough information in the context passages, answer \"Não há informação suficiente no episódio.\"." \
            "\n\n" \
            "Question: {query}"
        
        self.llm = Groq(model=llm_model, api_key=GROQ_API_KEY, temperature=LLM_TEMPERATURE)
        self.llm_model_name = llm_model

    def generate_answer(self, query: str, contexts: List[str]) -> str:
        context_passages = "\n\n".join([ f"Context {i}: {context}" for i, context in enumerate(contexts, 1) ])
        prompt = self._base_prompt.format(query=query, context_passages=context_passages)
        answer = self.llm.complete(prompt).text.strip()
        return answer

In [None]:
class RAGPipeline:
    def __init__(
        self,
        retriever: MultiIndexRetriever, 
        index_detector: IndexDetector,
        generator: RAGGenerator,
    ) -> None:
        self.index_detector: IndexDetector = index_detector
        self.retriever: MultiIndexRetriever = retriever
        self.generator: RAGGenerator = generator

    def __call__(self, query: str) -> RAGResponse:
        index_name = self.index_detector.detect_index(query)
        context_chunks = self.retriever.retrieve(index_name, query)
        answer = self.generator.generate_answer(query, context_chunks)
        return RAGResponse(answer, context_chunks)

In [None]:
index_detector = IndexDetector(llm_model=LLM_MODEL, index_names=list(transcription_nodes.keys()))
index_name = index_detector.detect_index('No episódio \'mário\', quem foi diagnosticado com câncer de bexiga na história?')
index_name

In [None]:
example_contexts = [
    "Eu sou Bia Suzuki, tenho 30 anos, moro em São José do Rio Preto, interior de São Paulo... eu recebi o diagnóstico de câncer de intestino aos 25 anos...",
    "O câncer colorretal é a terceira neoplasia mais frequente e a segunda de maior mortalidade no mundo...",
    "Eu achei que não é impossível viver bem com bolsinha e eu sou uma prova disso."
]
example_question = "No episódio 'bia', quem é a Bia e o que aconteceu com ela?"

rag_generator = RAGGenerator(llm_model=LLM_MODEL)
rag_generator.generate_answer(example_question, example_contexts)

In [None]:
multi_index_retriever = MultiIndexRetriever(
    indexes_nodes=transcription_nodes,
    top_k=TOP_K,
    bm25_top_k=BM25_TOP_K,
    reranker_model=RERANKER_MODEL
)
index_detector = IndexDetector(llm_model=LLM_MODEL, index_names=list(transcription_nodes.keys()))
rag_generator = RAGGenerator(llm_model=LLM_MODEL)

rag_pipeline = RAGPipeline(
    retriever=multi_index_retriever,
    index_detector=index_detector,
    generator=rag_generator,
)

In [None]:
# Prompt adaptado a partir da combinação dos seguintes prompts:
#   - https://github.com/run-llama/llama_index/blob/a87b63fce3cc3d24dc71ae170a8d431440025565/llama_index/agent/react/prompts.py
#   - https://smith.langchain.com/hub/hwchase17/react-chat

REACT_CHAT_SYSTEM_HEADER = """\
You are designed to help answering questions regarding the content of the Não Inviabilize podcast episodes.
You are a Large Language Model trained by Meta AI. As a language model, you are able to generate human-like text based on the input you receive, allowing yourself to engage in natural-sounding conversations and provide responses that are coherent and relevant to the topic at hand.
You are constantly learning and improving, and your capabilities are constantly evolving. You are able to process and understand large amounts of text, and can use this knowledge to provide accurate and informative responses to a wide range of questions.
Additionally, you are able to generate your own text based on the input it receives, allowing yourself to engage in discussions and provide explanations and descriptions on a wide range of topics.
Overall, you are a powerful tool that can help with a wide range of tasks and provide valuable insights and information on a wide range of topics. 
When the user needs help with a specific question about something that was discussed on an specific episode of the Não Inviabilize podcast, you are here to assist.

TOOLS:
------
You have access to a variety of tools that can help you get information to answer questions.
You are responsible for using the tools in any sequence you deem appropriate to complete the task at hand.
This may require breaking the task into subtasks and using different tools to complete each subtask.
You have access to the following tools:
{tools}

To use a tool, please use the following format:
```
Thought: Do I need to use a tool? Yes
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
```

When you have a response to say to the Human, or if you do not need to use a tool, you MUST use the format:
```
Thought: Do I need to use a tool? No
Final Answer: [your response here]
```
But be careful, you MUST answer the question in Portuguese only!

If there is not enough information in the context passages, answer "Não há informação suficiente no episódio."

Begin!
User question: {input}

{agent_scratchpad}
"""

class ReActRAGPipeline:
    def __init__(
        self,
        retriever: MultiIndexRetriever, 
        index_detector: IndexDetector,
        verbose: bool = False,
    ) -> None:
        self.index_detector: IndexDetector = index_detector
        self.retriever: MultiIndexRetriever = retriever

        base_prompt = PromptTemplate.from_template(REACT_CHAT_SYSTEM_HEADER)
        llm = ChatGroq(
            temperature=LLM_TEMPERATURE,
            model_name=LLM_MODEL,
            api_key=os.environ['GROQ_API_KEY']
        )

        tools = [self._search_episode_title, self._search_relevant_passages]
        agent = create_react_agent(llm, tools, base_prompt)
        agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=verbose, handle_parsing_errors=True)

        self._search_history = []

        self.agent_executor = agent_executor

    @tool
    def _search_relevant_passages(self, episode_title: str, question: str) -> list[str]:
        """
        Returns a sequence of relevant contexts passages for the given question about a specific episode in the podcast.
        In order to perform this search, you must provide the question in Portuguese and the title of the episode.
        """
        contexts = self.retriever.retrieve(episode_title, question)
        self._search_history += contexts
        return contexts


    @tool
    def _search_episode_title(self, question: str) -> list[str]:
        """
        Returns the title of the episode that is being asked about in the given question.
        In order to perform this search, you must provide the question in Portuguese.
        """
        return self.index_detector.detect_index(question)
    
    def generate_answer(self, question: str) -> RAGResponse:
        self._search_history = []
        output = self.agent_executor.invoke({"input": "No episódio 'bia', quem é a Bia e o que aconteceu com ela?"})
        
        answer = output['output']
        contexts = self._search_history
        
        self._search_history = []

        return RAGResponse(answer, contexts)

In [None]:
react_rag_pipeline = ReActRAGPipeline(
    retriever=multi_index_retriever, 
    index_detector=index_detector,
)
react_rag_pipeline.generate_answer("No episódio 'bia', quem é a Bia e o que aconteceu com ela?")