In [95]:
import regex as re
import os
from typing import List, Dict
import requests
import json

from pprint import pprint

GROQ_API_KEY = os.getenv('groq_key')

FILE_PATH = r'C:\Users\fuedj\Documents\Code\RAG_Dr_Voss\llm-case-study-main\src\currency.txt'

In [96]:
def load_txt(file_path: str) -> str:
    """Carrega o conteúdo do arquivo de texto"""
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()

def chunk_text(text: str, chunk_size: int = 800) -> List[str]:
    """Divide o texto em chunks"""
    return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]

In [97]:
text = load_txt('currency.txt')

chunks = chunk_text(text)

In [19]:
analysis_chunk = chunks[0]

In [64]:
len(chunks)

40

In [23]:
class GroqFileRAG:
    def __init__(self, api_key, content: str):
        self.api_url = "https://api.groq.com/openai/v1/chat/completions"
        self.headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {api_key}"
        }
        self.model = "qwen-qwq-32b"
        self.file_content = content
        #print(len(self.file_content))
        
    def _make_api_call(self, payload, stream=False):
        response = requests.post(
            self.api_url,
            headers=self.headers,
            json=payload,
            stream=stream
        )
        
        if response.status_code != 200:
            raise Exception(f"API Error: {response.status_code} - {response.text}")
        
        if stream:
            return self._handle_stream_response(response)
        return response.json()

    def _handle_stream_response(self, response):
        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith("data: "):
                    data = decoded_line[6:]
                    if data != "[DONE]":
                        yield json.loads(data)

    def answer_with_cot(self, question, stream=False):
        # Step 1: Generate retrieval thoughts
        retrieval_payload = {
            "model": self.model,
            "messages": [
                {"role": "system", "content": "Analyze the question about a diary of a doctor in a fictional world and generate search thoughts for the document."},
                {"role": "user", "content": f"Question: {question}\n\nGenerate key search terms and thoughts:"}
            ],
            "temperature": 0.5,
            "max_tokens": 256,
            "top_p": 0.9,
            "stream": False
        }
        
        retrieval_response = self._make_api_call(retrieval_payload)
        retrieval_thoughts = retrieval_response['choices'][0]['message']['content']
        
        # Step 2: Create the RAG prompt with document context
        rag_prompt = f"""Document Context:
{self.file_content}

Question: {question}
Retrieval Thoughts: {retrieval_thoughts}

Answer the question using Chain of Thought reasoning:
1. First, identify the relevant section in the document
2. Then extract the specific information needed
3. Finally, formulate a clear answer"""

        # Step 3: Generate final answer
        answer_payload = {
            "model": self.model,
            "messages": [
                {"role": "system", "content": "You are a RAG assistant. Answer using the document and Chain of Thought."},
                {"role": "user", "content": rag_prompt}
            ],
            "temperature": 0.7,
            "max_tokens": 1024,
            "top_p": 0.95,
            "stream": stream
        }
        
        if stream:
            return self._stream_final_answer(answer_payload)
        else:
            final_response = self._make_api_call(answer_payload)
            return {
                "question": question,
                "retrieval_thoughts": retrieval_thoughts,
                "final_answer": final_response['choices'][0]['message']['content']
            }

    def _stream_final_answer(self, payload):
        for chunk in self._make_api_call(payload, stream=True):
            content = chunk['choices'][0]['delta'].get('content', '')
            print(content, end='', flush=True)
            yield content


In [None]:

# Initialize
rag_system = GroqFileRAG(GROQ_API_KEY, content=analysis_chunk)

# Ask about local currency
print("Answering question about local currency...")
result = rag_system.answer_with_cot("What is the currency of Veridia called?", stream=False)


Answering question about local currency...


