# Code 7.2 - Multi-Agent System for Market Intelligence
### Introduction

This notebook demonstrates the creation of a sophisticated multi-agent system designed to automate market intelligence gathering. The system features a **Manager Agent** that orchestrates a team of specialist agents to collect, analyze, and synthesize data into a professional intelligence briefing.

This lab is designed for a professional audience and showcases key principles of modern AI development:

* **Agent-Based Architecture:** Decomposing a complex task into smaller, manageable responsibilities handled by specialized agents.
* **Hybrid Execution:** Managing a mix of parallel and sequential tasks to handle dependencies efficiently.
* **AI-Powered Synthesis:** Using an LLM to derive qualitative insights from raw data.
* **Robustness:** Incorporating conditional logic to function with or without a live API key and handling potential failures gracefully.

**How to Use:** Run each cell sequentially from top to bottom. The notebook will prompt you for an OpenAI API key but is fully functional without one, using mock data as a fallback. The final, enriched report will be displayed and saved to a dynamically named markdown file in the `OUTPUT` directory.

## 1. Setup and Configuration
This first block handles all necessary setup. It imports the required libraries and includes a custom exception class for cleaner error handling. The most important part of this section is the interactive prompt that asks for an OpenAI API key. This allows the notebook to be flexible and accessible to all users.

In [1]:
import os
import json
import getpass
import concurrent.futures
import time
from datetime import datetime
from typing import Dict, List, Any, Optional

# --- Custom Exception ---
class AgentError(Exception):
    """Custom exception for agent-related failures."""
    pass

def setup_environment():
    """Handles user input for API key configuration."""
    openai_key = None
    use_openai = input("Do you have an OpenAI API key to use for this lab? (yes/no): ").strip().lower()

    if use_openai in ['yes', 'y']:
        try:
            openai_key = getpass.getpass("Please enter your OpenAI API Key: ")
            print("Status: OpenAI API key received.")
        except Exception as e:
            print(f"Warning: Could not read key ({e}). Falling back to mock data.")
    else:
        print("Status: No OpenAI key provided. The LLM Analyst will use mock data.")
    return openai_key

# Run the setup function to get the API key from the user.
openai_key = setup_environment()


Do you have an OpenAI API key to use for this lab? (yes/no):  no


Status: No OpenAI key provided. The LLM Analyst will use mock data.


## 2. Simulated Data Sources
In a real-world application, agents would query live databases and external APIs. To ensure this lab is self-contained and reproducible, we simulate these sources with Python dictionaries. This includes a fallback database for the LLM agent in case an API key is not provided.

In [2]:
# --- Data Sources (Simulated) ---
mock_news_db = {
    "InnovateCorp": {"status": "ok", "data": ["InnovateCorp launches groundbreaking new AI product.", "Company announces record R&D spending."]},
    "GlobalConnect": {"status": "ok", "data": ["GlobalConnect expands operations into South America.", "Stock prices soar after positive quarterly earnings report."]}
}
mock_financial_db = {
    "InnovateCorp": {"status": "ok", "data": {"stock_price": 150.75, "change_pct": -5.2}},
    "GlobalConnect": {"status": "ok", "data": {"stock_price": 88.20, "change_pct": 7.8}}
}
mock_sentiment_db = {
    "InnovateCorp": {"status": "ok", "data": {"sentiment_score": 0.85}},
    "GlobalConnect": {"status": "ok", "data": {"sentiment_score": 0.92}}
}
mock_llm_analysis_db = {
    "InnovateCorp": {
        "theme": "The company's core strategy appears to be aggressive technological innovation, focusing on capturing the AI market.",
        "risk": "High R&D spending could negatively impact short-term profitability if the new AI product fails to gain market traction quickly."
    },
    "GlobalConnect": {
        "theme": "The company is focused on a strategy of aggressive global expansion and market penetration in emerging economies.",
        "risk": "Expansion into new regions carries significant logistical and regulatory risks, which could impact operational stability."
    }
}

## 3. Specialist Agent Definitions
The foundation of our system is the specialist agent team. Each agent is a distinct function with a single responsibility, designed to return data in a consistent format (Dict). This modular design makes the system easy to test, maintain, and extend. The LLMAnalystAgent is special, as it contains conditional logic to operate in one of two modes: live API calls or mock data fallback.

In [3]:
# --- Specialist Agent Definitions ---

def NewsAgent(company_name: str) -> Dict[str, Any]:
    """Fetches top news headlines from a data source."""
    print("[NewsAgent] Fetching headlines...")
    time.sleep(0.5)
    response = mock_news_db.get(company_name)
    if response and response["status"] == "ok":
        return {"source": "NewsAgent", "status": "success", "data": response["data"]}
    return {"source": "NewsAgent", "status": "error", "data": "No news found."}

