
# Agentic Financial Analysis Notebook

This notebook integrates **news analysis** and **stock market data** using an agentic approach.  
The agent iterates through decision-making steps, ensuring logical execution of financial tasks.

## Agent Workflow:
1. **Fetch relevant financial news** based on a query.
2. **Retrieve historical stock prices** for the related company.
3. **Analyze the correlation between news sentiment and stock performance.**
4. **Decide next steps iteratively based on previous responses.**

### Libraries Used:
- `requests` for API calls
- `pandas` for data manipulation
- `yfinance` for stock data
- `openai` (if needed for LLM-based decision-making)


In [1]:
%pip install python-dotenv -q
%pip install google-generativeai -q

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Access your API key
api_key = os.getenv("GEMINI_API_KEY")

In [3]:
import google.generativeai as genai

# Configure the Gemini API
genai.configure(api_key=api_key)
#client = genai.GenerativeModel("gemini-1.5-pro")
client = genai.GenerativeModel("gemini-2.0-flash")


  from .autonotebook import tqdm as notebook_tqdm


In [4]:

import os
import requests
import pandas as pd
from datetime import datetime, timedelta
import yfinance as yf
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# API key for news retrieval
GNEWS_API_KEY = os.getenv("GNEWS_API_KEY")



In [5]:

def get_news_gnews(query,from_date,to_date):
    # params = param.split(",")
    # query = params[0]
    # from_date = params[1]
    # to_date = params[2]

    """Fetch news from GNews API between given dates."""
    url = f"https://gnews.io/api/v4/search?q={query}&from={from_date}&to={to_date}&token={GNEWS_API_KEY}&max=100"
    headers = {"User-Agent": "Mozilla/5.0"}

    #print(f"Requesting URL: {url}")
    #print(f"Using API Key: {GNEWS_API_KEY}")

    response = requests.get(url, headers=headers)
    #print(f"Response Status Code: {response.status_code}")

    if response.status_code != 200:
        print(f"Error Response: {response.text}")
        return pd.DataFrame()  # Return empty DataFrame on error

    data = response.json()
    articles = data.get("articles", [])

    if not articles:
        print("No articles found in API response.")
        return pd.DataFrame()

    news_df = pd.DataFrame(
        [(article.get('publishedAt', '')[:10], article.get('title', ''), article.get('url', '')) for article in articles],
        columns=['Date', 'Title', 'URL']
    )

    #print(f"Fetched {len(news_df)} news articles.")
    return news_df



In [6]:
#print(get_news_gnews("Apple","2024-03-10","2024-03-28"))

In [7]:

def get_stock_data(ticker,from_date,to_date):
    # params=param.split(",")
    # ticker =params[0]
    # from_date=params[1]
    # to_date=params[2]
    """Fetch historical stock data from Yahoo Finance."""
    stock_df = yf.download(ticker, start=from_date, end=to_date)
    stock_df = stock_df[['Close']].reset_index()
    stock_df['Date'] = stock_df['Date'].dt.strftime('%Y-%m-%d')  # Format date
    return stock_df


In [8]:
def analyze_data(news_df, stock_df):
    """Analyze correlation between news sentiment and stock price movement using Gemini AI."""
    
    # Debugging: Print types to verify the correct data structure
    print("Type of news_df:", type(news_df))
    print("Type of stock_df:", type(stock_df))

    # Ensure inputs are DataFrames
    if not isinstance(news_df, pd.DataFrame) or not isinstance(stock_df, pd.DataFrame):
        return "Error: Inputs should be Pandas DataFrames."

    if news_df.empty or stock_df.empty:
        return "Insufficient data for analysis."

    analysis_prompt = f"""
    Given the following financial news headlines and stock price movements, analyze the potential correlation.
    News: {news_df}
    Stock Prices: {stock_df}
    Provide a summary of how the news might have influenced stock prices.
    """

    response = client.generate_content(analysis_prompt)

    insight = response.text.strip() if response.text else "Analysis could not be generated."
    return insight


In [9]:
import pandas as pd
import json
from io import StringIO

# Store actual DataFrames
stored_data = {
    "news_df": None,
    "stock_df": None
}

def function_caller(func_name, *params):
    function_map = {
        "fetch_news": get_news_gnews,
        "fetch_stock": get_stock_data,
        "analyze_data": analyze_data
    }

    if func_name in function_map:
        if func_name == "analyze_data":
            try:
                # Use stored DataFrames instead of incorrect string values
                news_df = stored_data.get("news_df", pd.DataFrame())
                stock_df = stored_data.get("stock_df", pd.DataFrame())

                if news_df.empty or stock_df.empty:
                    return "Error: One or both data sources are empty or invalid."

                return function_map[func_name](news_df, stock_df)
            
            except (ValueError, json.JSONDecodeError) as e:
                return f"Error: Failed to parse JSON data - {str(e)}"
        
        else:
            result = function_map[func_name](*params)
            if func_name == "fetch_news":
                stored_data["news_df"] = result  # Store news DataFrame
            elif func_name == "fetch_stock":
                stored_data["stock_df"] = result  # Store stock DataFrame
            return result

    return f"Function {func_name} not found"


