### LLM analysis of financial statements

In this notebook, I preprocess the downloaded reports as far as required in order to feed them into the LLM to obtain a buy/sell recommendation.

In [1]:
import pandas as pd
import numpy as np 
from google import genai
from google.genai import types
import json
import re
from tqdm import tqdm
import time

In [2]:
# Import Gemini API key
with open("../google_api_key.txt", "r") as f:
    key = f.read().strip()    

In [None]:
# Read in company names to look up buy/sell recommendations for
sp400_companies = pd.read_csv("../data/sp400_companies.csv", dtype={"CIK": str})
sp500_companies = pd.read_csv("../data/sp500_companies.csv", dtype={"CIK": str})
sp600_companies = pd.read_csv("../data/sp600_companies.csv", dtype={"CIK": str})

# Combine all CIKs into a single list
sp400_ciks = sp400_companies["CIK"].tolist()
sp500_ciks = sp500_companies["CIK"].tolist()
sp600_ciks = sp600_companies["CIK"].tolist()
ciks = sp400_ciks + sp500_ciks + sp600_ciks

# Read in RICs to match with CIKs
rics = pd.read_csv("data/rics.csv", dtype={"Instrument": str})
# Ranem Instrument to CIK for easier merging
rics.rename(columns={"Instrument": "CIK"}, inplace=True)

- Import and further process balance sheet df

In [None]:
first_balance_sheets = pd.read_csv("../data/balance_sheets.csv", dtype={"CIK": str})
missing_balance_sheets = pd.read_csv("../data/missing_balance_sheets.csv", dtype={"CIK": str})
balance_sheets = pd.concat([first_balance_sheets, missing_balance_sheets], ignore_index=True)
# Delete entries where STD Balance Sheet All and FCC Item Name are missing
balance_sheets = balance_sheets.dropna(subset=["STD Balance Sheet All", "FCC Item Name"])
balance_sheets.head()

Unnamed: 0,Date,STD Balance Sheet All,FCC Item Name,RIC,Statement
4,2016-06-30,332000000.0,TR.F.CashSTInvst,AA.N,balance_sheet
5,2016-06-30,332000000.0,TR.F.CashCashEquiv,AA.N,balance_sheet
6,2016-06-30,74000000.0,TR.F.DerivFinInstrHedgeST,AA.N,balance_sheet
7,2016-06-30,607000000.0,TR.F.LoansRcvblNetST,AA.N,balance_sheet
8,2016-06-30,426000000.0,TR.F.TradeAcctTradeNotesRcvblNet,AA.N,balance_sheet


In [5]:
# Adjust Item Name to give a clearer indication of the position
unique_balance_sheet_positions = balance_sheets["FCC Item Name"].unique()

In this cell, I employ Gemini to return actual official balance sheet position labels for the Thomson Reuters abbreviations contained in financial statements downloaded via the LSEG Data Library for Python.

In [3]:
client = genai.Client(api_key = key)

In [None]:
balance_sheet_labels = client.models.generate_content(
    model="gemini-2.5-flash",
    config=types.GenerateContentConfig(
        temperature=0.1,
        system_instruction="You are a financial analyst, that provides concise and accurate answers.",
        thinking_config=types.ThinkingConfig(thinking_budget=0),# Disables thinking
    ),
    contents=[f"""The following is a list of abbreviations used by Thomsom Reuters in their reporting database. For every given abbreviation,
              please provide the actual financial position this refers to in a balance sheet. Example: TR.F.TotAssets: Total Assets.
              Here are the abbreviations: {unique_balance_sheet_positions.tolist()} Provide your answer in a dictionary style format.
              """],  
)

In [None]:
# Regex to extract JSON from the response
# First I extract the JSON string from the response text
json_str = re.search(r'{.*}', balance_sheet_labels.text, re.DOTALL).group()
# Then I parse the JSON string into a Python dictionary
data = json.loads(json_str)
# Save the dictionary to a file
with open("../data/balance_sheet_labels.json", "w") as f:
    json.dump(data, f, indent=4)  # indent makes JSON-format more readable

In [16]:
# Finally, this dictionary can be used to map the abbreviations to the actual financial positions in the balance sheets.
balance_sheets["position_label"] = balance_sheets["FCC Item Name"].map(data)

In [17]:
# Merge on RIC to include CIK and RIC in the balance sheets
balance_sheets = balance_sheets.merge(rics, left_on="RIC", right_on="RIC", how="left")
balance_sheets.head(10)

