## https://github.com/amarchini5339/AAI-520-Group1

## /src/RouterMain.py
This is the routing agent that will call all of the other agents.  It will first lookup the company name in an attempt to determine if it is an equity or not.  If it is an equity, it will bull SEC filings and financial data from Yahoo Finance.  I will then merge with the non-equity branch and pull economic data from FRED and news sites.  It will combine all of this data to create a final report and make a final reccomendation. 

## Main Orchestration Engine - RouterMain.py

###  **Core System Controller**

This cell implements the **central workflow coordinator** for our autonomous financial research system. It defines the main flow that:

### **Key Responsibilities:**
- **Orchestrates multiple specialized AI agents** in a coordinated sequence
- **Implements intelligent routing logic** based on security type (Equity vs Non-Equity)
- **Manages the complete research pipeline** from company identification to final recommendation
- **Handles error recovery** and maintains state throughout the analysis process

### **Workflow Pattern Implementation:**
- **ROUTING PATTERN**: Directs analysis to appropriate specialists based on security classification
- **SEQUENTIAL EXECUTION**: Coordinates agents in logical order (SEC → Yahoo → FRED → News → Synthesis)
- **ERROR HANDLING**: Graceful degradation when individual agents encounter issues

### **Agent Coordination:**
The flow intelligently routes analysis through four specialized research agents, collects their findings, and uses advanced AI (GPT-5) to synthesize a comprehensive investment recommendation with clear bull/bear cases and macroeconomic impact analysis.

In [None]:
# src/RouterMain.py
# Main flow router for financial analysis using multiple agents
from crewai.flow.flow import Flow, listen, start, router, or_
from crewai import Agent, Crew, Task
from dotenv import load_dotenv
from litellm import completion

# Import yahoo finance ticker finder
from researchers.tools import yahoo_find_ticker as yft

# Import necessary functions and classes from other researchers
from researchers.SECresearcher import run_sec_filing_agent, _safe_parse_json, _load_openai_client
from researchers.News_Agent_Crew import news_agent_crew
from researchers.FREDresearcher import create_crewai_fred_agent
from researchers.YahooFinanceCrew import run_yahoo_finance_agent

# Import logging for debugging
import logging

# Load environment variables
load_dotenv()

# Define the main financial analysis flow
class FinancialAnalysisFlow(Flow):
    # Use a lightweight model for intermediate steps
    model = "gpt-5-mini"

    # Initialize the flow with tracing enabled
    def __init__(self):
        # Initialize state and logging
        super().__init__(tracing=True)
        logging.basicConfig(level=logging.INFO)
        self.log = logging.getLogger(__name__)
        self.state['debug'] = False  # default debug to False
        self.client = _load_openai_client()  # Initialize OpenAI client

    # Define the start of the flow
    @start()
    def get_ticker(self):
        self.log.info("Starting flow")
        print(f"Flow State ID: {self.state['id']}")

        if 'debug' in self.state:
            # Get debug flag from state, default to False
            self.state['debug'] = self.state.get('debug', False)

        # Get user-provided prompt or fallback to Microsoft
        prompt = self.state.get("prompt", "Microsoft")
        if self.state["debug"]:
            print(f"Prompt: {prompt}")

        try:
            best = yft.yahoo_find_ticker(prompt)
            self.state["best"] = best
            print(best)
            return best
        except Exception as e:
            best = {"symbol": "ERROR", "name": str(e), "exch": "", "exchDisp": "", "type": "", "typeDisp": ""}
            self.state["best"] = best
            return best
    # Define the router for equity check
    @router(get_ticker)
    def check_equity(self):
        type = self.state["best"]["typeDisp"]

        if type == "":
            return "UNKNOWN_BRANCH"
        elif type == "EQUITY":
            return "EQUITY_BRANCH"
        else:
            return "NON_EQUITY_BRANCH"
    
    # Define the handler for unknown security types
    @listen('UNKNOWN_BRANCH')
    def handle_unknown(self):
        symbol = self.state["best"]["symbol"]
        print(f"Unknown branch for {symbol} - cannot proceed with research.")
        self.state["error"] = f"Unknown security type for symbol: {symbol}"
        return 'ERROR_DONE'
    
    # Define the handler for equity securities
    @listen('EQUITY_BRANCH')
    def get_sec_agent(self):
        symbol = self.state["best"]["symbol"]
        print(f"Equity branch for {symbol} - running SEC research agent...")

        try:
            # Run the SEC filing analysis agent
            result = run_sec_filing_agent({"ticker": symbol})
            self.state["sec_result"] = result
            if self.state["debug"]:
                print(self.state["sec_result"])
            print(f"SEC Research completed for {symbol}")
        except Exception as e:
            print(f"Error running SEC agent: {e}")
            self.state["sec_result"] = {}
        return "SEC_DONE"
 
    @listen('get_sec_agent')
    def get_yahoo_agent(self):
        symbol = self.state["best"]["symbol"]
        # Run the Yahoo Finance research agent
        yahoo_result = run_yahoo_finance_agent({"ticker": symbol})
        self.state["yahoo_result"] = yahoo_result
        if self.state["debug"]:
            print(self.state["yahoo_result"])
        print(f"Yahoo branch for {symbol} - proceeding to do yahoo research")
        return 'YAHOO_DONE'
    
    @listen(or_('NON_EQUITY_BRANCH', 'get_yahoo_agent'))
    def get_fred_agent(self):
        symbol = self.state["best"]["symbol"]
        try:
            print(f"FRED branch for {symbol} - running FRED research agent...")
            # Create FRED agent
            fred_agent = create_crewai_fred_agent(Agent)
            # Create a task for economic analysis
            task = Task(
                description=f"Analyze economic indicators from FRED for their impact on {symbol}",
                agent=fred_agent,
                input_payload={"ticker": symbol},
                expected_output="JSON with 1-5 rating and economic analysis")

            # Create and run the crew
            crew = Crew(agents=[fred_agent], tasks=[task])
            self.log.info("Starting economic analysis...")
            result = crew.kickoff()
            
        except Exception as e:
            print(f"Error running FRED agent: {e}")
            self.state["fred_result"] = {}
            return 'ERROR_DONE'

        self.state["fred_result"] = result
        if self.state["debug"]:
            print(self.state["fred_result"])
        print(f"Yahoo done branch for {symbol} - proceeding to do fred research")
        return 'FRED_DONE'
    
    @listen('get_fred_agent')
    def get_news_agent(self):
        symbol = self.state["best"]["symbol"]
        print(f"News branch for {symbol} - proceeding to do news research")
        try:
            # Run the News agent crew
            result = news_agent_crew.kickoff(inputs={"company": symbol})
            self.state["news_result"] = result
            if self.state["debug"]:
                print(self.state["news_result"])
        except Exception as e:
            print(f"Error running News agent: {e}")
            self.state["news_result"] = {}
            return 'ERROR_DONE'
        print(f"NewsAgent Crew completed for {symbol}")
        return 'NEWS_DONE'
    
    @listen('get_news_agent')
    def finalize(self):
        symbol = self.state["best"]["symbol"]
        print(self.state)
        print(f"Complete analysis for {symbol} - finalizing flow")
        try:
            # Final synthesis using advanced model
            response = self.client.chat.completions.create(
            model="gpt-5", # use advanced model for final synthesis
            messages=[
                {"role": "system", "content": "You are a financial analysis expert."},
                {
                    "role": "user",
                    "content": (
                        "Provide a rating from 1 'sell', 2 'underperform', 3 'hold', "
                        "4 'outperform', 5 'strong buy' based on the following context. "
                        f"SEC Report JSON: {self.state['sec_result']}. "
                        f"FRED JSON: {self.state['fred_result']}. "
                        f"News JSON: {self.state['news_result']}. "
                        f"Yahooo Finance JSON: {self.state['yahoo_result']}. "
                        "Respond with easy to read report only like rating and rationale. "
                        "Create a concise rationale for your rating.  List bull and bear cases for investment."
                        "Explain how macroeconomic factors from FRED influenced your rating and how changing interest rates might impact the company's performance."
                    ),
                },
            ],
        )
        except Exception as e:
            print(f"Error during final analysis: {e}")
            self.state["final_result"] = {}
            return 'ERROR_DONE'

        final_result = _safe_parse_json(response.choices[0].message.content)
        self.state["final_result"] = final_result
        print(self.state["final_result"])
        print(f"Final analysis completed for {symbol}")
        return self.state

