##### 0 - Requirementes and configurations

In [74]:
# Set autoreload
%load_ext autoreload
%autoreload 2

# Python modules
import logging
import pandas as pd

# Data modules
from src.data_handler.crypto_price_fetcher import get_crypto_data
from src.data_handler.news_processor import process_news


# Set logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True)
logger = logging.getLogger(__name__)


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
# load btc data
btc_data = get_crypto_data('BTC', 'USDT', 'hour', 40)

# Load 1000, events examples
sample = 1000
news = pd.read_csv('data/news/raw/news_btc.csv', nrows=sample)
events = process_news(sample=sample)
del(news)

2025-03-10 02:47:02,621 - INFO - Loading cached data from data/prices/BTC_USDT_hour_40days.csv
2025-03-10 02:47:02,626 - INFO - Data retrieved successfully with 961 records


Processed 1000 news articles




In [None]:
# Self-Improving LLM Agent with Multi-Memory Framework
# Based on the provided diagram

# Installing required packages
!pip install langchain langchain-openai pydantic pandas matplotlib

import os
import json
import datetime
import pandas as pd
import matplotlib.pyplot as plt
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field

from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain
from langchain.schema import HumanMessage, AIMessage

# You need to set your API key
os.environ["OPENAI_API_KEY"] = "your-api-key-here"

# Define the data structures for our memory components
class NewsItem(BaseModel):
    """A news item with its features."""
    content: str
    date: str
    source: str
    category: str
    sentiment: Optional[float] = 0.0
    
class PriceData(BaseModel):
    """Price data point."""
    asset: str
    price: float
    date: str
    