{'question': 'What is the currency of Veridia called?',
 'retrieval_thoughts': '\n<think>\nOkay, the user is asking about the currency in Veridia from a fictional doctor\'s diary. Let me start by breaking down the question. The main points are "currency" and "Veridia." Since it\'s a fictional world, the information might not be in standard economic terms. First, I need to think about possible names for currency in fictional settings. Often, they have unique names based on the world\'s lore. Maybe the diary entries mention transactions or economic activities. The doctor might refer to paying for services, buying supplies, or something related to money. \n\nI should consider keywords like "Veridian currency," "money in Veridia," or terms like "coins," "notes," "economic system." Also, maybe the currency has a specific name, like "Veridian Crowns" or "Eco," something that fits the setting\'s theme. Since it\'s a diary, the doctor might write about daily expenses, which could hint at the c

In [None]:
pprint(result)

In [None]:
# Exemplo de uso:
#resultado = analyze_chunk_with_deepseek(chunk_text="Texto do chunk...", question="Qual a moeda local?", api_key=GROQ_API_KEY)
#print(resultado)

In [None]:
import aiohttp
import asyncio
from typing import List, Optional

class AsyncAgentSystem:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.found_answer = asyncio.Event()
        self.result = None

    async def analyze_chunk(self, session: aiohttp.ClientSession, chunk: str) -> Optional[str]:
        if self.found_answer.is_set():
            return None

        prompt = f"""Analise este texto e responda APENAS se encontrar informação sobre a moeda local:
        
        {chunk[:1000]}

        Se não encontrar, responda 'NÃO ENCONTRADO'
        Se encontrar, responda 'ENCONTREI'"""
        
        
        try:
            async with session.post(
                "https://api.groq.com/openai/v1/chat/completions",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={
                    "model": "qwen-qwq-32b",
                    "messages": [{"role": "user", "content": prompt}],
                    "temperature": 0.1
                },
                timeout=10
            ) as response:
                data = await response.json()
                answer = data['choices'][0]['message']['content']
                print(session)
                print(answer)
                if "NÃO ENCONTRADO" not in answer:
                    self.found_answer.set()
                    self.result = answer
                    return answer

        except Exception as e:
            print(f"Erro no chunk: {str(e)}")
        return None

    async def process_chunks(self, chunks: List[str]) -> str:
        async with aiohttp.ClientSession() as session:
            tasks = [self.analyze_chunk(session, chunk) for chunk in chunks]
            await asyncio.gather(*tasks)
            
            if self.found_answer.is_set():
                return self.result
            return "Informação não encontrada em nenhum chunk"

# Função que pode ser chamada diretamente no Jupyter
async def run_agent_analysis():
    for i in range(0, len(chunks), 3):
        batch = chunks[i:i+3]
        print(f"\nProcessando lote {i//3 + 1}: chunks {i} a {i+2}")
        
        new_chunks = batch  # ← Substitua pelos seus chunks

        agent = AsyncAgentSystem(GROQ_API_KEY)  # Cria a instância do agente
        result = await agent.process_chunks(new_chunks)
        print("Resultado:", result)
        if result != 'Informação não encontrada em nenhum chunk':
            return result

# Executa no Jupyter Notebook
res = await run_agent_analysis()  # ← Note o 'await' diretamente na célula


Processando lote 1: chunks 0 a 2
<aiohttp.client.ClientSession object at 0x000002396E1A7250>

<think>
Okay, let's see. The user wants me to analyze the given text and check if there's any information about the local currency. The instruction is clear: if I find it, I should respond with "ENCONTREI"; otherwise, "NÃO ENCONTRADO". 

First, I'll read through the text carefully. The text talks about Veridia going through turbulent times, mentions leaders integrating with everyday life, sketching drone patterns, Seraphina transforming adversity into opportunity, and the balance between past and future. The user is looking for mentions of the local currency.

Looking at the keywords related to currency: words like "moeda local" (local currency) in Portuguese, or terms like money, coins, currency, economy, etc. in English. Scanning the text again, I notice words like "legacy", "drone patterns", "spectacle", "tradition and innovation", "snow-draped world". There's no mention of any currency or

In [79]:
res

'Informação não encontrada em nenhum chunk'

In [67]:
pprint(res)

'Informação não encontrada em nenhum chunk'


In [76]:
for i in range(0, len(chunks), 3):
    batch = chunks[i:i+3]
    print(len(batch))
    print(batch)


3
["23rd Day of Verdelight 1855\nThe echoes of last night's festivities lingered in my mind as I awoke to the gentle illumination of\nthe winter dawn. The Solstice, with its orchestration of tradition and innovation, left me pondering\nthe future with renewed curiosity and resolve. As the morning sun crept lazily across the rooftops,\nI decided to venture to the Veridian Archives, feeling an insatiable pull towards the stories and\nhistories housed within its storied halls.\nThe  path  to  the  Archives  was  lined  with  the  remnants  of  last  night's  celebrations.  Confetti\nsparkled on the cobblestones like fallen stars, and the air still carried the faint aroma of mulled\nspices  and  candle  wax.  I  couldn't  help  but  smile  at  the  memory  of  the  students  I  had  spoken\nto—their eager ", "faces an indelible mark of hope in this ever-evolving world of ours. \nUpon  arriving,  I  was  greeted  by  the  familiar  scent  of  aged  parchment  and  polished  oak.  The\nArchi

In [71]:
i

39

In [None]:
import aiohttp
import asyncio
from typing import List, Optional

class AsyncAgentSystem:
    def __init__(self, api_key: str, question: str):
        self.api_key = api_key
        self.found_answer = asyncio.Event()
        self.result = None
        self.question = question

    async def analyze_chunk(self, session: aiohttp.ClientSession, chunk: str) -> Optional[Dict]:
        """Analyzes a single chunk with structured JSON response"""
        if self.found_answer.is_set():
            return None

        prompt = """Analyze the following text from a doctor's diary in a fictional world and respond in strict JSON format:

{
    "instructions": {
        "purpose": "Find information relevant to the question",
        "rules": [
            "Respond only in valid JSON",
            "Include exact excerpts when available",
            "Provide confidence score (0.0-1.0)",
            "Be specific about missing information"
        ],
        "response_format": {
            "contains_answer": "boolean",
            "answer": "string or null",
            "confidence": "float",
            "excerpt": "string or null",
            "reason": "string (if contains_answer=false)"
        }
    },
    "examples": {
        "positive": {
            "contains_answer": true,
            "answer": "Verdelight Dollar",
            "confidence": 0.85,
            "excerpt": "The Verdelight Dollar (VGD) became the official currency in 1992, as recorded in my diary on page 142"
        },
        "negative": {
            "contains_answer": false,
            "answer": null,
            "confidence": 0.0,
            "excerpt": null,
            "reason": "No currency mentions found in patient treatment records"
        }
    },
    "task": {
        "question": \"""" + self.question + """\",
        "text_to_analyze": \"""" + chunk[:1000] + """\"
    }
}

Provide your analysis in this exact JSON structure:"""
        try:
            async with session.post(
                "https://api.groq.com/openai/v1/chat/completions",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={
                    "model": "qwen-qwq-32b",
                    "messages": [{"role": "user", "content": prompt}],
                    "temperature": 0.1,
                    "response_format": {"type": "json_object"},
                    "max_tokens": 200
                },
                timeout=10
            ) as response:
                data = await response.json()
                response_content = data['choices'][0]['message']['content']
                
                try:
                    result = json.loads(response_content)
                    if result.get('positive', False):
                        self.found_answer.set()
                        self.result = result
                        return result
                    return None
                
                except json.JSONDecodeError:
                    print("Error: Invalid JSON response")
                    return None

        except Exception as e:
            print(f"API Error: {str(e)}")
            return None

    async def process_chunks(self, chunks: List[str]) -> str:
        async with aiohttp.ClientSession() as session:
            tasks = [self.analyze_chunk(session, chunk) for chunk in chunks]
            await asyncio.gather(*tasks)
            
            if self.found_answer.is_set():
                return self.result
            return "Informação não encontrada em nenhum chunk"

question = "What is the currency of Veridia called?"
chunks = chunks[:3]
# Função que pode ser chamada diretamente no Jupyter
async def run_agent_analysis():
    for i in range(0, len(chunks), 3):
        batch = chunks[i:i+3]
        print(f"\nProcessando lote {i//3 + 1}: chunks {i} a {i+2}")
        
        new_chunks = batch  # ← Substitua pelos seus chunks

        agent = AsyncAgentSystem(GROQ_API_KEY, question=question)  # Cria a instância do agente
        result = await agent.process_chunks(new_chunks)
        print("Resultado:", result)
        if result != 'Informação não encontrada em nenhum chunk':
            return result

# Executa no Jupyter Notebook
res = await run_agent_analysis()  # ← Note o 'await' diretamente na célula


Processando lote 1: chunks 0 a 2
API Error: 'choices'
API Error: 'choices'
API Error: 'choices'
Resultado: Informação não encontrada em nenhum chunk

Processando lote 2: chunks 3 a 5
API Error: 'choices'
Resultado: Informação não encontrada em nenhum chunk


In [100]:
import aiohttp
import asyncio
import json
from typing import List, Dict, Optional

class AsyncAgentSystem:
    def __init__(self, api_key: str, question: str):
        self.api_key = api_key
        self.found_answer = asyncio.Event()
        self.result = None
        self.question = question

    async def analyze_chunk(self, session: aiohttp.ClientSession, chunk: str) -> Optional[Dict]:
        """Analyzes a single chunk with structured JSON response"""
        if self.found_answer.is_set():
            return None

        prompt = f"""
        Analyze the following text from a doctor's diary in a fictional world and respond in strict JSON format:

{{
    "instructions": {{
        "purpose": "Find information relevant to the question",
        "rules": [
            "Respond only in valid JSON",
            "Include exact excerpts when available",
            "Provide confidence score (0.0-1.0)",
            "Be specific about missing information"
        ],
        "response_format": {{
            "contains_answer": "boolean",
            "answer": "string or null",
            "confidence": "float",
            "excerpt": "string or null",
            "reason": "string (if contains_answer=false)"
        }}
    }},
    "examples": {{
        "positive": {{
            "contains_answer": true,
            "answer": "Verdelight Dollar",
            "confidence": 0.85,
            "excerpt": "The Verdelight Dollar (VGD) became the official currency in 1992, as recorded in my diary on page 142"
        }},
        "negative": {{
            "contains_answer": false,
            "answer": null,
            "confidence": 0.0,
            "excerpt": null,
            "reason": "No currency mentions found in patient treatment records"
        }}
    }},
    "task": {{
        "question": "{self.question}",
        "text_to_analyze": "{chunk[:1000]}"
    }}
}}

        Provide your analysis in this exact JSON structure
        """

        try:
            async with session.post(
                "https://api.groq.com/openai/v1/chat/completions",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={
                    "model": "qwen-qwq-32b",
                    "messages": [{"role": "user", "content": json.dumps(prompt, ensure_ascii=False)}],
                    "temperature": 0.1,
                    "response_format": {"type": "json_object"}
                },
                timeout=15  # Increased timeout
            ) as response:
                response.raise_for_status()
                data = await response.json()
                response_content = data['choices'][0]['message']['content']
                
                try:
                    result = json.loads(response_content)
                    print(f"Result: {result}")
                    if result.get('contains_answer', False):  # Fixed key from 'positive' to 'contains_answer'
                        self.found_answer.set()
                        self.result = result
                        return result
                    return None
                
                except json.JSONDecodeError as e:
                    print(f"JSON Error: {str(e)}")
                    print(f"Raw response: {response_content}")
                    return None

        except Exception as e:
            print(f"API Error: {str(e)}")
            return None

    async def process_chunks(self, chunks: List[str]) -> Optional[Dict]:
        """Process chunks and return the first valid answer found"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.analyze_chunk(session, chunk) for chunk in chunks]
            results = await asyncio.gather(*tasks)
            
            for result in results:
                if result is not None:
                    return result
            return None

async def run_agent_analysis(api_key: str, question: str, chunks: List[str], batch_size: int = 3):
    """Process chunks in batches and return the first valid answer"""
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        print(f"\nProcessing batch {i//batch_size + 1} (chunks {i}-{i+len(batch)-1})")
        
        agent = AsyncAgentSystem(api_key, question=question)
        result = await agent.process_chunks(batch)
        
        if result:
            print("✅ Answer found!")
            return result
        print("\nNo answer in this batch")
    
    print("\nNo answer found in any chunk")
    return None

result = await run_agent_analysis(GROQ_API_KEY, question, chunks)  # Process first 3 chunks
if result:
    print(f"\nCurrency: {result.get('answer')}")
    print(f"Confidence: {result.get('confidence', 0)*100:.0f}%")
    print(f"Excerpt: {result.get('excerpt')}")


Processing batch 1 (chunks 0-2)
Result: {'contains_answer': False, 'answer': None, 'confidence': 0.0, 'excerpt': None, 'reason': 'No references to currency or economic systems were found in the analyzed diary entry'}
Result: {'contains_answer': False, 'answer': None, 'confidence': 0.0, 'excerpt': None, 'reason': 'No references to currency or monetary systems were found in the analyzed diary excerpt'}
Result: {'contains_answer': False, 'answer': None, 'confidence': 0.0, 'excerpt': None, 'reason': "No references to currency systems or monetary units were found in the diary entry. Mentions of 'Verdelight' relate to a date rather than economic systems."}

No answer in this batch

Processing batch 2 (chunks 3-5)
Result: {'contains_answer': False, 'answer': None, 'confidence': 0.0, 'excerpt': None, 'reason': 'No currency mentions found in diary entry text'}
Result: {'contains_answer': False, 'answer': None, 'confidence': 0.0, 'excerpt': None, 'reason': 'No currency mentions found in the pro

In [102]:
result

{'contains_answer': True,
 'answer': 'Veridian Crown',
 'confidence': 0.8,
 'excerpt': 'the Veridian Crown, the currency exchanged in this thriving metropolis',
 'reason': None}