In [1]:
# Standard library imports
import os
import datetime as dt
import traceback

# Third-party imports
import pandas as pd
import yfinance as yf
from ta.momentum import RSIIndicator, StochasticOscillator
from ta.trend import SMAIndicator, EMAIndicator, MACD
from ta.volume import volume_weighted_average_price
from dotenv import load_dotenv

# LangGraph and LangChain imports
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import (
    SystemMessage,
    AIMessage,
    HumanMessage,
    BaseMessage,
)
from langchain_core.tools import tool

# Typing imports
from typing import Union, Dict, Set, List, TypedDict, Annotated, Any, Sequence

# Load environment variables
load_dotenv()

# Comments or explanations of the code's purpose can go here
# For example:
# This script integrates LangGraph, LangChain, and TA-Lib to create a 
# sophisticated AI-driven analysis and messaging system for financial data.

Python-dotenv could not parse statement starting at line 19


True

In [2]:
from typing import Dict, Any, Sequence
from langchain_core.messages import BaseMessage
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages

# Function to merge two dictionaries, prioritizing the second dictionary's keys in case of conflict
def merge_dicts(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
    """
    Merge two dictionaries, with values from the second dictionary overriding the first.
    
    Args:
        a (Dict[str, Any]): The first dictionary.
        b (Dict[str, Any]): The second dictionary.
    
    Returns:
        Dict[str, Any]: The merged dictionary.
    """
    return {**a, **b}

# Define the agent's state structure using TypedDict with type annotations
class AgentState(TypedDict):
    """
    TypedDict representing the state of an agent.
    
    Attributes:
        messages: A sequence of BaseMessage objects with additional processing.
        data: A dictionary holding the agent's data, merged using the merge_dicts function.
        metadata: A dictionary holding additional metadata, merged using the merge_dicts function.
    """
    messages: Sequence[BaseMessage]
    data: Dict[str, Any]
    metadata: Dict[str, Any]

# Build the graph with the defined agent state
graph_builder = StateGraph(AgentState)


In [4]:
import os
from typing import Dict, Any, List
import pandas as pd
import requests

import requests

def get_financial_metrics(
    ticker: str,
    report_period: str,
    period: str = 'ttm',
    limit: int = 1
) -> List[Dict[str, Any]]:
    """Fetch financial metrics from the API."""
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = (
        f"https://api.financialdatasets.ai/financial-metrics/"
        f"?ticker={ticker}"
        f"&report_period_lte={report_period}"
        f"&limit={limit}"
        f"&period={period}"
    )
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        raise Exception(
            f"Error fetching data: {response.status_code} - {response.text}"
        )
    data = response.json()
    financial_metrics = data.get("financial_metrics")
    if not financial_metrics:
        raise ValueError("No financial metrics returned")
    return financial_metrics

def search_line_items(
    ticker: str,
    line_items: List[str],
    period: str = 'ttm',
    limit: int = 1
) -> List[Dict[str, Any]]:
    """Fetch cash flow statements from the API."""
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = "https://api.financialdatasets.ai/financials/search/line-items"

    body = {
        "tickers": [ticker],
        "line_items": line_items,
        "period": period,
        "limit": limit
    }
    response = requests.post(url, headers=headers, json=body)
    if response.status_code != 200:
        raise Exception(
            f"Error fetching data: {response.status_code} - {response.text}"
        )
    data = response.json()
    search_results = data.get("search_results")
    if not search_results:
        raise ValueError("No search results returned")
    return search_results

def get_insider_trades(
    ticker: str,
    end_date: str,
    limit: int = 5,
) -> List[Dict[str, Any]]:
    """
    Fetch insider trades for a given ticker and date range.
    """
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = (
        f"https://api.financialdatasets.ai/insider-trades/"
        f"?ticker={ticker}"
        f"&filing_date_lte={end_date}"
        f"&limit={limit}"
    )
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        raise Exception(
            f"Error fetching data: {response.status_code} - {response.text}"
        )
    data = response.json()
    insider_trades = data.get("insider_trades")
    if not insider_trades:
        raise ValueError("No insider trades returned")
    return insider_trades

def get_market_cap(
    ticker: str,
) -> List[Dict[str, Any]]:
    """Fetch market cap from the API."""
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = (
        f'https://api.financialdatasets.ai/company/facts'
        f'?ticker={ticker}'
    )

    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        raise Exception(
            f"Error fetching data: {response.status_code} - {response.text}"
        )
    data = response.json()
    company_facts = data.get('company_facts')
    if not company_facts:
        raise ValueError("No company facts returned")
    return company_facts.get('market_cap')

def get_prices(
    ticker: str,
    start_date: str,
    end_date: str
) -> List[Dict[str, Any]]:
    """Fetch price data from the API."""
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = (
        f"https://api.financialdatasets.ai/prices/"
        f"?ticker={ticker}"
        f"&interval=day"
        f"&interval_multiplier=1"
        f"&start_date={start_date}"
        f"&end_date={end_date}"
    )
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        raise Exception(
            f"Error fetching data: {response.status_code} - {response.text}"
        )
    data = response.json()
    prices = data.get("prices")
    if not prices:
        raise ValueError("No price data returned")
    return prices

def prices_to_df(prices: List[Dict[str, Any]]) -> pd.DataFrame:
    """Convert prices to a DataFrame."""
    df = pd.DataFrame(prices)
    df["Date"] = pd.to_datetime(df["time"])
    df.set_index("Date", inplace=True)
    numeric_cols = ["open", "close", "high", "low", "volume"]
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col], errors="coerce")
    df.sort_index(inplace=True)
    return df

