In [None]:
# cleaned_list contains the cik of all public US firms
import csv
with open("C:/Users/25280/Downloads/myproject/cleaned_list.csv", 'w', newline='', encoding = "utf-8") as csvfile:
    spamwriter = csv.writer(csvfile)
    spamwriter.writerow(cleaned_list)

Main program for extracting exhibits

In [12]:
import os
import re
import requests
import concurrent.futures
import pandas as pd
import html
from urllib.parse import urlparse
from io import StringIO
from bs4 import BeautifulSoup
from pathlib import Path

# ---------------------------
# Configuration
# ---------------------------
HEADERS = {"User-Agent": "ResearchBot/1.0 (your_email@example.com)"}
SEC_BASE = "https://www.sec.gov/Archives/edgar/full-index"
DOWNLOAD_ROOT = "C://Users//25280//Downloads//myproject//ex10_downloads"
FORMS = {"10-K", "10-Q", "8-K"}
MAX_WORKERS = 12
#KEYWORDS = [
#    "cybersecurity", "cyber risk", "data breach", "information security",
#    "network intrusion", "hacker", "malware", "ransomware", "phishing",
#    "incident response", "security controls", "vulnerability"
#]
OUTPUT_CSV = "cyber_clauses_ex10.csv"

CYBER_KEYWORDS = [
    "cyber", "data breach", "information security", "network intrusion",
    "data protection", "information systems", "data privacy",
    "personal data", "privacy", "data processing", "security incident",
    "private data", "information technology security", "security breach", "data restoration",
    " SOC ", "IT asset", "Privacy Legal Requirements", "unauthorized access"
]


# ---------------------------
# Global session
# ---------------------------
session = requests.Session()
session.headers.update(HEADERS)

# ---------------------------
# Utilities
# ---------------------------
def robust_get_text(url):
    try:
        r = session.get(url, timeout=30)
        if r.status_code == 200:
            return r.text
    except requests.RequestException:
        pass
    return None

def fetch_master_idx(year, quarter):
    url = f"{SEC_BASE}/{year}/QTR{quarter}/master.idx"
    return robust_get_text(url)

def parse_master_idx_text(idx_text):
    lines = idx_text.splitlines()
    header_idx = next((i for i, ln in enumerate(lines[:80])
                       if ln.strip().startswith("CIK|Company Name|Form Type|Date Filed|Filename")), None)
    if header_idx is None:
        return pd.DataFrame(columns=["cik","company","form","date","filename"])
    data = "\n".join(lines[header_idx+1:])
    df = pd.read_csv(StringIO(data), sep="|", header=None,
                     names=["cik","company","form","date","filename"],
                     dtype=str, engine="python")
    df["cik"] = df["cik"].str.strip().str.zfill(10)
    df["company"] = df["company"].str.strip()
    df["form"] = df["form"].str.strip()
    df["filename"] = df["filename"].str.strip()
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    return df[["cik","company","form","date","filename"]]

def filing_index_headers_url(filename):
    parts = filename.split("/")
    cik = parts[2]
    accession_file = parts[-1].replace(".txt","")
    accession = "".join(accession_file.split("-"))
    return f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession}/{accession_file}-index-headers.html"

def find_ex10_links(url_index):
    html = robust_get_text(url_index)
    if not html:
        return []
    EX10 = re.compile(r"ex[-_\.]?\s*10[\w\.\-_]*\.(htm|html)", re.IGNORECASE)
    base = url_index.rsplit("/", 1)[0]
    links = []
    for match in re.finditer(r'href="([^"]*ex[-_\.]?\s*10[\w\.\-_]*\.(htm|html))"', html, flags=re.IGNORECASE):
        h = match.group(1)
        full_url = h if h.startswith("http") else base + "/" + h
        links.append(full_url)
    return list(dict.fromkeys(links))

def download_exhibit(url, cik, company):
    safe_name = "".join(c for c in company if c.isalnum() or c in (" ", "_")).strip().replace(" ", "_")
    firm_folder = os.path.join(DOWNLOAD_ROOT, f"{cik}_{safe_name}")
    os.makedirs(firm_folder, exist_ok=True)
    path = os.path.normpath(urlparse(url).path)  # normalize slashes
    parts = path.split(os.sep)
    fname = parts[-2] + "+" + parts[-1]
    #fname = os.path.basename(urlparse(url).path).split("?")[0]
    save_path = os.path.join(firm_folder, fname)

    if os.path.exists(save_path):
        return save_path
    txt = robust_get_text(url)
    if not txt:
        return None
    with open(save_path, "w", encoding="utf-8", errors="replace") as f:
        f.write(txt)
    return save_path

