Installation of necessary libraries

In [None]:
# pip install bertopic sec_edgar_downloader

Dataset creation

In [None]:
import os
from sec_edgar_downloader import Downloader
import shutil
import time


def make_output_folder(folder_name):
    # Make sure output folder exists and create if not
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
    return folder_name


def download_filings(downloader, tickers, limit=6):
    for ticker in tickers:
        try:
            downloader.get("10-K", ticker, limit=limit)
            time.sleep(1)
        except Exception as e:
            print(f"{ticker}: Failed to download. Error: {e}")


def collect_and_rename_files(tickers, output_folder):
    # Search through downnloaded folder structure to collect and rename 10-K filings
    for ticker in tickers:
        base = os.path.join("sec-edgar-filings", ticker, "10-K")
        if not os.path.exists(base):
            print(f"{ticker}: Folder missing.")
            continue

        for accession in sorted(os.listdir(base)):
            source = os.path.join(base, accession, "full-submission.txt")
            if not os.path.exists(source):
                print(f"{ticker}: Missing file {accession}")
                continue

            # Try to get year from accession number
            parts = accession.split("-")
            year = f"20{parts[1]}" if len(parts) > 1 else "unknown"

            filename = f"10K_{ticker}_{year}.txt"
            shutil.copy(source, os.path.join(output_folder, filename))
            print(f"{ticker}: Saved {filename}")
            
            
def filter_for_years():
    input_folder = "raw_10ks"
    # Filter files in the input folder for specific years
    years = ["2020", "2021", "2022", "2023", "2024"]
    # Delete files not in the specified years
    for filename in os.listdir(input_folder):
        if not any(year in filename for year in years):
            file_path = os.path.join(input_folder, filename)
            os.remove(file_path)
            print(f"Deleted {filename} as it does not match the specified years.")


def main():
    tickers = [
        # Tech
        "AAPL", "GOOGL", "MSFT", "NVDA", "ORCL",
        # Financials
        "BAC",  "GS", "JPM", "MS", "V",
        # Healthcare
        "JNJ", "LLY", "MRK", "PFE", "UNH"
    ]

    output = make_output_folder("raw_10ks")
    dl = Downloader("Copenhagen Business School", "daur24ac@student.cbs.dk")

    download_filings(dl, tickers)
    collect_and_rename_files(tickers, output)
    filter_for_years()


if __name__ == "__main__":
    main()
    

Extraction of visible text out of 10-K filings

In [None]:
from bs4 import BeautifulSoup
import json
import re
import unicodedata

# Extracts table of contents (TOC) anchor links matching relevant section names
def get_clean_toc_links(soup, max_links=50):
    VALID_CHAPTERS = [
        "business", "risk factors", "unresolved staff comments", "cybersecurity", "properties",
        "legal proceedings", "mine safety disclosures", "market for the registrant’s common stock",
        "management’s discussion and analysis", "quantitative and qualitative disclosures",
        "financial statements", "changes in and disagreements", "controls and procedures",
        "other information", "directors", "executive compensation", "security ownership",
        "certain relationships", "principal accountant fees", "exhibits", "form 10-k summary", "signatures"
    ]
    links = []
    for a in soup.find_all("a", href=True)[:max_links]:
        text = a.get_text(strip=True).lower()
        href = a["href"].strip().lstrip("#")
        if any(valid in text for valid in VALID_CHAPTERS):
            links.append((text, href))
    return links


# Finds the last HTML tag with the given ID (in case there are duplicates)
def find_last_tag_by_id(soup, anchor_id):
    matches = soup.find_all(id=anchor_id)
    return matches[-1] if matches else None


# Extracts clean visible text between two HTML tags
def extract_visible_section_text(soup, start_tag, stop_tag=None):
    buffer = []
    for tag in start_tag.find_all_next():
        if stop_tag and tag == stop_tag:
            break
        if tag.name in ["script", "style", "head", "footer", "nav", "table", "form"]:
            continue
        if tag.name in ["div", "span", "section", "article", "p", "li"]:
            text = tag.get_text(strip=True, separator=" ")
            if text:
                buffer.append(text)
    return "\n\n".join(buffer).strip()


