In [2]:
from dotenv import load_dotenv
import os

load_dotenv()

os.environ['GROQ_API_KEY'] = os.getenv('GROQ_API_KEY')
os.environ['LANGCHAIN_API_KEY'] = os.getenv('LANGCHAIN_API_KEY')
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ['LANGCHAIN_PROJECT'] = "BuffetBot"
os.environ['GOOGLE_API_KEY'] = os.getenv('GOOGLE_API_KEY')
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

#### Tools

In [4]:
import ssl
from urllib.request import urlopen
import certifi
import json
from langchain_core.tools import tool
from typing import List, Optional, TypedDict, Annotated

class FinancialRatios(TypedDict):
    symbol: str
    date: str
    operatingCashFlowPerShare: Optional[float]
    interestCoverageRatio: Optional[float]
    workingCapital: Optional[float]
    daysSalesOutstanding: Optional[float]
    bookValuePerShare: Optional[float]
    dividendYield: Optional[float]
    currentRatio: Optional[float]
    payablesTurnover: Optional[float]

class FinancialKeyMetrics(TypedDict):
    symbol: str
    date: str
    grossProfitMargin: Optional[float]
    debtEquityRatio: Optional[float]
    debtRatio: Optional[float]
    operatingProfitMargin: Optional[float]
    netProfitMargin: Optional[float]
    returnOnAssets: Optional[float]
    returnOnEquity: Optional[float]
    returnOnCapitalEmployed: Optional[float]
    totalDebtToCapitalization: Optional[float]
    cashFlowCoverageRatios: Optional[float]
    quickRatio: Optional[float]
    cashRatio: Optional[float]
    assetTurnover: Optional[float]
    inventoryTurnover: Optional[float]
    receivablesTurnover: Optional[float]
    cashConversionCycle: Optional[float]
    priceEarningsRatio: Optional[float]
    daysOfPayablesOutstanding: Optional[float]

# Function 1
@tool
def get_financial_key_metrics(ticker: str, observations: int = 1) -> List[FinancialRatios]:
    """
    Retrieve and filter key financial metrics for a specified company.

    This function fetches financial data from the Financial Modeling Prep API
    and filters it to include specific key metrics. It's designed to provide
    a focused set of financial indicators for analysis.

    Parameters:
    ticker (str): The stock symbol of the company (e.g., "AAPL" for Apple Inc.).
    observations (int, optional): The number of most recent periods to return. 
                                  Defaults to 1.

    Returns:
    List[FinancialRatios]: A list of FinancialRatio objects, each containing key metrics
                          for the specified number of observations.
    """
    api_key = os.getenv('FINANCIAL_MODELING_PREP_API_KEY')
    url = f"https://financialmodelingprep.com/api/v3/key-metrics/{ticker}?apikey={api_key}"
    
    context = ssl.create_default_context(cafile=certifi.where())
    
    with urlopen(url, context=context) as response:
        data = response.read().decode("utf-8")
    
    json_data = json.loads(data)
    
    filtered_data = []
    for item in json_data:
        filtered_item = FinancialRatios(
            symbol=item['symbol'],
            date=item['date'],
            operatingCashFlowPerShare=item.get('operatingCashFlowPerShare'),
            interestCoverageRatio=item.get('interestCoverage'),
            workingCapital=item.get('workingCapital'),
            daysSalesOutstanding=item.get('daysSalesOutstanding'),
            bookValuePerShare=item.get('bookValuePerShare'),
            dividendYield=item.get('dividendYield'),
            currentRatio=item.get('currentRatio'),
            payablesTurnover=item.get('payablesTurnover')
        )
        filtered_data.append(filtered_item)
    
    return filtered_data[:observations]