# --- Run the flow ---
flow = FinancialAnalysisFlow()

flow.plot()  # visualize flow

# Example run using Apple as prompt
result = flow.kickoff(inputs={"prompt": "Apple", "debug": True})


In [10]:
from IPython.display import HTML, display
import markdown

with open("TerminalOutput.md", "r", encoding="utf-8") as f:
    md_content = f.read()

html = markdown.markdown(md_content)
display(HTML(html))

![Alt text](crewflow.png)

## /src/researchers/tools/yahoo_find_ticker.py
File to lookup company name on Yahoo Finance and attempt to find stock ticker

Function created completely with ChatGPT

## Company Ticker Lookup Service - yahoo_find_ticker.py

### 🔍 **Intelligent Company Identification**

This cell implements a **robust ticker symbol resolution service** that converts company names into official stock symbols and security information.

### **Key Features:**

**Dual-API Architecture:**
- **Primary Endpoint**: Uses Yahoo Finance's modern search API for accurate symbol resolution
- **Fallback Endpoint**: Automatically switches to legacy autocomplete API if primary fails
- **Retry Logic**: Implements intelligent retry mechanism with exponential backoff for network resilience

**Advanced Filtering & Normalization:**
- **Security Type Filtering**: Can filter by EQUITY, ETF, or other security types
- **Exchange Filtering**: Supports filtering by specific exchanges (NYSE, NASDAQ, etc.)
- **Data Normalization**: Standardizes output format across different API responses
- **Error Handling**: Comprehensive exception handling with meaningful error messages


In [None]:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, List, Dict

SEARCH_URL = "https://query2.finance.yahoo.com/v1/finance/search"   # preferred
AUTOC_URL  = "https://autoc.finance.yahoo.com/autoc"                # fallback

def _session() -> requests.Session:
    s = requests.Session()
    s.headers.update({
        # Some Yahoo endpoints 404/403 without a UA
        "User-Agent": "Mozilla/5.0 (compatible; ticker-lookup/1.0)",
        "Accept": "application/json,text/javascript,*/*;q=0.1",
    })
    retry = Retry(
        total=3, backoff_factor=0.3,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET"]
    )
    s.mount("https://", HTTPAdapter(max_retries=retry))
    return s

def yahoo_find_ticker(
    company_name: str,
    exchanges: Optional[List[str]] = None,   # e.g. ["NYSE", "NasdaqGS"]
    types: Optional[List[str]] = None,       # e.g. ["EQUITY", "ETF"]
    return_all: bool = False
) -> Optional[Dict]:
    """
    Find a ticker by company name using Yahoo Finance.
    Returns best dict match (default) or a list of matches if return_all=True.
    """
    s = _session()

    # --- Preferred endpoint ---
    try:
        r = s.get(SEARCH_URL, params={"q": company_name, "lang": "en-US", "region": "US"}, timeout=10)
        r.raise_for_status()
        data = r.json()
        quotes = data.get("quotes", []) or []

        # Normalize + filter
        def norm(q):
            return {
                "symbol": q.get("symbol"),
                "name": q.get("shortname") or q.get("longname") or q.get("quoteType"),
                "exch": q.get("exchange"),
                "exchDisp": q.get("exchDisp") or q.get("fullExchangeName"),
                "type": q.get("quoteType"),
                "typeDisp": q.get("quoteType"),
            }

        candidates = [norm(q) for q in quotes if q.get("symbol")]
        if types:
            tset = {t.upper() for t in types}
            candidates = [c for c in candidates if (c["type"] or "").upper() in tset]
        if exchanges:
            eset = set(exchanges)
            candidates = [c for c in candidates if c["exchDisp"] in eset or c["exch"] in eset]

        if candidates:
            return candidates if return_all else candidates[0]
    except requests.HTTPError:
        pass  # fall through to fallback

    # --- Fallback endpoint (older autocomplete) ---
    try:
        r = s.get(AUTOC_URL, params={"query": company_name, "region": 1, "lang": "en"}, timeout=10)
        r.raise_for_status()
        rs = (r.json() or {}).get("ResultSet", {}).get("Result", []) or []
        candidates = [{
            "symbol": x.get("symbol"),
            "name": x.get("name"),
            "exch": x.get("exch"),
            "exchDisp": x.get("exchDisp"),
            "type": x.get("type"),
            "typeDisp": x.get("typeDisp"),
        } for x in rs if x.get("symbol")]

        if types:
            tset = {t.upper() for t in types}
            candidates = [c for c in candidates if (c["typeDisp"] or "").upper() in tset or (c["type"] or "").upper() in tset]
        if exchanges:
            eset = set(exchanges)
            candidates = [c for c in candidates if c["exchDisp"] in eset or c["exch"] in eset]

        if not candidates:
            return [] if return_all else None
        return candidates if return_all else candidates[0]
    except requests.HTTPError as e:
        # Surface the original 404/403 with a friendly hint
        raise RuntimeError(
            f"Yahoo lookup failed ({e.response.status_code}). "
            "Try the newer search API, ensure a real User-Agent, or check your network/proxy."
        ) from e

# --- Example ---
if __name__ == "__main__":
    print(yahoo_find_ticker("Apple"))            # -> {'symbol': 'AAPL', ...}
    print(yahoo_find_ticker("Delta", return_all=True)[:3])

## /src/researchers/SECresearcher.py
Agent file to run tools in sec_tools.py for creating financial assessment based on most recent 10-K/10-Q SEC filing.


In [None]:
import json
import os
from typing import Any, Dict

from crewai import Agent, Crew, Process, Task
from crewai.flow.flow import Flow, and_, listen, start
from dotenv import load_dotenv
from openai import OpenAI

try:
    # Import sec_tools from the tools package if available
    from tools.sec_tools import (
        calc_debt_to_equity,
        calc_positive_netincome,
        calc_profit,
        calc_yoy_rev,
        get_recent_facts,
        get_risks_mna as fetch_risks_mna,
        ticker_to_cik,
    )
except ImportError:
    # Fallback import if tools is not a package
    from researchers.tools.sec_tools import (  # type: ignore
        calc_debt_to_equity,
        calc_positive_netincome,
        calc_profit,
        calc_yoy_rev,
        get_recent_facts,
        get_risks_mna as fetch_risks_mna,
        ticker_to_cik,
    )

# === Helper functions ===
def _load_openai_client() -> OpenAI:
    load_dotenv()
    openai_api_key = os.getenv("OPENAI_API_KEY")
    if not openai_api_key:
        raise RuntimeError("OPENAI_API_KEY environment variable is not set.")
    return OpenAI(api_key=openai_api_key)


def _safe_parse_json(raw_content: str) -> Any:
    try:
        return json.loads(raw_content)
    except json.JSONDecodeError:
        return raw_content

from openai import OpenAI