# Simplifies a paragraph for deduplication
def normalize_paragraph(p):
    p = unicodedata.normalize("NFKC", p)
    p = p.strip().lower()
    p = re.sub(r"[ \xa0\t]+", " ", p)
    p = re.sub(r"[®™†*]+", "", p)
    p = re.sub(r"[^\w\s]", "", p)
    return p


# Removes repeated paragraphs based on normalized form
def remove_duplicate_paragraphs(text):
    paragraphs = text.split("\n\n")
    seen = set()
    unique = []
    for p in paragraphs:
        norm = normalize_paragraph(p)
        if norm and norm not in seen:
            seen.add(norm)
            unique.append(p.strip())
    return "\n\n".join(unique).strip()


# Pipeline to process 10-K filings
def process_10k_file(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        html = f.read()

    soup = BeautifulSoup(html, "lxml")
    toc_links = get_clean_toc_links(soup)

    doc_sections = {}
    for i, (label, anchor_id) in enumerate(toc_links):
        start_tag = find_last_tag_by_id(soup, anchor_id)
        if not start_tag:
            continue
        stop_tag = find_last_tag_by_id(soup, toc_links[i + 1][1]) if i + 1 < len(toc_links) else None
        raw = extract_visible_section_text(soup, start_tag, stop_tag)
        clean = remove_duplicate_paragraphs(raw)
        doc_sections[label.lower()] = clean

    return doc_sections


# Set input/output folders
INPUT_FOLDER = "raw_10ks"
OUTPUT_FOLDER = "processed_10ks"
os.makedirs(OUTPUT_FOLDER, exist_ok=True)


# Process all files
for filename in sorted(os.listdir(INPUT_FOLDER)):
    if not filename.endswith(".txt"):
        continue

    input_path = os.path.join(INPUT_FOLDER, filename)
    output_path = os.path.join(OUTPUT_FOLDER, filename.replace(".txt", ".json"))

    print(f"Processing {filename}...")
    cleaned_sections = process_10k_file(input_path)

    with open(output_path, "w", encoding="utf-8") as out:
        json.dump(cleaned_sections, out, indent=2)

print("All files processed and saved.")


Topic modelling of risk sections

In [None]:
from bertopic import BERTopic
import json
import pandas as pd
from sentence_transformers import SentenceTransformer

# Set up constants
SECTION_KEY = "risk factors"
OUTPUT_TOPICS = "risk_topics.csv"
INDUSTRY_MAP = {
    "AAPL": "Technology", "MSFT": "Technology", "GOOGL": "Technology",
    "NVDA": "Technology", "ORCL": "Technology",
    "JPM": "Financials", "BAC": "Financials", "GS": "Financials",
    "WFC": "Financials", "MS": "Financials",
    "JNJ": "Healthcare", "PFE": "Healthcare", "MRK": "Healthcare",
    "LLY": "Healthcare", "UNH": "Healthcare"
}


def extract_paragraphs(text, min_words=5):
    paragraphs = text.split("\n\n")
    cleaned = []
    for p in paragraphs:
        p = p.strip()
        if len(p.split()) < min_words:
            continue
        if re.search(r"Form\s+10-K|\bItem\b|\bTable of Contents\b|\d{4}|\bRisk Factors\b", p, re.IGNORECASE):
            if len(p.split()) < 10:
                continue
        cleaned.append(p)
    return cleaned


def load_clean_paragraphs(folder, section_key, industry_map):
    rows = []
    print("Loading and cleaning paragraphs...")
    for filename in os.listdir(folder):
        if not filename.endswith(".json"):
            continue
        parts = filename.replace(".json", "").split("_")
        if len(parts) < 3:
            continue
        firm, year = parts[1], parts[2]
        industry = industry_map.get(firm)
        if not industry:
            continue
        with open(os.path.join(folder, filename), "r", encoding="utf-8") as f:
            data = json.load(f)
        section_text = data.get(section_key, "")
        if section_text:
            for p in extract_paragraphs(section_text):
                rows.append({
                    "firm": firm,
                    "year": int(year),
                    "industry": industry,
                    "text": p.strip()
                })
    print(f"Extracted {len(rows)} clean paragraphs.")
    return pd.DataFrame(rows)


def topic_model(input_df, top_n_keywords=3):
    print("Running BERTopic on", len(input_df), "paragraphs...")
    embedding_model = SentenceTransformer("paraphrase-MiniLM-L3-v2")
    model = BERTopic(embedding_model=embedding_model, calculate_probabilities=False, verbose=True)

    topics, _ = model.fit_transform(input_df["text"].tolist())
    input_df["topic"] = topics

    topic_labels = {
        topic: ", ".join([kw for kw, _ in model.get_topic(topic)[:top_n_keywords]])
        if topic != -1 else "Other"
        for topic in model.get_topic_freq().Topic
    }

    input_df["topic_label"] = input_df["topic"].map(topic_labels)
    input_df.to_csv(OUTPUT_TOPICS, index=False)
    print("Saved:", OUTPUT_TOPICS)
    return input_df, topic_labels


def main():
    df = load_clean_paragraphs(OUTPUT_FOLDER, SECTION_KEY, INDUSTRY_MAP)
    df = df.drop_duplicates(subset=["firm", "year", "text"])
    print("Paragraphs cleaned and ready for topic modeling.")
    df_with_topics, topic_labels = topic_model(df)
    return df_with_topics, topic_labels


# Capture results for futher use
df_with_topics, topic_labels = main()


Visulization of topic modelling

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid")


def find_top_topics(df, top_n=10):
    df = df[df["topic_label"] != "Other"]
    top_overall = (
        df["topic_label"]
        .value_counts()
        .head(top_n)
        .reset_index()
        .rename(columns={"index": "topic_label", "topic_label": "count"})
    )
    top_overall.to_csv("top_topics_total.csv", index=False)


def plot_top_overall(df, top_n=10):
    df = df[df["topic_label"] != "Other"]
    top_topics = df["topic_label"].value_counts().head(top_n)
    plt.figure(figsize=(10, 5))
    sns.barplot(x=top_topics.values, y=top_topics.index, palette="Blues_d")
    plt.xlabel("Paragraph Count")
    plt.ylabel("Topic Label")
    plt.tight_layout()
    plt.savefig("top_10_most_frequent_risk_topics.png")
    plt.show()
    plt.close()


def top_topics_per_industry(df, top_n=5):
    df = df[df["topic_label"] != "Other"]
    grouped = (
        df.groupby(["industry", "topic_label"])
        .size()
        .reset_index(name="count")
        .sort_values(["industry", "count"], ascending=[True, False])
    )
    top_per_industry = grouped.groupby("industry").head(top_n)
    top_per_industry.to_csv("top_topics_per_industry.csv", index=False)

    for industry, filename in zip(
        ["Financials", "Healthcare", "Technology"],
        [
            "top_5_risk_topics_financials.png",
            "top_5_risk_topics_healthcare.png",
            "top_5_risk_topics_technology.png"
        ]
    ):
        subset = top_per_industry[top_per_industry["industry"] == industry]
        if not subset.empty:
            plt.figure(figsize=(8, 4))
            sns.barplot(x="count", y="topic_label", data=subset, palette="Set2")
            plt.xlabel("Paragraph Count")
            plt.ylabel("Topic")
            plt.tight_layout()
            plt.savefig(filename)
            plt.show()
            plt.close()


def plot_industry_risk_profile(df, min_topic_count=50):
    counts = df.groupby(["industry", "topic_label"]).size().unstack(fill_value=0)
    counts = counts.loc[:, counts.sum() > min_topic_count]
    counts_norm = counts.div(counts.sum(axis=1), axis=0)
    plt.figure(figsize=(12, 6))
    sns.heatmap(counts_norm.T, cmap="YlGnBu", annot=True, fmt=".2f")
    plt.xlabel("Industry")
    plt.ylabel("Topic")
    plt.tight_layout()
    plt.savefig("relative_emphasis_on_risk_topics.png")
    plt.show()
    plt.close()


def plot_topic_trends_over_time(df, top_n=10):
    df = df[df["topic_label"] != "Other"]
    top_labels = df["topic_label"].value_counts().head(top_n).index
    df_top = df[df["topic_label"].isin(top_labels)]
    pivot = df_top.groupby(["year", "topic_label"]).size().unstack(fill_value=0)
    ax = pivot.plot(kind="line", marker="o", figsize=(10, 5))
    ax.set_xticks(sorted(df["year"].unique()))
    ax.set_xticklabels(sorted(df["year"].unique()), rotation=0)
    plt.xlabel("Year")
    plt.ylabel("Paragraph Count")
    plt.legend(title="Topic", bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.tight_layout()
    plt.savefig("top_10_risk_topics_over_time.png")
    plt.show()
    plt.close()


def plot_topic_trends_by_industry(df, top_n=5):
    df = df[df["topic_label"] != "Other"]
    for industry, filename in zip(
        ["Financials", "Healthcare", "Technology"],
        [
            "top_5_risk_topics_over_time_financials.png",
            "top_5_risk_topics_over_time_healthcare.png",
            "top_5_risk_topics_over_time_technology.png"
        ]
    ):
        df_ind = df[df["industry"] == industry]
        top_labels = df_ind["topic_label"].value_counts().head(top_n).index
        df_top = df_ind[df_ind["topic_label"].isin(top_labels)]
        pivot = df_top.groupby(["year", "topic_label"]).size().unstack(fill_value=0)
        ax = pivot.plot(kind="line", marker="o", figsize=(10, 5), title=f"{industry} – Top {top_n} Topics Over Time")
        ax.set_xticks(sorted(df["year"].unique()))
        ax.set_xticklabels(sorted(df["year"].unique()), rotation=0)
        plt.xlabel("Year")
        plt.ylabel("Paragraph Count")
        plt.legend(title="Topic", bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.tight_layout()
        plt.savefig(filename)
        plt.show()
        plt.close()


def plot_total_paragraph_volume(df):
    yearly_counts = df.groupby("year").size().reset_index(name="paragraph_count")
    plt.figure(figsize=(8, 4))
    sns.barplot(x="year", y="paragraph_count", data=yearly_counts, palette="muted")
    plt.xlabel("Year")
    plt.ylabel("Paragraph Count")
    plt.tight_layout()
    plt.savefig("risk_paragraph_count.png")
    plt.show()
    plt.close()


def plot_risk_section_length(df):
    df["word_count"] = df["text"].str.split().str.len()
    lengths = df.groupby(["firm", "year", "industry"])["word_count"].sum().reset_index()
    avg_by_industry = lengths.groupby(["industry", "year"])["word_count"].mean().reset_index()

    plt.figure(figsize=(10, 5))
    sns.lineplot(data=avg_by_industry, x="year", y="word_count", hue="industry", marker="o")
    plt.xlabel("Year")
    plt.ylabel("Average Word Count")
    plt.tight_layout()
    plt.savefig("risk_section_length.png")
    plt.show()
    plt.close()
    
    
if __name__ == "__main__":
    find_top_topics(df_with_topics)
    plot_top_overall(df_with_topics)
    top_topics_per_industry(df_with_topics)
    plot_industry_risk_profile(df_with_topics)
    plot_topic_trends_over_time(df_with_topics, top_n=10)
    plot_topic_trends_by_industry(df_with_topics, top_n=5)
    plot_total_paragraph_volume(df_with_topics)
    plot_risk_section_length(df_with_topics)


In [None]:
def list_top_risk_disclosures(df, top_n=10):
    # Calculate word count per company and year
    df["word_count"] = df["text"].str.split().str.len()
    totals = df.groupby(["firm", "year", "industry"])["word_count"].sum().reset_index()
    top_longest = totals.sort_values("word_count", ascending=False).head(top_n)
    return top_longest

print("Top companies by total risk disclosure length:")
list_top_risk_disclosures(df_with_topics, top_n=30)
