In [None]:
# @title Install libraries

!pip install -q sec_edgar_downloader==5.0.3 faiss-cpu==1.11.0.post1 langchain==0.3.27 langchain-community==0.3.27 langchain-cohere==0.4.4 cohere==5.15.0

In [3]:
# @title Imports

import os
import re
import time
import json
import shutil
import logging
import requests
import numpy as np
import pandas as pd
from google.colab import userdata
from IPython.display import display
from sec_edgar_downloader import Downloader
from concurrent.futures import ThreadPoolExecutor, as_completed

# LangChain & LangChain Community
import faiss
from langchain.chains import RetrievalQA
from langchain_core.documents import Document
from langchain_community.vectorstores import FAISS

# Cohere integrations
from langchain_cohere import ChatCohere
from langchain_cohere import CohereEmbeddings

In [4]:
# @title Set-up variables and files for notebook

# Set up header for downloading SEC data
headers = {"User-Agent": "YourName YourEmail@example.com"}

# Clear existing handler for smooth log file creation
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Set up logging configuration for excel file downloads
log_file = '/content/download_excels.log'

logging.basicConfig(
    filename=log_file,
    filemode='a',
    level=logging.INFO,
    format='%(asctime)s - %(message)s'
)

if not os.path.exists(log_file):
    print("Log file creation failed!")

In [5]:
# @title Get accession numbers for downloading files and download files -- fetch_accession_numbers(cik) & get_excel(acc)

def fetch_accession_numbers(cik):
    try:
        url = f"https://data.sec.gov/submissions/CIK{cik}.json"
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        data = response.json()
    except:
        return None

    filings = data.get('filings', {}).get('recent', {})
    acc_numbers = filings.get('accessionNumber', [])
    form_types = filings.get('form', [])

    return sorted((acc for acc, form in zip(acc_numbers, form_types) if form == "10-K"))


def get_excel(company, cik, accession_num):

    file_name = f"{company}_{accession_num}.xlsx"
    acc_clean = accession_num.replace("-", "")
    url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{acc_clean}/Financial_Report.xlsx"

    try:
        response = requests.get(url, headers=headers, timeout=10)

        if response.status_code == 200:
            with open(file_name, 'wb') as f:
                f.write(response.content)
            return ("success", accession_num)
        elif response.status_code == 404:
            return ("not_found", accession_num)
        else:
            return ("error", accession_num, f"Status code: {response.status_code}")

    except Exception as e:
        return ("error", accession_num, str(e))

In [6]:
# @title Download excel files using CIKs -- download_excels(company, cik)

def download_excels(company, cik):

    # Improve company name for better file handling
    company = re.sub(r'[^\w\-_]', '_', company).strip('_')

    # Fetch 10-K accession numbers from cik
    accession_numbers = fetch_accession_numbers(cik)
    if not accession_numbers:
        logging.error(f"{company}, {cik}, , Error, No 10-K accessions found")
        return

    # To keep track of failed and successful downloads
    failed_downloads, success_downloads = [], []

    for acc in accession_numbers:

        # To track number of tries in case of 429 error status
        attempt = 1

        # This loop tracks number of tries
        while attempt < 6:

            result = get_excel(company, cik, acc)
            status, acc_num = result[0], result[1]

            if status == "success":
                success_downloads.append([company, cik, acc_num, "success", ""])
                break

            elif status == "not_found":
                failed_downloads.append([company, cik, acc_num, "not_found", ""])
                break

            elif result[2] == "Status code: 429":
                print(f"TRY NUMBER: {attempt + 1} for {company.upper()}")

                time.sleep(75 * attempt)
                attempt += 1

                if attempt > 5:
                    failed_downloads.append([company, cik, acc_num, "denied", result[2]])

            else:
                failed_downloads.append([company, cik, acc_num, "error", result[2]])
                break

    # Log entries in the log file
    for entry in success_downloads:
        logging.info(f"{entry[0]}, {entry[1]}, {entry[2]}, {entry[3]}, {entry[4]}")

    for entry in failed_downloads:
        logging.error(f"{entry[0]}, {entry[1]}, {entry[2]}, {entry[3]}, {entry[4]}")

    print(f"{company} -- Successful downloads: {len(success_downloads)} | Failed downloads: {len(failed_downloads)}")


