# Semantic Memory System with Cognitive Scoring + Dynamic Pruning

**Fases:**
- **Valida√ß√£o**: Update de scores (EMA), decay em mem√≥rias fracas, salvamento imediato
- **Teste**: Mem√≥ria congelada (read-only)

> ‚ö†Ô∏è Nesta vers√£o, **apenas** o mecanismo de update de score (EMA) e decaimento est√£o ativos durante a valida√ß√£o.
> N√£o h√° gera√ß√£o de novos fatos nem refinamento de fatos existentes.

# 0) Setup

### A) Imports

In [1]:
# Standard library imports
import ast
import os
import re
import time
import sys

# Add parent directory to path for imports
sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), '..', '..')))
# Add vectorstore & prompts folders to path
sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), '..', 'vectorstore')))
sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), '..', 'prompts')))

# Third-party imports
import pandas as pd
from dotenv import load_dotenv
from tqdm import tqdm

# LangChain imports
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_huggingface.embeddings import HuggingFaceEmbeddings
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI

# Memory managers & metadata builders
from cognitive_memory_manager import CognitiveMemoryManager, build_metadata_semantic
from simple_vector_memory import SimpleVectorMemory, build_metadata_reflection

# Prompt templates
from templates import (
    SEMANTIC_TEMPLATE,
    SCORE_TEMPLATE,
)

# Local imports
from utils_notebook import (
    SemanticCleaner,
    calcular_metricas_memoria,
    format_choices,
    parse_simple_score,
    make_question
)
cleaner = SemanticCleaner()

load_dotenv()

True

### B) Language Models and Datasets

In [2]:
LABELS = ['A','B','C','D']
number = {'A':0,'B':1,'C':2,'D':3}

In [3]:
# Dataset do arc
valid_df = pd.read_csv("../../dataset/arc_challenge_valid_processed.csv")
test_df = pd.read_csv("../../dataset/arc_challenge_test_processed.csv")

In [4]:
# Modelos para extra√ß√£o e tratamento
gpt5_nano = ChatOpenAI(model='gpt-5-nano-2025-08-07',temperature=0)

In [5]:
phi2 = ChatOllama(model="phi", temperature=0)

### C) Prompts and Chains

In [6]:
semantic_prompt = PromptTemplate.from_template(SEMANTIC_TEMPLATE)
rag_chain = semantic_prompt | phi2

score_prompt = PromptTemplate.from_template(SCORE_TEMPLATE)
score_chain = score_prompt | phi2 | StrOutputParser()

# 1) Utils Functions

### A) Context Formatter

In [7]:
def format_semantic_context(semantic_items, show_scores=False):
    if not semantic_items:
        return "No specific reference information found."

    formatted_parts = []
    formatted_parts.append("")
    
    for i, fact in enumerate(semantic_items, 1):
        raw_q = fact['metadata'].get('question', '')
        q_text = raw_q.split('\n')[0].strip()
        f_text = fact.get('content', '').strip()
        
        block = f"""
    * **Principle #{i}**
        * Context: "{q_text}"
        * Fact: "{f_text}"
"""
        formatted_parts.append(block)        
            
    return "\n".join(formatted_parts)

### B) Answer Extractor 

In [8]:
import re
from typing import List, Optional

def extract_answer(
    small_llm_model, 
    model_text_output: str, 
    valid_labels: List[str] = None,
    debug: bool = False,
    question: Optional[str] = None,
) -> str:
    """
    Extra√ß√£o robusta baseada em padr√µes de alta confian√ßa (Tier 1) 
    com fallback para LLM Juiz em caso de falha.
    """
    if valid_labels is None:
        valid_labels = ['A', 'B', 'C', 'D', 'E']
    
    if not model_text_output:
        return "N/A"
    
    text = model_text_output.strip()

    if "```python" in text or "def solution" in text:
        if debug: print("‚ö† [CODE DETECTED] Enviando para LLM Judge.")
        return _llm_judge(small_llm_model, text, valid_labels, debug, question)

    strong_patterns = [
        r"\\boxed\s*\{\s*([A-H])\s*\}",
        r"(?:Final|Correct)\s+Answer\s*[:\-]?\s*(?:is)?\s*(?:Option)?\s*[\(\[]([A-H])[\)\]](?![a-z])",
        r"The\s+(?:correct\s+)?(?:answer|option|choice)\s+is\s*(?:Option)?\s*[:\-]?\s*[\(\[]([A-H])[\)\]](?![a-z])",
        r"(?:Final|Correct)\s+Answer\s*[:\-]\s*(?:is\s+)?(?:Option\s+)?([A-H])(?=\s|\.|,|!|\?|$)(?![A-Za-z])",
        r"(?:Therefore|Thus|Hence|So),\s*(?:the\s+answer\s+is\s*)?(?:Option)?\s*[\(\[]([A-H])[\)\]](?![a-z])",
        r"(?:^|\n)\s*Answer\s*:\s*([A-H])(?=\s|$|\.|\,)(?![A-Za-z])",
        r"\*\*([A-H])\*\*(?![a-z])",
        r"^([A-H])$"
    ]
    
    for pattern in strong_patterns:
        matches = list(re.finditer(pattern, text, re.IGNORECASE))
        if matches:
            last_match = matches[-1]
            candidate = last_match.group(1).upper()
            
            if candidate in valid_labels:
                if debug: 
                    print(f"‚úì [REGEX] Padr√£o encontrado: '{pattern}' -> {candidate}")
                return candidate

    if debug: print("‚úó [REGEX] Falha nos padr√µes fortes. Chamando LLM Judge.")
    return _llm_judge(small_llm_model, text, valid_labels, debug, question)