def FinancialAgent(company_name: str) -> Dict[str, Any]:
    """Fetches financial data from a data source."""
    print("[FinancialAgent] Fetching financial data...")
    time.sleep(0.5)
    response = mock_financial_db.get(company_name)
    if response and response["status"] == "ok":
        return {"source": "FinancialAgent", "status": "success", "data": response["data"]}
    return {"source": "FinancialAgent", "status": "error", "data": "Financial data not available."}

def SentimentAgent(company_name: str) -> Dict[str, Any]:
    """Fetches a sentiment score from a data source."""
    print("[SentimentAgent] Fetching sentiment score...")
    time.sleep(0.5)
    response = mock_sentiment_db.get(company_name)
    if response and response["status"] == "ok":
        return {"source": "SentimentAgent", "status": "success", "data": response["data"]}
    return {"source": "SentimentAgent", "status": "error", "data": "Sentiment score not available."}

def LLMAnalystAgent(headlines: List[str], company_name: str, api_key: Optional[str]) -> Dict[str, Any]:
    """
    Analyzes news headlines for strategic insights.
    Uses the OpenAI API if a key is provided, otherwise falls back to mock data.
    """
    if api_key:
        print("[LLMAnalystAgent] Sending headlines to OpenAI for analysis...")
        client = openai.OpenAI(api_key=api_key)
        prompt = (
            f"Based on the following news headlines about '{company_name}', perform a brief strategic analysis.\n"
            f"Headlines: {headlines}\n\n"
            "Provide your output *only* as a JSON object with two keys:\n"
            '1. "theme": A single sentence summarizing the company\'s apparent strategic theme.\n'
            '2. "risk": A single sentence identifying a potential risk implied by the headlines.'
        )
        try:
            response = client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": prompt}],
                response_format={"type": "json_object"},
                temperature=0.7,
                max_tokens=150
            )
            analysis = json.loads(response.choices[0].message.content)
            return {"source": "LLMAnalystAgent (Live OpenAI API)", "status": "success", "data": analysis}
        except Exception as e:
            return {"source": "LLMAnalystAgent (Live OpenAI API)", "status": "error", "data": f"API call failed: {e}"}
    else:
        print("[LLMAnalystAgent] Generating mock analysis (no API key)...")
        time.sleep(1.0)
        analysis = mock_llm_analysis_db.get(company_name)
        if analysis:
            return {"source": "LLMAnalystAgent (Mock Data)", "status": "success", "data": analysis}
        return {"source": "LLMAnalystAgent (Mock Data)", "status": "error", "data": "No mock analysis found."}

## 4. The Manager Agent
The ManagerAgent is the brain of the operation. It is implemented as a class to cleanly manage the orchestration process. Its key responsibilities are:

Orchestration (_run_agent_team): Executes the specialist agents using a hybrid parallel/sequential model to efficiently handle data dependencies.

Synthesis (_synthesize_report): Aggregates the data from all agents and synthesizes it into a rich, analytical markdown report. It also calculates a "Conflict Score" to quantify any discrepancy between market sentiment and financial performance.

Output (_save_report): Saves the final report to a dynamically named markdown file for easy distribution and archiving.

In [4]:
# --- Manager Agent ---