# === SEC Filing Analysis Flow ===
class SECFilingAnalysis:
    # Initialize with OpenAI client
    def __init__(self):
        self.client = _load_openai_client()
        self.state = {}

    # === Public API ===
    def run(self, ticker: str):
        if not ticker:
            raise ValueError("No ticker provided. Example: run('AAPL')")

        cik = self._get_cik(ticker)
        facts = self._get_facts(cik)
        financial_ratings = self._calc_financial_ratings(facts)
        risk_mna_rating = self._get_risks_mna(cik)
        final_report = self._get_final_report(financial_ratings, risk_mna_rating)

        return {
            "final_result": final_report,
            "financial_ratings": financial_ratings,
            "risk_mna_rating": risk_mna_rating,
        }

    # === Individual steps ===

    # Get CIK from ticker
    def _get_cik(self, ticker: str):
        cik = ticker_to_cik(ticker)
        if not cik:
            raise ValueError(f"Unable to resolve CIK for ticker '{ticker}'.")
        self.state["cik"] = cik
        return cik

    # Get recent facts from CIK
    def _get_facts(self, cik: str):
        facts = get_recent_facts(cik)
        if not facts:
            raise ValueError(f"No recent facts returned for CIK '{cik}'.")
        self.state["facts"] = facts
        return facts

    # Calculate financial ratings
    def _calc_financial_ratings(self, facts):
        financial_ratings = {
            "yoy": calc_yoy_rev(facts),
            "profit": calc_profit(facts),
            "debt": calc_debt_to_equity(facts),
            "income": calc_positive_netincome(facts),
        }
        self.state["financial_ratings"] = financial_ratings
        return financial_ratings

    # Get risk/MNA rating
    def _get_risks_mna(self, cik: str):
        risks, mna = fetch_risks_mna(cik)

        response = self.client.chat.completions.create(
            model="gpt-5",
            messages=[
                {"role": "system", "content": "You are a financial analysis expert."},
                {
                    "role": "user",
                    "content": (
                        "Provide a rating from 1 'sell', 2 'underperform', 3 'hold', "
                        "4 'outperform', 5 'strong buy' for the following based on risk factors: "
                        f"{risks} and management discussion and analysis: {mna}. "
                        "Respond with JSON only like {'rating': 4, 'rationale': 'text'}."
                    ),
                },
            ],
        )

        parsed_rating = _safe_parse_json(response.choices[0].message.content)
        self.state["risk_mna_rating"] = parsed_rating
        return parsed_rating

    # Get final report
    def _get_final_report(self, financial_ratings, risk_mna_rating):
        response = self.client.chat.completions.create(
            model="gpt-5",
            messages=[
                {"role": "system", "content": "You are a financial analysis expert."},
                {
                    "role": "user",
                    "content": (
                        "Provide a rating from 1 'sell', 2 'underperform', 3 'hold', "
                        "4 'outperform', 5 'strong buy' based on the following context. "
                        f"Financial ratings: {financial_ratings}. "
                        f"Risk/MNA rating: {risk_mna_rating}. "
                        "Respond with JSON only like {'rating': 4, 'rationale': 'text'}. "
                        "Give 20% weight for YoY, 20% for profit, 15% for debt, "
                        "15% for income, 30% for risk/mna."
                    ),
                },
            ],
        )

        final_result = _safe_parse_json(response.choices[0].message.content)
        return final_result

# run SEC Filing Analysis as an Agent
def run_sec_filing_agent(inputs: dict):
    ticker = inputs.get("ticker")
    analyzer = SECFilingAnalysis()
    result = analyzer.run(ticker)
    return result

# Define the SEC Filing Agent
sec_filing_agent = Agent(
    role="Flow Runner",
    goal="Run the SEC filing analysis and return raw output",
    name="SEC Filing Agent",
    description="Executes SEC analysis and returns unmodified JSON",
    backstory="This agent runs the SEC filing analysis flow and returns the results as-is.",
    expected_output="Full JSON returned from sec_filing_flow, do not modify.",
    model="gpt-5-mini",
    run_function=run_sec_filing_agent,
)

if __name__ == "__main__":
    print(run_sec_filing_agent({"ticker": "AAPL"}))

## /src/researchers/tools/sec_tools.py

In [None]:
import os
import pandas as pd
from dotenv import load_dotenv
from openai import OpenAI

load_dotenv()

from pathlib import Path
from sec_cik_mapper import StockMapper

from bs4 import BeautifulSoup
import re
import requests
import json
from datetime import datetime

def ticker_to_cik(ticker: str) -> str:
    mapper = StockMapper()
    if ticker.upper() in mapper.ticker_to_cik:
        print(f"Ticker {ticker} CIK={mapper.ticker_to_cik[ticker]}")
        CIK = mapper.ticker_to_cik[ticker]
    else:
        print(f"Ticker {ticker} not found")
    return CIK

def get_recent_facts(CIK: str):
    """
    Fetches the most recent 10-Q or 10-K filing facts from SEC's companyfacts API.
    Returns a list of fact entries (concept, unit, value, end_date).
    """
    # 1. Configuration
    URL = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{CIK.zfill(10)}.json"
    USER_AGENT = "Your Name (your.email@example.com)"  # <-- REQUIRED by SEC

    # 2. Fetch Data
    try:
        headers = {"User-Agent": USER_AGENT}
        response = requests.get(URL, headers=headers)
        response.raise_for_status()
        data = response.json()
    except requests.RequestException as e:
        print(f"[Error] Could not fetch data for CIK {CIK}: {e}")
        return []

    # 3. Identify the most recent 10-Q or 10-K accession number
    latest_filing_accn = None
    latest_filed_date = datetime.min
    latest_form = None

    facts = data.get("facts", {}).get("us-gaap", {})

    for concept in facts.values():
        if "units" not in concept:
            continue
        for unit_facts in concept["units"].values():
            for fact in unit_facts:
                form = fact.get("form", "")
                # Skip other forms not 10-Q or 10-K
                if form not in ("10-Q", "10-K"):
                    continue

                filed_date_str = fact.get("filed")
                if not filed_date_str:
                    continue

                try:
                    filed_date = datetime.strptime(filed_date_str, "%Y-%m-%d")
                except ValueError:
                    continue

                if filed_date > latest_filed_date:
                    latest_filed_date = filed_date
                    latest_filing_accn = fact.get("accn")
                    latest_form = form

    if not latest_filing_accn:
        print(f"[Warning] No 10-Q or 10-K filings found for CIK {CIK}.")
        return []

    print(f"Most recent filing for CIK {CIK}: Form {latest_form}, Accession {latest_filing_accn}, Filed on {latest_filed_date.date()}")

    # 4. Extract all facts from the most recent filing
    most_recent_facts = []
    for concept_name, concept in facts.items():
        if "units" not in concept:
            continue
        for unit_name, unit_facts in concept["units"].items():
            for fact in unit_facts:
                if fact.get("accn") == latest_filing_accn:
                    most_recent_facts.append({
                        "concept": concept_name,
                        "unit": unit_name,
                        "value": fact.get("val"),
                        "end_date": fact.get("end"),
                        "form": fact.get("form"),
                        "accn": fact.get("accn")
                    })
    return most_recent_facts

# Calculate year-over-year revenue growth
def calc_yoy_rev(facts):
    # Filter the data for total revenue
    revenue_data = [item for item in facts if item['concept'].lower()[:7] == 'revenue']

    curr_end = max([item['end_date'] for item in revenue_data])
    prev_end = min([item['end_date'] for item in revenue_data])

    # Sum the quarterly values by year
    revenue_prev = sum(item['value'] for item in revenue_data if item['end_date'] == prev_end)
    revenue_curr = sum(item['value'] for item in revenue_data if item['end_date'] == curr_end)

    # Calculate the YoY growth
    if revenue_prev > 0:
        yoy_growth = ((revenue_curr - revenue_prev) / revenue_prev) * 100
    else:
        yoy_growth = float('inf') if revenue_curr > 0 else 0

    # # Print the results
    # print(f"Total Revenue for 2024: ${revenue_prev:,}")
    # print(f"Total Revenue for 2025: ${revenue_curr:,}")
    # print(f"Year-over-Year Revenue Growth: {yoy_growth:.2f}%")

    # 5 if greater than 15, 1 if less than 5, sliding scale between
    if yoy_growth > 15:
        return_val = 5
    elif yoy_growth < 5:
        return_val = 1
    else:
        return_val = yoy_growth / 5

    return {'yoy_growth': yoy_growth, 'ratintg': return_val, 'description' : "Year over year revenue growth as a percentage"}

# Calculate profit percent
def calc_profit(facts):
    revenue_data = [item for item in facts if item['concept'].lower()[:7] == 'revenue']
    curr_end = max([item['end_date'] for item in revenue_data])
    revenue_curr = sum(item['value'] for item in revenue_data if item['end_date'] == curr_end)

    profit_data = [item for item in facts if 'netincome' in item['concept'].lower()]
    curr_end = max([item['end_date'] for item in profit_data])

    # Sum the quarterly values by year
    net_income = sum(item['value'] for item in profit_data if item['end_date'] == curr_end)

    # print(f"{net_income:,} {revenue_curr:,}")

    net_profit_margin = (net_income / revenue_curr) * 100

    # print(net_profit_margin)

    if net_profit_margin > 10:
        return_val = 5
    elif net_profit_margin < 5:
        return_val = 1
    else:
        return_val = net_profit_margin / 5

    # print(f"{return_val:.2f}")
    return {'net_profit_margin': net_profit_margin, 'rating': return_val, 'description': "Net profit margin as a percentage"}