Unnamed: 0,Date,STD Balance Sheet All,FCC Item Name,RIC,Statement,position_label,CIK
0,2016-06-30,332000000.0,TR.F.CashSTInvst,AA.N,balance_sheet,Cash and Short-Term Investments,1675149
1,2016-06-30,332000000.0,TR.F.CashCashEquiv,AA.N,balance_sheet,Cash and Cash Equivalents,1675149
2,2016-06-30,74000000.0,TR.F.DerivFinInstrHedgeST,AA.N,balance_sheet,"Derivative Financial Instruments, Hedging, Sho...",1675149
3,2016-06-30,607000000.0,TR.F.LoansRcvblNetST,AA.N,balance_sheet,"Loans and Receivables, Net, Short-Term",1675149
4,2016-06-30,426000000.0,TR.F.TradeAcctTradeNotesRcvblNet,AA.N,balance_sheet,"Trade Accounts and Trade Notes Receivable, Net",1675149
5,2016-06-30,181000000.0,TR.F.RcvblOthTot,AA.N,balance_sheet,"Receivables, Other, Total",1675149
6,2016-06-30,1166000000.0,TR.F.InvntTot,AA.N,balance_sheet,"Inventory, Total",1675149
7,2016-06-30,835000000.0,TR.F.InvntRawMaterial,AA.N,balance_sheet,"Inventory, Raw Material",1675149
8,2016-06-30,176000000.0,TR.F.InvntWorkInProg,AA.N,balance_sheet,"Inventory, Work In Progress",1675149
9,2016-06-30,155000000.0,TR.F.InvntFinGoods,AA.N,balance_sheet,"Inventory, Finished Goods",1675149


In [None]:
# Save to CSV
balance_sheets.to_csv("../data/balance_sheets_with_labels.csv", index=False)

---
- Import and further process Cash Flow statement df

In [None]:
first_cash_flow_statements = pd.read_csv("../data/cash_flow_statements.csv", dtype={"CIK": str})
missing_cash_flow_statements = pd.read_csv("../data/missing_cash_flow_statements.csv", dtype={"CIK": str})
cash_flow_statements = pd.concat([first_cash_flow_statements, missing_cash_flow_statements], ignore_index=True)
cash_flow_statements.head()

Unnamed: 0,Date,STD Cash Flow All,FCC Item Name,RIC,statement,Statement
0,2011-09-30,,,AA.N,cashflow,
1,2012-09-30,,,AA.N,cashflow,
2,2013-09-30,,,AA.N,cashflow,
3,2014-09-30,,,AA.N,cashflow,
4,2016-06-30,-19000000.0,TR.F.ProfLossStartingLineCF,AA.N,cashflow,


In [5]:
# Fix wrong column names
cash_flow_statements = cash_flow_statements.drop(columns = "Statement")
cash_flow_statements.rename(columns={"statement": "Statement"}, inplace=True)
# Delete entries where both STD Cash Flow All and FCC Item Name is NaN
cash_flow_statements = cash_flow_statements.dropna(subset=["STD Cash Flow All", "FCC Item Name"], how="all")
cash_flow_statements.head()

Unnamed: 0,Date,STD Cash Flow All,FCC Item Name,RIC,Statement
4,2016-06-30,-19000000.0,TR.F.ProfLossStartingLineCF,AA.N,cashflow
5,2016-06-30,245000000.0,TR.F.NonCashItemsReconcAdjCF,AA.N,cashflow
6,2016-06-30,16000000.0,TR.F.EqIncLossInNetEarnCF,AA.N,cashflow
7,2016-06-30,103000000.0,TR.F.OthNonCashItemsReconcAdjCF,AA.N,cashflow
8,2016-06-30,178000000.0,TR.F.DeprDeplAmortInclImpairCF,AA.N,cashflow


In [6]:
# Get unique cash flow positions
unique_cash_flow_positions = cash_flow_statements["FCC Item Name"].unique()

- As before, Gemini is employed to infer the correct cash flow labels from the FCC Item Name Codes

In [9]:
cash_flow_labels = client.models.generate_content(
    model="gemini-2.5-flash",
    config=types.GenerateContentConfig(
        temperature=0.1,
        system_instruction="You are a financial analyst, that provides concise and accurate answers.",
        thinking_config=types.ThinkingConfig(thinking_budget=0),# Disables thinking
    ),
    contents=[f"""The following is a list of abbreviations used by Thomsom Reuters in their reporting database. For every given abbreviation,
              please provide the actual financial position this refers to in a cash flow statement. Example: TR.F.TotAssets: Total Assets.
              Here are the abbreviations: {unique_cash_flow_positions.tolist()} Provide your answer in a dictionary style format.
              """],  
)

In [10]:
# Regex to extract JSON from the response
# First I extract the JSON string from the response text
json_str = re.search(r'{.*}', cash_flow_labels.text, re.DOTALL).group()
# Then I parse the JSON string into a Python dictionary
data = json.loads(json_str)
# Save the dictionary to a file
with open("data/cash_flow_labels.json", "w") as f:
    json.dump(data, f, indent=4)  

In [11]:
# Map dictionary to cash flow statements
cash_flow_statements["position_label"] = cash_flow_statements["FCC Item Name"].map(data)

In [12]:
# Merge on RIC to include CIK and RIC in the cash flow statements
cash_flow_statements = cash_flow_statements.merge(rics, left_on="RIC", right_on="RIC", how="left")
cash_flow_statements.head(10)