In [7]:
# @title Correct the values in dataframe according to share and usd multipliers -- process_df_values(df, share_multiplier, usd_multiplier)

def process_df_values(df: pd.DataFrame, share_multiplier: int, usd_multiplier: int) -> pd.DataFrame:
    pre_string = ''
    nan_index, text_index = [], []

    # Some common terms to be taken care of when converting dataframe values
    common_averages = ["in share", "average share", "average common share"]
    usd_terms = ["in usd", "in dollars", "income per share"]

    for index, row in df.iterrows():
        try:
            # Get row description for appropriate value conversion
            row_desc = row.loc['Description'].lower().rstrip(':')
        except:
            continue

        if row.isnull().all():
          pre_string = ''
          nan_index.append(index)

        elif row.iloc[2:].isnull().all():
          if index - 1 in nan_index:
            pre_string = pre_string + " | " + row_desc
          else:
            pre_string = row_desc
          nan_index.append(index)

        else:
            try:
                # If value can't be converted to float, then skip that row
                float(row.iloc[2])

                if pre_string:
                    df.iloc[index, 0] = pre_string

                for col in df.columns[2:]:
                    val = row[col]

                    if any(term in row_desc for term in common_averages) or any(term in pre_string for term in common_averages):
                        val = float(val) * share_multiplier
                        val = str(val) + " shares"
                    else:
                        if all(term not in row_desc and term not in pre_string for term in usd_terms):
                            val = float(val) * usd_multiplier
                            if abs(val) >= 1000000000:
                                val = str(val/1000000000) + " billion"
                            elif abs(val) >= 1000000:
                                val = str(val/1000000) + " million"

                        val = "USD " + str(val)

                    df.at[index, col] = val
            except:
                text_index.append(index)
                continue

    # Drop rows that contain text and not numbers
    if text_index:
      df = df.drop(text_index)

    # Drop empty rows
    df = df.dropna(subset=df.columns[1:])

    return df.reset_index(drop = True)


In [8]:
# @title Set column headings and find share and usd multipliers -- process_dataframe(df)

def process_dataframe(df: pd.DataFrame):

    # Identify multipliers from the first row of the sheet
    multiplier_identifier = df.columns[0].lower()

    share_multiplier = 1
    if 'shares in billion' in multiplier_identifier:
        share_multiplier = 1000000000
    elif 'shares in million' in multiplier_identifier:
        share_multiplier = 1000000
    elif 'shares in thousand' in multiplier_identifier:
        share_multiplier = 1000

    usd_multiplier = 1
    if '$ in billion' in multiplier_identifier:
        usd_multiplier = 1000000000
    elif '$ in million' in multiplier_identifier:
        usd_multiplier = 1000000
    elif '$ in thousand' in multiplier_identifier:
        usd_multiplier = 1000

    # Rename columns with None heading for proper column renaming
    df.columns = [f'new_column_{i}' if col is None else col for i, col in enumerate(df.columns)]

    # Replace empty dataframe values with NaN for better processing
    df = df.map(lambda x: np.nan if (x == '' or x is None or (isinstance(x, str) and x.strip() == "")) else x)

    # Delete columns with very less unique values as they do not contain good data
    low_unique_cols = [col for col in df.columns if df[col].nunique(dropna=True) < 5]
    df = df.drop(columns=low_unique_cols)

    # Rename columns with their respective years
    try:
        df = df.rename(columns = {df.columns[0]: "Description",
                                  df.columns[1]: int(df.iloc[0,1][-4:]),
                                  df.columns[2]: int(df.iloc[0,2][-4:]),
                                  df.columns[3]: int(df.iloc[0,3][-4:])})
    except:
        return None

    # Remove first row as its not required and add a new column 'Category' to dataframe
    df = df.iloc[1:].reset_index(drop=True)
    df.insert(0, 'Category', '')

    return process_df_values(df, share_multiplier, usd_multiplier)


In [9]:
# @title Convert excel data to dataframe -- excel_to_df(excel_file_path)