# Calculate debt to equity ratio
def calc_debt_to_equity(facts):
    total_debt = 0

    for item_name in ['LongTermDebtNoncurrent', 'LongTermDebtCurrent', 'ShortTermBorrowings']:
        debt_data = [item for item in facts if (item_name.lower() in item['concept'].lower())]
        if debt_data:
            curr_end = max([item['end_date'] for item in debt_data])
            debt_data = [item for item in facts if (item_name.lower() in item['concept'].lower()) and (item['end_date'] == curr_end)]

        if len(debt_data) == 1:
            debt_item = debt_data[0]['value']
        else:
            debt_item = 0
        total_debt += debt_item

    # print(total_debt)

    se_data = [item for item in facts if ('StockholdersEquity'.lower() == item['concept'].lower()) and (item['end_date'] == curr_end)]
    curr_end = max([item['end_date'] for item in se_data])
    se_data = [item for item in facts if ('StockholdersEquity'.lower() == item['concept'].lower()) and (item['end_date'] == curr_end)]

    if len(se_data) == 1:
        se = se_data[0]['value']
    else:
        se = 0

    debt_to_equity = total_debt / se

    #print(se)

    if debt_to_equity < 0.5:
        return_val = 5
    elif debt_to_equity > 1:
        return_val = 1
    else:
        return_val = (-8 * debt_to_equity) + 9

    # print(f"D_E={debt_to_equity} {return_val}")
    return {'debt_to_equity': debt_to_equity, 'rating': return_val, 'description': "Debt to equity ratio"}

# Calculate if net income is positive
def calc_positive_netincome(facts):
    for item_name in ['NetIncomeLoss']:
        
        cash_data = [item for item in facts if (item_name.lower() in item['concept'].lower())]

        curr_end = max([item['end_date'] for item in cash_data])

        net_income = sum(item['value'] for item in cash_data if item['end_date'] == curr_end)

        if net_income == None:
            net_income = 0

    if net_income > 0:
        return_val = 5
    elif net_income < 0:
        return_val = 1
    else:
        return_val = 3

    # print(net_income, return_val)
    return {'net_income': net_income, 'rating': return_val, 'description': "Positive net income"}

def get_latest_10k_text_url(cik: str, user_agent: str):
    # Step 1: fetch the submissions JSON
    padded = cik.zfill(10)
    submissions_url = f"https://data.sec.gov/submissions/CIK{padded}.json"
    headers = {"User-Agent": user_agent}
    resp = requests.get(submissions_url, headers=headers)
    resp.raise_for_status()
    data = resp.json()
    
    # Step 2: find the latest 10-K filing metadata
    filings = pd.DataFrame(data['filings']['recent'])
    # filter by form “10-K” (not “10-K/A”) — you might adapt that logic
    tenk_rows = filings[filings['form'] == '10-K']
    if tenk_rows.empty:
        raise ValueError("No 10-K filings found for this CIK")
    latest = tenk_rows.iloc[0]
    accession = latest['accessionNumber']
    primary_doc = latest['primaryDocument']
    
    # Step 3: build paths
    accession_nodash = accession.replace('-', '')
    cik_int = int(cik)  # for path part
    base = f"https://www.sec.gov/Archives/edgar/data/{cik_int}/{accession_nodash}/"
    
    # The HTML or .htm version:
    html_url = base + primary_doc
    
    # The .txt submission file often follows:
    # example: “CIK-yy-nnnnn.txt”
    # we can infer its name — sometimes the name is the same as `primaryDocument` but with `.txt`
    # common pattern: <cik>-<two digit year>-<5 digit seq>.txt
    # The JSON “accessionNumber” is like “0000320193-24-000123”
    # So txt filename might be “0000320193-24-000123.txt”
    txt_filename = f"{accession}.txt"
    txt_url = base + txt_filename
    
    return {
        "html_url": html_url,
        "txt_url": txt_url,
        "accession": accession,
        "primary_document": primary_doc
    }

# Get risks and MNA from most recent 10-K since these are often not listed in 10-Q
def get_risks_mna(CIK: str):
    txt_url = get_latest_10k_text_url(cik=CIK, user_agent="Your Name (your.email@example.com)")["txt_url"]
    headers = {"User-Agent": "Your Name (your.email@example.com)"}

    #print(txt_url)

    # Step 4: Download and extract Risk Factors section
    txt = requests.get(txt_url, headers=headers).text

    soup = BeautifulSoup(txt, "html.parser")
    clean_text = soup.get_text(separator=' ', strip=True)

    # Pattern for locating "Item 1A – Risk Factors" and its next section start
    pattern_risk_start = re.compile(
        r"(?i)item\s*1A\.[^a-zA-Z0-9]{0,10}",
        re.IGNORECASE
    )
    pattern_next_section = re.compile(
        r"(?is)item\s*(1B|2)[^a-zA-Z0-9]{0,10}"
    )

    # Find all matches (returns Match objects)
    matches_1a = list(pattern_risk_start.finditer(clean_text))
    matches_1b2 = list(pattern_next_section.finditer(clean_text))

    match_1a_list = []
    match_1b2_list = []

    if matches_1a:
        for i, match in enumerate(matches_1a, start=1):
            match_1a_list.append(match.start())
    else:
        print("No Item 1A – Risk Factors occurrences found.")

    if matches_1b2:
        for i, match in enumerate(matches_1b2, start=1):
            match_1b2_list.append(match.start())
    else:
        print("No Item 1B or Item 2 occurrences found.")

    # print(match_1a_list)
    # print(match_1b2_list)

    start_pos = match_1a_list[1]

    for item_pos in match_1b2_list:
        if item_pos > start_pos:
            end_pos = item_pos
            break

    # print(start_pos, end_pos)

    risk_text = clean_text[start_pos:end_pos]

    # Pattern for locating "Item 7" and its next section start
    pattern_risk_start = re.compile(
        r"(?i)item\s*7\.[^a-zA-Z0-9]{0,10}"
    )
    pattern_next_section = re.compile(
        r"(?is)item\s*8\.[^a-zA-Z0-9]{0,10}"
    )

    matches_7 = list(pattern_risk_start.finditer(clean_text))
    matches_8 = list(pattern_next_section.finditer(clean_text))

    match_7_list = []
    match_8_list = []

    if matches_7:
        for i, match in enumerate(matches_1a, start=1):
            match_7_list.append(match.start())
    else:
        print("No Item 7 occurrences found.")

    if matches_8:
        for i, match in enumerate(matches_8, start=1):
            match_8_list.append(match.start())
    else:
        print("No Item 8 occurrences found.")

    start_pos = match_7_list[1]

    for item_pos in match_8_list:
        if item_pos > start_pos:
            end_pos = item_pos
            break

    # print(start_pos, end_pos)

    mda_text = clean_text[start_pos:end_pos]
    return risk_text, mda_text

# This function was used in development but is not currently called in the flow, left for debugging
def prompt(risk_text: str, mda_text: str):
    load_dotenv()
    openai_api_key = os.getenv("OPENAI_API_KEY")
    client = OpenAI(api_key=openai_api_key)
    response = client.chat.completions.create(
        model="gpt-5-mini",
        messages=[
            {"role": "system", "content": "You are a financial analysis expert."},
            {"role": "user", "content": f"Provide a rating from 1 'sell', 2 'underperform', 3 'hold', 4 'outperform', 5 'strong buy' for the following based on risk factors: {risk_text} and management discussion and analysis: {mda_text}. Respond with json only like {{'rating': 4, 'rationale': 'text'}}"}
        ]
    )
    return response.choices[0].message.content

# This function was used in development but is not currently called in the flow, left for debugging
def final_rating(risk_rating, yoy_rating, profit_rating, debt_equity_rating, net_income_rating):
    # Set weights for calculation
    weights = {
        'risk': 0.3,
        'yoy': 0.2,
        'profit': 0.2,
        'debt_equity': 0.15,
        'net_income': 0.15
    }
    
    # Calculate final score with weights
    final_score = (
        risk_rating * weights['risk'] +
        yoy_rating * weights['yoy'] +
        profit_rating * weights['profit'] +
        debt_equity_rating * weights['debt_equity'] +
        net_income_rating * weights['net_income']
    )
    
    # Determine recommendation based on final score
    if final_score >= 4:
        recommendation = 'strong buy'
    elif final_score >= 3.5:
        recommendation = 'outperform'
    elif final_score >= 2.5:
        recommendation = 'hold'
    elif final_score >= 1.5:
        recommendation = 'underperform'
    else:
        recommendation = 'sell'
    
    return final_score, recommendation

