In [1]:
import pandas as pd
import numpy as np
import os
import random
from shutil import copyfile
import re
import csv

In [2]:
folder_list = ['data/year_data/'+str(d) for d in range(1996,2019)]
all_file_list = []
for folder in folder_list:
    for path, subdirs, files in os.walk(folder):
        if len(files) > 0:
            tmp_file_list = [os.path.join(path, name) for name in files]
            all_file_list += tmp_file_list

In [4]:
annual_report_list = [d for d in all_file_list if ('10-K' in d) or ('10K' in d) or ('10k' in d) or ('10-k' in d)]
print(len(annual_report_list))

295346


In [6]:
# Up until March 16, 2009, smaller companies could use Form 10-KSB
# Form 10-K405 is an SEC filing to the US Securities and Exchange Commission (SEC) that indicates that an officer or director of a public company failed to file a Form 4 (or related Form 3 or Form 5) on time, in violation of Section 16 - meaning that they did not disclose their insider trading activities within the required time period.
# 带A的指修正过的

annual_report_list = [d for d in annual_report_list if ('10-K_' in d) or ('10K_' in d) or ('10_K_' in d)]
annual_report_list = [d for d in annual_report_list if '.txt' in d]
print(len(annual_report_list))
random.shuffle(annual_report_list)

178927


In [11]:
def get_basic_information(head_contents):
    file_name_re = r'<filename>(.*)</filename>'
    GrossFileSize_re = r'<grossfilesize>(.*)</grossfilesize>'
    netfilesize_re = r'<netfilesize>(.*)</netfilesize>'
    html_chars_re = r'<html_chars>(.*)</html_chars>'
    xbrl_chars_re = r'<xbrl_chars>(.*)</xbrl_chars>'
    xml_chars_re = r'<xml_chars>(.*)</xml_chars>'
    n_tables_re = r'<n_tables>(.*)</n_tables>'
    n_exhibits_re = r'<n_exhibits>(.*)</n_exhibits>'
    submission_type_re = r'conformed submission type:(\s*)(.*)'
    document_count_re = r'public document count:(\s*)(.*)'
    confirmed_period_re = r'conformed period of report:(\s*)(.*)'
    filed_date_re = r'filed as of date:(\s*)(.*)'
    date_change_re = r'date as of change:(\s*)(.*)'
    sros_re = r'sros:(\s*)(.*)'
    company_name_re = r'company conformed name:(\s*)(.*)'
    cik_re = r'central index key:(\s*)(.*)'
    fiscal_end_re = r'fiscal year end:(\s*)(.*)'

    basic_info_dict = {}

    file_name_result = re.search(file_name_re, head_contents)
    GrossFileSize_result = re.search(GrossFileSize_re, head_contents)
    netfilesize_result = re.search(netfilesize_re, head_contents)
    n_tables_result = re.search(n_tables_re, head_contents)
    n_exhibits_result = re.search(n_exhibits_re, head_contents)
    submission_type_result = re.search(submission_type_re, head_contents)
    document_count_result = re.search(document_count_re, head_contents)
    confirmed_period_result = re.search(confirmed_period_re, head_contents)
    filed_date_result = re.search(filed_date_re, head_contents)
    date_change_result = re.search(date_change_re, head_contents)
    sros_result = re.search(sros_re, head_contents)
    company_name_result = re.search(company_name_re, head_contents)
    cik_result = re.search(cik_re, head_contents)
    fiscal_end_result = re.search(fiscal_end_re, head_contents)

    basic_info_dict['fila_name'] = file_name_result.group(1) if not file_name_result is None else None
    basic_info_dict['GrossFileSize'] = GrossFileSize_result.group(1) if not GrossFileSize_result is None else None
    basic_info_dict['netfilesize'] = netfilesize_result.group(1) if not netfilesize_result is None else None
    basic_info_dict['n_tables'] = n_tables_result.group(1) if not n_tables_result is None else None
    basic_info_dict['n_exhibits'] = n_exhibits_result.group(1) if not n_exhibits_result is None else None
    basic_info_dict['submission_type'] = submission_type_result.group(2) if not submission_type_result is None else None
    basic_info_dict['document_count'] = document_count_result.group(2) if not document_count_result is None else None
    basic_info_dict['confirmed_period'] = confirmed_period_result.group(2) if not confirmed_period_result is None else None
    basic_info_dict['filed_date'] = filed_date_result.group(2) if not filed_date_result is None else None
    basic_info_dict['date_change'] = date_change_result.group(2) if not date_change_result is None else None
    basic_info_dict['sros'] = sros_result.group(2) if not sros_result is None else None
    basic_info_dict['company_name'] = company_name_result.group(2) if not company_name_result is None else None
    basic_info_dict['cik'] = cik_result.group(2) if not cik_result is None else None
    basic_info_dict['fiscal_end'] = fiscal_end_result.group(2) if not fiscal_end_result is None else None
    
    return basic_info_dict

