In [1]:
import re
import requests
import json
import time
import logging
from datetime import datetime
from bs4 import BeautifulSoup
from typing import List, Dict, Union

logging.basicConfig(level=logging.INFO)

REQUEST_DELAY_SECONDS = 0.1  # Delay to ensure we don't exceed 10 requests per second as per SEC policies

In [2]:
# get the ciks if provided with a list of tickers
def get_ciks(tickers_or_ciks: List[str]) -> Union[set, str]:
    # initialize the list of ciks and incorrect input values
    ciks = []
    incorrect_input_values = []

    # check if any input value is a ticker
    has_tickers = any(re.fullmatch(r'\b[a-z]{1,5}\b', input_value.lower()) for input_value in tickers_or_ciks)
    
    # download ticker.txt and read it into a dictionary if there are any tickers
    ticker_cik_dict = {}
    if has_tickers:
        response = requests.get('https://www.sec.gov/include/ticker.txt')
        lines = response.text.split('\n')
        for line in lines:
            # ensure line is not empty before splitting
            if line.strip():
                (key, val) = line.split('\t')
                ticker_cik_dict[key] = val
    
    # iterate over each input
    for input_value in tickers_or_ciks:
        # convert input to lower case for comparison
        input_value_lower = input_value.lower()
        # check if input_value is a ticker or a CIK
        if re.fullmatch(r'\d{4,10}', input_value_lower):  # CIKs are 4-10 digit numbers
            ciks.append(input_value_lower)
        elif re.fullmatch(r'\b[a-z]{1,5}\b', input_value_lower):  # tickers are 1-5 lowercase letters
            # look up the corresponding cik in the ticker_cik_dict
            if input_value_lower in ticker_cik_dict:
                ciks.append(ticker_cik_dict[input_value_lower])
            else:
                logging.info(f"CIK not found for ticker {input_value}")

    # return the set of ciks to avoid duplicates
    return set(ciks)

In [3]:
# get the filings given a list of ciks over a certain date range
def get_filings(cik_numbers: List[str], filing_date_start: str, filing_date_end: str, email_as_user_agent: str, form_type: str = None) -> Dict[str, Dict]:
    filings = {}
    
    for cik_number in cik_numbers:
        try:
            url = f"https://data.sec.gov/submissions/CIK{cik_number.zfill(10)}.json"
            headers = {
                'User-Agent': email_as_user_agent
            }
            response = requests.get(url, headers=headers)
            
            if response.status_code == 200:
                original_dict = json.loads(response.text)['filings']['recent']
                
                # Date range
                start_date = datetime.strptime(filing_date_start, '%Y-%m-%d')
                end_date = datetime.strptime(filing_date_end, '%Y-%m-%d')
                
                # Indices of filings that meet the date and form type criteria
                indices = [
                    i for i, date_str in enumerate(original_dict['filingDate'])
                    if start_date <= datetime.strptime(date_str, '%Y-%m-%d') <= end_date and (form_type is None or original_dict['form'][i] == form_type)
                ]
                
                # New dictionary, where each key's value is a list of entries that meet the criteria
                new_dict = {
                    key: [original_dict[key][i] for i in indices]
                    for key in original_dict.keys()
                }
                
                # If no filings are found for a specific date range and form type, log this information
                if not new_dict['form']:
                    logging.info(f"No filings found for CIK {cik_number} from {filing_date_start} to {filing_date_end} for form type {form_type}")
                else:
                    filings[cik_number] = new_dict
        
        except requests.RequestException as e:
            logging.error(f"Error occurred while making a request: {str(e)}")
        except Exception as e:
            logging.error(f"Unexpected error occurred: {str(e)}")
        
        time.sleep(REQUEST_DELAY_SECONDS)
    
    return filings

In [4]:
# Extract the starting position of the readable portion of the 10K document
def extract_starting_position(input_html):
    # Write the regex
    regex = re.compile(r'(>Part(\s|&#160;|&nbsp;)(I)(?=<))|(PART\s(I)(?=<))')

    # Use finditer to match the regex
    matches = regex.finditer(input_html)

    # Look backwards to find the starting position of the match in the enclosing div
    start_positions = []
    for match in matches:
        match_text = match.group(0)
        match_start = match.start()
        enclosing_div_start = input_html.rfind('<div', 0, match_start)
        start_positions.append(enclosing_div_start)
        
    # return the starting position of the first incidence of "PART I" in the document
    logging.info(f"Filing will be processed from the following position onwards: {enclosing_div_start}. All preceding text will be ignored.")
    return start_positions[0]