In [10]:
def financial_agent(stock_name):
    max_iterations = 4
    last_response = None
    iteration = 0
    iteration_response = []

    system_prompt = """You are a financial analysis agent. Respond with EXACTLY ONE of these formats:
    1. FUNCTION_CALL: python_function_name|input
    2. FINAL_ANSWER: [sring]

    where python_function_name is one of the followin:
    1. fetch_news(query,start_date,end_date) and fetch_news returns pandas dataframe by the name news_df
    2. fetch_stock(ticker,start_date,end_date) and fetch_stock returns pandas dataframe by the name stock_df
    3. analyze_data|news_df,stock_df

    Date Rules:
    - `end_date` MUST be today's date in YYYY-MM-DD format.
    - `start_date` MUST be exactly 30 days before `end_date` (YYYY-MM-DD format).
    - Example: If today is 2025-03-28, then:
    - `start_date = 2025-03-28`
    - `end_date = 2025-02-28`


    DO NOT include multiple responses. Give ONE response at a time.


    Execute only one step per iteration and do not skip steps."""


    query = "Analyze the impact of " + str(stock_name) + "'s financial news on its stock price."

    while iteration < max_iterations:
        print(f"\n--- Iteration {iteration + 1} ---")
        
        if last_response is None:
            current_query = query
        else:
            current_query += "\n\n" + " ".join(iteration_response)
            current_query += " What should I do next?"

        # Generate agent's response
        prompt = f"{system_prompt}\n\nQuery: {current_query}"
        response = client.generate_content(contents=prompt)
        response_text = response.text.strip()
        print(f"LLM Response: {response_text}")

        # Execute function call
        if response_text.startswith("FUNCTION_CALL:"):
            _, function_info = response_text.split(":", 1)
            func_name, params = [x.strip() for x in function_info.split("|", 1)]
            param_list = params.split(",")

            if response_text.startswith("FUNCTION_CALL:"):
                response_text = response.text.strip()
                print("response_text",response_text)
                _, function_info = response_text.split(":", 1)
                print("_",_)
                print("function_info",function_info)

                func_name, params = [x.strip() for x in function_info.split("|", 1)]
                print("func_name",func_name)
                print("params",params)

                # Correctly unpack the parameters before calling the function
                iteration_result = function_caller(func_name, *param_list)
                print("iteration_result",iteration_result)

            # Check if it's the final answer
            elif response_text.startswith("FINAL_ANSWER:"):
                print("\n=== Agent Execution Complete ===")
                break

            print(f" *** Result: {iteration_result}")
            last_response = iteration_result
            iteration_response.append(f"Iteration {iteration + 1}: Called {func_name} with {params}, returned {iteration_result}.")

        iteration += 1

In [None]:
import datetime

def financial_agent(stock_name, max_iterations=4):
    """
    Modular financial analysis agent that analyzes stock and news data.
    
    Args:
        stock_name (str): Name of the stock to analyze
        max_iterations (int, optional): Maximum number of iterations. Defaults to 4.
        client (object, optional): LLM client for generating content
        function_caller (function, optional): Function to call external APIs/functions
    
    Returns:
        dict: A dictionary containing analysis results and iteration details
    """
    # Validate input parameters
    if not stock_name:
        raise ValueError("Stock name must be provided")
    
    if client is None or function_caller is None:
        raise ValueError("Client and function_caller must be provided")

    # Prepare system prompt
    system_prompt = """You are a financial analysis agent. Respond with EXACTLY ONE of these formats:
    1. FUNCTION_CALL: python_function_name|input
    2. FINAL_ANSWER: [string]

    where python_function_name is one of the following:
    1. fetch_news(query,start_date,end_date) and fetch_news returns pandas dataframe by the name news_df
    2. fetch_stock(ticker,start_date,end_date) and fetch_stock returns pandas dataframe by the name stock_df
    3. analyze_data|news_df,stock_df

    Date Rules:
    - `end_date` MUST be today's date in YYYY-MM-DD format.
    - `start_date` MUST be exactly 30 days before `end_date` (YYYY-MM-DD format).
    - Example: If today is 2025-03-28, then:
    - `start_date = 2025-03-28`
    - `end_date = 2025-02-28`

    DO NOT include multiple responses. Give ONE response at a time.
    Execute only one step per iteration and do not skip steps."""

    # Initialize variables
    last_response = None
    iteration = 0
    iteration_response = []
    results = {
        'iterations': [],
        'final_answer': None,
        'error': None
    }

    # Prepare initial query
    query = f"Analyze the impact of {stock_name}'s financial news on its stock price."

    # Compute dates
    end_date = datetime.date.today().strftime("%Y-%m-%d")
    start_date = (datetime.date.today() - datetime.timedelta(days=30)).strftime("%Y-%m-%d")

    while iteration < max_iterations:
        try:
            # Prepare current query
            current_query = query if last_response is None else (
                query + "\n\n" + " ".join(iteration_response) + " What should I do next?"
            )

            # Generate agent's response
            prompt = f"{system_prompt}\n\nQuery: {current_query}"
            response = client.generate_content(contents=prompt)
            response_text = response.text.strip()

            # Store iteration details
            iteration_details = {
                'iteration': iteration + 1,
                'response': response_text
            }

            # Process function call
            if response_text.startswith("FUNCTION_CALL:"):
                _, function_info = response_text.split(":", 1)
                func_name, params = [x.strip() for x in function_info.split("|", 1)]
                
                # Add start and end dates to function parameters if not present
                param_list = params.split(",")
                if func_name in ['fetch_news', 'fetch_stock'] and len(param_list) == 1:
                    param_list.extend([start_date, end_date])

                # Call function
                iteration_result = function_caller(func_name, *param_list)
                
                iteration_details['function_call'] = {
                    'name': func_name,
                    'params': param_list,
                    'result': iteration_result
                }

                last_response = iteration_result
                iteration_response.append(
                    f"Iteration {iteration + 1}: Called {func_name} with {param_list}, returned {iteration_result}."
                )

            # Check for final answer
            elif response_text.startswith("FINAL_ANSWER:"):
                final_answer = response_text.split(":", 1)[1].strip()
                results['final_answer'] = final_answer
                iteration_details['final_answer'] = final_answer
                break

            results['iterations'].append(iteration_details)
            iteration += 1

        except Exception as e:
            results['error'] = str(e)
            break

    return results

