# Capstone: Financial Research Copilot

## Step 0: Setup and Configuration
Importing necessary libraries (yfinance, pandas, google-generativeai) and configuring the Google Gemini client using the API key from `.env`.

In [1]:
import os
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from dotenv import load_dotenv
import google.generativeai as genai
import json
from ddgs import DDGS

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# 1. Load environment variables
load_dotenv()

# 2. Get API Key
api_key = os.getenv("GOOGLE_API_KEY")

if not api_key:
    print("Error: GOOGLE_API_KEY not found in .env file.")
else:
    print("API Key loaded successfully.")
    
    # 3. Configure the client
    try:
        genai.configure(api_key=api_key)
        print("Google GenAI Client configured successfully.")
    except Exception as e:
        print(f"Error configuring client: {e}")

API Key loaded successfully.
Google GenAI Client configured successfully.


## Step 1.1: Price History Tool
Defining `get_asset_price_history` to fetch OHLCV data for Stocks and ETFs using `yfinance`. This tool returns structured JSON data that the agent can analyze.


In [3]:
from typing import Dict, Any

def get_asset_price_history(symbol: str, asset_type: str = "equity", period: str = "1mo") -> Dict[str, Any]:
    """
    Fetch historical OHLCV price data for an asset.
    
    Args:
        symbol (str): Ticker symbol (e.g., 'AAPL', 'BTC-USD').
        asset_type (str): Type of asset ('equity', 'crypto', 'etf'). Defaults to 'equity'.
        period (str): Data period to download (e.g., '1mo', '3mo', '1y'). Defaults to '1mo'.
        
    Returns:
        dict: Structured data containing status and price history.
    """
    try:
        # Handle crypto symbols for yfinance (usually needs -USD suffix if not provided)
        if asset_type.lower() == 'crypto' and not symbol.endswith('-USD'):
            symbol = f"{symbol}-USD"
            
        ticker = yf.Ticker(symbol)
        
        # Fetch history
        hist = ticker.history(period=period)
        
        if hist.empty:
            return {
                "status": "error",
                "message": f"No data found for {symbol}. Check ticker or period."
            }
            
        # Format data for the agent (convert to list of dicts)
        # We limit to the last 60 records to keep context small if period is long
        data_records = []
        for date, row in hist.tail(60).iterrows():
            data_records.append({
                "date": date.strftime('%Y-%m-%d'),
                "close": round(row['Close'], 2),
                "volume": int(row['Volume'])
            })
            
        return {
            "status": "success",
            "symbol": symbol,
            "currency": ticker.info.get('currency', 'USD'),
            "data": data_records
        }

    except Exception as e:
        return {"status": "error", "message": str(e)}

## Step 1.2: Fundamentals Tool
Defining `get_asset_fundamentals` to retrieve key financial metrics (P/E, Market Cap, Sector) using `yfinance`. This helps the agent assess value and risk.


In [4]:
def get_asset_fundamentals(symbol: str, asset_type: str = "equity") -> Dict[str, Any]:
    """
    Fetch fundamental data (P/E, Market Cap, Sector, Summary) for an asset.
    
    Args:
        symbol (str): Ticker symbol (e.g., 'AAPL').
        asset_type (str): 'equity' or 'etf'. (Crypto has limited fundamentals in yf).
        
    Returns:
        dict: Key fundamentals or error message.
    """
    try:
        if asset_type.lower() == 'crypto' and not symbol.endswith('-USD'):
             symbol = f"{symbol}-USD"

        ticker = yf.Ticker(symbol)
        info = ticker.info
        
        # Extract only the most relevant fields to save context tokens
        fundamentals = {
            "symbol": symbol,
            "name": info.get("shortName", "Unknown"),
            "sector": info.get("sector", "Unknown"),
            "industry": info.get("industry", "Unknown"),
            "market_cap": info.get("marketCap", "N/A"),
            "trailing_pe": info.get("trailingPE", "N/A"),
            "forward_pe": info.get("forwardPE", "N/A"),
            "dividend_yield": info.get("dividendYield", "N/A"),
            "fifty_two_week_high": info.get("fiftyTwoWeekHigh", "N/A"),
            "fifty_two_week_low": info.get("fiftyTwoWeekLow", "N/A"),
            "summary": info.get("longBusinessSummary", "No summary available")[:300] + "..." # Truncate summary
        }
        
        return {
            "status": "success",
            "data": fundamentals
        }
        
    except Exception as e:
        return {"status": "error", "message": str(e)}