def main(ticker: str):
    CIK = ticker_to_cik(ticker)
    if CIK is None:
        print(f"Could not find CIK for ticker {ticker}. Exiting.")
        return
    else:
        print(f"Using CIK {CIK} for ticker {ticker}")

    facts = get_recent_facts(CIK)
    yoy_rating = calc_yoy_rev(facts)
    profit_rating = calc_profit(facts)
    debt_equity_rating = calc_debt_to_equity(facts)
    net_income_rating = calc_positive_netincome(facts)

    risk_text, mda_text = get_risks_mna(CIK)
    risk_dict = json.loads(prompt(risk_text, mda_text))

    risk_rating = risk_dict['rating']

    final_score, recommendation = final_rating(
        risk_rating,
        yoy_rating,
        profit_rating,
        debt_equity_rating,
        net_income_rating
    )
    print(f"rating {final_score}, recommendation: {recommendation} rationale: {risk_dict['rationale']}")
    return({'rating': final_score, 'recommendation': recommendation})

if __name__ == "__main__":
    main("AAPL")

## /src/researchers/YahooFinanceCrew.py

In [None]:
# YahooFinanceAgent v2 
import json
import time
from datetime import datetime, timedelta
from dateutil import parser as date_parser

import numpy as np
import pandas as pd
import yfinance as yf

# optional technical indicators library
try:
    from ta.momentum import RSIIndicator
except Exception:
    RSIIndicator = None

# Simple in-memory cache for fetched data
_YF_CACHE = {}
def _cache_get(key):
    v = _YF_CACHE.get(key)
    if v and (time.time() - v["ts"]) < 300:
        return v["value"]
    return None

# set cache value
def _cache_set(key, value):
    _YF_CACHE[key] = {"ts": time.time(), "value": value}