def excel_to_df(excel_file_path: str):

    # Description of sheet names that need to be read from excel files
    sheet_keywords = [
        'consolidated statements of oper',
        'consolidated statements of inco',
        'consolidated statement of incom',
        'income statements',
        'consolidated statements of earn',
        'consolidated income statement',
        'statement of earnings (loss)',
        'consolidated statement of earni',
        'statements of consolidated earn',
        'consolidated statements of comp',
        'statements of consolidated inco',
        'statement of consolidated opera',
        'statements of consolidated oper',
        'consolidated statement of opera',
        'statements_of_operations',
        'consolidated results of operati',
        'consolidated and combined state',
        'statement_of_income',
        'statement of income',
        'statements of operations'
        'income_statements'                 # For Ishares_gold_trust
    ]

    try:
        excel_content = pd.ExcelFile(excel_file_path, engine='openpyxl')

        # Find the first sheet name matching any keyword (case-insensitive)
        matched_sheet = next(
            (sheet for sheet in excel_content.sheet_names
             if any(keyword in sheet.lower() for keyword in sheet_keywords)),
            None
        )

        if not matched_sheet:
            return f"No matching sheet found in {excel_file_path}."

        df = pd.read_excel(excel_file_path, sheet_name=matched_sheet, engine='openpyxl')

    except Exception as e:
        return f"Could not read the excel file {excel_file_path}: {e}"

    return process_dataframe(df)

In [10]:
# @title Convert dataframe to langchain Documents -- df_to_document(df, company_name)

def df_to_document(df, company_name):
    documents = []

    # Convert each dataframe row to text for mebedding
    for _, row in df.iterrows():
      if row.iloc[0]:
        row_desc = f"For {company_name} {row.iloc[1]} in {row.iloc[0]} category for year"
      else:
        row_desc = f"For {company_name} {row.iloc[1]} for year"

      for column in df.columns[2:]:
        metadata = {"company": company_name, "year": column}
        documents.append(Document(page_content = f"{row_desc} {column} is {row[column]}.", metadata = metadata))

    return documents


In [11]:
# @title Remove duplicate years between dataframes to reduce redundancy -- sync_years(df, years)

def sync_years(df, years):
    drop_cols = []

    for column in df.columns[2:]:
        try:
          year = int(column)
        except:
          drop_cols.append(column)
          continue

        if year in years:
            drop_cols.append(column)
        else:
            years.append(year)

    return [df.drop(columns = drop_cols), years]


In [None]:
# @title MAIN

# Get tickers of SEC companies

response = requests.get("https://www.sec.gov/files/company_tickers.json", headers=headers)
response.raise_for_status()
tickers = tuple(response.json().values())

# Fetch required number companies for which data needs to be downloaded
number_of_companies = 500
sec_top_companies = tickers[:number_of_companies]

cohere_api_key = userdata.get('COHERE_API_KEY3')
embeddings = CohereEmbeddings(cohere_api_key=cohere_api_key, model="embed-english-v3.0")
vectorstore = FAISS.from_documents([Document(page_content="dummy_data")], embeddings)

file_success = "Success companies.txt"
file_failed = "Failed companies.txt"

docs = []
for index, ticker_desc in enumerate(sec_top_companies):
    download_excels(ticker_desc['title'], str(ticker_desc['cik_str']).zfill(10))

    # Get all files in the current directory
    files = os.listdir()

    years = []
    for file in files:
        if file.endswith('.xlsx') or file.endswith('.xls'):
            df = excel_to_df(excel_file_path = file)
            os.remove(file)

            # Check if df is indeed a dataframe
            if isinstance(df, pd.DataFrame):
                df, years = sync_years(df, years)
                docs.extend(df_to_document(df = df, company_name = ticker_desc['title']))

    # Save company name - total files in a file
    if years:
        with open(file_success, 'a') as f:
            f.write(f"{ticker_desc['title']} - {len(files)} - {sorted(years)}\n")
    else:
        with open(file_failed, 'a') as f:
            f.write(f"{ticker_desc['title']} - {len(files)}\n")

    # Embed docs for each 50 companies to reduce API requests
    if index % 50 == 0 and docs:
        vectorstore.add_documents(docs)
        docs = []

# LLM Chain Creation
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
llm = ChatCohere(cohere_api_key=cohere_api_key)
qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=retriever, return_source_documents=False)

In [None]:
# Save the vectorstore locally in Colab
vectorstore.save_local("faiss_index")

In [None]:
# @title Query to Cohere

retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
llm = ChatCohere(cohere_api_key=cohere_api_key)
qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=retriever, return_source_documents=False)

query = "Nvidia net income"
response = qa_chain.invoke({"query": query})

print("\nAnswer:")
print(response['result'])