def extract_clean_text_from_html(html_path):
    """Extract all paragraph text cleanly from the HTML file."""
    with open(html_path, 'r', encoding='utf-8', errors='ignore') as f:
        html = f.read()

        # find every <p>...</p> block exactly as written
        p_blocks = re.findall(r'<p[^>]*>(.*?)</p>', html, flags=re.I | re.S)

        clean_paragraphs = []
        for block in p_blocks:
           # remove all tags inside
            text = re.sub(r'<[^>]+>', '', block)
            text = text.replace('\xa0', ' ')
            text = text.replace('&nbsp', ' ')
            text = re.sub(r'\s+', ' ', text).strip()
            if text and len(text) > 20:
                clean_paragraphs.append(text)
    return clean_paragraphs

def find_cyber_clauses(paragraphs):
    """
    Extract all cyber-related clauses:
    - Captures full 'Cybersecurity and Data Privacy' sections (excluding the heading)
    - Splits subclauses (a), (b), (c)... into separate items
    - Also picks up isolated cyber paragraphs outside sections
    """
    results = []
    i = 0
    for i in range(len(paragraphs)):
        para = paragraphs[i].strip()
        lower_para = para.lower()
        # --- 3. Detect isolated cyber-related paragraphs ---
        if any(kw in lower_para for kw in CYBER_KEYWORDS):
            results.append(para)
    # Final cleanup: remove duplicates and junk
    cleaned = []
    for clause in results:
        clause = re.sub(r'\s+', ' ', clause).strip()
        if len(clause) > 80:  # discard trivial lines
            cleaned.append(clause)

    return cleaned


def extract_contract_type(html_path):
    """
    Reads the top part of an SEC EX-10 exhibit file and tries to extract
    the contract title (e.g., 'Credit Agreement', 'Purchase Agreement', etc.)
    purely from plain text, without HTML parsing.
    """
    try:
        with open(html_path, "r", encoding="utf-8", errors="ignore") as f:
            # Read top 150 KB for safety
            raw = f.read(150000)

        # Remove HTML tags and unescape entities
        text = re.sub(r"<[^>]+>", " ", raw)
        text = html.unescape(text)
        text = re.sub(r"\s+", " ", text).strip()

        # Split into candidate "lines" based on typical breaks
        lines = re.split(r"(?:\s{2,}|[\r\n]+|<br>|<br/>|<BR>)", raw)
        candidates = []

        for line in lines[:300]:  # only first ~300 fragments
            clean = re.sub(r"<[^>]+>", " ", line)
            clean = html.unescape(clean)
            clean = re.sub(r"\s+", " ", clean).strip()
            if not clean:
                continue
            # must contain "agreement"
            if re.search(r"\bagreement\b", clean, re.I):
                # ignore money lines or exhibit references
                if re.search(r"\$\s*\d", clean) or re.search(r"exhibit\s*10", clean, re.I):
                    continue
                if len(clean.split(" ")) > 1:
                # looks like a valid title
                    candidates.append(clean)
        
        #print(candidates[:5])

        if not candidates:
            return "No Type"

        # Choose the shortest plausible line (most likely the true title)
        title = min(candidates, key=len)
        # Basic cleanup
        title = re.sub(r"^\W+|\W+$", "", title)
        title = re.sub(r"\s{2,}", " ", title)
        return title.strip()

    except Exception:
        return ""