# Yahoo Finance Analysis Agent
class YahooFinanceAgent:
    SOURCE = "YahooFinanceAgent"
    # Initialize agent
    def __init__(self, session_name=None):
        self.session_name = session_name or "default"

    # Fetch price history from Yahoo Finance
    def _fetch_price_history(self, symbol, period_days=365):
        key = f"prices::{symbol}::{period_days}"
        cached = _cache_get(key)
        if cached is not None:
            return cached
        end = datetime.now().date()
        start = end - timedelta(days=period_days + 7)
        # set auto_adjust explicitly to avoid FutureWarning
        df = yf.download(symbol, start=start.isoformat(), end=end.isoformat(),
                         progress=False, threads=False, auto_adjust=True)
        if df is None or df.empty:
            raise ValueError(f"No price data fetched for {symbol}")
        df = df.reset_index().rename(columns={"Date": "date"})
        df["date"] = pd.to_datetime(df["date"])
        _cache_set(key, df)
        return df
    # Fetch ticker object from yfinance
    def _fetch_ticker(self, symbol):
        key = f"ticker::{symbol}"
        cached = _cache_get(key)
        if cached is not None:
            return cached
        t = yf.Ticker(symbol)
        _cache_set(key, t)
        return t

    # Safe attribute getter
    def _safe_get(self, obj, attr, default=None):
        try:
            return getattr(obj, attr, default) if obj is not None else default
        except Exception:
            return default

    # Compute technical indicators from price DataFrame
    def _compute_indicators(self, price_df):
        out = {}
        # Ensure DataFrame and locate Close column whether columns are single-level or multi-level
        df = price_df.copy().set_index("date").sort_index()
        cols = df.columns
        # detect MultiIndex columns like ('Close','AAPL') or single-level 'Close'
        if isinstance(cols, pd.MultiIndex):
            # find first level name 'Close' (case-sensitive)
            close_cols = [c for c in cols if c[0] == "Close"]
            if not close_cols:
                raise ValueError("Price DataFrame missing 'Close' column (multiindex)")
            close_series = df[close_cols[0]]
        else:
            if "Close" not in df.columns:
                # try lowercase fallback
                low = [c for c in df.columns if str(c).lower() == "close"]
                if not low:
                    raise ValueError("Price DataFrame missing 'Close' column")
                close_series = df[low[0]]
            else:
                close_series = df["Close"]
    
        # convert to numeric 1-D array and drop NaNs
        close_vals = pd.to_numeric(close_series, errors="coerce").to_numpy()
        # if close_series came from a DataFrame column (multiindex extraction) it may be 2-D; ensure 1-D
        if close_vals.ndim > 1:
            # if shape (n,1) flatten
            close_vals = close_vals.reshape(-1)
        mask = ~np.isnan(close_vals)
        close_vals = close_vals[mask]
        n = len(close_vals)
        if n == 0:
            raise ValueError("Empty close series after cleaning")
    
        # scalar-safe access using numpy indices
        latest = float(close_vals[-1])
        out["latest_close"] = latest
    
        # first/last dates from df index (after cleaning mask we approximate using original index)
        try:
            out["first_date"] = df.index[0].isoformat()
            out["last_date"] = df.index[-1].isoformat()
        except Exception:
            out["first_date"] = None
            out["last_date"] = None
    
        def pct_by_indices(latest_idx, prior_idx):
            if prior_idx < 0 or latest_idx < 0 or prior_idx >= n or latest_idx >= n:
                return None
            prior = close_vals[prior_idx]
            latest_v = close_vals[latest_idx]
            if prior == 0:
                return None
            return float((latest_v / prior) - 1)
    
        # Calculate returns over various periods
        out["7d_return"] = pct_by_indices(n - 1, n - 8) if n >= 8 else None
        out["30d_return"] = pct_by_indices(n - 1, n - 31) if n >= 31 else None
        out["90d_return"] = pct_by_indices(n - 1, n - 91) if n >= 91 else None
        out["1y_return"] = pct_by_indices(n - 1, 0) if n >= 252 else None
    
        # Use pandas Series built from the cleaned numpy array for rolling ops
        close_series_clean = pd.Series(close_vals)
    
        # Simple Moving Averages and related indicators
        out["sma_20"] = float(close_series_clean.rolling(window=20, min_periods=1).mean().iat[-1])
        out["sma_50"] = float(close_series_clean.rolling(window=50, min_periods=1).mean().iat[-1])
        out["sma_200"] = float(close_series_clean.rolling(window=200, min_periods=1).mean().iat[-1])
        out["price_vs_sma20"] = 1 if out["latest_close"] > out["sma_20"] else -1
        out["volatility_30d"] = float(close_series_clean.pct_change().rolling(window=21, min_periods=1).std().iat[-1])
    
        # max drawdown
        roll_max = close_series_clean.cummax()
        drawdown = (close_series_clean - roll_max) / roll_max
        out["max_drawdown"] = float(drawdown.min())
    
        # RSI 14 (safe fallback if ta not installed)
        try:
            if RSIIndicator is not None:
                rsi = RSIIndicator(close_series_clean, window=14)
                out["rsi_14"] = float(rsi.rsi().iat[-1])
            else:
                delta = close_series_clean.diff().dropna()
                up = delta.where(delta > 0, 0).rolling(14).mean()
                down = -delta.where(delta < 0, 0).rolling(14).mean()
                rs = up / down.replace(0, np.nan)
                last_rs = rs.iat[-1] if len(rs) > 0 else np.nan
                out["rsi_14"] = float(100 - (100 / (1 + last_rs))) if not np.isnan(last_rs) else None
        except Exception:
            out["rsi_14"] = None
    
        return out

    # Fetch fundamental data from ticker object
    def _fetch_fundamentals(self, ticker_obj):
        out = {}
        try:
            info = ticker_obj.info or {}
        except Exception:
            info = {}
        # common fields, may be missing
        out["market_cap"] = info.get("marketCap")
        out["trailing_pe"] = info.get("trailingPE")
        out["forward_pe"] = info.get("forwardPE")
        out["peg_ratio"] = info.get("pegRatio")
        out["beta"] = info.get("beta")
        return out

    # Compute returns around last earnings event
    def _earnings_event_returns(self, ticker_obj, price_df):
        # attempt to fetch last earnings calendar and compute 7d pre/post returns around the last earnings date
        try:
            cal = ticker_obj.calendar
            # calendar may have nextEarningsDate etc; fallback to earnings_dates from history if available
            earnings = ticker_obj.get_earnings_dates(limit=5) if hasattr(ticker_obj, "get_earnings_dates") else None
        except Exception:
            earnings = None
        # fallback: attempt to read earnings from history property
        if earnings is None:
            try:
                eht = ticker_obj.earnings_dates if hasattr(ticker_obj, "earnings_dates") else None
                earnings = eht
            except Exception:
                earnings = None
        # convert to list of datetimes if possible
        event = None
        if isinstance(earnings, (list, tuple)) and len(earnings) > 0:
            # expect list of dicts with 'Earnings Date' or 'startdatetime'
            for e in earnings:
                if isinstance(e, dict) and ("startdatetime" in e or "Earnings Date" in e or "date" in e):
                    try:
                        # try many keys
                        d = e.get("startdatetime") or e.get("Earnings Date") or e.get("date")
                        event = date_parser.parse(d) if isinstance(d, str) else d
                        break
                    except Exception:
                        continue
        # as very last resort, try ticker.calendar nextEarningsDate
        if event is None:
            try:
                cal = ticker_obj.calendar
                if isinstance(cal, pd.DataFrame) and "Earnings Date" in cal.index:
                    event = cal.loc["Earnings Date"].values[0]
            except Exception:
                event = None
        # compute returns if we have event and prices
        if event is None:
            return {"last_earnings_date": None, "pre7_return": None, "post7_return": None}
        event_date = pd.to_datetime(event).date()
        df = price_df.copy().set_index("date").sort_index()
        try:
            # compute pre and post 7-day returns
            pre_start = event_date - timedelta(days=10)
            pre_end = event_date - timedelta(days=1)
            post_start = event_date + timedelta(days=1)
            post_end = event_date + timedelta(days=10)
            pre = df.loc[(df.index.date >= pre_start) & (df.index.date <= pre_end)]["Close"]
            post = df.loc[(df.index.date >= post_start) & (df.index.date <= post_end)]["Close"]
            pre7 = float((pre.iloc[-1] / pre.iloc[0]) - 1) if len(pre) >= 2 else None
            post7 = float((post.iloc[-1] / post.iloc[0]) - 1) if len(post) >= 2 else None
            return {"last_earnings_date": event_date.isoformat(), "pre7_return": pre7, "post7_return": post7}
        except Exception:
            return {"last_earnings_date": event_date.isoformat(), "pre7_return": None, "post7_return": None}

    # Score and confidence calculation
    def _score_and_confidence(self, indicators, fundamentals, earnings_event):
        # Base score 3 neutral
        score = 3.0
        confidence = 0.5
        evidence = []

        # Momentum: 30d and 90d
        r30 = indicators.get("30d_return")
        r90 = indicators.get("90d_return")
        if r30 is not None:
            if r30 > 0.08:
                score += 0.8; confidence += 0.08; evidence.append("strong_30d_momentum")
            elif r30 > 0.02:
                score += 0.35; confidence += 0.04; evidence.append("mild_30d_momentum")
            elif r30 < -0.08:
                score -= 0.9; confidence += 0.07; evidence.append("strong_30d_down")
            elif r30 < -0.02:
                score -= 0.35; confidence += 0.03; evidence.append("mild_30d_down")
        if r90 is not None and r90 > 0.20:
            score += 0.4; confidence += 0.03; evidence.append("90d_strong_up")

        # SMA position
        if indicators.get("price_vs_sma20") == 1:
            score += 0.25; confidence += 0.03; evidence.append("above_sma20")
        else:
            score -= 0.15; confidence += 0.02; evidence.append("below_sma20")

        # Fundamentals
        pe = fundamentals.get("trailing_pe")
        fpe = fundamentals.get("forward_pe")
        peg = fundamentals.get("peg_ratio")
        if pe:
            if pe < 10:
                score += 0.4; confidence += 0.03; evidence.append("cheap_pe")
            elif pe > 60:
                score -= 0.5; confidence += 0.03; evidence.append("high_pe")
        if peg and peg < 1:
            score += 0.25; confidence += 0.02; evidence.append("low_peg")

        # Earnings event behavior
        post = earnings_event.get("post7_return")
        if post is not None:
            if post > 0.05:
                score += 0.4; confidence += 0.04; evidence.append("earnings_post_positive")
            elif post < -0.05:
                score -= 0.6; confidence += 0.05; evidence.append("earnings_post_negative")

        # Volatility impact on confidence
        vol = indicators.get("volatility_30d") or 0.0
        if vol > 0.06:
            confidence -= 0.12; evidence.append("high_volatility")
        elif vol < 0.02:
            confidence += 0.04; evidence.append("low_volatility")

        # RSI extreme adjustments
        rsi = indicators.get("rsi_14")
        if rsi is not None:
            if rsi > 75:
                score -= 0.25; evidence.append("rsi_overbought")
            elif rsi < 25:
                score += 0.25; evidence.append("rsi_oversold")

        # Data availability boosts confidence
        if fundamentals.get("market_cap"):
            confidence += 0.03
        if indicators.get("1y_return") is not None:
            confidence += 0.02

        # clamp and convert
        confidence = max(0.0, min(1.0, confidence))
        score = max(1.0, min(5.0, score))
        rating = int(round(score))
        rating = max(1, min(5, rating))
        return rating, float(confidence), evidence

    def analyze(self, symbol, period_days=365):
        #start_ts = datetime.utcnow()
        start_ts = datetime.now()
        try:
            prices = self._fetch_price_history(symbol, period_days)
        except Exception as e:
            return {
                "symbol": symbol,
                "rating": 3,
                "confidence": 0.12,
                #"timestamp": datetime.utcnow().isoformat() + "Z",
                "timestamp": start_ts.isoformat() + "Z",
                "source": self.SOURCE,
                "context": {"error": f"price_fetch_failed: {str(e)}"}
            }

        # Gather data
        ticker = self._fetch_ticker(symbol)
        indicators = self._compute_indicators(prices)
        fundamentals = self._fetch_fundamentals(ticker)
        earnings_event = self._earnings_event_returns(ticker, prices)
        rating, confidence, evidence = self._score_and_confidence(indicators, fundamentals, earnings_event)

        # Create rationale
        rationale = []
        if evidence:
            rationale.append(" ; ".join(evidence))
        if fundamentals.get("trailing_pe") is not None:
            rationale.append(f"pe={fundamentals.get('trailing_pe')}")
        if earnings_event.get("post7_return") is not None:
            rationale.append(f"earn_post7={earnings_event.get('post7_return'):.3f}")

        # Create context fopr payload
        context = {
            "key_indicators": indicators,
            "fundamentals": fundamentals,
            "earnings_event": earnings_event,
            "rationale": " | ".join(rationale) if rationale else None,
            "fetch_seconds": round((datetime.now() - start_ts).total_seconds(), 2),
            "data_source": "yfinance"
        }

        # Create payload
        payload = {
            "symbol": symbol,
            "rating": rating,
            "confidence": confidence,
            "timestamp": start_ts.isoformat() + "Z",
            "source": self.SOURCE,
            "context": context
        }
        return payload

# run Yahoo Finance Analysis as an Agent
def run_yahoo_finance_agent(inputs: dict) -> dict:
    symbol = inputs.get("ticker")
    agent = YahooFinanceAgent()
    result = agent.analyze(symbol)
    return result

if __name__ == "__main__":  
    test_inputs = {"ticker": "AAPL"}
    output = run_yahoo_finance_agent(test_inputs)
    print(json.dumps(output, indent=2))

## /src/researchers/FREDresearcher.py

In [None]:
from __future__ import annotations
from openai import OpenAI
from dotenv import load_dotenv
import os
import inspect
import json
import logging
from typing import Any, Callable, Dict, Optional, TypedDict

# CrewAI tool base
try:
    from pydantic import BaseModel, Field
    from crewai.tools import BaseTool