def _llm_judge(
    small_llm_model, 
    text: str, 
    valid_labels: List[str],
    debug: bool = False,
    question: Optional[str] = None,
) -> str:
    context_block = ""
    if question:
        context_block = f"""
### CONTEXT (Use this to infer the answer letter):
{question}
"""

    prompt = f"""You are an Answer Extraction Bot. 
Your ONLY job is to identify which option the following "Model Output" concluded is correct.

{context_block}

### Model Output to Analyze:
{text}

### Instructions:
1. Look for an explicit answer (e.g., "Answer: A").
2. If NO explicit letter is found, read the conclusion of the "Model Output" and match it against the Options in the Context. **Infer the letter.**
   - Example: If Context has "(A) 5 (B) 10" and Model Output says "The result is 10", you must output B.
3. Do NOT calculate or solve the problem yourself. Trust the "Model Output".
4. If the model refuses to answer or is unclear, return "E".

Output format: Just the single letter (A, B, C, D, or E). No other text."""

    try:
        response = small_llm_model.invoke(prompt) 
        output = response.content if hasattr(response, 'content') else str(response)
        clean_cand = re.sub(r"[^A-E]", "", output.strip().upper())
        
        if len(clean_cand) > 1:
            clean_cand = clean_cand[-1]
            
        if clean_cand in valid_labels:
            if debug: print(f"‚úì [LLM JUIZ] Inferido: {clean_cand}")
            return clean_cand
            
        return "N/A"

    except Exception as e:
        if debug: print(f"‚úó [ERRO JUIZ] {e}")
        return "N/A"

### C) Evaluation