## Step 1.3: Technical Analysis Tool
Defining `calculate_technical_indicators` to compute Volatility (Standard Deviation) and Moving Averages. This gives the agent quantitative data to assess risk.


In [5]:
def calculate_technical_indicators(symbol: str, period: str = "3mo") -> Dict[str, Any]:
    """
    Calculate technical indicators (Volatility, SMA) for a symbol.
    
    Args:
        symbol (str): Ticker symbol.
        period (str): Data lookback period (default '3mo').
        
    Returns:
        dict: Calculated metrics including annualized volatility and current trend.
    """
    try:
        # We reuse our first tool logic to get the raw dataframe usually, 
        # but calling yfinance directly here is cleaner for the standalone tool.
        ticker = yf.Ticker(symbol)
        hist = ticker.history(period=period)
        
        if hist.empty:
             return {"status": "error", "message": "No data for calculations."}
        
        # 1. Calculate Daily Returns
        hist['Returns'] = hist['Close'].pct_change()
        
        # 2. Annualized Volatility (Standard Deviation of returns * sqrt(252 trading days))
        volatility = hist['Returns'].std() * np.sqrt(252)
        
        # 3. Simple Moving Average (50-day) - requiring enough data
        if len(hist) >= 50:
            sma_50 = hist['Close'].rolling(window=50).mean().iloc[-1]
        else:
            sma_50 = "Not enough data"
            
        current_price = hist['Close'].iloc[-1]
        
        # Determine trend
        trend = "Neutral"
        if isinstance(sma_50, (int, float)):
            if current_price > sma_50:
                trend = "Uptrend (Above 50d SMA)"
            else:
                trend = "Downtrend (Below 50d SMA)"
        
        return {
            "status": "success",
            "symbol": symbol,
            "current_price": round(current_price, 2),
            "annualized_volatility": round(volatility * 100, 2), # as percentage
            "sma_50": round(sma_50, 2) if isinstance(sma_50, float) else sma_50,
            "trend": trend
        }

    except Exception as e:
        return {"status": "error", "message": str(e)}


## Step 1.4: Market Opportunity Scanner Tool
Defining `scan_market_opportunities` to filter a watchlist of major assets for potential "Buys". It looks for:
1. **Value**: Low P/E ratio (< 25).
2. **Momentum**: Strong recent performance.
3. **Dip Buys**: Assets down significantly from their 52-week high.


In [6]:
def search_market_trends(query: str) -> Dict[str, Any]:
    """
    Search the web for financial news, trending stocks, or market analysis.
    Use this to find NEW ticker ideas (e.g., "undervalued AI stocks 2025").
    
    Args:
        query (str): Search query (e.g., "best biotech stocks under $50").
        
    Returns:
        dict: Top 5 search results with titles, snippets, and URLs.
    """
    try:
        results = []
        # specific 'news' backend is often good for markets, but 'text' is broader
        with DDGS() as ddgs:
            # Fetch top 5 results
            ddg_results = list(ddgs.text(query, max_results=5))
            
            if not ddg_results:
                return {"status": "error", "message": "No results found."}
            
            for r in ddg_results:
                results.append({
                    "title": r.get('title'),
                    "snippet": r.get('body'), # DDGS uses 'body' for snippet
                    "link": r.get('href')
                })
                
        return {
            "status": "success",
            "query": query,
            "results": results
        }

    except Exception as e:
        return {"status": "error", "message": str(e)}

## Step 2: Initialize Agent with Tools
We define the `tools` list containing our three functions and configure the Gemini Pro model to use them. We then start a chat session (Agent) that knows about these tools.


In [7]:
# 1. Define the Tool List
# We pass the actual function objects to the GenAI SDK. 
# The SDK automatically inspects the docstrings to understand how to use them.
my_tools = [
    get_asset_price_history,
    get_asset_fundamentals,
    calculate_technical_indicators
]

# 2. Initialize the Model with Tools
# We use 'gemini-1.5-flash' or 'gemini-1.5-pro' for best tool performance.
# 'auto' function calling mode is default, meaning the model decides when to use tools.
model = genai.GenerativeModel(
    model_name='gemini-2.5-flash', # Or 'gemini-1.5-pro' if you have access
    tools=my_tools
)

# 3. Create the Agent (Chat Session)
# enable_automatic_function_calling=True means the SDK handles the loop:
# Model asks for tool -> SDK runs tool -> SDK gives answer back to Model -> Model answers user.
agent = model.start_chat(enable_automatic_function_calling=True)



## Step 3: Defining the Financial Advisor Persona
We re-initialize the agent with a **System Instruction**. This tells the model to act as a professional Investment Advisor, mandates that it *must* use tools before answering, and defines risk rules (e.g., "High volatility > 30% requires a warning").