except Exception as e:  # pragma: no cover
    raise ImportError(
        "CrewAI and Pydantic are required. Install with: pip install crewai pydantic"
    ) from e

# Import FRED tools
try:
    from tools import fred_tools as fred_agent
except ImportError:
    from researchers.tools import fred_tools as fred_agent

# Setup logging
log = logging.getLogger(__name__)

# load OpenAI client
def _load_openai_client() -> OpenAI:
    load_dotenv()
    openai_api_key = os.getenv("OPENAI_API_KEY")
    if not openai_api_key:
        raise RuntimeError("OPENAI_API_KEY environment variable is not set.")
    return OpenAI(api_key=openai_api_key)

# Define FRED State TypedDict
class FREDState(TypedDict, total=False):
    query: str
    context: Dict[str, Any]
    result: Any

# Define FREDAgentAdapter
class FREDAgentAdapter:
    def __init__(self, module=fred_agent):
        self._module = module
        self._entrypoint = getattr(module, "main")
        self.client = _load_openai_client()

    def run(self, query: str, context=None):
        # Treat query as ticker
        return self._entrypoint(query)

# Create CrewAI FRED Agent
def create_crewai_fred_agent(agent_class):
    """
    Create a CrewAI agent for FRED economic analysis.
    """
    agent = FREDAgentAdapter()
    
    class FREDAnalysisTool(BaseTool):
        name: str = "fred_analysis"
        description: str = "Analyzes economic indicators from FRED to assess their impact on a given stock ticker"

        def _run(self, query: str) -> Dict[str, Any]:
            return agent.run(query)

        async def _arun(self, query: str) -> Dict[str, Any]:
            return self._run(query)

    # Create the agent instance
    return agent_class(
        name="FRED Economic Analyst",
        role="Economic Research Specialist",
        goal="Analyze macroeconomic indicators from FRED to assess their impact on specific stocks",
        backstory="""You are an expert economic analyst with deep knowledge of how 
        macroeconomic indicators affect stock performance. You use FRED data to provide 
        insights about the economic environment's impact on specific companies.""",
        allow_delegation=False,
        tools=[FREDAnalysisTool()]
    )


## /src/researchers/tools/fred_tools.py

In [None]:
"""
fred_tools.py - Core FRED analysis functionality
"""

import os
from datetime import datetime, timedelta
import pandas as pd
from fredapi import Fred
import openai
from typing import Dict, Any, Optional

def get_fred_data(ticker: str, api_key: Optional[str] = None) -> Dict[str, Any]:
    """
    Fetch and analyze economic indicators from FRED that might impact the given ticker.
    
    Args:
        ticker (str): The stock symbol to analyze
        api_key (str, optional): FRED API key. If not provided, will try to get from environment.
        
    Returns:
        dict: Analysis results with rating and context
    """
    # Initialize FRED API
    fred_key = api_key or os.getenv('FRED_API_KEY')
    if not fred_key:
        raise ValueError("FRED API key must be provided or set in FRED_API_KEY environment variable")
        
    fred = Fred(api_key=fred_key)
    
    # Core economic indicators
    indicators = {
        'CPIAUCSL': 'Consumer Price Index',
        'UNRATE': 'Unemployment Rate',
        'FEDFUNDS': 'Federal Funds Rate',
        'GDP': 'Gross Domestic Product',
        'INDPRO': 'Industrial Production Index'
    }
    
    end_date = datetime.now()
    start_date = end_date - timedelta(days=365)
    
    economic_data = {}
    
    try:
        # Fetch economic indicators
        for series_id, description in indicators.items():
            series = fred.get_series(
                series_id,
                observation_start=start_date.strftime('%Y-%m-%d'),
                observation_end=end_date.strftime('%Y-%m-%d')
            )
            
            if not series.empty:
                latest_value = series.iloc[-1]
                previous_value = series.iloc[-2] if len(series) > 1 else None
                pct_change = ((latest_value - previous_value) / previous_value * 100) if previous_value else None
                
                economic_data[series_id] = {
                    'description': description,
                    'latest_value': latest_value,
                    'previous_value': previous_value,
                    'pct_change': pct_change,
                    'last_updated': series.index[-1].strftime('%Y-%m-%d')
                }
        
        # Generate analysis using OpenAI
        return analyze_economic_data(economic_data, ticker)
        
    except Exception as e:
        return {
            "rating": 3,  # Neutral rating on error
            "analysis": f"Error fetching FRED data: {str(e)}",
            "details": {
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }
        }

def analyze_economic_data(economic_data: Dict[str, Any], ticker: str) -> Dict[str, Any]:
    """Generate analysis of economic data using OpenAI."""
    openai_key = os.getenv('OPENAI_API_KEY')
    if not openai_key:
        raise ValueError("OpenAI API key must be set in OPENAI_API_KEY environment variable")
        
    openai.api_key = openai_key
    
    # Prepare economic data for the prompt
    data_points = []
    for series_id, data in economic_data.items():
        data_points.append(
            f"{data['description']}: Current value: {data['latest_value']:.2f}, "
            f"Change: {data['pct_change']:.2f}% (as of {data['last_updated']})"
        )
    
    economic_context = "\n".join(data_points)
    
    prompt = f"""As an economic analyst, analyze how these core economic indicators might impact {ticker} stock:

    Economic Indicators:
    {economic_context}

    Provide two things:
    1. A rating from 1-5 (where 1 is very unfavorable and 5 is very favorable) based on how the current economic environment affects {ticker}'s prospects.
    
    2. A brief analysis explaining the rating and key economic factors affecting {ticker}.

    Format your response as:
    RATING: [number 1-5]
    ANALYSIS: [your brief explanation]"""

    try:
        client = openai.OpenAI()
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": "You are an expert economic analyst providing insights based on FRED economic data."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=300,
            temperature=0.7
        )
        
        response_text = response.choices[0].message.content
        
        # Parse the response
        try:
            rating_line = [line for line in response_text.split('\n') if line.startswith('RATING:')][0]
            analysis_line = [line for line in response_text.split('\n') if line.startswith('ANALYSIS:')][0]
            
            rating = int(rating_line.split(':')[1].strip())
            analysis = analysis_line.split(':')[1].strip()
            
            return {
                "rating": rating,
                "analysis": analysis,
                "details": {
                    "indicators": economic_data,
                    "timestamp": datetime.now().isoformat()
                }
            }
        except (IndexError, ValueError) as e:
            return {
                "rating": 3,
                "analysis": f"Error parsing analysis: {response_text}",
                "details": {
                    "error": str(e),
                    "timestamp": datetime.now().isoformat()
                }
            }
            
    except Exception as e:
        return {
            "rating": 3,
            "analysis": f"Error generating analysis: {str(e)}",
            "details": {
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }
        }

def main(ticker: str) -> Dict[str, Any]:
    """
    Main entry point for FRED analysis
    
    Args:
        ticker (str): Stock ticker to analyze
        
    Returns:
        dict: Analysis results with rating and context
    """
    return get_fred_data(ticker)


## /src/researchers/News_Agent_Crew.py

In [None]:
import os
import json
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process, LLM

try:
    from News_Agent import NewsAgent  
except ImportError:
    from researchers.News_Agent import NewsAgent

# Load environment variables
load_dotenv()

# Initialize LLM (Gemini 2.5 Flash) 
llm = LLM(
    model="gemini/gemini-2.5-flash",
    api_key=os.getenv("G_API_KEY")  # Google Gemini API key
)

# Define function to call your custom NewsAgent
def run_news_agent(company: str):
    """
    Runs the NewsAgent workflow for a given company and returns the result.
    """
    agent = NewsAgent(
        gemini_api_key=os.getenv("G_API_KEY"),
        news_api_key=os.getenv("NEWS_API_KEY")
    )
    result = agent.run(company)
    try:
        return json.loads(result)  # parse JSON string if valid
    except json.JSONDecodeError:
        return {"error": "Invalid JSON returned by NewsAgent", "raw_result": result}

# Define CrewAI Agent 
news_agent_wrapper = Agent(
    role="News Analysis Agent",
    goal="Fetch and analyze recent news for a given company and summarize sentiment.",
    backstory="This agent uses NewsAgent to collect and analyze company-related news articles.",
    llm=llm,  # Attach Gemini as the reasoning model
)



