### SEC FILINGS ANALYSIS
#### Ground truth financial health and event triggers
Filing Types:
* 10-K: Annual report, comprehensive overview (financials, business operations, risks). Filed within 60–90 days after fiscal year-end.
* 10-Q: Quarterly report, less detailed but includes financials and updates. Filed within 40–45 days after quarter-end.
* 8-K: Current report for material events (e.g., acquisitions, executive changes, earnings releases). Filed within 4 business days of the event.

In [1]:
import re
import os
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
from datetime import datetime
from transformers import pipeline
from lxml import etree
import xml.etree.ElementTree as ET

### 8-K Analysis

In [2]:
def parse_sec_header(file_content):
    """Extract metadata from SEC-HEADER section."""
    header_data = {
        'accession_number': None,
        'company_name': None,
        'filing_date': None,
        'report_date': None,
        'items': [],
        'cik': None,
        'sic': None
    }
    
    # Extract header section
    header_match = re.search(r'<SEC-HEADER>(.*?)</SEC-HEADER>', file_content, re.DOTALL)
    if not header_match:
        return header_data
    
    header_text = header_match.group(1)
    
    # Extract key fields
    header_data['accession_number'] = re.search(r'ACCESSION NUMBER:\s*(\S+)', header_text).group(1) if re.search(r'ACCESSION NUMBER:\s*(\S+)', header_text) else None
    header_data['company_name'] = re.search(r'COMPANY CONFORMED NAME:\s*(.+)', header_text).group(1) if re.search(r'COMPANY CONFORMED NAME:\s*(.+)', header_text) else None
    header_data['filing_date'] = re.search(r'FILED AS OF DATE:\s*(\d{8})', header_text).group(1) if re.search(r'FILED AS OF DATE:\s*(\d{8})', header_text) else None
    header_data['report_date'] = re.search(r'CONFORMED PERIOD OF REPORT:\s*(\d{8})', header_text).group(1) if re.search(r'CONFORMED PERIOD OF REPORT:\s*(\d{8})', header_text) else None
    header_data['cik'] = re.search(r'CENTRAL INDEX KEY:\s*(\S+)', header_text).group(1) if re.search(r'CENTRAL INDEX KEY:\s*(\S+)', header_text) else None
    header_data['sic'] = re.search(r'STANDARD INDUSTRIAL CLASSIFICATION:\s*.+\[(\d+)\]', header_text).group(1) if re.search(r'STANDARD INDUSTRIAL CLASSIFICATION:\s*.+\[(\d+)\]', header_text) else None
    
    # Extract item information
    items = re.findall(r'ITEM INFORMATION:\s*(.+)', header_text)
    header_data['items'] = items if items else []
    
    return header_data

def parse_body_content(file_content):
    """Extract narrative content from the body, handling HTML/XBRL."""
    # Extract document section
    doc_match = re.search(r'<DOCUMENT>.*?<TEXT>(.*?)</TEXT>', file_content, re.DOTALL)
    if not doc_match:
        return ""
    
    body_text = doc_match.group(1)
    
    # Parse HTML content
    soup = BeautifulSoup(body_text, 'html.parser')
    
    # Remove scripts, styles, and other non-text elements
    for script in soup(["script", "style"]):
        script.decompose()
    
    # Extract text
    text = soup.get_text(separator=" ", strip=True)
    
    # Clean up excessive whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text

def process_8k_filings(root_dir):
    """Traverse directory and process all 8-K filings."""
    filings_data = []
    
    for stock_name in os.listdir(root_dir):
        stock_path = os.path.join(root_dir, stock_name, '8-K')
        if not os.path.exists(stock_path):
            continue
            
        for accession_number in os.listdir(stock_path):
            file_path = os.path.join(stock_path, accession_number, 'full-submission.txt')
            if not os.path.exists(file_path):
                continue
                
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
                
            # Parse header and body
            header = parse_sec_header(content)
            body_text = parse_body_content(content)
            
            filings_data.append({
                'stock': stock_name,
                'accession_number': header['accession_number'],
                'company_name': header['company_name'],
                'filing_date': header['filing_date'],
                'report_date': header['report_date'],
                'cik': header['cik'],
                'sic': header['sic'],
                'items': header['items'],
                'body_text': body_text
            })
    
    return pd.DataFrame(filings_data)

root_dir = 'sec_finra_data/sec_filings/sec-edgar-filings'
filings_df = process_8k_filings(root_dir)

In [4]:
filings_df.to_csv('processed_8k_filings.csv', index=False)

* **Risk Score:** A numerical score based on the potential impact of event types on stock prices.
* **Sentiment Analysis:** FinBERT is used to analyze the sentiment of the narrative text, which can indicate the tone.
* **Date Lag:** Measures delays in reporting, which could suggest intentional timing to influence markets.
* **Event Frequency:** High filing frequency may indicate attempts to manipulate market perception through frequent news.

In [5]:
# Load FinBERT for sentiment analysis
sentiment_analyzer = pipeline("sentiment-analysis", model="yiyanghkust/finbert-tone")

def assign_risk_score(items):
    """Assign risk score based on event types."""
    high_risk = ["Results of Operations and Financial Condition", "Material Impairments", "Regulation FD Disclosure"]
    medium_risk = ["Departure of Directors or Certain Officers", "Entry into a Material Definitive Agreement"]
    
    for item in items:
        if item in high_risk:
            return 3
        if item in medium_risk:
            return 2
    return 1

def get_sentiment(text):
    """Get sentiment score using FinBERT."""
    try:
        result = sentiment_analyzer(text[:512])[0]  # FinBERT has token limits
        return result['label'], result['score']
    except:
        return "neutral", 0.0

# Feature engineering
filings_df['filing_date'] = pd.to_datetime(filings_df['filing_date'], format='%Y%m%d')
filings_df['report_date'] = pd.to_datetime(filings_df['report_date'], format='%Y%m%d')
filings_df['date_lag'] = (filings_df['filing_date'] - filings_df['report_date']).dt.days
filings_df['risk_score'] = filings_df['items'].apply(assign_risk_score)

# Sentiment analysis on body text
filings_df['sentiment_label'], filings_df['sentiment_score'] = zip(*filings_df['body_text'].apply(get_sentiment))

# Event frequency per stock per month
filings_df['month'] = filings_df['filing_date'].dt.to_period('M')
event_freq = filings_df.groupby(['stock', 'month']).size().reset_index(name='event_count')
filings_df = filings_df.merge(event_freq, on=['stock', 'month'], how='left')

filings_df.to_csv('engineered_8k_filings.csv', index=False)

config.json:   0%|          | 0.00/533 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/439M [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/226k [00:00<?, ?B/s]

Hardware accelerator e.g. GPU is available in the environment, but no `device` argument is passed to the `Pipeline` object. Model will be on CPU.