In [12]:
financial_agent("Apple")

ValueError: Client and function_caller must be provided

In [None]:
#This is initial code written. 

# max_iterations = 3
# last_response = None
# iteration = 0
# iteration_response = []

# news_df = None
# stock_df = None

# system_prompt = """You are a financial analysis agent. 

# Respond with EXACTLY ONE of the following formats per iteration:
# 1. FUNCTION_CALL: fetch_news|query,start_date,end_date
# 2. FUNCTION_CALL: fetch_stock|ticker,start_date,end_date
# 3. FUNCTION_CALL: analyze_data|news_df,stock_df
# 4. FINAL_ANSWER: [insight]

# where:
# - `end_date` is today's date.
# - `start_date` is 30 days before today's date.

# Execute only one step per iteration and do not skip steps."""


# query = "Analyze the impact of Apple's financial news on its stock price."

# while iteration < max_iterations:
#     print(f"\n--- Iteration {iteration + 1} ---")
    
#     if last_response is None:
#         current_query = query
#     else:
#         current_query += "\n\n" + " ".join(iteration_response)
#         current_query += " What should I do next?"

#     # Generate agent's response
#     prompt = f"{system_prompt}\n\nQuery: {current_query}"
#     response = client.generate_content(contents=prompt)

#     response_text = response.text.strip()
#     print(f"LLM Response: {response_text}")
    
#     # Simulating LLM response (Replace with actual API call if needed)
#     # if iteration == 0:
#     #     response_text = f"FUNCTION_CALL: fetch_news|Apple,{start_date_str},{today_str}"
#     # elif iteration == 1:
#     #     response_text = f"FUNCTION_CALL: fetch_stock|AAPL,{start_date_str},{today_str}"
#     # elif iteration == 2:
#     #     response_text = "FUNCTION_CALL: analyze_data|news_df,stock_df"
#     # else:
#     #     response_text = "FINAL_ANSWER: [Apple's stock price was correlated with negative news sentiment.]"

#     #print(f"Agent Response: {response_text}")

#     # Execute function call
#     if response_text.startswith("FUNCTION_CALL:"):
#         _, function_info = response_text.split(":", 1)
#         func_name, params = [x.strip() for x in function_info.split("|", 1)]
#         param_list = params.split(",")

#         # if func_name == "fetch_news":
#         #     news_df = get_news_gnews(param_list[0], param_list[1], param_list[2], GNEWS_API_KEY)
#         #     iteration_result = news_df
#         # elif func_name == "fetch_stock":
#         #     stock_df = get_stock_data(param_list[0], param_list[1], param_list[2])
#         #     iteration_result = stock_df
#         # elif func_name == "analyze_data":
#         #     if news_df is not None and stock_df is not None:
#         #         iteration_result = analyze_data(news_df, stock_df)
#         #     else:
#         #         iteration_result = "Error: Missing data for analysis. Ensure news and stock data are fetched first."

#         if response_text.startswith("FUNCTION_CALL:"):
#             response_text = response.text.strip()
#             print("response_text",response_text)
#             _, function_info = response_text.split(":", 1)
#             print("_",_)
#             print("function_info",function_info)

#             func_name, params = [x.strip() for x in function_info.split("|", 1)]
#             print("func_name",func_name)
#             print("params",params)

#             iteration_result = function_caller(func_name, params)
#             print("iteration_result",iteration_result)

#         # Check if it's the final answer
#         elif response_text.startswith("FINAL_ANSWER:"):
#             print("\n=== Agent Execution Complete ===")
#             break

#     print(f"  Result: {iteration_result}")
#     last_response = iteration_result
#     iteration_response.append(f"Iteration {iteration + 1}: Called {func_name} with {params}, returned {iteration_result}.")

#     iteration += 1