class Decision(BaseModel):
    """A decision made by the agent."""
    decision_id: str = Field(default_factory=lambda: f"decision_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}")
    recommendation: str
    confidence: float
    reasoning: str
    timestamp: str = Field(default_factory=lambda: datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    outcome: Optional[str] = None
    reward: Optional[float] = None
    
class Fact(BaseModel):
    """A long term fact the agent has learned."""
    fact: str
    source: str
    confidence: float
    category: str
    timestamp: str = Field(default_factory=lambda: datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# Memory Components as shown in the diagram
class SensoryMemory:
    """
    Sensory Memory: Most recent news + features and prices
    """
    def __init__(self):
        self.news: List[NewsItem] = []
        self.prices: List[PriceData] = []
        self.max_items = 10  # Only keep the most recent items
        
    def add_news(self, news_item: NewsItem):
        """Add a news item to sensory memory."""
        self.news.append(news_item)
        if len(self.news) > self.max_items:
            self.news.pop(0)  # Remove oldest item
            
    def add_price(self, price_data: PriceData):
        """Add price data to sensory memory."""
        self.prices.append(price_data)
        if len(self.prices) > self.max_items:
            self.prices.pop(0)  # Remove oldest item
            
    def get_formatted(self) -> str:
        """Return formatted sensory memory for use in prompts."""
        news_str = "\n".join([f"- {item.date}: {item.content} (Source: {item.source}, Category: {item.category})" 
                             for item in self.news])
        price_str = "\n".join([f"- {item.date}: {item.asset} = ${item.price}" 
                              for item in self.prices])
        
        return f"## CURRENT SENSORY INPUT\n### Recent News:\n{news_str}\n\n### Current Prices:\n{price_str}"

class ShortTermMemory:
    """
    Short Term Memory: News + features (15 old news) and prices (averages and others)
    """
    def __init__(self):
        self.news: List[NewsItem] = []
        self.prices: Dict[str, List[PriceData]] = {}  # Keyed by asset
        self.max_news = 15
        
    def add_news(self, news_item: NewsItem):
        """Add a news item to short term memory."""
        self.news.append(news_item)
        if len(self.news) > self.max_news:
            self.news.pop(0)  # Remove oldest item
            
    def add_price(self, price_data: PriceData):
        """Add price data to short term memory."""
        if price_data.asset not in self.prices:
            self.prices[price_data.asset] = []
        self.prices[price_data.asset].append(price_data)
        
        # Keep only recent 30 prices per asset
        if len(self.prices[price_data.asset]) > 30:
            self.prices[price_data.asset].pop(0)
            
    def get_price_averages(self) -> Dict[str, float]:
        """Calculate average prices for each asset."""
        averages = {}
        for asset, price_list in self.prices.items():
            if price_list:
                averages[asset] = sum(p.price for p in price_list) / len(price_list)
        return averages
    
    def get_formatted(self) -> str:
        """Return formatted short term memory for use in prompts."""
        news_str = "\n".join([f"- {item.date}: {item.content}" for item in self.news])
        
        # Calculate averages and create price summary
        averages = self.get_price_averages()
        price_summary = "\n".join([f"- {asset}: Average=${avg:.2f}, Latest=${self.prices[asset][-1].price:.2f}"
                                  for asset, avg in averages.items()])
        
        return f"## SHORT TERM MEMORY\n### Recent Historical News (Past few days):\n{news_str}\n\n### Price Trends:\n{price_summary}"

class ProceduralMemory:
    """
    Procedural Memory: Procedures as a prompt on how to analyze and make decisions
    """
    def __init__(self):
        self.procedures: Dict[str, str] = {
            "news_analysis": """
To analyze news items:
1. Identify key entities (companies, markets, sectors)
2. Assess sentiment (positive, negative, neutral)
3. Categorize impact (short-term, long-term, speculative)
4. Cross-reference with other news for confirmation
5. Evaluate source reliability
            """,
            
            "price_analysis": """
To analyze price movements:
1. Calculate percentage changes (daily, weekly)
2. Identify trends (upward, downward, sideways)
3. Compare to historical averages
4. Check for correlation with news events
5. Note unusual volume or volatility
            """,
            
            "decision_making": """
To make recommendation decisions:
1. Assess all available news and price data
2. Consider short-term and long-term implications
3. Evaluate risk factors
4. Form a hypothesis backed by specific data points
5. Assign a confidence level based on data quality and consistency
6. Make a clear recommendation with supporting rationale
            """
        }
        
    def add_procedure(self, name: str, procedure: str):
        """Add a new procedure or update an existing one."""
        self.procedures[name] = procedure
        
    def get_procedure(self, name: str) -> str:
        """Get a specific procedure by name."""
        return self.procedures.get(name, "Procedure not found.")
    
    def get_formatted(self) -> str:
        """Return all procedures formatted for use in prompts."""
        procedures_str = "\n\n".join([f"### {name.upper()}:\n{proc}" 
                                     for name, proc in self.procedures.items()])
        
        return f"## PROCEDURAL MEMORY - HOW TO ANALYZE AND DECIDE\n{procedures_str}"

class LongTermMemory:
    """
    Long Term Memory: Facts that the agent has learned
    """
    def __init__(self):
        self.facts: List[Fact] = []
        
    def add_fact(self, fact: Fact):
        """Add a new fact to long term memory."""
        self.facts.append(fact)
        
    def get_facts_by_category(self, category: str) -> List[Fact]:
        """Retrieve facts by category."""
        return [f for f in self.facts if f.category == category]
    
    def get_formatted(self, max_facts: int = 15) -> str:
        """Return formatted facts for use in prompts, limited to most relevant."""
        # In a real system, you would implement relevance ranking
        # For simplicity, we'll just take the most recent facts
        recent_facts = sorted(self.facts, key=lambda x: x.timestamp, reverse=True)[:max_facts]
        
        facts_str = "\n".join([f"- {fact.fact} (Confidence: {fact.confidence}, Source: {fact.source})" 
                              for fact in recent_facts])
        
        return f"## LONG TERM MEMORY - ESTABLISHED FACTS\n{facts_str}"

class AutobiographicalMemory:
    """
    Autobiographical Memory: Logs the agent's own decision history and outcomes
    """
    def __init__(self):
        self.decisions: List[Decision] = []
        
    def add_decision(self, decision: Decision):
        """Add a new decision to autobiographical memory."""
        self.decisions.append(decision)
        
    def update_outcome(self, decision_id: str, outcome: str, reward: float):
        """Update a decision with its outcome and reward."""
        for decision in self.decisions:
            if decision.decision_id == decision_id:
                decision.outcome = outcome
                decision.reward = reward
                break
                
    def get_formatted(self, max_decisions: int = 10) -> str:
        """Return formatted decision history for use in prompts."""
        # Get the most recent decisions
        recent_decisions = sorted(self.decisions, key=lambda x: x.timestamp, reverse=True)[:max_decisions]
        
        decisions_str = "\n".join([
            f"- Decision {d.decision_id}: {d.recommendation} (Confidence: {d.confidence})\n  "
            f"Reasoning: {d.reasoning}\n  "
            f"Outcome: {d.outcome or 'Pending'}, Reward: {d.reward or 'N/A'}"
            for d in recent_decisions
        ])
        
        return f"## AUTOBIOGRAPHICAL MEMORY - PREVIOUS DECISIONS AND OUTCOMES\n{decisions_str}"

class WorkingMemory:
    """
    Working Memory: ReAct framework for reasoning and acting
    """
    def __init__(self):
        self.thought_process: List[str] = []
        self.max_thoughts = 5  # Only keep recent thought steps
        
    def add_thought(self, thought: str):
        """Add a thought to the reasoning process."""
        self.thought_process.append(thought)
        if len(self.thought_process) > self.max_thoughts:
            self.thought_process.pop(0)
            
    def clear(self):
        """Clear working memory for a new reasoning session."""
        self.thought_process = []
        
    def get_formatted(self) -> str:
        """Return formatted working memory for use in prompts."""
        thoughts_str = "\n".join([f"{i+1}. {thought}" for i, thought in enumerate(self.thought_process)])
        
        return f"## WORKING MEMORY - CURRENT REASONING PROCESS\n{thoughts_str}"

class ProspectiveMemory:
    """
    Prospective Memory: Future considerations to keep in mind
    """
    def __init__(self):
        self.considerations: List[str] = []
        
    def add_consideration(self, consideration: str):
        """Add a consideration for future decisions."""
        self.considerations.append(consideration)
        
    def get_formatted(self) -> str:
        """Return formatted prospective memory for use in prompts."""
        if not self.considerations:
            return "## PROSPECTIVE MEMORY - FUTURE CONSIDERATIONS\nNo specific future considerations at this time."
            
        considerations_str = "\n".join([f"- {consideration}" for consideration in self.considerations])
        
        return f"## PROSPECTIVE MEMORY - FUTURE CONSIDERATIONS\n{considerations_str}"

# The main LLM Agent that integrates all memory components
class LLMAgent:
    def __init__(self, model_name="gpt-3.5-turbo"):
        # Initialize memory components
        self.sensory_memory = SensoryMemory()
        self.short_term_memory = ShortTermMemory()
        self.procedural_memory = ProceduralMemory()
        self.long_term_memory = LongTermMemory()
        self.autobiographical_memory = AutobiographicalMemory()
        self.working_memory = WorkingMemory()
        self.prospective_memory = ProspectiveMemory()
        
        # Initialize LLM
        self.llm = ChatOpenAI(model_name=model_name, temperature=0.7)
        
        # Define the agent's system prompt
        self.system_prompt = """You are an advanced AI financial analyst with multiple memory systems.
Your goal is to provide investment recommendations based on news and price data.
As you process information, you will build knowledge and learn from your past decisions.

You should use the different memory systems to inform your analysis and recommendations.
Think step by step and show your reasoning process before making final recommendations.
"""

    def _build_full_prompt(self, query: str) -> str:
        """Build a complete prompt combining all memory components."""
        prompt_parts = [
            self.sensory_memory.get_formatted(),
            self.short_term_memory.get_formatted(),
            self.procedural_memory.get_formatted(),
            self.long_term_memory.get_formatted(),
            self.autobiographical_memory.get_formatted(),
            self.working_memory.get_formatted(),
            self.prospective_memory.get_formatted(),
            f"\n## NEW QUERY\n{query}\n\n## RESPONSE\nLet me think through this step by step:"
        ]
        
        return "\n\n".join(prompt_parts)
    
    def react_step(self, query: str) -> str:
        """
        Implement the ReAct framework for a single step of reasoning.
        """
        # Build the prompt with all memory components
        full_prompt = self._build_full_prompt(query)
        
        # Get response from LLM
        response = self.llm.invoke([HumanMessage(content=full_prompt)])
        
        # Add the reasoning to working memory
        self.working_memory.add_thought(response.content)
        
        return response.content
    
    def make_recommendation(self, query: str) -> Decision:
        """
        Make a recommendation based on all available memory and the current query.
        """
        # Clear working memory for fresh reasoning
        self.working_memory.clear()
        
        # First, use react steps to build reasoning
        self.react_step(f"Analyze the latest news and price data to answer: {query}")
        
        # Final recommendation prompt with specific format request
        recommendation_prompt = f"""
Based on your previous analysis, make a final recommendation regarding: {query}

Your response should be structured as:
RECOMMENDATION: [Clear statement of recommendation]
CONFIDENCE: [Numeric value between 0-1]
REASONING: [Concise summary of key factors that led to this recommendation]
"""
        
        # Get structured recommendation
        full_prompt = self._build_full_prompt(recommendation_prompt)
        recommendation_response = self.llm.invoke([HumanMessage(content=full_prompt)])
        
        # Parse the response to extract recommendation, confidence, and reasoning
        response_text = recommendation_response.content
        
        # Simple parsing - in a production system you'd use more robust methods
        recommendation = ""
        confidence = 0.5  # Default
        reasoning = ""
        
        for line in response_text.split('\n'):
            if line.startswith("RECOMMENDATION:"):
                recommendation = line.replace("RECOMMENDATION:", "").strip()
            elif line.startswith("CONFIDENCE:"):
                confidence_str = line.replace("CONFIDENCE:", "").strip()
                try:
                    confidence = float(confidence_str)
                except ValueError:
                    # Handle text confidence levels
                    if "high" in confidence_str.lower():
                        confidence = 0.8
                    elif "medium" in confidence_str.lower():
                        confidence = 0.5
                    elif "low" in confidence_str.lower():
                        confidence = 0.3
            elif line.startswith("REASONING:"):
                reasoning = line.replace("REASONING:", "").strip()
        
        # Create a decision object
        decision = Decision(
            recommendation=recommendation,
            confidence=confidence,
            reasoning=reasoning
        )
        
        # Add to autobiographical memory
        self.autobiographical_memory.add_decision(decision)
        
        return decision
    
    def process_feedback(self, decision_id: str, outcome: str, reward: float):
        """
        Process feedback on a past decision and update memories accordingly.
        """
        # Update the decision in autobiographical memory
        self.autobiographical_memory.update_outcome(decision_id, outcome, reward)
        
        # Use the feedback to potentially learn new facts
        feedback_prompt = f"""
I received feedback on my recommendation (ID: {decision_id}):
Outcome: {outcome}
Reward: {reward}

Based on this feedback, what's one important fact I should remember for future decisions?
Format your response as:
NEW FACT: [concise statement of a fact to remember]
CATEGORY: [category for this fact]
CONFIDENCE: [numeric value between 0-1]
"""
        
        full_prompt = self._build_full_prompt(feedback_prompt)
        learning_response = self.llm.invoke([HumanMessage(content=full_prompt)])
        
        # Parse the response to extract the new fact
        response_text = learning_response.content
        
        fact_text = ""
        category = "general"
        confidence = 0.5
        
        for line in response_text.split('\n'):
            if line.startswith("NEW FACT:"):
                fact_text = line.replace("NEW FACT:", "").strip()
            elif line.startswith("CATEGORY:"):
                category = line.replace("CATEGORY:", "").strip()
            elif line.startswith("CONFIDENCE:"):
                try:
                    confidence = float(line.replace("CONFIDENCE:", "").strip())
                except ValueError:
                    pass
        
        if fact_text:
            # Add new fact to long-term memory
            new_fact = Fact(
                fact=fact_text,
                source=f"Feedback on decision {decision_id}",
                confidence=confidence,
                category=category
            )
            self.long_term_memory.add_fact(new_fact)
            
            # Also add a consideration to prospective memory
            self.prospective_memory.add_consideration(
                f"Consider the outcome of similar situations to decision {decision_id} ({fact_text})"
            )
        
        return learning_response.content
    
    def update_with_news(self, news_items: List[NewsItem]):
        """Update memory with new news items."""
        for item in news_items:
            # Add to sensory memory
            self.sensory_memory.add_news(item)
            
            # Add to short-term memory
            self.short_term_memory.add_news(item)
    
    def update_with_prices(self, price_data: List[PriceData]):
        """Update memory with new price data."""
        for item in price_data:
            # Add to sensory memory
            self.sensory_memory.add_price(item)
            
            # Add to short-term memory
            self.short_term_memory.add_price(item)

# Example usage of the agent
def generate_sample_data():
    """Generate sample news and price data for demonstration."""
    news_items = [
        NewsItem(
            content="Federal Reserve hints at potential interest rate cut next month",
            date="2023-09-01",
            source="Financial Times",
            category="Monetary Policy"
        ),
        NewsItem(
            content="Tech stocks rally as investors anticipate lower borrowing costs",
            date="2023-09-02",
            source="Bloomberg",
            category="Markets"
        ),
        NewsItem(
            content="Oil prices drop 3% on weak demand forecasts",
            date="2023-09-03",
            source="Reuters",
            category="Commodities"
        ),
        NewsItem(
            content="Major retailer reports better than expected Q3 earnings",
            date="2023-09-04",
            source="CNBC",
            category="Earnings"
        ),
        NewsItem(
            content="Housing starts decline for third consecutive month",
            date="2023-09-05",
            source="Wall Street Journal",
            category="Economic Data"
        )
    ]
    
    price_data = [
        PriceData(asset="SPY", price=450.23, date="2023-09-01"),
        PriceData(asset="SPY", price=452.78, date="2023-09-02"),
        PriceData(asset="SPY", price=453.15, date="2023-09-03"),
        PriceData(asset="SPY", price=451.89, date="2023-09-04"),
        PriceData(asset="SPY", price=454.20, date="2023-09-05"),
        PriceData(asset="QQQ", price=370.45, date="2023-09-01"),
        PriceData(asset="QQQ", price=375.23, date="2023-09-02"),
        PriceData(asset="QQQ", price=378.67, date="2023-09-03"),
        PriceData(asset="QQQ", price=380.12, date="2023-09-04"),
        PriceData(asset="QQQ", price=382.98, date="2023-09-05"),
        PriceData(asset="XLE", price=88.23, date="2023-09-01"),
        PriceData(asset="XLE", price=87.45, date="2023-09-02"),
        PriceData(asset="XLE", price=85.67, date="2023-09-03"),
        PriceData(asset="XLE", price=84.32, date="2023-09-04"),
        PriceData(asset="XLE", price=83.78, date="2023-09-05")
    ]
    
    return news_items, price_data

# Add some sample facts to long-term memory
def populate_long_term_memory(agent: LLMAgent):
    """Add some sample facts to the agent's long-term memory."""
    sample_facts = [
        Fact(
            fact="Interest rate cuts typically lead to rallies in growth stocks",
            source="Historical market analysis",
            confidence=0.85,
            category="Monetary Policy"
        ),
        Fact(
            fact="Energy sector tends to underperform in periods of weak global demand",
            source="Sector rotation studies",
            confidence=0.75,
            category="Sectors"
        ),
        Fact(
            fact="Housing data is a leading indicator for consumer discretionary spending",
            source="Economic research",
            confidence=0.7,
            category="Economic Indicators"
        )
    ]
    
    for fact in sample_facts:
        agent.long_term_memory.add_fact(fact)

# Simulate running the agent
def run_simulation():
    """Run a simulation of the agent over multiple days."""
    # Initialize the agent
    agent = LLMAgent()
    
    # Generate sample data
    news_items, price_data = generate_sample_data()
    
    # Add sample facts
    populate_long_term_memory(agent)
    
    # Update with initial data
    agent.update_with_news(news_items)
    agent.update_with_prices(price_data)
    
    # Make initial recommendation
    print("MAKING INITIAL RECOMMENDATION")
    decision = agent.make_recommendation("Should investors increase their allocation to technology stocks given current market conditions?")
    
    print(f"\nRECOMMENDATION: {decision.recommendation}")
    print(f"CONFIDENCE: {decision.confidence}")
    print(f"REASONING: {decision.reasoning}")
    print(f"DECISION ID: {decision.decision_id}")
    
    # Simulate feedback
    print("\n\nPROCESSING FEEDBACK")
    learning_output = agent.process_feedback(
        decision_id=decision.decision_id,
        outcome="The recommendation was correct. Technology stocks rose 2.5% in the following week.",
        reward=0.8
    )
    
    print(f"\nLEARNING OUTPUT: {learning_output}")
    
    # Add new data (simulating next day)
    print("\n\nUPDATING WITH NEW DATA")
    new_news = [
        NewsItem(
            content="Inflation data comes in lower than expected",
            date="2023-09-06",
            source="Bloomberg",
            category="Economic Data"
        ),
        NewsItem(
            content="Major tech company announces new AI initiative",
            date="2023-09-06",
            source="TechCrunch",
            category="Technology"
        )
    ]
    
    new_prices = [
        PriceData(asset="SPY", price=456.75, date="2023-09-06"),
        PriceData(asset="QQQ", price=385.43, date="2023-09-06"),
        PriceData(asset="XLE", price=82.56, date="2023-09-06")
    ]
    
    agent.update_with_news(new_news)
    agent.update_with_prices(new_prices)
    
    # Make a new recommendation
    print("\nMAKING UPDATED RECOMMENDATION")
    new_decision = agent.make_recommendation("Given the latest inflation data and tech announcement, should investors further increase their allocation to growth stocks?")
    
    print(f"\nRECOMMENDATION: {new_decision.recommendation}")
    print(f"CONFIDENCE: {new_decision.confidence}")
    print(f"REASONING: {new_decision.reasoning}")
    print(f"DECISION ID: {new_decision.decision_id}")
    
    # Display memory contents for demonstration
    print("\n\nMEMORY CONTENTS:")
    print("\nSENSORY MEMORY:")
    print(agent.sensory_memory.get_formatted())
    
    print("\nSHORT TERM MEMORY:")
    print(agent.short_term_memory.get_formatted())
    
    print("\nLONG TERM MEMORY:")
    print(agent.long_term_memory.get_formatted())
    
    print("\nAUTOBIOGRAPHICAL MEMORY:")
    print(agent.autobiographical_memory.get_formatted())
    
    print("\nPROSPECTIVE MEMORY:")
    print(agent.prospective_memory.get_formatted())
    
    return agent

# Run simulation if this script is executed directly
if __name__ == "__main__":
    agent = run_simulation()
    
    # Visualize the agent's decision history
    decisions = agent.autobiographical_memory.decisions
    if decisions:
        plt.figure(figsize=(10, 6))
        decision_ids = [d.decision_id[-6:] for d in decisions]  # Use shortened IDs for display
        confidence_values = [d.confidence for d in decisions]
        rewards = [d.reward if d.reward is not None else 0 for d in decisions]
        
        bar_width = 0.35
        index = range(len(decisions))
        
        plt.bar([i - bar_width/2 for i in index], confidence_values, bar_width, label='Confidence')
        plt.bar([i + bar_width/2 for i in index], rewards, bar_width, label='Reward')
        
        plt.xlabel('Decisions')
        plt.ylabel('Value')
        plt.title('Agent Decision Confidence vs Rewards')
        plt.xticks(index, decision_ids)
        plt.legend()
        
        plt.tight_layout()
        plt.show()