In [52]:
regex = re.compile(r'([\-|\'|\"|\)|\.|I|V|\]|\d]|(licable)|one|ontents)+(\s)*(\d)*(\s)+(item|ITEM|Item)(\s)*((\d\s*\d)|(\w)*)(\s)*([\.|\:|-])*(\s)*([A-Z])+(\w)*')
def get_first_stage_match_result(demo_file):
    with open(demo_file, 'r') as f:
        lines = f.readlines()
    content = ''.join(lines)
    content = content.replace('Table of Contents', '').replace('It em ', 'Item ').replace('I t e m ', 'Item ').replace('Ite m ', 'Item ').replace('I tem ', 'Item ')
    matches = regex.finditer(content)
    
    head_contents = ''.join(lines[:150]).lower()
    basic_info_dict = get_basic_information(head_contents)

    result_list = []
    number_section_list = []
    none_number_count = 0
    for match in matches:
        left_pos, right_pos = match.span()
        tmp_sentence = content[left_pos: right_pos]
#         tmp_result = {'left': left_pos, 'right': right_pos, 'sentence': tmp_sentence}
        item_begins = re.search(r'(item|ITEM|Item)', tmp_sentence).span()[0]
        tmp_sentence = tmp_sentence[item_begins:]
        tmp_result = {'left': left_pos+item_begins, 'right': right_pos, 'sentence': tmp_sentence}
        # 加个判断！ 如果下一行的开头就是item  则放弃这个记录
        next_sentence = content[right_pos: right_pos+100]
        item_next_begins = re.match(r'.*\n(\s|\d)*(Item|ITEM|item)', next_sentence)
        if not item_next_begins is None:
            continue
        # 加个判断！ 如果前面是see 就去掉
        pre_sentence = content[left_pos-50: left_pos].lower()
        if 'see ' in pre_sentence:
            continue
        result_list.append(tmp_result)
        
        number_result = re.search(r'\d+\s*\d*', tmp_sentence)
        if not number_result is None:
            span = number_result.span()
            number = int(tmp_sentence[span[0]: span[1]].replace(' ','').replace('\n','').replace('\t',''))
            # 得到数字
            number_section_list.append(number)
        else:
            number_section_list.append(None)
            none_number_count += 1
    clean_result_list = []
    new_number_section_list = []
    if none_number_count >= int(0.8*len(result_list)):  # 则很有可能由 section one,two... 构成
        clean_result_list = result_list
        new_number_section_list = number_section_list #[]
    else:
        result_list = [result_list[i] for i in range(0, len(result_list)) if number_section_list[i] is not None]
        number_section_list = [d for d in number_section_list if d is not None]
        for i in range(0, len(result_list)):
            if (i == 0) or (i == len(result_list) - 1):
                clean_result_list.append(result_list[i])
                new_number_section_list.append(number_section_list[i])
                continue
            tmp_number = number_section_list[i]
            previous_num = number_section_list[i-1]
            next_num = number_section_list[i+1]
            
            if (tmp_number >= previous_num) or (next_num >= tmp_number):
                clean_result_list.append(result_list[i])
                new_number_section_list.append(number_section_list[i])
#             else:
#                 print(result_list[i])
#                 print(tmp_number, previous_num, next_num)
        
    for i in range(0, len(clean_result_list)):
        if i == len(clean_result_list)-1:
            clean_result_list[i]['end_pos'] = None
            clean_result_list[i]['char_num'] = clean_result_list[i]['right'] - clean_result_list[i]['left']
        else:
            clean_result_list[i]['end_pos'] = clean_result_list[i+1]['left']
            clean_result_list[i]['char_num'] = clean_result_list[i]['end_pos'] - clean_result_list[i]['left']
    return content, clean_result_list, new_number_section_list, basic_info_dict



re_item1 = r'item(\s)*1[(\s)|.|-|:|A-Z]+'
re_item1a = r'item(\s)*1(\s)*a[(\s)|.|-|:|A-Z]+'
re_item2 = r'item(\s)*2[(\s)|.|-|:|A-Z]+'
re_item7 = r'item(\s)*7[(\s)|.|-|:|A-Z]+'
re_item7a = r'item(\s)*7(\s)*a[(\s)|.|-|:|A-Z]+'
re_item_one = r'item(\s)*one[(\s)|.|-|:|A-Z]+'
re_item_seven = r'item(\s)*seven[(\s)|.|-|:|A-Z]+'


