In [1]:
# ==========================================
# CELL 1: SETUP
# ==========================================
import os
import json
import re
from typing import List, Dict, Any
from tavily import TavilyClient
import time
from langchain_groq import ChatGroq
from langchain_core.messages import SystemMessage, HumanMessage
from dotenv import load_dotenv

# --- CONFIGURATION ---
# Load Key from Environment
load_dotenv(override=True)
TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY")

# Search Parameters
SEARCH_CONFIG = {
    "search_depth": "advanced",      # Deep search for quality
    "topic": "general",              # General knowledge
    "max_results": 5,                # Top 5 sources
    "include_answer": True,          # Get the AI-generated summary
    "include_raw_content": True,     # Get full page text
    "include_images": False,
    "chunks_per_source": 3
}

# Processing Config
MAX_RAW_CHARS = 4000  # Truncate raw content per source to avoid context overflow
MODEL_NAME = "llama-3.3-70b-versatile" # 100k TPD
# MODEL_NAME = "openai/gpt-oss-120b" # 200k TPD

print("‚úÖ Tavily Configuration Loaded.")

‚úÖ Tavily Configuration Loaded.


In [2]:
# ==========================================
# CELL 2: DEEP WEB SCOUT ENGINE (TAVILY)
# ==========================================

class DeepWebScout:
    def __init__(self):
        if not TAVILY_API_KEY:
            raise ValueError("‚ùå TAVILY_API_KEY not found in environment variables.")
        self.client = TavilyClient(api_key=TAVILY_API_KEY)

    def _clean_raw_content(self, text: str) -> str:
        """
        Helper to clean and truncate the massive raw_content strings.
        Removes excessive newlines and limits length.
        """
        if not text: return ""
        # Collapse whitespace
        clean = re.sub(r'\s+', ' ', text).strip()
        # Truncate to keep context window healthy
        if len(clean) > MAX_RAW_CHARS:
            return clean[:MAX_RAW_CHARS] + "... [TRUNCATED]"
        return clean

    def search_and_extract(self, sub_query: str) -> Dict[str, Any]:
        """
        Executes Advanced Search and formats the JSON for the Agent/Curator.
        """
        print(f"   üîé Scouting External Cortex for: '{sub_query}'...")
        
        try:
            # 1. CALL TAVILY API
            response = self.client.search(query=sub_query, **SEARCH_CONFIG)
            
            # 2. EXTRACT THE "ADVANCED ANSWER" (The Executive Summary)
            # Tavily's LLM generates this based on the search results.
            # This is extremely high-value for our Storyteller/Synthesizer.
            ai_summary = response.get("answer", "")
            
            # 3. PROCESS THE EVIDENCE (The "Results" List)
            results = response.get("results", [])
            
            formatted_context = []
            curation_data = []
            
            # If we have an AI summary, put it at the very top of the context
            if ai_summary:
                formatted_context.append(f"‚òÖ EXECUTIVE SUMMARY (AI GENERATED):\n{ai_summary}\n{'-'*40}")

            print(f"   üëÄ Retrieved {len(results)} high-fidelity sources...")
            
            for i, res in enumerate(results):
                # Extract Metadata
                title = res.get("title", "Unknown Title")
                url = res.get("url", "No URL")
                score = res.get("score", 0.0)
                
                # We prefer the high-quality snippet 'content', but we back it up
                # with 'raw_content' if the snippet is too short.
                snippet = res.get("content", "")
                raw_text = self._clean_raw_content(res.get("raw_content", ""))
                
                # 4. CONSTRUCT CONTEXT STRING (For the Agent)
                # We prioritize the Title/URL/Snippet.
                # We append a chunk of raw text only if it adds value.
                entry = (
                    f"SOURCE [{i+1}]: {title}\n"
                    f"LINK: {url} (Relevance: {score:.2f})\n"
                    f"SUMMARY: {snippet}\n"
                    f"EXTRACT: {raw_text[:500]}...\n" # Give agent a peek at raw text
                    f"{'-'*40}"
                )
                formatted_context.append(entry)
                
                # 5. PREPARE CURATION OBJECT (For the JSON File)
                # The Curator gets the FULL raw text to extract graph triples.
                curation_data.append({
                    "url": url,
                    "title": title,
                    "relevance_score": score,
                    "snippet": snippet,
                    "full_text": raw_text # Curator gets the big chunk
                })

            return {
                "status": "success",
                "tavily_answer": ai_summary,      # The direct answer
                "agent_context": "\n".join(formatted_context), # The string for the Prompt
                "curation_data": curation_data,   # The list for the JSON file
                "original_response": response     # Keep full metadata just in case
            }

        except Exception as e:
            print(f"   ‚ùå Search Engine Error: {e}")
            return {"status": "failed", "content": str(e)}

# Initialize
scout = DeepWebScout()
print("üöÄ Tavily Advanced Scout Ready.")

üöÄ Tavily Advanced Scout Ready.