class ManagerAgent:
    """
    Orchestrates an agent team, synthesizes an enriched report, and saves it.
    """
    def __init__(self, company_name: str, api_token: Optional[str]):
        if not company_name:
            raise ValueError("Company name cannot be empty.")
        self.company_name = company_name
        self.api_token = api_token

    def _run_agent_team(self) -> Dict[str, Any]:
        """
        Executes agents, handling dependencies between them.
        Financial and Sentiment agents run in parallel. The News agent's output
        is required before the LLM Analyst agent can run.
        """
        results = {}
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_financials = executor.submit(FinancialAgent, self.company_name)
            future_sentiment = executor.submit(SentimentAgent, self.company_name)

            news_result = NewsAgent(self.company_name)
            results["news"] = news_result
            
            if news_result["status"] == "success":
                llm_result = LLMAnalystAgent(news_result["data"], self.company_name, self.api_token)
                results["llm_analysis"] = llm_result
            else:
                results["llm_analysis"] = {"source": "LLMAnalystAgent", "status": "error", "data": "Skipped due to NewsAgent failure."}

            results["financials"] = future_financials.result()
            results["sentiment"] = future_sentiment.result()
        return results

    def _synthesize_report(self, results: Dict[str, Any]) -> str:
        """Synthesizes a detailed markdown report from agent results."""
        for agent_name, result in results.items():
            if result["status"] == "error":
                raise AgentError(f"The '{result['source']}' failed: \"{result['data']}\"")

        news_data = results["news"]["data"]
        financial_data = results["financials"]["data"]
        sentiment_data = results["sentiment"]["data"]
        llm_analysis = results["llm_analysis"]["data"]
        llm_source = results["llm_analysis"]["source"]
        
        stock_change = financial_data['change_pct']
        sentiment_score = sentiment_data['sentiment_score']
        
        # A simple score to quantify the difference between sentiment and financial performance.
        # A higher score indicates a larger discrepancy.
        conflict_score = abs(sentiment_score - (stock_change / 10))

        report_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        report = f"# Intelligence Briefing: {self.company_name}\n"
        report += f"**Report Generated:** {report_date}\n"
        report += f"**Analysis Engine:** {llm_source}\n\n"
        report += "---\n\n"
        report += "## Strategic Summary\n"
        report += f"**Strategic Imperative:** {llm_analysis.get('theme', 'Not available.')}\n\n"
        report += f"**Identified Risk:** {llm_analysis.get('risk', 'Not available.')}\n\n"
        report += "## Key Data Points\n"
        report += f"- **Recent Headlines:**\n"
        for headline in news_data:
            report += f"  - \"{headline}\"\n"
        report += f"- **Public Sentiment Score:** {sentiment_score:.2f} (from -1.0 to 1.0)\n"
        report += f"- **Current Stock Price:** ${financial_data['stock_price']:,.2f}\n"
        report += f"- **24-Hour Stock Change:** {stock_change}%\n\n"
        report += "---\n\n"
        report += "## Manager's Synthesis\n"
        
        if conflict_score > 0.5:
            report += (
                f"**Conflict Detected (Score: {conflict_score:.2f}):** A significant discrepancy exists between public perception and market reaction. "
                "While news and sentiment are positive, the stock has underperformed. This suggests the market is pricing in the identified strategic risks "
                "more heavily than the positive developments, or is reacting to broader unobserved factors."
            )
        else:
            report += (
                f"**Alignment Confirmed (Score: {conflict_score:.2f}):** Public perception and market performance are well-aligned. "
                "Positive sentiment is supported by strong stock performance, indicating high market confidence in the company's current strategy."
            )
        return report

    def _save_report(self, report_content: str, output_dir: str):
        """Saves the report to a dynamically named markdown file."""
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        file_name = f"{self.company_name}_Report_{timestamp}.md"
        file_path = os.path.join(output_dir, file_name)
        
        try:
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(report_content)
            print(f"\nStatus: Report successfully saved to '{file_path}'")
        except IOError as e:
            print(f"\nError: Could not save the report file. Reason: {e}")

    def generate_report(self):
        """Orchestrates the full process from execution to file saving."""
        print(f"\n[ManagerAgent] Assembling agent team for {self.company_name}.")
        start_time = time.time()
        
        try:
            specialist_results = self._run_agent_team()
            print("[ManagerAgent] All specialists have reported.")
            
            final_report = self._synthesize_report(specialist_results)
            self._save_report(final_report, "./OUTPUT")
            
            end_time = time.time()
            print(f"[ManagerAgent] Process completed in {end_time - start_time:.2f} seconds.")
            return final_report
        except AgentError as e:
            print(f"\n[ManagerAgent] CRITICAL FAILURE: {e}")
            return None

## 5. Execution
This is the final step where the system is put into action. We instantiate the ManagerAgent for a target company and call its main generate_report method. The code will run the full orchestration and synthesis process, save the final report to a file, and display a preview of the report directly in the notebook output.

In [5]:
# --- Main Execution ---
def main():
    """Main execution function to run the agent system."""
    # We will run the more interesting "InnovateCorp" case to demonstrate the conflict resolution.
    manager = ManagerAgent("InnovateCorp", openai_key)
    report = manager.generate_report()

    if report:
        print("\n" + "="*60)
        print("                FINAL REPORT PREVIEW")
        print("="*60)
        print(report)
        print("="*60)

if __name__ == "__main__":
    main()


[ManagerAgent] Assembling agent team for InnovateCorp.
[FinancialAgent] Fetching financial data...
[SentimentAgent] Fetching sentiment score...
[NewsAgent] Fetching headlines...
[LLMAnalystAgent] Generating mock analysis (no API key)...
[ManagerAgent] All specialists have reported.

Status: Report successfully saved to './OUTPUT/InnovateCorp_Report_20250923_025938.md'
[ManagerAgent] Process completed in 1.50 seconds.

                FINAL REPORT PREVIEW
# Intelligence Briefing: InnovateCorp
**Report Generated:** 2025-09-23 02:59:38
**Analysis Engine:** LLMAnalystAgent (Mock Data)

---

## Strategic Summary
**Strategic Imperative:** The company's core strategy appears to be aggressive technological innovation, focusing on capturing the AI market.

**Identified Risk:** High R&D spending could negatively impact short-term profitability if the new AI product fails to gain market traction quickly.

## Key Data Points
- **Recent Headlines:**
  - "InnovateCorp launches groundbreaking new AI