# ---------------------------
# Main processing
# ---------------------------
def process_cik(cik, start_year, end_year):
    cik_z = str(cik).zfill(10)

    # load previous output CSV if exists to skip reprocessing
    if os.path.exists(OUTPUT_CSV):
        existing_df = pd.read_csv(OUTPUT_CSV, encoding="utf-8-sig")
        processed_files = set(existing_df["htm_file"].tolist())
    else:
        processed_files = set()
        existing_df = pd.DataFrame()

    frames = []
    for year in range(start_year, end_year+1):
        for q in range(1,5):
            idx = fetch_master_idx(year, q)
            if not idx:
                continue
            df = parse_master_idx_text(idx)
            if df.empty:
                continue
            sel = df[(df["cik"] == cik_z) & (df["form"].isin(FORMS))]
            if not sel.empty:
                # filter by year
                sel = sel[sel["date"].dt.year.between(start_year, end_year)]
                if not sel.empty:
                    frames.append(sel)
    if not frames:
        #print(f"[{cik_z}] no filings found.")
        return

    filings_df = pd.concat(frames, ignore_index=True)
    filings_df["index_url"] = filings_df["filename"].apply(filing_index_headers_url)

    # fetch EX-10 links concurrently
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        ex10_lists = list(executor.map(find_ex10_links, filings_df["index_url"]))
    
    #print(ex10_lists)
    download_tasks = []
    for i, links in enumerate(ex10_lists):
        company = filings_df.iloc[i]["company"]
        filing_date = filings_df.iloc[i]["date"]
        for url in links:
            safe_name = "".join(c for c in company if c.isalnum() or c in (" ", "_")).strip().replace(" ", "_")
            firm_folder = os.path.join(DOWNLOAD_ROOT, f"{cik_z}_{safe_name}")
            #print(urlparse(url).path)
            path = os.path.normpath(urlparse(url).path)  # normalize slashes
            parts = path.split(os.sep)
            fname = parts[-2] + "+" + parts[-1]
            #fname = os.path.basename(urlparse(url).path).split("?")[0]
            save_path = os.path.join(firm_folder, fname)
            #print(save_path)
            if save_path in processed_files:
                continue
            download_tasks.append((url, cik_z, company, filing_date))

    extracted = []
    def handle_task(task):
        url, cik_z, company, filing_date = task
        htm_path = download_exhibit(url, cik_z, company)
        if not htm_path:
            return []
        #print(htm_path)
        paragraphs = extract_clean_text_from_html(htm_path)
        clauses = find_cyber_clauses(paragraphs)
        #print(len(clauses))
        contract_type = extract_contract_type(htm_path)
        rows = []
        for clause in clauses:
            rows.append({
                "cik": cik_z,
                "company": company,
                "filing_date": filing_date,
                "htm_file": htm_path,
                "contract_type": contract_type,
                "clause": clause
            })
        return rows

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        for rows in executor.map(handle_task, download_tasks):
            if rows:
                extracted.extend(rows)

    # append new results to CSV
    if extracted:
        df_new = pd.DataFrame(extracted)
        df_out = pd.concat([existing_df, df_new], ignore_index=True)
        df_out.to_csv(OUTPUT_CSV, index=False, encoding="utf-8-sig")
        print(f"[{cik_z}] Saved {len(df_new)} new clauses, total CSV rows: {len(df_out)}")
    #else:
        #print(f"[{cik_z}] No new cyber clauses found.")


In [56]:
# ---------------------------
# Example usage
# ---------------------------
if __name__ == "__main__":
    target_ciks = ["1047862"]  # sample firm
    process_cik("1047862", 2021, 2022)

[]
['STYLE="white-space:nowrap">364-DAY SENIOR UNSECURED TERM LOAN CREDIT AGREEMENT dated as of']
['PURCHASE AND SALE AGREEMENT']
['STYLE="white-space:nowrap">364-DAY SENIOR UNSECURED TERM LOAN CREDIT AGREEMENT dated as of']
[0001047862] Saved 7 new clauses, total CSV rows: 8


In [None]:
import time
# ---------------------------
# Run for year 2024
# ---------------------------
if __name__ == "__main__":
    start_time = time.perf_counter()
    for i in range(1211, len(cleaned_list)):
        TARGET_CIK = cleaned_list[i]
        process_cik(TARGET_CIK, 2024, 2024)
    end_time = time.perf_counter()
    execution_time = end_time - start_time
    print(f"代码块执行时间为: {execution_time} 秒")

Statistics for each year

In [19]:
import os
import shutil
from datetime import datetime

# Root folder where you want to search
root_dir = r"C:\Users\25280\Downloads\myproject\ex10_downloads"

# Directory where you want to save matching files
output_dir = r"D:\EDGAR\ex10_downloads\2024"
os.makedirs(output_dir, exist_ok=True)

# Target creation time
target_time1 = datetime(2025, 11, 10, 4, 0)
target_time2 = datetime(2025, 11, 10, 21, 0)

# Time tolerance (seconds). You can set to 0 for exact match.
time_tolerance = 1  # e.g., within 1 second
count = 0
for root, dirs, files in os.walk(root_dir):
    for file in files:
        file_path = os.path.join(root, file)
        
        # Get creation time (Windows-specific)
        ctime = os.path.getctime(file_path)
        file_creation_time = datetime.fromtimestamp(ctime)

        # Filter by creation time (exact or within tolerance)
        #if (file_creation_time - target_time1).total_seconds() >= time_tolerance and (file_creation_time - target_time2).total_seconds() <= time_tolerance:
        if (file_creation_time - target_time2).total_seconds() >= time_tolerance:
            count += 1
            rel_path = os.path.relpath(root, root_dir)
            firm_folder = rel_path.split(os.sep)[0]  # e.g., "0001552198_WhiteHorse_Finance_Inc"

            # Prepare destination folder
            dest_dir = os.path.join(output_dir, firm_folder)
            os.makedirs(dest_dir, exist_ok=True)

            # Copy file
            dest_path = os.path.join(dest_dir, file)
            shutil.copy2(file_path, dest_path)
print(count)


15638