In [33]:
# Convert the html to readable text, cleaning up the document and stripping out unnecessary characters
def extract_readable_text(input_html):
    # Initialize BeautifulSoup with input HTML
    soup = BeautifulSoup(input_html, 'html.parser')

    # Remove all script, style, a, and img tags
    for data in soup(['script', 'style', 'a', 'img', 'xbrl']):
        data.decompose()

    # Remove page numbers (div tags with centered text)
    for div in soup.find_all('div', {'style': 'text-align:center;'}):
        div.decompose()
    
    # Remove page numbers (p tags with centered text and containing digit)        
    for p in soup.find_all('p'):
        style = p.attrs.get('style', '')
        if 'text-align:center;' in style and ('font-size:7.5pt;' in style or 'font-size:8pt;' in style):
            # Your code here
            p.decompose()

    # Decide whether to process div or p tags based on their presence
    p_tags = soup.find_all('p')
    div_tags = soup.find_all('div')

    if len(p_tags) > len(div_tags):
        tags = p_tags
    else:
        tags = div_tags

    # Initialize output text
    output_text = ''

    dollar_next = False

    for tag in tags:
        # Ignore tags that contain other same tags
        if tag.find(tag.name):
            continue

        # Extract the first span tag in the tag
        span = tag.find('span')

        if span:
            # Check the span for the style attribute and 'font-weight:bold'
            style = span.attrs.get('style', '')
            if 'font-weight:bold' in style:
                text = span.text
            else:
                text = tag.text
        else:
            # If no span, use tag text
            text = tag.text

        # Replace '\xa0' with a space character
        text = text.replace('\xa0', ' ')
        
        # Replace '•' '◦' with an empty string character
        text = text.replace('•', '')
        text = text.replace('◦', '')

        # If this tag contains only '$', remember it to concatenate it with the next number
        if text.strip() == '$':
            dollar_next = True
            continue

        # If this tag contains only ')', append ')' to the previous text
        if text.strip() == ')':
            output_text = output_text.rstrip('\n\n') + ')\n\n'
            continue

        # If this tag contains only '%', append '%' to the previous text
        if text.strip() == '%':
            output_text = output_text.rstrip('\n\n') + '%\n\n'
            continue
        
        # If this tag contains only ')%', append ')%' to the previous text
        if text.strip() == ')%':
            output_text = output_text.rstrip('\n\n') + ')%\n\n'
            continue

        if dollar_next:
            text = '$' + text
            dollar_next = False

        # Strip leading and trailing spaces and add the cleaned text to output,
        # followed by two newline characters only if text is not empty
        text = text.strip()
        if text:
            output_text += text + '\n\n'

    return output_text

In [34]:
def retrieve_content(url: str, headers: Dict[str, str]) -> str:
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
    except requests.RequestException as e:
        logging.error(f"Error occurred while retrieving content from {url}: {str(e)}")
        return ""
        
    return response.text

def save_content(content: str, filename: str):
    try:
        with open(filename, 'w') as f:
            f.write(content)
    except Exception as e:
        logging.error(f"Error occurred while saving content to {filename}: {str(e)}")

In [35]:
def download_filings(
        tickers_or_ciks: List[str], 
        filing_date_start: str, 
        filing_date_end: str, 
        email_as_user_agent: str, 
        form_type: str = None, 
        parse_xml: bool = True):
    
    headers = {
        'User-Agent': email_as_user_agent
    }
    
    logging.info(
            "Use of this API is subject to the SEC terms and conditions "
            "governing the EDGAR database. You should conduct your own "
            "review of the terms to make sure they are acceptable for your "
            "use case before proceeding."
    )
    
    cik_numbers = get_ciks(tickers_or_ciks)
    filings = get_filings(cik_numbers, filing_date_start, filing_date_end, email_as_user_agent, form_type)

    for cik_number, filing_data in filings.items():
        accession_numbers = filing_data.get('accessionNumber', [])

        for accession_number in accession_numbers:
            url = f'https://www.sec.gov/Archives/edgar/data/{cik_number}/{accession_number}.txt'
            content = retrieve_content(url, headers)

            if not content:
                continue

            if parse_xml:        
                # Get the starting char of "PART I" - the juicy part of the 10-K
                start_character = extract_starting_position(content)
                end_character = len(content)

                # Clean up the text
                readable_text = extract_readable_text(content[start_character:end_character])

                # Save the cleaned up .txt file
                filename = f'{cik_number}-{accession_number}-{filing_data["form"][0]}-cleaned.txt'
                save_content(readable_text, filename)
                logging.info(f"File saved as {filename}\n")
            else:
                filename = f'{cik_number}-{accession_number}-{filing_data["form"][0]}-raw.txt'
                save_content(content, filename)
                logging.info(f"File saved as {filename}\n")

            time.sleep(REQUEST_DELAY_SECONDS)

In [37]:
### Example usage. This will save the cleaned .txt files in your working folder. It should take 10-20 secs per filing.
tickers_or_ciks = ['META', 'GOOG', 'AAPL', 'F']  # Example list of tickers or CIKs
filing_date_start = '2019-05-01'  # Example start date
filing_date_end = '2020-05-01'  # Example end date
email_as_user_agent = 'nick@gmail.com' # Example user agent email
form_type = '10-K'  # Example form type, only 10-K supported at this time
parse_xml = True

# cik_numbers = get_ciks(tickers_or_ciks)
# filings = get_filings(cik_numbers, filing_date_start, filing_date_end, email_as_user_agent, form_type)
download_filings(tickers_or_ciks, filing_date_start, filing_date_end, email_as_user_agent, form_type, parse_xml)