In [9]:
def avaliar_dataset_dynamic_pruning(
    df, 
    chain=None,
    manager_semantic=None,
    score_chain=None,
    k_semantic=3,
    threshold_semantic=0.24,
    semantic_weight=0.7,
    update_memory=False,
    decay_batch_size=False,
    decay_frequency=50,
    backup_frequency=160,
    backup_path=None,
    desc="Dynamic Pruning"
):
    """
    Avalia dataset com sistema sem√¢ntico cognitivo.
    Durante a valida√ß√£o, apenas o update de score (EMA) e o decaimento est√£o ativos.
    N√£o h√° gera√ß√£o de novos fatos nem refinamento.
    """
    resultados = []
    acertos = 0
    total = len(df)
    decay_counter = 0
    backup_counter = 0
    
    global_sims_sem = []
    global_counts_sem = []
    
    erros = 0

    # Preparar diret√≥rio de backup
    if backup_path:
        backup_dir = os.path.dirname(backup_path)
        if backup_dir and not os.path.exists(backup_dir):
            os.makedirs(backup_dir, exist_ok=True)
    
    loop = tqdm(df.iterrows(), total=total, desc=desc)

    for idx, row in loop:
        try:
            full_question = make_question(row, inline=False)[0]
            question_for_retriever = re.sub(r'\([A-Z]\)\s*', '', row['question'])
            
            # Recuperar mem√≥rias sem√¢nticas (COM SCORE COGNITIVO)
            items_semantic, _ = manager_semantic.retrieve_ranked_memories(
                query=question_for_retriever, 
                k=k_semantic,
                threshold=threshold_semantic,
                semantic_weight=semantic_weight,
                show_scores=False
            )

            cognitive_context_formatted = format_semantic_context(items_semantic)

            count_sem, avg_sim_sem, raw_sims_sem = calcular_metricas_memoria(items_semantic)
            
            global_counts_sem.append(count_sem)
            global_sims_sem.extend(raw_sims_sem)

            response_obj = chain.invoke({
                "question": full_question,
                "similar_facts": cognitive_context_formatted, 
            })

            response_text = response_obj.content if hasattr(response_obj, "content") else str(response_obj)
            
            pred = extract_answer(
                small_llm_model=gpt5_nano, 
                model_text_output=response_text,
                question=full_question,
            )

            if pred not in ['A', 'B', 'C', 'D']:
                pred = 'E'
                erros += 1
            
            target = row['answerKey']
            is_correct = (pred == target)
            acertos += int(is_correct)
            
            # === UPDATE DE SCORE (EMA) ===
            if update_memory and items_semantic and pred != 'E':
                used_ids = [m['doc_id'] for m in items_semantic]
                
                feedback_scores = []
                choices = ast.literal_eval(row['choices'])
                
                try:
                    correct_txt = choices['text'][number[target]] if target in number else str(target)
                except (IndexError, KeyError):
                    correct_txt = str(target)
                
                try:
                    chosen_txt = choices['text'][number[pred]] if pred in number else str(pred)
                except (IndexError, KeyError):
                    chosen_txt = str(pred)
                
                status = "SUCCESS" if is_correct else "FAILURE"
                
                for memory in items_semantic:
                    fact_text = memory['content']
                    try:
                        score_response = score_chain.invoke({
                            "question": full_question,
                            "chosen": chosen_txt,
                            "correct": correct_txt,
                            "outcome_status": status,
                            "fact": fact_text,
                            "reasoning": response_text
                        })
                        _, score = parse_simple_score(score_response)
                        feedback_scores.append(score)
                    except:
                        feedback_scores.append(0)
                
                # Aplicar update EMA
                manager_semantic.update_memories_feedback(used_ids, feedback_scores)
            
            # === DECAY PERI√ìDICO ===
            if update_memory and decay_frequency > 0 and (idx + 1) % decay_frequency == 0:
                decay_counter += 1
                tqdm.write(f"üåô [Q{idx+1}/{total}] Aplicando ciclo de decay #{decay_counter}...")
                manager_semantic.apply_decay_cycle(decay_threshold=0.0)
            
            # === BACKUP PERI√ìDICO ===
            if backup_path and backup_frequency > 0 and (idx + 1) % backup_frequency == 0:
                backup_counter += 1
                df_partial = pd.DataFrame(resultados)
                backup_file = backup_path.replace('.csv', f'_backup.csv')
                df_partial.to_csv(backup_file, index=False)
            
            # Atualiza barra de progresso
            acc_atual = (acertos / (loop.n + 1)) * 100
            loop.set_postfix(
                acc=f"{acc_atual:.2f}%", 
                sem=count_sem, 
                decay=decay_counter,
                bkp=backup_counter,
                erros=erros
            )

            resultados.append({
                'index': idx,
                'question': full_question,
                'retrieved_context': cognitive_context_formatted,
                'retrieved_count_semantic': count_sem,
                'avg_similarity_semantic': avg_sim_sem,
                'raw_output': response_text,
                'pred': pred,
                'target': target,
                'is_correct': is_correct,
                'source': desc,
                'erros': erros
            })

        except Exception as e:
            tqdm.write(f"Erro no √≠ndice {idx}: {e}")
            resultados.append({
                'index': idx, 
                'error': str(e), 
                'is_correct': False, 
                'retrieved_count_semantic': 0,
                'avg_similarity_semantic': 0.0,
            })
    
    # === DECAY FINAL ===
    if update_memory and decay_batch_size:
        decay_counter += 1
        print(f"\nüåô [FINAL] Aplicando ciclo de decay final (#{decay_counter})...")
        manager_semantic.apply_decay_cycle(decay_threshold=0.0)
    
    # Sum√°rio final
    if update_memory:
        print(f"\nüìä SUM√ÅRIO DE MEM√ìRIA:")
        print(f"   Ciclos de decay: {decay_counter}")
        print(f"   Mem√≥rias sem√¢nticas finais: {manager_semantic._collection.count()}")
    
    if backup_path and backup_counter > 0:
        print(f"üíæ Total de backups salvos: {backup_counter}")
    
    return pd.DataFrame(resultados)

# 2) Constru√ß√£o do Banco de Dados Sem√¢ntico

In [10]:
scientific_facts = pd.read_csv("../../scientific_facts_expanded.csv")
scientific_facts = scientific_facts.fillna("N/A")

# Preparar dados sem√¢nticos
df_semantic = scientific_facts[scientific_facts['scientific_fact'] != "N/A"].copy()
df_semantic.drop_duplicates(subset=['scientific_fact'], keep='first', inplace=True)

In [11]:
FORCE_RESET = False  # True quando quisermos recriar os bancos do zero
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") 

manager_semantic = CognitiveMemoryManager(
    db_path="vectorstores/dynamic_pruning/chroma_semantic",
    embedding_model=embedding_model, alpha=0.1, decay_lambda=0.99, forget_threshold=-0.6
).init_from_dataframe(df=df_semantic, content_col='scientific_fact',
                      id_prefix='semantic', metadata_func=build_metadata_semantic, reset_db=FORCE_RESET)