Unnamed: 0,Date,STD Cash Flow All,FCC Item Name,RIC,Statement,position_label,CIK
0,2016-06-30,-19000000.0,TR.F.ProfLossStartingLineCF,AA.N,cashflow,Profit/Loss Starting Line (Cash Flow),1675149
1,2016-06-30,245000000.0,TR.F.NonCashItemsReconcAdjCF,AA.N,cashflow,Non-Cash Items Reconciliation Adjustments (Cas...,1675149
2,2016-06-30,16000000.0,TR.F.EqIncLossInNetEarnCF,AA.N,cashflow,Equity Income/Loss in Net Earnings (Cash Flow),1675149
3,2016-06-30,103000000.0,TR.F.OthNonCashItemsReconcAdjCF,AA.N,cashflow,Other Non-Cash Items Reconciliation Adjustment...,1675149
4,2016-06-30,178000000.0,TR.F.DeprDeplAmortInclImpairCF,AA.N,cashflow,"Depreciation, Depletion, Amortization, Includi...",1675149
5,2016-06-30,178000000.0,TR.F.DeprDeplPPECF,AA.N,cashflow,"Depreciation, Depletion of Property, Plant, an...",1675149
6,2016-06-30,-28000000.0,TR.F.DefIncTaxIncTaxCreditsCF,AA.N,cashflow,Deferred Income Tax and Income Tax Credits (Ca...,1675149
7,2016-06-30,-34000000.0,TR.F.AssetsSaleGLCF,AA.N,cashflow,Assets Sale Gain/Loss (Cash Flow),1675149
8,2016-06-30,10000000.0,TR.F.ShrBasedPaymtCF,AA.N,cashflow,Share-Based Payment (Cash Flow),1675149
9,2016-06-30,226000000.0,TR.F.CashFlowOpBefChgInWkgCap,AA.N,cashflow,Cash Flow from Operations Before Changes in Wo...,1675149


In [None]:
# Save to CSV
cash_flow_statements.to_csv("../data/cash_flow_statements_with_labels.csv", index=False)

---
- Import and further process income statements

In [None]:
first_income_statements = pd.read_csv("../data/income_statements.csv", dtype={"CIK": str})
missing_income_statements = pd.read_csv("../data/missing_income_statements.csv", dtype={"CIK": str})
income_statements = pd.concat([first_income_statements, missing_income_statements], ignore_index=True)
income_statements.head()

Unnamed: 0,Date,STD Income Statement All,FCC Item Name,RIC,statement,Statement
0,2011-09-30,,,AA.N,income_statement,
1,2012-09-30,,,AA.N,income_statement,
2,2013-09-30,,,AA.N,income_statement,
3,2014-09-30,,,AA.N,income_statement,
4,2016-06-30,2323000000.0,TR.F.RevGoodsSrvc,AA.N,income_statement,


In [23]:
# Drop observations, where both STD Income Statement All and FCC Item Name is NaN
income_statements = income_statements.dropna(subset=["STD Income Statement All", "FCC Item Name"], how="all")
# Get rid of Statement column, which is result of wrong spelling
income_statements = income_statements.drop(columns = "Statement")

In [17]:
# Unique positions in income statements
unique_income_positions = income_statements["FCC Item Name"].unique()

- One last time, Gemini is employed to map the FCC Item Name Codes to more official labels used in Cash Flow statements

In [18]:
income_statement_labels = client.models.generate_content(
    model="gemini-2.5-flash",
    config=types.GenerateContentConfig(
        temperature=0.1,
        system_instruction="You are a financial analyst, that provides concise and accurate answers.",
        thinking_config=types.ThinkingConfig(thinking_budget=0),# Disables thinking
    ),
    contents=[f"""The following is a list of abbreviations used by Thomsom Reuters in their reporting database. For every given abbreviation,
              please provide the actual financial position this refers to in an income statement. Example: TR.F.TotAssets: Total Assets.
              Here are the abbreviations: {unique_income_positions.tolist()} Provide your answer in a dictionary style format.
              """],  
)

In [None]:
# Regex to extract JSON from the response
# First I extract the JSON string from the response text
json_str = re.search(r'{.*}', income_statement_labels.text, re.DOTALL).group()
# Then I parse the JSON string into a Python dictionary
data = json.loads(json_str)
# Save the dictionary to a file
with open("../data/income_statement_labels.json", "w") as f:   
    json.dump(data, f, indent=4)

In [24]:
# Map dictionary to cash flow statements
income_statements["position_label"] = income_statements["FCC Item Name"].map(data)
income_statements.head()

Unnamed: 0,Date,STD Income Statement All,FCC Item Name,RIC,statement,position_label
4,2016-06-30,2323000000.0,TR.F.RevGoodsSrvc,AA.N,income_statement,Revenue from Goods and Services
5,2016-06-30,2323000000.0,TR.F.SalesOfGoodsSrvcNetUnclassif,AA.N,income_statement,"Sales of Goods and Services, Net, Unclassified"
6,2016-06-30,2323000000.0,TR.F.TotRevenue,AA.N,income_statement,Total Revenue
7,2016-06-30,2119000000.0,TR.F.CostOfOpRev,AA.N,income_statement,Cost of Operating Revenue
8,2016-06-30,2119000000.0,TR.F.COGSTot,AA.N,income_statement,"Cost of Goods Sold, Total"


In [25]:
# Merge on RIC to include CIK and RIC in the cash flow statements
income_statements = income_statements.merge(rics, left_on="RIC", right_on="RIC", how="left")
income_statements.head(10)

Unnamed: 0,Date,STD Income Statement All,FCC Item Name,RIC,statement,position_label,CIK
0,2016-06-30,2323000000.0,TR.F.RevGoodsSrvc,AA.N,income_statement,Revenue from Goods and Services,1675149
1,2016-06-30,2323000000.0,TR.F.SalesOfGoodsSrvcNetUnclassif,AA.N,income_statement,"Sales of Goods and Services, Net, Unclassified",1675149
2,2016-06-30,2323000000.0,TR.F.TotRevenue,AA.N,income_statement,Total Revenue,1675149
3,2016-06-30,2119000000.0,TR.F.CostOfOpRev,AA.N,income_statement,Cost of Operating Revenue,1675149
4,2016-06-30,2119000000.0,TR.F.COGSTot,AA.N,income_statement,"Cost of Goods Sold, Total",1675149
5,2016-06-30,1941000000.0,TR.F.COGSUnclassif,AA.N,income_statement,"Cost of Goods Sold, Unclassified",1675149
6,2016-06-30,178000000.0,TR.F.DeprInCOGS,AA.N,income_statement,Depreciation in Cost of Goods Sold,1675149
7,2016-06-30,204000000.0,TR.F.GrossProfIndPropTot,AA.N,income_statement,"Gross Profit, Industrial Property, Total",1675149
8,2016-06-30,97000000.0,TR.F.SGATot,AA.N,income_statement,"Selling, General and Administrative Expenses, ...",1675149
9,2016-06-30,90000000.0,TR.F.SGAUnclassif,AA.N,income_statement,"Selling, General and Administrative Expenses, ...",1675149


In [None]:
# Save to CSV
income_statements.to_csv("../data/income_statements_with_labels.csv", index=False)

---

- Function to fetch most recent company filings for a given CIK and Date

In [None]:
# Load analyst ratings to contrast with LLM recommendations
analyst_ratings = pd.read_csv("../data/sp1500_sell_side_recommendations.csv", dtype={"CIK": str})

In [4]:
labeled_balance_sheets = pd.read_csv("../data/balance_sheets_with_labels.csv", dtype={"CIK": str})
labeled_income_statements = pd.read_csv("../data/income_statements_with_labels.csv", dtype={"CIK": str})
labeled_cash_flow_statements = pd.read_csv("../data/cash_flow_statements_with_labels.csv", dtype={"CIK": str})

  labeled_income_statements = pd.read_csv("../data/income_statements_with_labels.csv", dtype={"CIK": str})
  labeled_cash_flow_statements = pd.read_csv("../data/cash_flow_statements_with_labels.csv", dtype={"CIK": str})


In [5]:
def get_llm_ratings(cik: str, balance_sheets = None, income_statements = None, cash_flow_statements = None):
    
    """
    Function that returns a DataFrame with LLM ratings for a given CIK.
    For every reporting date, the function fetches the most recent financial statements, i.e.
    - Balance Sheet
    - Cash Flow Statement
    - Income Statement
    and calls the LLM to get a buy/sell/hold recommendation. In order to avoid issues with reports that were filed slightly apart,
    a window of 10 days around a given reporting date is used. This helps to ensure that the LLM has access to all relevant financial information for a given reporting date.
    Furthermore, reports that were filed slightly apart will not lead to recommendations that are based on partial information only and will also not cause multiple
    recommendations that only lie within the window of 10 days around a given reporting date.
    
    Parameters: 
    cik: str, CIK of the company (Can be looked up on the SEC website)
    balance_sheets: Balance Sheets DataFrame with columns: STD Balance Sheet All, FCC Item Name, CIK and Date (among others)
    income_statements: Income Statements DataFrame with columns: STD Income Statement All, FCC Item Name, CIK and Date (among others)
    cash_flow_statements: Cash Flow Statements DataFrame with columns: STD Cash Flow All, FCC Item Name, CIK and Date (among others)
    """
    
    # First filter dfs for input CIK
    balance_sheets = balance_sheets[balance_sheets["CIK"] == cik].copy()
    income_statements = income_statements[income_statements["CIK"] == cik].copy()
    cash_flow_statements = cash_flow_statements[cash_flow_statements["CIK"] == cik].copy()
    
    # Convert the date columns to datetime objects
    for df in [balance_sheets, income_statements, cash_flow_statements]:
        df["Report Date"] = pd.to_datetime(df["Date"])

    # Determine unique dates
    reporting_dates = pd.concat([
        balance_sheets["Report Date"],
        income_statements["Report Date"],
        cash_flow_statements["Report Date"]
    ]).unique()

    # Sort dates just to be safe
    reporting_dates = np.sort(reporting_dates)

    # In order to handle reports, that were filed slighty apart, a window of 10 days around a given reporting date is used
    window = pd.Timedelta(days=10)

    # Loop over reporting dates to obtain LLM ratings
    llm_ratings = []
    for date in reporting_dates:

        # Subset all financial statements for the given date +- window days
        bs = balance_sheets[(balance_sheets["Report Date"] >= date - window) & (balance_sheets["Report Date"] <= date + window)]
        is_ = income_statements[(income_statements["Report Date"] >= date - window) & (income_statements["Report Date"] <= date + window)]
        cf = cash_flow_statements[(cash_flow_statements["Report Date"] >= date - window) & (cash_flow_statements["Report Date"] <= date + window)]
    
        # If no reports are available for the given date, skip to next date
        if bs.empty or is_.empty or cf.empty:
            continue
        
        # Concatenate reports into a string with correct labels
        bs_str = "\n".join(bs.apply(lambda row: f"{row['position_label']}: {row['STD Balance Sheet All']}", axis=1).astype(str))
        is_str = "\n".join(is_.apply(lambda row: f"{row['position_label']}: {row['STD Income Statement All']}", axis=1).astype(str))
        cf_str = "\n".join(cf.apply(lambda row: f"{row['position_label']}: {row['STD Cash Flow All']}", axis=1).astype(str))

        # Call the LLM to get the rating
        response = client.models.generate_content(
            model="gemini-2.5-flash", # "gemini-2.5-flash"
            config=types.GenerateContentConfig(
                temperature=0, # Deterministic ouput
                system_instruction="""You are an experienced, data-driven financial analyst, that provides concise and accurate answers.""",
                
                thinking_config=types.ThinkingConfig(thinking_budget=0),# Disables thinking, but only required for Gemini 2.5
            ),
            
            contents=[f"""
            Based on the following financial reports only, please provide an investment recommendation for the underlying company.
                      
            Balance Sheet: 
            {bs_str}

            Income Statement: 
            {is_str}

            Cash Flow Statement: 
            {cf_str}

            Provide your answer using only one of the following signals: 'strong buy', 'buy', 'hold', 'sell', or 'strong sell'.
            """]
        )

        # Extract rating from the response
        rating = response.text.strip().lower()
        llm_ratings.append({
            "CIK": cik,
            "Report Date": date,
            "Rating": rating
        })
    
    # Convert the list of dictionaries to a DataFrame
    llm_ratings_df = pd.DataFrame(llm_ratings)

    # Convert Report Date to datetime
    llm_ratings_df["Report Date"] = pd.to_datetime(llm_ratings_df["Report Date"]).dt.date

    # Sort by Report Date
    llm_ratings_df.sort_values(by="Report Date", inplace=True)

    # Reset index
    llm_ratings_df.reset_index(drop=True, inplace=True)
    
    return llm_ratings_df

- Finally, I can use this function to loop over all company CIKs 

In [6]:
# First determine unique CIKs
ciks = set(labeled_balance_sheets["CIK"].unique()) | set(labeled_income_statements["CIK"].unique()) | set(labeled_cash_flow_statements["CIK"].unique())

# Convert to DataFrame to make slicing into equal parts easier
ciks = pd.DataFrame(ciks, columns=["CIK"])

In [None]:
# To avoid hitting rate limits, internet connection issues, or other problems, I will only partly loop over the CIKs and concatenate the results later
# Split into 10 equal parts
fraction = (len(ciks) // 10)
ciks1 = ciks[:fraction]
ciks2 = ciks[fraction:2*fraction]
ciks3 = ciks[2*fraction:3*fraction]
ciks4 = ciks[3*fraction:4*fraction]
ciks5 = ciks[4*fraction:5*fraction]
ciks6 = ciks[5*fraction:6*fraction]
ciks7 = ciks[6*fraction:7*fraction]
ciks8 = ciks[7*fraction:8*fraction]
ciks9 = ciks[8*fraction:9*fraction]
ciks10 = ciks[9*fraction:]

# Check if all CIKs are included
len(ciks) == len(ciks1) + len(ciks2) + len(ciks3) + len(ciks4) + len(ciks5) + len(ciks6) + len(ciks7) + len(ciks8) + len(ciks9) + len(ciks10)

True

---
Testing the function

In [7]:
ciks1 = ciks[:100] # Example sublist for testing, can be adjusted as needed
ciks2 = ciks[100:200] # Example sublist for testing, can be adjusted as needed
ciks3 = ciks[200:300] # Example sublist for testing, can be adjusted as needed
ciks4 = ciks[300:400] # Example sublist for testing, can be adjusted as needed
ciks5 = ciks[400:500] # Example sublist for testing, can
ciks6 = ciks[500:600] # Example sublist for testing, can be adjusted as needed
ciks7 = ciks[600:700] # Example sublist for testing, can
ciks8 = ciks[700:800] # Example sublist for testing, can be adjusted as needed
ciks9 = ciks[800:900] # Example sublist for testing, can be adjusted as needed
ciks10 = ciks[900:1000] # Example sublist for testing, can be adjusted as needed
ciks11 = ciks[1000:1100] # Example sublist for testing, can be adjusted as needed
ciks12 = ciks[1100:1200] # Example sublist for testing, can be adjusted as needed
ciks13 = ciks[1200:1300] # Example sublist for testing, can be adjusted as needed
ciks14 = ciks[1300:1400] # Example sublist for testing, can be adjusted as needed
ciks15 = ciks[1400:1500] # Example sublist for testing

In [None]:
ciks1_ratings = []
progress_bar = tqdm(ciks1["CIK"], desc="Processing CIKs")

for i, cik in enumerate(progress_bar):
    # Progress message
    progress_bar.set_description(f"Processing CIK {i+1}/{len(ciks1)}: {cik} | Time: {pd.Timestamp.now().strftime('%H:%M:%S')}")
    # Get LLM ratings for the given CIK
    ratings = get_llm_ratings(cik, labeled_balance_sheets, labeled_income_statements, labeled_cash_flow_statements)
    ciks1_ratings.append(ratings)
# Concatenate all ratings into a single DataFrame
ciks_test_ratings_df = pd.concat(ciks1_ratings, ignore_index=True)
# Save to CSV
ciks_test_ratings_df.to_csv("data/ciks1_ratings.csv", index=False)

Processing CIK 100/100: 0001396009 | Time: 18:19:23: 100%|██████████| 100/100 [1:14:11<00:00, 44.52s/it]


In [17]:
ciks2_ratings = []
progress_bar = tqdm(ciks2["CIK"], desc="Processing CIKs")

for i, cik in enumerate(progress_bar):
    # Progress message
    progress_bar.set_description(f"Processing CIK {i+1}/{len(ciks2)}: {cik} | Time: {pd.Timestamp.now().strftime('%H:%M:%S')}")
    # Get LLM ratings for the given CIK
    ratings = get_llm_ratings(cik, labeled_balance_sheets, labeled_income_statements, labeled_cash_flow_statements)
    ciks2_ratings.append(ratings)
# Concatenate all ratings into a single DataFrame
ciks2_ratings_df = pd.concat(ciks2_ratings, ignore_index=True)
# Save to CSV
ciks2_ratings_df.to_csv("data/ciks2_ratings.csv", index=False)

Processing CIK 100/100: 0001096752 | Time: 12:11:31: 100%|██████████| 100/100 [1:03:30<00:00, 38.10s/it]


In [None]:
ciks3_ratings = []
progress_bar = tqdm(ciks3["CIK"], desc="Processing CIKs")

for i, cik in enumerate(progress_bar):
    # Progress message
    progress_bar.set_description(f"Processing CIK {i+1}/{len(ciks3)}: {cik} | Time: {pd.Timestamp.now().strftime('%H:%M:%S')}")
    # Get LLM ratings for the given CIK
    ratings = get_llm_ratings(cik, labeled_balance_sheets, labeled_income_statements, labeled_cash_flow_statements)
    ciks3_ratings.append(ratings)
# Concatenate all ratings into a single DataFrame
ciks3_ratings_df = pd.concat(ciks3_ratings, ignore_index=True)
# Save to CSV
ciks3_ratings_df.to_csv("../data/ciks3_ratings.csv", index=False)

In [8]:
ciks4_ratings = []
progress_bar = tqdm(ciks4["CIK"], desc="Processing CIKs")

for i, cik in enumerate(progress_bar):
    # Progress message
    progress_bar.set_description(f"Processing CIK {i+1}/{len(ciks4)}: {cik} | Time: {pd.Timestamp.now().strftime('%H:%M:%S')}")
    # Get LLM ratings for the given CIK
    ratings = get_llm_ratings(cik, labeled_balance_sheets, labeled_income_statements, labeled_cash_flow_statements)
    ciks4_ratings.append(ratings)
# Concatenate all ratings into a single DataFrame
ciks4_ratings_df = pd.concat(ciks4_ratings, ignore_index=True)
# Save to CSV
ciks4_ratings_df.to_csv("../data/ciks4_ratings.csv", index=False)

Processing CIK 100/100: 0000890926 | Time: 17:44:45: 100%|██████████| 100/100 [1:01:25<00:00, 36.85s/it]


In [9]:
ciks4_ratings_df.sort_values(by="Report Date", inplace=True)
ciks4_ratings_df["Rating"].value_counts()

Rating
sell           3719
hold           3452
buy             683
strong sell      60
sell.            20
hold.             9
strong buy        3
Name: count, dtype: int64

In [None]:
Rating
hold           3407
sell           3311
buy             763
strong sell      72
sell.            21
hold.             9
strong buy        2
Name: count, dtype: int64

Major differences between the LLM recommendations and sell-side analyst recommendations can be observed. 
The LLM appears to be more cautious in its ouput, signaling buy only 5 times, where as the analysts recommended buy over 200 times.

It is important to note, that the analyst recommendations are available on a monthly basis, leading to larger absolute numbers. However,
even relatively speaking, the share of buy signals in sell-side analyst answers is very large.


---


# Batched API requests

Since the number of individual requests that would be sent using the previously written function would exceed the daily request limit by far, batch mode will be used.

In [9]:
def create_llm_requests(cik_list, labeled_balance_sheets, labeled_income_statements, labeled_cash_flow_statements):
    requests_data = []
    for i, cik in enumerate(cik_list["CIK"]):
        # Progress message
        print(f"Processing CIK {cik} ({i+1}/{len(cik_list)})")
        
        # First filter dfs for input CIK
        balance_sheets = labeled_balance_sheets[labeled_balance_sheets["CIK"] == cik].copy()
        income_statements = labeled_income_statements[labeled_income_statements["CIK"] == cik].copy()
        cash_flow_statements = labeled_cash_flow_statements[labeled_cash_flow_statements["CIK"] == cik].copy()
        
        # Convert the date columns to datetime objects
        for df in [balance_sheets, income_statements, cash_flow_statements]:
            df["Report Date"] = pd.to_datetime(df["Date"])

        # Determine unique dates
        reporting_dates = pd.concat([
            balance_sheets["Report Date"],
            income_statements["Report Date"],
            cash_flow_statements["Report Date"]
        ]).unique()

        # Sort dates just to be safe
        reporting_dates = np.sort(reporting_dates)

        # In order to handle reports, that were filed slighty apart, a window of 10 days around a given reporting date is used
        window = pd.Timedelta(days=10)

        # Loop over reporting dates to obtain LLM ratings
        for date in reporting_dates:

            # Subset all financial statements for the given date +- window days
            bs = balance_sheets[(balance_sheets["Report Date"] >= date - window) & (balance_sheets["Report Date"] <= date + window)]
            is_ = income_statements[(income_statements["Report Date"] >= date - window) & (income_statements["Report Date"] <= date + window)]
            cf = cash_flow_statements[(cash_flow_statements["Report Date"] >= date - window) & (cash_flow_statements["Report Date"] <= date + window)]
        
            # If no reports are available for the given date, skip to next date
            if bs.empty or is_.empty or cf.empty:
                continue
            
            # Concatenate reports into a string with correct labels
            bs_str = "\n".join(bs.apply(lambda row: f"{row['position_label']}: {row['STD Balance Sheet All']}", axis=1).astype(str))
            is_str = "\n".join(is_.apply(lambda row: f"{row['position_label']}: {row['STD Income Statement All']}", axis=1).astype(str))
            cf_str = "\n".join(cf.apply(lambda row: f"{row['position_label']}: {row['STD Cash Flow All']}", axis=1).astype(str))

            # Construct the request for batch processing
            requests_data.append({
                "key": f"request_{i}_{cik}_{pd.Timestamp(date).strftime('%d_%m_%Y')}",
                "request": {
                    "contents": [{
                        "parts": [{
                            "text": f"""
                            Based on the following financial reports only, please provide an investment recommendation for the underlying company.

                            Balance Sheet: 
                            {bs_str}

                            Income Statement: 
                            {is_str}

                            Cash Flow Statement: 
                            {cf_str}

                            Provide your answer using only one of the following signals: 'strong buy', 'buy', 'hold', 'sell', or 'strong sell'.
                            """.strip()  # strip removes leading and trailing whitespace
                        }]
                    }],
                    "generation_config": {"temperature": 0,
                                          "thinking_config":{"thinking_budget": 0}
                    },
                    "system_instruction": {
                        "parts": [{"text": "You are an experienced, data-driven financial analyst, that provides concise and accurate answers."}]
                    }
                }
            })

                     

                    

                      # Disables thinking, but only required for Gemini 2.5
            
    return requests_data


In [10]:
cik_sublist = ciks1[:100]
ciks1_requests = create_llm_requests(cik_sublist, labeled_balance_sheets, labeled_income_statements, labeled_cash_flow_statements)

Processing CIK 0001996862 (1/100)
Processing CIK 0001049502 (2/100)
Processing CIK 0001071739 (3/100)
Processing CIK 0001164863 (4/100)
Processing CIK 0000717605 (5/100)
Processing CIK 0000008947 (6/100)
Processing CIK 0001341439 (7/100)
Processing CIK 0001955520 (8/100)
Processing CIK 0000775158 (9/100)
Processing CIK 0001166003 (10/100)
Processing CIK 0001360901 (11/100)
Processing CIK 0000822416 (12/100)
Processing CIK 0000066740 (13/100)
Processing CIK 0000089089 (14/100)
Processing CIK 0000889331 (15/100)
Processing CIK 0000876427 (16/100)
Processing CIK 0000104894 (17/100)
Processing CIK 0000788784 (18/100)
Processing CIK 0000038777 (19/100)
Processing CIK 0000805676 (20/100)
Processing CIK 0000089439 (21/100)
Processing CIK 0001025378 (22/100)
Processing CIK 0001532961 (23/100)
Processing CIK 0001845815 (24/100)
Processing CIK 0000913142 (25/100)
Processing CIK 0001569650 (26/100)
Processing CIK 0000882184 (27/100)
Processing CIK 0000106640 (28/100)
Processing CIK 0001095565 (29

In [81]:
# Create a sample JSONL file
with open("my-batch-requests.jsonl", "w") as f:
    for req in ciks1_requests:
        f.write(json.dumps(req) + "\n")


In [82]:
# Read in file to check if it was created correctly
with open("my-batch-requests.jsonl", "r") as f:
    requests = [json.loads(line) for line in f]

In [83]:
uploaded_file = client.files.upload(
    file='my-batch-requests.jsonl',
    config=types.UploadFileConfig(display_name='my-batch-requests', mime_type='jsonl')
)

In [84]:
print(f"Uploaded file: {uploaded_file.name}")

Uploaded file: files/14jmhiosat1d


In [85]:
# Assumes `uploaded_file` is the file object from the previous step
file_batch_job = client.batches.create(
    model="gemini-2.5-flash",
    src=uploaded_file.name,
    config={
        'display_name': "file-upload-job-1",
    },
)

print(f"Created batch job: {file_batch_job.name}")


ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits.', 'status': 'RESOURCE_EXHAUSTED', 'details': [{'@type': 'type.googleapis.com/google.rpc.Help', 'links': [{'description': 'Learn more about Gemini API quotas', 'url': 'https://ai.google.dev/gemini-api/docs/rate-limits'}]}]}}

In [66]:
# Use the name of the job you want to check
# e.g., inline_batch_job.name from the previous step
job_name = file_batch_job.name  # (e.g. 'batches/your-batch-id')
batch_job = client.batches.get(name=job_name)

completed_states = set([
    'JOB_STATE_SUCCEEDED',
    'JOB_STATE_FAILED',
    'JOB_STATE_CANCELLED',
])

print(f"Polling status for job: {job_name}")
batch_job = client.batches.get(name=job_name) # Initial get
while batch_job.state.name not in completed_states:
  print(f"Current state: {batch_job.state.name}")
  time.sleep(15) # Wait for 30 seconds before polling again
  batch_job = client.batches.get(name=job_name)

print(f"Job finished with state: {batch_job.state.name}")
if batch_job.state.name == 'JOB_STATE_FAILED':
    print(f"Error: {batch_job.error}")


Polling status for job: batches/omc1mqgq4vbwmhbe01uzo2oxn09ai4rlc5ok
Job finished with state: JOB_STATE_SUCCEEDED


In [67]:
import json

# Use the name of the job you want to check
# e.g., inline_batch_job.name from the previous step
job_name = file_batch_job.name
batch_job = client.batches.get(name=job_name)

if batch_job.state.name == 'JOB_STATE_SUCCEEDED':

    # If batch job was created with a file
    if batch_job.dest and batch_job.dest.file_name:
        # Results are in a file
        result_file_name = batch_job.dest.file_name
        print(f"Results are in file: {result_file_name}")

        print("Downloading result file content...")
        file_content = client.files.download(file=result_file_name)
        # Process file_content (bytes) as needed
        print(file_content.decode('utf-8'))

    # If batch job was created with inline request
    elif batch_job.dest and batch_job.dest.inlined_responses:
        # Results are inline
        print("Results are inline:")
        for i, inline_response in enumerate(batch_job.dest.inlined_responses):
            print(f"Response {i+1}:")
            if inline_response.response:
                # Accessing response, structure may vary.
                try:
                    print(inline_response.response.text)
                except AttributeError:
                    print(inline_response.response) # Fallback
            elif inline_response.error:
                print(f"Error: {inline_response.error}")
    else:
        print("No results found (neither file nor inline).")
else:
    print(f"Job did not succeed. Final state: {batch_job.state.name}")
    if batch_job.error:
        print(f"Error: {batch_job.error}")


Results are in file: files/batch-omc1mqgq4vbwmhbe01uzo2oxn09ai4rlc5ok
Downloading result file content...
{"response":{"candidates":[{"content":{"parts":[{"text":"Hold"}],"role":"model"},"index":0,"finishReason":"STOP"}],"usageMetadata":{"candidatesTokenCount":1,"totalTokenCount":3960,"promptTokensDetails":[{"modality":"TEXT","tokenCount":3959}],"promptTokenCount":3959},"responseId":"6eN4aIvLOvPB1MkPq4XgyQE","modelVersion":"gemini-2.5-flash"},"key":"request_0_0001295401_30_06_2004"}
{"response":{"candidates":[{"finishReason":"STOP","content":{"parts":[{"text":"Hold"}],"role":"model"},"index":0}],"responseId":"6eN4aOadJO-n1MkPs6OcsAc","usageMetadata":{"promptTokensDetails":[{"modality":"TEXT","tokenCount":4096}],"candidatesTokenCount":1,"totalTokenCount":4097,"promptTokenCount":4096},"modelVersion":"gemini-2.5-flash"},"key":"request_0_0001295401_30_09_2004"}



check differences between temperature and thinking budget, but parameters seem to be working

----


Testing