def get_second_stage_data(content, result_list):
    # 只提取 item1 item1a item7 item7a 的数据
    item1_list = []
    item1a_list = []
    item2_list = []
    item7_list = []
    item7a_list = []

    for result in result_list:
        research_result = re.search(re_item1, result['sentence'].lower())
        if not research_result is None:
            if not result['end_pos'] is None:
                item1_list.append(result)
                continue
        research_result = re.search(re_item1a, result['sentence'].lower())
        if not research_result is None:
            if not result['end_pos'] is None:
                item1a_list.append(result)
                continue
        research_result = re.search(re_item2, result['sentence'].lower())
        if not research_result is None:
            if not result['end_pos'] is None:
                item2_list.append(result)
                continue
        research_result = re.search(re_item7, result['sentence'].lower())
        if not research_result is None:
            if not result['end_pos'] is None:
                item7_list.append(result)
                continue
        research_result = re.search(re_item7a, result['sentence'].lower())
        if not research_result is None:
            if not result['end_pos'] is None:
                item7a_list.append(result)
                continue
            
            
        capital_sentence = content[result['left']: result['right']+40].lower().replace('\n', ' ')
        research_result = re.search(re_item_one, capital_sentence)
        if not research_result is None:
            if 'business' in capital_sentence:
                item1_list.append(result)
            elif 'risk' in capital_sentence:
                item1a_list.append(result)
            continue
        
        research_result = re.search(re_item_seven, capital_sentence)
        if not research_result is None:
            if 'manage' in capital_sentence:
                item7_list.append(result)
            elif 'quanti' in capital_sentence:
                item7a_list.append(result)
            continue

    section_result = {'item1':None, 'item1a':None, 'item2': None, 'item7':None, 'item7a':None}
    if len(item1_list) > 0:
#         item1_list.sort(key=lambda x:x['char_num'], reverse=False)
        section_result['item1'] = item1_list[-1]
    if len(item1a_list) > 0:
#         item1a_list.sort(key=lambda x:x['char_num'], reverse=False)
        section_result['item1a'] = item1a_list[-1]
    if len(item2_list) > 0:
#         item1a_list.sort(key=lambda x:x['char_num'], reverse=False)
        section_result['item2'] = item2_list[-1]
    if len(item7_list) > 0:
#         litem7_list.sort(key=lambda x:x['char_num'], reverse=False)
        section_result['item7'] = item7_list[-1]
    if len(item7a_list) > 0:
#         item7a_list.sort(key=lambda x:x['char_num'], reverse=False)
        section_result['item7a'] = item7a_list[-1]
    
    return section_result


def get_item_content(content, section_result):
    content_result = {}
    for item in section_result:
        tmp_result = section_result[item]
        if tmp_result is None:
            content_result[item] = None
        else:
            tmp_left, tmp_right = tmp_result['left'], tmp_result['end_pos']
            tmp_content = content[tmp_left: tmp_right].replace('\n', ' ').replace('\t', ' ').replace('\r','')
            content_result[item] = tmp_content
    return content_result

def check_valid(number_section_list):
    if len(number_section_list) <= 2:
        return True, None
    number_next = np.array(number_section_list[2:])
    number_previous = np.array(number_section_list[:-2])
    current = np.array(number_section_list[1:-1])

    previous_gap = np.abs(current - number_previous)
    next_gap = np.abs(current - number_next)
    
    check_array = (next_gap >= 2) & (previous_gap >= 2)
    return sum(check_array) < 0.5, check_array

In [32]:
# 使用csv.DictWriter输出文件


fieldnames = ['fila_name', 'GrossFileSize', 'netfilesize', 'n_tables', 'n_exhibits', 
              'submission_type', 'document_count', 'confirmed_period', 'filed_date', 
              'date_change', 'sros', 'company_name', 'cik', 'fiscal_end',
              'item1', 'item1a', 'item2', 'item7', 'item7a']


with open('all_10_k_data.csv', 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    for i in range(0, len(annual_report_list)): #
        if i % 1000 ==0:
            print(i)
        tmp_file = annual_report_list[i] #[i]
        content, result_list, number_section_list, basic_info_dict = get_first_stage_match_result(tmp_file)
        section_result = get_second_stage_data(content, result_list)
        content_result = get_item_content(content, section_result)
        for d in content_result:
            basic_info_dict[d] = content_result[d]
        writer.writerow(basic_info_dict)