üî® Construindo nova mem√≥ria cognitiva com 3993 itens...
‚úÖ Mem√≥ria inicializada com 3993 itens.


# 3) Testing in one question

In [14]:
i = 0
k_semantic = 3
row = test_df.iloc[i]

In [15]:
q_text = make_question(row, inline=False)[0]
question_clean = re.sub(r'\([A-Z]\)\s*', '', q_text)

# Recuperar mem√≥rias sem√¢nticas
items_semantic, _ = manager_semantic.retrieve_ranked_memories(
    query=question_clean, 
    k=k_semantic, 
    threshold=0.24,
    semantic_weight=0.9,
    show_scores=True
)

semantic_context = format_semantic_context(items_semantic, show_scores=True)

print("Question: \n", q_text)
print("\nSemantic Context: \n", semantic_context)

count_sem, avg_sim_sem, raw_sims_sem = calcular_metricas_memoria(items_semantic)
print("\nM√©tricas Sem√¢nticas: Count:", count_sem, "AVG_SIM:", avg_sim_sem)

Question: 
 An astronomer observes that a planet rotates faster after a meteorite impact. Which is the most likely effect of this increase in rotation?
(A) Planetary density will decrease.
(B) Planetary years will become longer.
(C) Planetary days will become shorter.
(D) Planetary gravity will become stronger.

Semantic Context: 
 

    * **Principle #1**
        * Context: "Why is it winter in North America when it is summer in South America?"
        * Fact: "Axial precession causes the Earth's axis to change its orientation relative to its orbit around the sun over a period of approximately 26,000 years."


    * **Principle #2**
        * Context: "Michael learned that the movement of Earth in the solar system causes changes that can be seen on the planet. Which change could be seen on Earth in the time it takes Earth to rotate once on its axis?"
        * Fact: "This apparent movement and day-night change are due to Earth's axial tilt and its revolution around the Sun."


    * *

In [16]:
import time 

a = time.time()
semantic_context = format_semantic_context(items_semantic, show_scores=False)
response_obj = rag_chain.invoke({
    "question": q_text,
    "similar_facts": semantic_context, 
})
print("Time to answer: ", round(time.time()-a,2))
response_text = response_obj.content if hasattr(response_obj, "content") else str(response_obj)

pred = extract_answer(
    small_llm_model=gpt5_nano, 
    model_text_output=response_text,
    question=q_text,
    debug=True
)

target = row['answerKey']
is_correct = (pred == target)

print("# Raw Response: \n", response_text)
print(f"# Alternative chosen: {pred} (Correta: {target})")
print(f"# Resultado: {'CORRETO' if is_correct else 'ERRO'}")

Time to answer:  2.66
‚úì [REGEX] Padr√£o encontrado: '(?:Final|Correct)\s+Answer\s*[:\-]?\s*(?:is)?\s*(?:Option)?\s*[\(\[]([A-H])[\)\]](?![a-z])' -> C
# Raw Response: 
  
The correct answer is (C) Planetary days will become shorter.

Reasoning:
1. The rotation of a planet determines the length of its day, so an increase in rotation would result in shorter planetary days.
2. This change could be due to the impact causing a shift in the planet's mass distribution, which affects its rotational speed.
3. The other options are not supported by the given information.

# Alternative chosen: C (Correta: C)
# Resultado: CORRETO


# 4) Running

In [17]:
# VALIDA√á√ÉO: Mem√≥ria ATIVA (update de score + decay)
df_resultado_valid_dp = avaliar_dataset_dynamic_pruning(
    df=valid_df, 
    chain=rag_chain,
    manager_semantic=manager_semantic,
    score_chain=score_chain,
    k_semantic=3,
    semantic_weight=0.7,
    update_memory=True,
    decay_batch_size=True,
    decay_frequency=100,
    backup_frequency=130,
    backup_path="../../results/dynamic_pruning_valid.csv",
    desc="Valida√ß√£o (Score Update + Decay Only)"
)

Valida√ß√£o (Score Update + Decay Only):   0%|          | 0/299 [00:01<?, ?it/s]


KeyboardInterrupt: 

In [None]:
# TESTE: Mem√≥ria CONGELADA (apenas recupera√ß√£o)
df_resultado_test_dp = avaliar_dataset_dynamic_pruning(
    df=test_df, 
    chain=rag_chain,
    manager_semantic=manager_semantic,
    score_chain=score_chain,
    k_semantic=3,
    semantic_weight=0.7,
    update_memory=False,
    decay_batch_size=False,
    backup_frequency=150,
    backup_path="../../results/dynamic_pruning_test.csv",
    desc="Teste (Memory Frozen)"
)

In [None]:
df_resultado_test_dp.to_csv("../../results/dynamic_pruning_test.csv", index=False)
df_resultado_valid_dp.to_csv("../../results/dynamic_pruning_valid.csv", index=False)