# Function 2
@tool
def get_financial_ratios(ticker: str, observations: int = 1) -> List[FinancialKeyMetrics]:
    """
    Retrieve and filter financial ratios for a specified company.

    This function fetches financial ratio data from the Financial Modeling Prep API
    and filters it to include specific key ratios. It's designed to provide
    a focused set of financial indicators for analysis.

    Parameters:
    ticker (str): The stock symbol of the company (e.g., "AAPL" for Apple Inc.).
    api_key (str): Your Financial Modeling Prep API key for authentication.
    observations (int, optional): The number of most recent periods to return. 
                                  Defaults to 1.

    Returns:
    List[FinancialKeyMetrics]: A list of FinancialKeyMetric objects, each containing key ratios
                              for the specified number of observations.
    """
    api_key = os.getenv('FINANCIAL_MODELING_PREP_API_KEY')
    url = f"https://financialmodelingprep.com/api/v3/ratios/{ticker}?&apikey={api_key}"
    
    context = ssl.create_default_context(cafile=certifi.where())
    
    with urlopen(url, context=context) as response:
        data = response.read().decode("utf-8")
    
    json_data = json.loads(data)
    
    filtered_data = []
    for item in json_data:
        filtered_item = FinancialKeyMetrics(
            symbol=item['symbol'],
            date=item['date'],
            grossProfitMargin=item.get('grossProfitMargin'),
            debtEquityRatio=item.get('debtEquityRatio'),
            debtRatio=item.get('debtRatio'),
            operatingProfitMargin=item.get('operatingProfitMargin'),
            netProfitMargin=item.get('netProfitMargin'),
            returnOnAssets=item.get('returnOnAssets'),
            returnOnEquity=item.get('returnOnEquity'),
            returnOnCapitalEmployed=item.get('returnOnCapitalEmployed'),
            totalDebtToCapitalization=item.get('totalDebtToCapitalization'),
            cashFlowCoverageRatios=item.get('cashFlowCoverageRatios'),
            quickRatio=item.get('quickRatio'),
            cashRatio=item.get('cashRatio'),
            assetTurnover=item.get('assetTurnover'),
            inventoryTurnover=item.get('inventoryTurnover'),
            receivablesTurnover=item.get('receivablesTurnover'),
            cashConversionCycle=item.get('cashConversionCycle'),
            priceEarningsRatio=item.get('priceEarningsRatio'),
            daysOfPayablesOutstanding=item.get('daysOfPayablesOutstanding')
        )
        filtered_data.append(filtered_item)
    
    return filtered_data[:observations]

# Function 3
# Will add more functions as needed



##### Debugging



#### State

In [5]:
#State
from langgraph.graph.message import AnyMessage, add_messages
from langchain_core.documents import Document

class State(TypedDict):
    ticker: Optional[str] = None
    observations: Optional[int] = None
    financial_key_metrics: Optional[List[FinancialKeyMetrics]] = None
    financial_ratios: Optional[List[FinancialRatios]] = None
    company_summary: Optional[str] = None
    chunked_docs: Optional[List[Document]] = None

#### Router

In [6]:
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

# Data model
class RouteQuery(BaseModel):
    """Route a user query to the appropriate action."""

    action: Literal["company_specific", "general_knowledge"] = Field(
        ...,
        description="Given a user question, choose whether it's company-specific or general knowledge.",
    )

# LLM with function call
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm_router = llm.with_structured_output(RouteQuery)

# Prompt
system = """You are an expert at determining whether a query is about a specific company or financial data, or if it's a general knowledge question.
For queries about specific companies, financial metrics, stock performance, or company-specific data, use 'company_specific'.
For general knowledge questions, including explanations of financial concepts not tied to a specific company, use 'general_knowledge'."""

route_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)

question_router = route_prompt | structured_llm_router

def router(state: State) -> Literal["company_specific", "general_knowledge"]:
    last_message = state["messages"][-1]["content"] if state["messages"] else ""
    result = question_router.invoke({"question": last_message})
    return result.action

##### Test the router function 

In [19]:
# Test the router function

# Sample queries
test_queries = [
    "What are Apple's financial ratios?",
    "How do I calculate return on investment?",
    "Can you explain Tesla's recent stock performance?",
    "What is the difference between stocks and bonds?",
    "What was Amazon's revenue last quarter?",
    "What factors affect a company's price-to-earnings ratio?",
    "How has the tech sector performed over the last year?",
]

# Create a mock state for testing
def create_mock_state(query):
    return State(
        messages=[{"role": "human", "content": query}],
        ticker="",
        observations=0,
        financial_key_metrics=None,
        financial_ratios=None,
        company_summary=None
    )

# Run tests
for query in test_queries:
    mock_state = create_mock_state(query)
    result = router(mock_state)
    print(f"Query: {query}")
    print(f"Decision: {result}")
    print("---")

Query: What are Apple's financial ratios?
Decision: company_specific
---
Query: How do I calculate return on investment?
Decision: general_knowledge
---
Query: Can you explain Tesla's recent stock performance?
Decision: company_specific
---
Query: What is the difference between stocks and bonds?
Decision: general_knowledge
---
Query: What was Amazon's revenue last quarter?
Decision: company_specific
---
Query: What factors affect a company's price-to-earnings ratio?
Decision: general_knowledge
---
Query: How has the tech sector performed over the last year?
Decision: general_knowledge
---


#### Knowledge Base Retriever

In [7]:
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser


embeddings = OpenAIEmbeddings()