In [8]:
# Define the System Prompt (Persona & Rules)
advisor_instruction = advisor_instruction = """
You are an expert Financial Research Assistant and Investment Advisor.

YOUR GOAL:
Help the user analyze assets and construct portfolios based on data.

RULES:
1. ALWAYS fetch real data using your tools before answering. Do not guess.
2. If a user asks about a stock, get its Fundamentals AND Technicals.
3. RISK ASSESSMENT:
   - If Annualized Volatility is > 30%, label the asset as "High Risk".
   - If Volatility is < 15%, label it as "Stable/Low Risk".
4. RECOMMENDATIONS: Cite specific metrics to support your view.

FORMATTING REQUIREMENTS:
- You MUST present data in two distinct tables:
  
  **Table 1: Fundamentals**
  | Metric | Value |


  **Table 2: Technicals**
  | Metric | Value |

- Make sure the table is well format in a uniform spaced manner with its borders
- Do not leave tables empty. If data is missing, write "N/A".
"""

# Re-initialize model with the system instruction
advisor_model = genai.GenerativeModel(
    model_name='gemini-2.5-flash',
    tools=my_tools,
    system_instruction=advisor_instruction
)

# Start the Advisor Chat
advisor_agent = advisor_model.start_chat(enable_automatic_function_calling=True)

print("Advisor Agent initialized with System Instructions!")


Advisor Agent initialized with System Instructions!


## Step 4: Portfolio Allocation Tool
Defining `allocate_portfolio` to simulate investment execution. This tool takes a list of assets and a budget, fetches real-time prices, and calculates an equal-weighted allocation (share counts and values).


In [9]:
# --- TOOL 1.4: Portfolio Planner (Draft Only) ---
import yfinance as yf
from typing import Dict, Any, List

def allocate_portfolio(tickers: List[str], total_amount: float, weights: Dict[str, float] = None) -> Dict[str, Any]:
    """
    Create a DRAFT portfolio allocation plan. 
    Does NOT save to memory. (Agent must call update_portfolio_memory for that).
    """
    try:
        if not tickers or total_amount <= 0:
            return {"status": "error", "message": "Invalid tickers or amount."}
        
        # Strategy Logic
        strategy_name = "Custom Risk-Adjusted"
        if not weights:
            strategy_name = "Equal Weight"
            w_val = 1.0 / len(tickers)
            weights = {t: w_val for t in tickers}
            
        # Check weights sum
        total_weight = sum(weights.values())
        if not (0.95 <= total_weight <= 1.05):
             return {"status": "error", "message": f"Weights sum to {total_weight}, must be ~1.0"}

        allocation = []
        total_spent = 0.0
        
        # Execution Loop
        for symbol in tickers:
            # Heuristics
            search_symbol = symbol
            if symbol in ["BTC", "ETH"] or (symbol.isupper() and len(symbol) <= 4 and "USD" not in symbol and symbol not in ["AAPL", "MSFT", "TSLA", "NVDA", "GOOG", "AMZN"]): 
                 pass
            
            # Get Price
            ticker_obj = yf.Ticker(search_symbol)
            try:
                price = ticker_obj.fast_info['last_price']
            except:
                hist = ticker_obj.history(period="1d")
                price = hist['Close'].iloc[-1] if not hist.empty else 0.0
            
            if price > 0:
                asset_budget = total_amount * weights.get(symbol, 0.0)
                shares = round(asset_budget / price, 4)
                cost = round(shares * price, 2)
                
                allocation.append({
                    "symbol": symbol,
                    "price": round(price, 2),
                    "shares": shares,
                    "value": cost,
                    "weight_allocated": f"{round((cost / total_amount) * 100, 1)}%"
                })
                total_spent += cost
                
        remaining_cash = round(total_amount - total_spent, 2)
        
        # Return result purely as data (No saving)
        return {
            "status": "success",
            "strategy": strategy_name,
            "total_investment": total_amount,
            "allocation": allocation,
            "total_spent": round(total_spent, 2),
            "remaining_cash": remaining_cash
        }

    except Exception as e:
        return {"status": "error", "message": str(e)}


## Step 5: Agent Memory
Updating the agent memory to maintain a portfolio of stocks and their prices. The memory is stored in a JSON file and is updated every time the agent makes a decision to buy or sell a stock.


In [10]:
MEMORY_FILE = "user_portfolio.json"

