In [None]:
!pip install sec_api

In [None]:
# -*- coding: utf-8 -*-
"""
SEC Filing Notes Parser
This script extracts notes 2, 3, 4, and the last note from SEC filings and processes their readability.
"""

import warnings
from sec_api import ExtractorApi
from bs4 import BeautifulSoup
import re
import os
from datetime import datetime
import hashlib
import asyncio
from typing import List, Tuple, Dict
from openai import AsyncOpenAI

async def asynchronous_llm(
    messages,
    temperature=0.7,
    model=None,
    provider="openai",
    verbose=False,
    max_tokens=2048,
    **kwargs,
):
    """
    Asynchronous function to interact with different LLM providers.

    Returns:
    - LLM response (either full response object or content, depending on 'verbose')
    """

    DEFAULT_MODELS = {
        "openai": "gpt-4o-mini",
    }

    if model is None:
        model = DEFAULT_MODELS.get(provider, DEFAULT_MODELS["openai"])

    if provider == "openai":
        try:
            openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
            openai_object = {
                "model": model,
                "messages": messages,
                "temperature": temperature,
                "max_tokens": max_tokens,
            }

            openai_object.update(kwargs)

            response = await openai_client.chat.completions.create(**openai_object)

            if not verbose:
                return response.choices[0].message.content
            return response

        except Exception as e:
            print(f"Error: {e}")
            return None

# Placeholder for the async function you'll implement
async def get_readability_score(text: str) -> float:
    """Placeholder for the async function that will calculate readability scores"""
    # To be implemented
    system_prompt = """Imagine you're an average individual investor. Please read the following excerpts from 10-K filings and rate each one on a scale from 1 to 5 based on how easy it is to understand.

1 = Very easy to understand
5 = Very difficult to understand or process

Do not output anything else, just the score."""

    user_prompt = f"""note: {text}"""

    chat = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]

    res = await asynchronous_llm(
        model="gpt-4o-mini",
        messages=chat,
        temperature=0.2
    )

    return res

def get_cache_filename(url):
    """Generate a cache filename from URL"""
    # Create a hash of the URL to use as filename
    url_hash = hashlib.md5(url.encode()).hexdigest()
    return f"cache_{url_hash}.html"

def get_html_content(url, section_num, api_key, cache_dir="html_cache"):
    """Get HTML content either from cache or SEC API"""
    # Create cache directory if it doesn't exist
    os.makedirs(cache_dir, exist_ok=True)

    cache_file = os.path.join(cache_dir, get_cache_filename(url))

    # Try to read from cache first
    if os.path.exists(cache_file):
        print(f"Reading from cache: {cache_file}")
        with open(cache_file, 'r', encoding='utf-8') as f:
            return f.read()

    # If not in cache, fetch from API
    print(f"Fetching from SEC API: {url}")
    extractorApi = ExtractorApi(api_key)
    html = extractorApi.get_section(url, str(section_num), "html")

    # Save to cache if content was received
    if html:
        print(f"Saving to cache: {cache_file}")
        with open(cache_file, 'w', encoding='utf-8') as f:
            f.write(html)

    return html

def clean_text(text):
    """Clean text by removing extra whitespace and normalizing spaces"""
    if not text:
        return ""
    text = re.sub(r'\s+', ' ', text.strip())
    return text

def table_to_markdown(table):
    """Convert HTML table to markdown format"""
    rows = table.find_all('tr')
    if not rows:
        return ""

    markdown = []

    # Process header row
    header = rows[0]
    header_cols = []
    for th in header.find_all(['th', 'td']):
        text = clean_text(th.get_text())
        header_cols.append(text if text else " ")

    if header_cols:
        markdown.append("| " + " | ".join(header_cols) + " |")
        markdown.append("|-" * len(header_cols) + "|")

    # Process data rows
    for row in rows[1:]:
        cols = []
        for td in row.find_all(['td', 'th']):
            text = clean_text(td.get_text())
            cols.append(text if text else " ")
        if cols:
            markdown.append("| " + " | ".join(cols) + " |")

    return "\n".join(markdown) + "\n\n"