# Define CrewAI Task 
news_agent_task = Task(
    description="Run NewsAgent workflow for {company} and return analysis result.",
    expected_output=""" 
    Required JSON format:
    {
  "company": "{company}",
  "date": "<ISO8601 timestamp of analysis>",
  "sentiment_summary": {
      "overall_sentiment": "<Positive | Neutral | Negative>",
      "positive_articles": <number>,
      "neutral_articles": <number>,
      "negative_articles": <number>,
      "main_topic": ["theme 1", "theme 2", "..."],
      "good_news": ["point 1", "point 2", "..."],
      "bad_news": ["point 1", "point 2", "..."],
      "neutral_news": ["point 1", "point 2", "..."]
  },
  "feedback": "<short summary evaluating completeness and quality of analysis>"
} 

Rules:
1. Always return valid JSON only — no explanations, markdown, or extra text.
2. Always include all keys exactly as shown.
3. Count the number of articles correctly for each sentiment.
4. Summaries (main_topic, good_news, bad_news, feedback) should be concise and informative.
5. Dates must be in ISO8601 format (YYYY-MM-DDTHH:MM:SS).
""",
    agent=news_agent_wrapper,
    function= run_news_agent,
    )


# Define Crew 
news_agent_crew = Crew(
    agents=[news_agent_wrapper],
    tasks=[news_agent_task],
    process=Process.sequential,
)

# Optional test run 
if __name__ == "__main__":
    print(" Running NewsAgent CrewAI pipeline...\n")
    result = news_agent_crew.kickoff(inputs={"company": "Microsoft"})
    print("\n Final Result:")
    # Try to get the JSON safely
    try:
        # Check if CrewAI result is a dict already
        if hasattr(result, "raw"):
            clean_output = result.raw.strip('`').strip()
            if clean_output.startswith("json"):
                clean_output = clean_output[4:].strip()
            parsed = json.loads(clean_output)
        else:
            # Already a dict
            parsed = result
    except json.JSONDecodeError:
        parsed = {"error": "Invalid JSON returned by NewsAgent", "raw_result": getattr(result, "raw", str(result))}

    # Pretty print
    print(json.dumps(parsed, indent=2))


## /src/researchers/News_Agent.py

In [None]:
import os
import requests
from datetime import datetime, timedelta
import google.generativeai as genai
from dotenv import load_dotenv
import json
load_dotenv() 

class NewsAgent:
    def __init__(self, gemini_api_key, news_api_key=None):
        """Initialize agent with memory and APIs."""
        self.news_api_key = news_api_key
        genai.configure(api_key=gemini_api_key)
        self.model = genai.GenerativeModel("gemini-2.5-flash")
        self.memory = {}  # store previous company analyses

    # Planning: decide workflow steps 
    def plan_steps(self, company):
        return ["fetch_news", "analyze_sentiment", "self_reflect", "iterate_if_needed"]

   # Tool: fetch news 
    def fetch_news(self, company, max_articles=10):
        """Fetch recent news for the specified company, filtering only relevant articles."""
        if not self.news_api_key:
            # Dummy fallback if no API key provided
            return [
                f"{company} quarterly earnings positive",
                f"{company} launches new product",
                f"{company} market analysis favorable",
                f"{company} competitor developments",
                f"{company} regulatory updates"
            ][:max_articles]

        company_name = company.lower()
        url = f"https://newsapi.org/v2/everything?q={company_name}&sortBy=publishedAt&apiKey={self.news_api_key}&language=en&pageSize={max_articles*2}"
        try:
            response = requests.get(url)
            data = response.json()
            if data.get("status") != "ok":
                return [f"NewsAPI error: {data.get('message', 'Unknown error')}"]

            articles = data.get("articles", [])
            # Filter articles by company name in title or description
            relevant_articles = [
                article for article in articles
                if company_name in article.get("title", "").lower()
                or company_name in (article.get("description") or "").lower()
            ]

            if not relevant_articles:
                return [f"No recent news articles found for {company}"]

            return [
                f"{article.get('title', 'No title')} ({article.get('source', {}).get('name', 'Unknown source')})"
                for article in relevant_articles[:max_articles]
            ]

        except Exception as e:
            return [f"Error fetching news: {str(e)}"]


    #  Analyze sentiment and summarize 
    def analyze_sentiment(self, news_articles):
        text = "\n".join(news_articles[:10])
        prompt = f"""
You are a financial news sentiment analyzer.
1. Classify each article as Positive, Negative, or Neutral.
2. Extract key risks and opportunities.
3. Summarize overall sentiment for the company.

Articles:
{text}
"""
        response = self.model.generate_content(prompt)
        return response.text
    
    # Reanalyze only new or uncovered information
    def reanalyze_sentiment(self, new_articles, previous_summary):
        text = "\n".join(new_articles[:10])
        prompt = f"""
    You are a financial news sentiment analyzer refining a previous analysis.

    Your goal:
- Identify ONLY truly new insights, opportunities, risks, or sentiment shifts not mentioned before.
- show the article that provides the new insight
- Avoid repeating or restating anything already covered.
- Keep it short, plain, and non-academic (no jargon).

    Previous summary:
    {previous_summary}

    New articles:
    {text}
    """
        response = self.model.generate_content(prompt)
        return response.text

    # Self-reflection: evaluate analysis quality 
    def self_reflect(self, company, analysis):
        prompt = f"""
You are a financial research agent.
Evaluate the following analysis for {company}. 
- Does it cover key risks and opportunities?
- Is the overall sentiment clear?
- Suggest improvements if needed.

Analysis:
{analysis}
"""
        response = self.model.generate_content(prompt)
        feedback = response.text
        return feedback

    # Iteration: refine based on feedback 
    def iterate(self, company, feedback, previous_analysis=None):
        if "improve" in feedback.lower():
            news = self.fetch_news(company, max_articles=15)  # fetch more
            new_analysis = self.reanalyze_sentiment(news, previous_summary=previous_analysis or "")
            return new_analysis
        return None

    # Run the full agent workflow 
    def run(self, company):
        print(f" Planning steps for: {company}")
        steps = self.plan_steps(company)

        # Check memory first
        if company in self.memory:
            print(f"Found previous analysis for {company} in memory. Using it as base.")
            previous_result = self.memory[company]
            previous_news = previous_result.get("articles", [])
            previous_analysis = previous_result.get("refined_analysis", None)
        else:
            previous_news = []
            previous_analysis = None

        news = []
        analysis = previous_analysis
        feedback = None
        refined = None

        for step in steps:
            if step == "fetch_news":
                print("→ Fetching news...")
                news = self.fetch_news(company)
                if previous_news:
                    news = previous_news + news
                print(f"  Found {len(news)} articles:")
                for i, article in enumerate(news, 1):
                    print(f"    Article {i}: {article}\n")

            elif step == "analyze_sentiment":
                print("→ Analyzing sentiment...")
                if not analysis:
                    analysis = self.analyze_sentiment(news)
                print("  Sentiment analysis complete. Details:\n")
                print(analysis)
            elif step == "self_reflect":
                print("→ Self-reflection in progress...")
                feedback = self.self_reflect(company, analysis)
                print("  Self-reflection complete. Feedback:\n")
                print(feedback)

            elif step == "iterate_if_needed":
                print("→ Iterating if needed...")
                refined = self.iterate(company, feedback, previous_analysis=analysis)
                if refined:
                    analysis = refined
                    print("  Refined analysis applied:\n")
                    print(refined)


        # Prepare JSON-like output 
        result = {
            "agent": "NewsAgent",
            "company": company,
            "timestamp": datetime.now().isoformat(),
            "articles": news,
            "sentiment_summary": analysis,
            "feedback": feedback,
            "refined_analysis": refined or analysis
        }

        # Save to memory
        self.memory[company] = result
        print(f" Workflow complete. Memory updated for {company}.")

        # Return as JSON string (for easy integration later)
        return json.dumps(result, indent=2)


if __name__ == "__main__":
    agent = NewsAgent(
        gemini_api_key=os.getenv("G_API_KEY"),
        news_api_key=os.getenv("API_KEY")
    )
    result_json = agent.run("Microsoft")


Coding assistants and other sources like Gemeni and ChatGPT were used to help create the code in this project.