vector_store = FAISS.load_local(
    "knowledge_base", embeddings, allow_dangerous_deserialization=True
)
retriever = vector_store.as_retriever(search_type="mmr", search_kwargs={"k": 5})

# RAG Fusion
from langchain.prompts import ChatPromptTemplate

#------------------------------
# RAG-Fusion Question Generating Prompt
template = """You are a financial assistant that takes questions for Warren Buffet and generates multiple queries, so that Warren understands them properly \n
Generate multiple search queries related to: {question} \n
Output (4 queries):"""
#------------------------------

prompt_rag_fusion = ChatPromptTemplate.from_template(template)

generate_queries = (
    prompt_rag_fusion 
    | ChatOpenAI(temperature=0)
    | StrOutputParser() 
    | (lambda x: x.split("\n"))
)

# Reciprocal Ranking
from langchain.load import dumps, loads

def format_docs(docs):
    if not isinstance(docs, list):
        raise ValueError("Error: Expected a list of documents")
    
    formatted_docs = []
    for i, item in enumerate(docs):
        if not (isinstance(item, tuple) and len(item) == 2 and hasattr(item[0], 'page_content')):
            raise ValueError(f"Error: Item at index {i} is not in the expected RAG Fusion format")
        formatted_docs.append(item[0].page_content)
    
    return "\n\n".join(formatted_docs)

def reciprocal_rank_fusion(results: list[list], k=60, n=5):

    fused_scores = {}

    # Iterate through each list of ranked documents
    for docs in results:
        # Iterate through each document in the list, with its rank (position in the list)
        for rank, doc in enumerate(docs):
            # Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
            doc_str = dumps(doc)
            # If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
            if doc_str not in fused_scores:
                fused_scores[doc_str] = 0
            # Retrieve the current score of the document, if any
            previous_score = fused_scores[doc_str]
            # Update the score of the document using the RRF formula: 1 / (rank + k)
            fused_scores[doc_str] += 1 / (rank + k)

    # Sort the documents based on their fused scores in descending order to get the final reranked results
    reranked_results = [
        (loads(doc), score)
        for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]

    # Return the reranked results as a list of tuples, each containing the document and its fused score
    return reranked_results[:5]

retrieval_chain_rag_fusion = generate_queries | retriever.map() | reciprocal_rank_fusion

def retrieve_docs(state: State) -> State:
    last_message = state["messages"][-1]["content"] if state["messages"] else ""
    retrieved_docs = retrieval_chain_rag_fusion.invoke({"question": last_message})
    return {**state, "retrieved_documents": retrieved_docs}

##### Test RAG Fusion

In [23]:
test_query = "What is the difference between stocks and bonds?"
test_results = retrieval_chain_rag_fusion.invoke({"question": test_query})
print(test_results)