def extract_notes(html_content) -> List[Tuple[str, str]]:
    """Extract notes 2, 3, 4, and the last note from the HTML content"""
    soup = BeautifulSoup(html_content, 'html.parser')

    all_notes = []
    current_note = None
    current_content = []

    # Look for note headers and their content
    for element in soup.find_all(['div', 'p', 'table']):
        text = clean_text(element.get_text())

        # Check if this is a note header (handle both number and letter formats)
        note_match = re.match(r'Note\s+(\d+|\w+)\s*[–-]\s*(.+)', text, re.IGNORECASE)
        if note_match:
            # If we were processing a previous note, save it
            if current_note and current_content:
                all_notes.append((current_note, ''.join(current_content)))

            # Start new note
            current_note = text
            current_content = []
            continue

        # If we're in a note, collect its content
        if current_note:
            if element.name == 'table':
                # Convert table to markdown
                table_md = table_to_markdown(element)
                if table_md:
                    current_content.append(table_md)
            else:
                if text:
                    current_content.append(text + "\n\n")

    # Don't forget to add the last note
    if current_note and current_content:
        all_notes.append((current_note, ''.join(current_content)))

    # Filter for notes 2, 3, 4, and the last note
    selected_notes = []
    for i, (title, content) in enumerate(all_notes):
        note_num_match = re.match(r'Note\s+(\d+)', title)
        if note_num_match:
            note_num = int(note_num_match.group(1))
            if note_num in [2, 3, 4] or i == len(all_notes) - 1:
                selected_notes.append((title, content))

    return selected_notes

async def process_notes_with_scores(notes: List[Tuple[str, str]]) -> List[Tuple[str, str, float]]:
    """Process all notes in parallel and get their readability scores"""
    tasks = []
    for title, content in notes:
        task = asyncio.create_task(get_readability_score(content))
        tasks.append((title, content, task))

    # Wait for all scoring tasks to complete
    results = []
    for title, content, task in tasks:
        score = await task
        results.append((title, content, score))

    return results

async def process_filing(url: str, section_num: str, api_key: str) -> List[Tuple[str, str, float]]:
    """Process a specific filing and return notes with their readability scores"""
    try:
        # Get HTML content (either from cache or API)
        html = get_html_content(url, section_num, api_key)

        if not html:
            raise ValueError(f"No content found in section {section_num}")

        # Extract notes from HTML
        notes = extract_notes(html)

        if not notes:
            print(f"Warning: No notes found in section {section_num}")
            return []

        print(f"Found {len(notes)} notes to process")
        for title, _ in notes:
            print(f"  - {title}")

        # Process notes and get readability scores
        return await process_notes_with_scores(notes)

    except Exception as e:
        print(f"Error processing filing: {str(e)}")
        return []

def save_results(company: str, results: List[Tuple[str, str, float]], output_dir: str = "results"):
    """Save results to a structured text file"""
    os.makedirs(output_dir, exist_ok=True)
    output_file = os.path.join(output_dir, f"{company}_results.txt")

    with open(output_file, 'w', encoding='utf-8') as f:
        for title, content, score in results:
            # Write in CSV-like format: note_title, content_preview, score
            content_preview = content[:200].replace('\n', ' ').replace(',', ' ')  # Basic content preview
            f.write(f"{title},{content_preview}...,{score}\n")



ModuleNotFoundError: No module named 'sec_api'

In [None]:
async def main():
    # SEC API Key
    SEC_API_KEY = "_"

    # List of SEC filings to process
    filings = [
        # Apple Inc. 10-K 2018
        "https://www.sec.gov/Archives/edgar/data/320193/000032019318000145/a10-k20189292018.htm",
        # Microsoft 10-K 2023
        "https://www.sec.gov/Archives/edgar/data/789019/000156459023029823/msft-10k_20230630.htm",
        # Tesla 10-K 2023
        "https://www.sec.gov/Archives/edgar/data/1318605/000095017023001409/tsla-20221231.htm"
    ]

    # Process all filings in parallel
    tasks = []
    for url in filings:
        company = url.split('/')[-1].split('-')[0].split('.')[0]
        print(f"\nProcessing {company}...")

        task = asyncio.create_task(process_filing(url, "8", SEC_API_KEY))
        tasks.append((company, task))

    # Wait for all filings to be processed
    for company, task in tasks:
        try:
            results = await task
            if results:
                save_results(company, results)
                print(f"Results saved for {company}")
        except Exception as e:
            print(f"Error processing {company}: {str(e)}")

if __name__ == "__main__":
    asyncio.run(main())