Parsing downloaded SEC files


In [None]:
import html
import logging
import os.path
import pathlib
import re

from forms import form_10K, form_10Q

# log level
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [None]:
item_10K = "Item 7"
item_10Q = "Item 2"

skip_present_files = True

data_directory = "train"
min_file_size = 50  # 50 Bytes

In [None]:
def parse_document(document):
    # remove everything before the 10-Q/K document tag
    document = re.sub(r'^(.*?)<TYPE>(10-Q|10-K)', '', document, flags=re.DOTALL)

    # remove everything after the document tag, we only need the main document
    document = re.sub(r'</DOCUMENT>(.*)', '', document, flags=re.DOTALL)

    # remove everything before text tag
    document = re.sub(r'^(.*?)<TEXT>', '', document, flags=re.DOTALL)

    # XBRL: remove header
    document = re.sub(r'<ix:header>(.*)</ix:header>', '', document, flags=re.DOTALL)

    # remove tables; only possible if no sections are needed (the titles are normally stored in tables)
    # document = re.sub("<table[^>]*>(.*?)</table>", ' ', document, flags=re.IGNORECASE | re.DOTALL)

    # remove tags
    document = re.sub(r'<[^>]+>', ' ', document)

    # unescape html entities
    document = html.unescape(document)

    # replace new lines, tabstops, ...
    document = re.sub(r'\s+', ' ', document)

    # remove non-alphanumeric characters and some special ones
    document = re.sub(r'[^A-Za-z0-9 \-_.:,$]+', '', document)

    # replace one or more spaces
    document = re.sub(r'\s+', ' ', document)

    return document


def extract_section(document, name):
    if "10-K" in name and item_10K:
        item = item_10K
        title = form_10K.get(item)
        form_list = list(form_10K)
        item_following = form_list[form_list.index(item) + 1]
        title_following = form_10K.get(item_following)
    elif "10-Q" in name and item_10Q:
        item = item_10Q
        title = form_10Q.get(item)
        form_list = list(form_10Q)
        item_following = form_list[form_list.index(item) + 1]
        title_following = form_10Q.get(item_following)
    else:
        return document

    matches = re.findall(re.escape(item) + r'(\. |: | | - |\.- )' + re.escape(title) + r'(.*?)' +
                         re.escape(item_following) + r'(\. |: | | - |\.- )' + re.escape(title_following), document,
                         flags=re.IGNORECASE)
    length = len(matches)
    if length == 0:
        logger.debug("no match for section '%s' in '%s'", item_10K, file_name)
        return ""
    else:
        # use match with the biggest value (length of second tuple element, which is the text between the item titles)
        result = max(matches, key=lambda x: len(x[1]))[1]
        return result.strip()

In [None]:
files_directory = os.path.join(data_directory, "files")
parsed_directory = os.path.join(data_directory, "parsed")

for cik_directory in os.listdir(files_directory):
    parsed_directory_path = os.path.join(parsed_directory, cik_directory)

    # create parsed cik directory, if needed
    pathlib.Path(parsed_directory_path).mkdir(parents=True, exist_ok=True)

    directory_path = os.path.join(files_directory, cik_directory)
    for file_name in os.listdir(directory_path):
        parsed_file_path = os.path.join(parsed_directory_path, file_name)
        if skip_present_files and os.path.exists(parsed_file_path):
            logging.debug("Skipping parsing, file already exists: %s" % parsed_file_path)
            continue

        file_path = os.path.join(directory_path, file_name)
        with open(file=file_path, mode="rt", encoding="utf-8") as file:
            try:
                parsed_document = parse_document(file.read())
            except UnicodeDecodeError as e:
                logging.exception(f'Exception during document parsing. File={file_path}')
                continue

        parsed_document = extract_section(parsed_document, file_name)
        document_size = len(parsed_document.encode("utf-8"))
        if not parsed_document:
            logging.warning(f'Parsed document is empty. File={file_path}')
            continue
        elif document_size < min_file_size:
            logging.warning(
                f'Parsed document is has less than {min_file_size} Bytes. Size={document_size} Bytes. File={file_path}')
            continue

        write_mode = "x"
        if os.path.exists(parsed_file_path):
            write_mode = "w"
        with open(file=parsed_file_path, mode=write_mode, encoding="utf-8") as parsed_file:
            parsed_file.write(parsed_document)

        logging.info("file %s parsed to %s " % (file_path, parsed_file_path))

INFO:root:file train\files\100885\2016-02-05_10-K.txt parsed to train\parsed\100885\2016-02-05_10-K.txt 
INFO:root:file train\files\100885\2017-02-03_10-K.txt parsed to train\parsed\100885\2017-02-03_10-K.txt 
INFO:root:file train\files\100885\2018-02-09_10-K.txt parsed to train\parsed\100885\2018-02-09_10-K.txt 
INFO:root:file train\files\100885\2019-02-08_10-K.txt parsed to train\parsed\100885\2019-02-08_10-K.txt 
INFO:root:file train\files\100885\2020-02-07_10-K.txt parsed to train\parsed\100885\2020-02-07_10-K.txt 
INFO:root:file train\files\100885\2021-02-05_10-K.txt parsed to train\parsed\100885\2021-02-05_10-K.txt 
INFO:root:file train\files\100885\2022-02-04_10-K.txt parsed to train\parsed\100885\2022-02-04_10-K.txt 
INFO:root:file train\files\1022079\2009-07-28_10-Q.txt parsed to train\parsed\1022079\2009-07-28_10-Q.txt 
INFO:root:file train\files\1022079\2009-10-27_10-Q.txt parsed to train\parsed\1022079\2009-10-27_10-Q.txt 
INFO:root:file train\files\1022079\2010-04-26_10-Q.

In [None]:
# count files and folders
files = 0
folders = 0

for folder in list(os.walk(parsed_directory))[1:]:
    if folder[2]:
        folders += 1
        files += len(folder[2])
    else:
        # remove directory
        os.rmdir(folder[0])

logging.info(f'{files} documents parsed for {folders} companies')

INFO:root:17978 documents parsed for 338 companies