# Update the get_price_data function to use the new functions
def get_price_data(
    ticker: str,
    start_date: str,
    end_date: str
) -> pd.DataFrame:
    prices = get_prices(ticker, start_date, end_date)
    return prices_to_df(prices)

In [11]:
get_financial_metrics(ticker='MSFT', report_period='2024-09-30')

[{'ticker': 'MSFT',
  'calendar_date': '2024-09-30',
  'report_period': '2024-09-30',
  'period': 'ttm',
  'currency': 'USD',
  'market_cap': 3198436415344.0,
  'enterprise_value': 3190020415344.0,
  'price_to_earnings_ratio': 35.337,
  'price_to_book_ratio': 11.116,
  'price_to_sales_ratio': 12.583,
  'enterprise_value_to_ebitda_ratio': 22.865,
  'enterprise_value_to_revenue_ratio': 12.516174575490775,
  'free_cash_flow_yield': 0.02271797546182734,
  'peg_ratio': 15961.667003365506,
  'gross_margin': 0.693,
  'operating_margin': 0.4475549785593454,
  'net_margin': 0.356,
  'return_on_equity': 0.346,
  'return_on_assets': 0.182,
  'return_on_invested_capital': 0.504,
  'asset_turnover': 0.511,
  'inventory_turnover': 156.32841328413284,
  'receivables_turnover': 5.029879689726136,
  'days_sales_outstanding': 0.19881191234903026,
  'operating_cycle': 182.04241450866763,
  'working_capital_turnover': 1.6766984495535995,
  'current_ratio': 1.301,
  'quick_ratio': 1.2873263888888888,
  'ca

In [12]:


def market_data_agent(state: AgentState):
    """Responsible for gathering and preprocessing market data"""
    messages = state["messages"]
    data = state["data"]

    # Set default dates
    end_date = data["end_date"] or datetime.now().strftime('%Y-%m-%d')
    if not data["start_date"]:
        # Calculate 3 months before end_date
        end_date_obj = datetime.strptime(end_date, '%Y-%m-%d')
        start_date = end_date_obj.replace(month=end_date_obj.month - 3) if end_date_obj.month > 3 else \
            end_date_obj.replace(year=end_date_obj.year - 1, month=end_date_obj.month + 9)
        start_date = start_date.strftime('%Y-%m-%d')
    else:
        start_date = data["start_date"]

    # Get the historical price data
    prices = get_prices(
        ticker=data["ticker"], 
        start_date=start_date, 
        end_date=end_date,
    )

    # Get the financial metrics
    financial_metrics = get_financial_metrics(
        ticker=data["ticker"], 
        report_period=end_date, 
        period='ttm', 
        limit=1,
    )

    # Get the insider trades
    insider_trades = get_insider_trades(
        ticker=data["ticker"], 
        end_date=end_date,
        limit=5,
    )

    # Get the market cap
    market_cap = get_market_cap(
        ticker=data["ticker"],
    )

    # Get the line_items
    financial_line_items = search_line_items(
        ticker=data["ticker"], 
        line_items=["free_cash_flow", "net_income", "depreciation_and_amortization", "capital_expenditure", "working_capital"],
        period='ttm',
        limit=2,
    )

    return {
        "messages": messages,
        "data": {
            **data, 
            "prices": prices, 
            "start_date": start_date, 
            "end_date": end_date,
            "financial_metrics": financial_metrics,
            "insider_trades": insider_trades,
            "market_cap": market_cap,
            "financial_line_items": financial_line_items,
        }
    }

In [5]:
import os
from typing import Dict, Any, List
import pandas as pd
import requests


def get_financial_metrics(
    ticker: str,
    report_period: str,
    period: str = "ttm",
    limit: int = 1,
) -> List[Dict[str, Any]]:
    """
    Fetch financial metrics for a given ticker and report period.

    Args:
        ticker (str): Stock ticker symbol.
        report_period (str): The latest reporting period.
        period (str): The financial period to query (default: "ttm").
        limit (int): Maximum number of records to fetch (default: 1).

    Returns:
        List[Dict[str, Any]]: List of financial metrics.
    """
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = (
        f"https://api.financialdatasets.ai/financial-metrics/"
        f"?ticker={ticker}&report_period_lte={report_period}&limit={limit}&period={period}"
    )
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    data = response.json()
    financial_metrics = data.get("financial_metrics")
    if not financial_metrics:
        raise ValueError("No financial metrics returned")
    return financial_metrics


def search_line_items(
    ticker: str,
    line_items: List[str],
    period: str = "ttm",
    limit: int = 1,
) -> List[Dict[str, Any]]:
    """
    Search specific line items in financial statements.

    Args:
        ticker (str): Stock ticker symbol.
        line_items (List[str]): List of line items to search for.
        period (str): The financial period to query (default: "ttm").
        limit (int): Maximum number of records to fetch (default: 1).

    Returns:
        List[Dict[str, Any]]: List of search results.
    """
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = "https://api.financialdatasets.ai/financials/search/line-items"
    body = {"tickers": [ticker], "line_items": line_items, "period": period, "limit": limit}

    response = requests.post(url, headers=headers, json=body)
    response.raise_for_status()
    data = response.json()
    search_results = data.get("search_results")
    if not search_results:
        raise ValueError("No search results returned")
    return search_results


def get_insider_trades(ticker: str, end_date: str, limit: int = 5) -> List[Dict[str, Any]]:
    """
    Fetch insider trades for a given ticker and date range.

    Args:
        ticker (str): Stock ticker symbol.
        end_date (str): End date for the query (format: YYYY-MM-DD).
        limit (int): Maximum number of records to fetch (default: 5).

    Returns:
        List[Dict[str, Any]]: List of insider trades.
    """
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = (
        f"https://api.financialdatasets.ai/insider-trades/"
        f"?ticker={ticker}&filing_date_lte={end_date}&limit={limit}"
    )
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    data = response.json()
    insider_trades = data.get("insider_trades")
    if not insider_trades:
        raise ValueError("No insider trades returned")
    return insider_trades


def get_market_cap(ticker: str) -> float:
    """
    Fetch the market capitalization of a company.

    Args:
        ticker (str): Stock ticker symbol.

    Returns:
        float: Market capitalization.
    """
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = f"https://api.financialdatasets.ai/company/facts?ticker={ticker}"
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    data = response.json()
    company_facts = data.get("company_facts")
    if not company_facts or "market_cap" not in company_facts:
        raise ValueError("No market cap data returned")
    return company_facts["market_cap"]


def get_prices(ticker: str, start_date: str, end_date: str) -> List[Dict[str, Any]]:
    """
    Fetch historical price data for a stock.

    Args:
        ticker (str): Stock ticker symbol.
        start_date (str): Start date for the query (format: YYYY-MM-DD).
        end_date (str): End date for the query (format: YYYY-MM-DD).

    Returns:
        List[Dict[str, Any]]: List of price data.
    """
    headers = {"X-API-KEY": os.environ.get("FINANCIAL_DATASETS_API_KEY")}
    url = (
        f"https://api.financialdatasets.ai/prices/"
        f"?ticker={ticker}&interval=day&interval_multiplier=1"
        f"&start_date={start_date}&end_date={end_date}"
    )
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    data = response.json()
    prices = data.get("prices")
    if not prices:
        raise ValueError("No price data returned")
    return prices


def prices_to_df(prices: List[Dict[str, Any]]) -> pd.DataFrame:
    """
    Convert price data into a Pandas DataFrame.

    Args:
        prices (List[Dict[str, Any]]): List of price data.

    Returns:
        pd.DataFrame: DataFrame with price information.
    """
    df = pd.DataFrame(prices)
    df["Date"] = pd.to_datetime(df["time"])
    df.set_index("Date", inplace=True)
    numeric_cols = ["open", "close", "high", "low", "volume"]
    df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors="coerce")
    df.sort_index(inplace=True)
    return df


def get_price_data(ticker: str, start_date: str, end_date: str) -> pd.DataFrame:
    """
    Fetch and convert historical price data into a DataFrame.

    Args:
        ticker (str): Stock ticker symbol.
        start_date (str): Start date for the query (format: YYYY-MM-DD).
        end_date (str): End date for the query (format: YYYY-MM-DD).

    Returns:
        pd.DataFrame: DataFrame with historical price data.
    """
    prices = get_prices(ticker, start_date, end_date)
    return prices_to_df(prices)