def get_portfolio_state() -> Dict[str, Any]:
    """
    Retrieve portfolio AND calculate current real-time value and profit/loss.
    """
    try:
        if not os.path.exists(MEMORY_FILE):
            return {"status": "empty", "message": "No portfolio found. Start fresh."}
            
        with open(MEMORY_FILE, 'r') as f:
            data = json.load(f)
            
        # --- NEW: Calculate Live Value ---
        holdings = data.get('holdings', {})
        cash = data.get('cash', 0.0)
        total_invested_cost = data.get('total_value', 0.0) # This tracks original cost basis
        
        current_holdings_value = 0.0
        asset_details = []
        
        for symbol, shares in holdings.items():
            # Fetch live price
            try:
                ticker = yf.Ticker(symbol)
                # fast_info is most efficient
                price = ticker.fast_info['last_price']
            except:
                price = 0.0
                
            val = shares * price
            current_holdings_value += val
            
            asset_details.append({
                "symbol": symbol,
                "shares": shares,
                "current_price": round(price, 2),
                "current_value": round(val, 2)
            })
            
        total_portfolio_value = current_holdings_value + cash
        
        # Calculate Profit/Loss
        # Note: 'total_value' in JSON was used as 'Total Original Investment' in our logic
        # If it's missing, we assume cost basis ~ current value (approx)
        cost_basis = total_invested_cost if total_invested_cost > 0 else total_portfolio_value
        
        profit_loss = total_portfolio_value - cost_basis
        pct_return = (profit_loss / cost_basis) * 100 if cost_basis > 0 else 0.0
        
        return {
            "status": "success",
            "summary": {
                "total_value": round(total_portfolio_value, 2),
                "cash_available": round(cash, 2),
                "cost_basis": round(cost_basis, 2),
                "total_profit_loss": round(profit_loss, 2),
                "return_pct": f"{round(pct_return, 2)}%"
            },
            "holdings": asset_details
        }

    except Exception as e:
        return {"status": "error", "message": str(e)}




In [11]:
# --- Keep update_portfolio_memory unchanged (it works fine) ---
def update_portfolio_memory(action: str, details: Dict[str, Any]) -> Dict[str, Any]:
    """
    Update portfolio memory. 
    Supports: 'deposit_cash', 'buy_asset', 'batch_update' (Merge), 'set_portfolio' (Overwrite).
    """
    try:
        if os.path.exists(MEMORY_FILE):
            with open(MEMORY_FILE, 'r') as f:
                portfolio = json.load(f)
        else:
            portfolio = {"cash": 0.0, "holdings": {}, "total_value": 0.0}
            
        if action == "deposit_cash":
            amount = float(details.get('amount', 0))
            portfolio['cash'] += amount
            portfolio['total_value'] += amount
            msg = f"Deposited ${amount}."
            
        elif action == "buy_asset":
            # Single asset buy
            sym = details['symbol']
            qty = details['shares']
            cost = details['cost']
            portfolio['cash'] -= cost
            portfolio['holdings'][sym] = portfolio['holdings'].get(sym, 0) + qty
            msg = f"Bought {qty} of {sym}."

        elif action == "batch_update":
            # MERGE new allocation into existing portfolio
            # details should contain: {'allocation': [...], 'total_spent': ...}
            new_items = details.get('allocation', [])
            total_cost = details.get('total_spent', 0)
            
            # Deduct cash
            portfolio['cash'] -= total_cost
            # Update cost basis (we treat this as new investment usage)
            # If cash came from 'deposit', we already added to total_value. 
            # If we are just spending existing cash, total_value doesn't change.
            
            for item in new_items:
                sym = item['symbol']
                qty = item['shares']
                portfolio['holdings'][sym] = portfolio['holdings'].get(sym, 0) + qty
                
            msg = f"Added {len(new_items)} assets to portfolio."
            
        elif action == "set_portfolio":
            # HARD RESET / OVERWRITE
            # Use this only if the user says "Sell everything and buy X"
            portfolio['cash'] = details.get('remaining_cash', 0)
            portfolio['total_value'] = details.get('total_investment', 0)
            new_holdings = {}
            if 'allocation' in details:
                for item in details['allocation']:
                    new_holdings[item['symbol']] = item['shares']
            portfolio['holdings'] = new_holdings
            msg = "Portfolio overwritten."

        with open(MEMORY_FILE, 'w') as f:
            json.dump(portfolio, f, indent=2)
            
        return {"status": "success", "message": msg}

    except Exception as e:
        return {"status": "error", "message": str(e)}


## Step 6: Final Agent Assembly
This is the master cell for the final agent assembly which decides the final tools and system instruction to be used.