[(Document(metadata={'source': 'intelligent_investor.pdf', 'page': 18}, page_content='els. That was too good to be true. At long last the stock market has\n“returned to normal,” in the sense that both speculators and stock\ninvestors must again be prepared to experience significant and per-\nhaps protracted falls as well as rises in the value of their holdings.\nIn the area of many secondary and third-line common stocks,\nespecially recently floated enterprises, the havoc wrought by thelast market break was catastrophic. This was nothing new initself—it had happened to a similar degree in 1961–62—but there\nwas now a novel element in the fact that some of the investment\nfunds had large commitments in highly speculative and obviouslyovervalued issues of this type. Evidently it is not only the tyro who\nneeds to be warned that while enthusiasm may be necessary for\ngreat accomplishments elsewhere, on Wall Street it almost invari-ably leads to disaster.\nThe major question we shall have 

In [16]:
from typing import Optional
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

# Data model for ticker extraction
class TickerExtraction(BaseModel):
    """Extract the relevant stock ticker from a user query."""

    ticker: Optional[str] = Field(
        None,
        description="The stock ticker mentioned or implied in the query. Use None if no specific company is mentioned.",
    )

# LLM with function call
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm_ticker_extractor = llm.with_structured_output(TickerExtraction)

# Prompt
system = """You are an expert at identifying company stock tickers in financial queries.
If a specific company is mentioned or strongly implied, provide its stock ticker.
If no specific company is mentioned or implied, return None for the ticker.
Always provide a brief explanation for your decision."""

ticker_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)

ticker_extractor = ticker_prompt | structured_llm_ticker_extractor

def extract_ticker(state: State) -> State:
    last_message = state["messages"][-1]["content"] if state["messages"] else ""
    result = ticker_extractor.invoke({"question": last_message})
    
    # Create a new state with the updated ticker
    new_state = dict(state)
    new_state["ticker"] = result.ticker
    
    return new_state

In [19]:
# Test the ticker extractor

# Sample queries
test_queries = [
    "What are Apple's financial ratios?",
    "How do I calculate return on investment?",
    "Can you explain Tesla's recent stock performance?",
]
# Create a mock state for testing
def create_mock_state(query):
    return State(
        messages=[{"role": "human", "content": query}],
        ticker=None,
        observations=None,
        financial_key_metrics=None,
        financial_ratios=None,
        company_summary=None
    )

# Run tests
for query in test_queries:
    initial_state = create_mock_state(query)
    updated_state = extract_ticker(initial_state)
    print(f"Query: {query}")
    print(f"Initial State Ticker: {initial_state.get('ticker')}")
    print(f"Updated State Ticker: {updated_state.get('ticker')}")
    print("---")

Query: What are Apple's financial ratios?
Initial State Ticker: None
Updated State Ticker: AAPL
---
Query: How do I calculate return on investment?
Initial State Ticker: None
Updated State Ticker: None
---
Query: Can you explain Tesla's recent stock performance?
Initial State Ticker: None
Updated State Ticker: TSLA
---


In [23]:
# Metric Extractor
from typing import List, Optional, Literal
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

class MetricExtraction(BaseModel):
    """Extract relevant financial metrics based on a user query."""
    financial_ratios: List[Literal[tuple(FinancialRatios.__annotations__.keys())]] = Field(
        ...,
        description="List of relevant financial ratios needed to answer the query.",
    )
    financial_key_metrics: List[Literal[tuple(FinancialKeyMetrics.__annotations__.keys())]] = Field(
        ...,
        description="List of relevant financial key metrics needed to answer the query.",
    )
    explanation: str = Field(
        ...,
        description="A brief explanation of why these metrics were chosen.",
    )

# LLM with function call
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm_metric_extractor = llm.with_structured_output(MetricExtraction)

# Prompt
system = """You are an expert financial analyst. Your task is to identify the most relevant financial metrics needed to answer a given query.

Available Financial Ratios:
{financial_ratios}

Available Financial Key Metrics:
{financial_key_metrics}

Analyze the query and select the most relevant metrics from each category. Provide a brief explanation for your choices."""

metric_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)

def extract_metrics(state: State) -> State:
    last_message = state["messages"][-1]["content"] if state["messages"] else ""
    
    # Prepare the list of available metrics
    financial_ratios = "\n".join(f"- {key}" for key in FinancialRatios.__annotations__.keys() if key not in ['symbol', 'date'])
    financial_key_metrics = "\n".join(f"- {key}" for key in FinancialKeyMetrics.__annotations__.keys() if key not in ['symbol', 'date'])
    
    # Create the extractor chain
    metric_extractor = metric_prompt.format(
        financial_ratios=financial_ratios,
        financial_key_metrics=financial_key_metrics
    ) | structured_llm_metric_extractor
    
    # Include the question parameter in the invoke call
    result = metric_extractor.invoke({"question": last_message})
    
    # Create a new state with the updated metrics
    new_state = dict(state)
    
    # Update financial_ratios
    if new_state["financial_ratios"] is None:
        new_state["financial_ratios"] = []
    new_state["financial_ratios"] = [
        FinancialRatios(symbol="", date="", **{metric: None for metric in result.financial_ratios})
    ]
    
    # Update financial_key_metrics
    if new_state["financial_key_metrics"] is None:
        new_state["financial_key_metrics"] = []
    new_state["financial_key_metrics"] = [
        FinancialKeyMetrics(symbol="", date="", **{metric: None for metric in result.financial_key_metrics})
    ]
    
    return new_state

# Test the metric extractor

test_queries = [
    "What is the company's ability to pay off its short-term debts?",
    "How efficiently is the company using its assets to generate sales?",
    "What's the company's profitability in relation to its total assets?",
    "How well is the company managing its inventory?",
    "What's the company's overall debt situation?",
]

# Create a mock state for testing
def create_mock_state(query):
    return State(
        messages=[{"role": "human", "content": query}],
        ticker=None,
        observations=None,
        financial_key_metrics=None,
        financial_ratios=None,
        company_summary=None
    )

# Run tests
for query in test_queries:
    initial_state = create_mock_state(query)
    updated_state = extract_metrics(initial_state)
    print(f"Query: {query}")
    print(f"Relevant Financial Ratios: {updated_state['financial_ratios'][0] if updated_state['financial_ratios'] else None}")
    print(f"Relevant Financial Key Metrics: {updated_state['financial_key_metrics'][0] if updated_state['financial_key_metrics'] else None}")
    print("---")

KeyError: 'question'

In [None]:
# Summarizer 

In [None]:
# Buffet Agent

In [None]:
# Graph