In [3]:
# ==========================================
# CELL 3: KNOWLEDGE CURATOR (UPDATED)
# ==========================================
class KnowledgeCurator:
    def __init__(self, pending_file="./models/pending_knowledge.json", model_name="llama-3.3-70b-versatile"):
        self.pending_file = pending_file
        self.llm = ChatGroq(
            temperature=0, 
            model_name=model_name, 
            api_key=os.environ.get("GROQ_API_KEY"),
            model_kwargs={"response_format": {"type": "json_object"}}
        )

    def curate(self, query: str, scout_result: Dict):
        """
        Takes the scout result, analyzes the 'curation_data', and saves a knowledge artifact.
        """
        if scout_result["status"] != "success":
            return

        print("   üß† Curating knowledge from raw content...")
        
        # Prepare a rich context from the top 3 results for the Curator
        # We combine the raw text from the best sources
        best_sources = scout_result["curation_data"][:3] 
        combined_text = "\n\n".join([f"Source ({s['url']}): {s['full_text']}" for s in best_sources])
        
        sys_msg = """
            You are the **Graph RAG Knowledge Architect**.
            Your goal is to transform raw, noisy web content into a pristine, structured Knowledge Artifact optimized for both vector search and graph traversal.

            ### INSTRUCTIONS

            1. **VECTOR CONTENT (The Summary)**:
            - Synthesize a **dense, information-rich paragraph** that directly answers the User Query based *only* on the Scraped Content.
            - Remove conversational fluff ("The article states...", "It is important to note...").
            - Focus on factual density: include dates, numbers, names, and specific technical details.
            - This text will be embedded; ensure it is semantically complete and self-contained.

            2. **GRAPH TRIPLES (The Knowledge Graph)**:
            - Extract 5-15 semantic triples: `{"head": "Subject", "relation": "Predicate", "tail": "Object"}`.
            - **Entity Rules (Head/Tail)**: Use precise Proper Nouns or technical concepts. Keep them atomic (e.g., "Elon Musk" instead of "The CEO of Tesla Elon Musk").
            - **Relation Rules**: Use active, directed verbs (e.g., "founded", "acquired", "located_in", "author_of"). Avoid generic relations like "is" or "has" if a more specific one exists.
            - **Canonicalization**: Resolve pronouns and aliases to their full names (e.g., replace "he" with the person's name).

            3. **METADATA**:
            - `confidence_score`: 0.0 (Irrelevant/Garbage) to 1.0 (Perfect, Factual Match).
            - `category`: Classify the content into one specific domain tag (e.g., "Market Data", "Technical Documentation", "Biography", "News").

            ### OUTPUT SCHEMA (Strict JSON)
            {
                "vector_content": "Dense text summary...",
                "graph_triples": [
                    {"head": "Entity A", "relation": "relationship_verb", "tail": "Entity B"},
                    {"head": "Entity B", "relation": "relationship_verb", "tail": "Entity C"}
                ],
                "metadata": {
                    "confidence_score": 0.85,
                    "category": "Domain Tag"
                }
            }
            """
        
        try:
            response = self.llm.invoke([
                SystemMessage(content=sys_msg),
                HumanMessage(content=f"QUERY: {query}\n\nCONTENT:\n{combined_text[:6000]}") # Context limit
            ])
            artifact_data = json.loads(response.content)
            
            final_artifact = {
                "status": "pending_review",
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
                "original_query": query,
                "data": artifact_data
            }
            
            self._save(final_artifact)
            print(f"   üíæ Knowledge Artifact Saved to {self.pending_file}")
            
        except Exception as e:
            print(f"   ‚ùå Curation Failed: {e}")

    def _save(self, artifact):
        data = []
        if os.path.exists(self.pending_file):
            try:
                with open(self.pending_file, "r") as f: data = json.load(f)
            except: data = []
        data.append(artifact)
        with open(self.pending_file, "w") as f: json.dump(data, f, indent=2)

curator = KnowledgeCurator(model_name=MODEL_NAME)
print("üìö Curator Ready.")

üìö Curator Ready.


In [4]:
# ==========================================
# CELL 4: RUNNING THE PIPELINE
# ==========================================
# Example atomic questions that might fail RAG
missing_tasks = [
    "current net worth Beyonce 2025",
    "Beyonce recent tour gross earnings 2024"
]

print("üö® RAG GAP DETECTED. ACTIVATING EXTERNAL CORTEX.")
print("="*60)

for task in missing_tasks:
    print(f"\nüåê TASK: {task}")
    
    # 1. Search & Extract
    result = scout.search_and_extract(task)
    
    # 2. Display the 'Agent View' (What the LLM sees)
    if result['status'] == 'success':
        print("\nüìÑ AGENT CONTEXT BLOCK (Preview):")
        print("-" * 40)
        # This includes the Tavily 'Advanced Answer' + Source Snippets
        print(result['agent_context'][:2000] + "...\n[Truncated for view]") 
        
        # 3. Curate & Memorize
        curator.curate(task, result)
    else:
        print("‚ùå Task Failed.")
    
    print("-" * 60)

print("\n‚úÖ PIPELINE COMPLETE.")

üö® RAG GAP DETECTED. ACTIVATING EXTERNAL CORTEX.

üåê TASK: current net worth Beyonce 2025
   üîé Scouting External Cortex for: 'current net worth Beyonce 2025'...
   üëÄ Retrieved 5 high-fidelity sources...

üìÑ AGENT CONTEXT BLOCK (Preview):
----------------------------------------
‚òÖ EXECUTIVE SUMMARY (AI GENERATED):
Beyonc√©'s net worth is estimated to be around $1 billion in 2025. She became the fifth billionaire musician, recognized by Forbes. Her success spans music, fashion, and business ventures.
----------------------------------------
SOURCE [1]: Beyonc√© declared a billionaire by Forbes - BBC
LINK: https://www.bbc.com/news/articles/cn09091zw34o (Relevance: 1.00)
SUMMARY: Earlier this month, Forbes estimated Beyonc√©'s net worth of $800m (¬£593m) and predicted she would cross the billionaire threshold for the first time following years of success.

Her 2023 Renaissance World Tour grossed nearly $600m, making her one of the biggest pop music icons in the world alongsid