In [12]:
# --- ULTIMATE MASTER AGENT (Merged Logic) ---

# 1. Tools (All capabilities)
final_tools = [
    get_asset_price_history,        # Research
    get_asset_fundamentals,         # Research
    calculate_technical_indicators, # Analysis
    allocate_portfolio,             # Planning (Draft Only - No Auto-Save)
    search_market_trends,           # Discovery
    get_portfolio_state,            # Memory Read
    update_portfolio_memory         # Memory Write (Execution)
]

final_instruction = """
You are the "Master Financial Orchestrator" (Powered by Gemini 2.5).

YOUR RESPONSIBILITY:
Manage the user's portfolio with a 3-step lifecycle: Analyze -> Advise -> Execute.

### 1. STATE: ANALYSIS (User asks "How is my portfolio?", "Future outlook?")
- **Action**: Call `get_portfolio_state`.
- **Output**:
  - **üí∞ Portfolio Status**: Current Value vs Cost Basis.
  - **Health Check**: Warn if any holding is High Risk (>40% vol) or Low Growth.
  - **Future Outlook**: Bullish/Bearish based on `calculate_technical_indicators`.

### 2. STATE: ADVISORY (User asks "Invest $X", "Suggest stocks", "Rebalance")
- **Action 1 (Discovery)**: Call `search_market_trends` for new ideas if needed.
- **Action 2 (Planning)**: Call `allocate_portfolio` to generate a **DRAFT PLAN**. 
  - *DO NOT SAVE yet.*
- **Output Format (Strictly follow this)**:
  1. **üåç Market Insight**: "Based on trends, I focused on..."
  2. **üìä Core Portfolio Analysis**:
     - Fundamentals Table.
     - Technicals Table.
     - *Brief reasoning.*
  3. **üìã Proposed Allocation** (The Plan):
     - Show the table from `allocate_portfolio`.
  4. **üíé Underrated Stocks to Consider**:
     - List 1-2 high-potential stocks NOT in the plan.
  5. **Glossary**: Define terms.
  6. **Confirmation Ask**: "Does this plan look good? Say 'Yes' to execute."

### 3. STATE: EXECUTION (User says "Yes", "Confirm")
- **Action**: 
  - If adding new investments to existing ones: Call `update_portfolio_memory(action='batch_update', details=...)`.
  - If replacing everything: Call `update_portfolio_memory(action='set_portfolio', details=...)`.
- **Output**: "‚úÖ Done! Added new positions. Total Portfolio Value: $X."

### 4. STATE: REFINEMENT (User says "Change X to Y", "Remove BTC")
- **Action**: Re-run `allocate_portfolio` with new constraints.
- **Output**: Show the **Revised Plan** (using the same Advisory Format above) and ask for confirmation again.

DISCLAIMER: "This is a simulation for educational purposes."
"""

# 3. Initialize
final_model = genai.GenerativeModel(
    model_name='gemini-2.5-flash', 
    tools=final_tools,
    system_instruction=final_instruction
)

analyst_desk = final_model.start_chat(enable_automatic_function_calling=True)

print("Ultimate Master Agent is Ready!")
print("Type 'exit' to stop.\n")

while True:
    user_input = input("üë§ You: ")
    if user_input.lower() in ["exit", "quit"]:
        break
    
    try:
        print(f"User :{user_input}\n")
        print("Processing...")
        response = analyst_desk.send_message(user_input)
        print(f"\nAnalyst:\n{response.text}\n")
    except Exception as e:
        print(f"Error: {e}")


Ultimate Master Agent is Ready!
Type 'exit' to stop.



User :help me invest 23000 in both crypto and stocks

Processing...

Analyst:
This is a simulation for educational purposes.

üåç Market Insight: Based on trends, I focused on a diversified approach across established cryptocurrencies (Bitcoin, Ethereum) and leading technology stocks (Apple, Microsoft, Nvidia), which are poised for continued growth.

üìä Core Portfolio Analysis:

| Asset      | P/E Ratio (Trailing/Forward) | Market Cap          | Sector        | Annualized Volatility (3mo) | 50-Day SMA Trend   |
|------------|------------------------------|---------------------|---------------|-----------------------------|--------------------|
| **BTC-USD**| N/A / N/A                    | $1.83 Trillion      | Unknown       | 31.9%                       | Downtrend          |
| **ETH-USD**| N/A / N/A                    | $366.44 Billion     | Unknown       | 53.8% (High Risk)           | Downtrend          |
| **AAPL**   | 37.33 / 33.56                | $4.14 Trillion      | Technol