# Preamble

In [1]:
%matplotlib notebook

## Notebook parameters

In [2]:
NAME = 'a_6c_calculate_text_metrics' 
PROJECT = 'covid-empirical'
PYTHON_VERSION = '3.9'
USER = 'linuxBox' ## Note, this notebook is designed to run on Linux.
CONDA_ENVIRONMENT = 'covid-empirical'
USE_EXTERNAL_PIPELINE = True

## Run preamble script

In [3]:
%run -i preamble.py 

----------------------------------------------------------------------------------
The following utility functions are loaded and available through `functions.<..>`:
----------------------------------------------------------------------------------

extract_data_edgar_link, fast_load_json, fast_store_json, flatten_multiindex_column, inDB, recreate_edgar_link

----------------------------------------------------------------
The following modules and functions are imported by preamble.py:
----------------------------------------------------------------

copy, delayed, importlib, json, math, np, orjson, os, pd, plt, pqdm_p, pqdm_t, random, re, requests, sys, time, yaml


## Notebook specific imports

In [4]:
from bs4 import BeautifulSoup
import html
import unidecode
import warnings

### Syllables parser

In [5]:
import pyphen

In [6]:
syllable_parser = pyphen.Pyphen(lang = 'en_US')

### Set up spacy

In [7]:
import spacy

In [9]:
nlp = spacy.load('en_core_web_sm', disable = ['tok2vec', 'tagger' ,'parser' ,'attribute_ruler', 'lemmatizer', 'ner'])
nlp.enable_pipe('senter')
nlp.max_length = 3_000_000

--------------------
# Load processed filings
---------------------

## Sample file

In [20]:
filing_df = pd.read_hdf(Path.cwd() / '2_pipeline' / 'a_6a_download_raw_filings' / 'out' / 'filing_df.h5')

## Index done files

In [21]:
full_done_folder = externalPipelineFolder / 'a_6b_parse_filings' / 'out' / 'full'
split_done_folder = externalPipelineFolder / 'a_6b_parse_filings' / 'out' / 'split'

In [22]:
full_done_dict = {}
full_done_list = []
for folder in os.listdir(full_done_folder):
    full_done_dict[folder] = []
    for file in (full_done_folder / folder).glob('*.json.gz'):
        full_done_dict[file.name] = file
        full_done_list.append(file)
        
print(f'Number of filing done in the full dataset: {len(full_done_list):,}')

Number of filing done in the full dataset: 81,109


In [23]:
split_done_dict = {}
split_done_list = []
for folder in os.listdir(split_done_folder):
    split_done_dict[folder] = []
    for file in (split_done_folder / folder).glob('*.json.gz'):
        split_done_dict[file.name] = file
        split_done_list.append(file)

print(f'Number of filing done in the split dataset: {len(split_done_list):,}')

Number of filing done in the split dataset: 72,996


## Example file

In [24]:
index_to_load = 17
full_data = functions.fast_load_json(full_done_list[index_to_load])
print(full_data['link'])
full_text = full_data['filing_text']
split_data = functions.fast_load_json(split_done_list[index_to_load])
split_text_list = split_data['filing_list']
print(split_data['link'])

https://www.sec.gov/Archives/edgar/data/1750/000110465921118843/air-20210831x10q.htm
https://www.sec.gov/Archives/edgar/data/1750/000110465921118843/air-20210831x10q.htm


------------------------------------
# Text metric logic
------------------------------------

### Development params

In [25]:
text = full_text
filing_details = functions.extract_data_edgar_link(full_data['link'])
verbose = 1

## Covid keywords

https://www.nature.com/articles/s41599-022-01039-1

>  Since this study examines the attention attributed to COVID-19 in the SEC filings, the discovery mechanism of relevant COVID-19 mentions is of central importance. To mitigate susceptibility to errors due to word splitting, stemming and other text preprocessing, we decided for the most simple approach based on the matching of regular expressions. We scanned the reports for the **two relatively unambiguous terms ‘corona’ and ‘covid’**, also accounting for ‘coronavirus’ and ‘covid-19’ without duplication. For this process, the entire text is set to lower case.

In [26]:
def count_covid_words(text:str)->dict:
    covid_keywords = ['corona', 'covid'] 

    text_lower = text.lower() 

    covid_count_dict = {}
    for kw in covid_keywords:
        count = text_lower.count(kw)
        covid_count_dict[kw] = count

    return covid_count_dict

In [27]:
count_covid_words(full_text)

{'corona': 1, 'covid': 7}

## Syllables logic

In [28]:
def get_syllables(word, syllable_parser=syllable_parser):
    if not isinstance(word, str):
        word = str(word)
    syllables = syllable_parser.inserted(word.lower()).split("-")
    return syllables

In [29]:
word = nlp('University')

In [30]:
get_syllables(word)

['uni', 'ver', 'si', 'ty']

## Complexity

### Check validity of sentence

In [31]:
def check_validity_sentence(sen):
    sen_str = str(sen)
    token_list  = [token for token in sen if not token.is_punct]
    all_chars = ''.join([str(token) for token in token_list])

    valid_sen = True

    ## If too many uppercase characters, not a valid sentence. 
    num_char = len(all_chars)
    num_upper = len([x for x in all_chars if x.isupper()])
    
    if not num_char:
        valid_sen = False

    if valid_sen:
        perc_upper = num_upper / num_char 
        if perc_upper > 0.33:
            valid_sen = False

    ## Require a sentence to have at least 10 characters
    if valid_sen:
        if num_char < 10:
            valid_sen = False
        
    ## If starts with Yes --> false
    if valid_sen:
        if sen_str[:3] == 'Yes':
            valid_sen = False
        
    ## Check for valid sentence endings
    valid_text_endings = ['.', '?', '!', "'", '"', ":"]
    if valid_sen:
        if sen_str[-1] not in valid_text_endings:
            valid_sen = False
            
    ## Check for sentence start
    if valid_sen:
        if sen_str[0] in ['<', '(', '[', '{']:
            valid_sen = False
            
    ## Check for count of special characters
    for special_char in ['*', ':', ';', '-', '_']:
        if valid_sen:
            if sen_str.count(special_char) > 5:
                valid_sen = False
                
    return valid_sen

### Functions to split into tokens and sentences

In [32]:
def seg_and_token_spacy(document):
    processed_doc = nlp(document)
    sen_list_raw = list(processed_doc.sents)
    sen_list = []
    for sen in sen_list_raw:
        if check_validity_sentence(sen):
            sen_list.append([token for token in sen if not token.is_punct])
        
    return sen_list

### Calculate text metrics

In [33]:
def calc_metrics_for_sentence(sen_comp, debug = False):
    sen_dict = {
        'number_of_tokens' : len(sen_comp),
        'number_of_words' : np.nan,
        'number_of_numbers' : 0,
        'number_of_complex_words' : 0
    }

    tokens_word, tokens_number = [], []
    for token in sen_comp:
        if token.is_alpha:
            tokens_word.append(token)
        elif token.is_digit:
            tokens_number.append(token)
            
    sen_dict['number_of_words'] = len(tokens_word)
    sen_dict['number_of_numbers'] = len(tokens_number)

    syl_list = []
    for sub_l in [get_syllables(token) for token in tokens_word]:
        num_valid_syl = 0
        for i, item in enumerate(sub_l):
            if len(sub_l) != 3:
                num_valid_syl += 1
            else:
                if item not in ['es', 'ed']:
                    num_valid_syl += 1
        syl_list.append(num_valid_syl)
    
    if debug:
        for i, token in enumerate(tokens_word):
            print(token, syl_list[i])
        print()

    sen_dict['number_of_complex_words'] = np.count_nonzero([x > 2 for x in syl_list])
    
    return sen_dict

In [34]:
def calc_metrics_for_text(text:str, filing_details:dict, include_covid:bool=False)->dict:
    full_metric_dict = {
        'uniqueID' : filing_details['uniqueID'],
        'number_of_sentences' : 0, 
        'number_of_tokens' : 0,
        'number_of_words' : 0,
        'number_of_numbers' : 0,
        'number_of_complex_words' : 0,
        'average_sen_length' : np.nan,
        'perc_complex_words' : np.nan,
        'fog_index' : np.nan,
    }

    sentence_components = seg_and_token_spacy(text)
    full_metric_dict['number_of_sentences'] = len(sentence_components)

    ## Sentence level metrics
    metric_list = []
    for sen_comp in sentence_components:
        metric_list.append(calc_metrics_for_sentence(sen_comp, debug=False))

    ## Aggregate to document level

    for metric_dict in metric_list:
        for k,v in metric_dict.items():
            full_metric_dict[k] += v

    ## Complexity metrics for FOG
    if full_metric_dict['number_of_sentences'] and full_metric_dict['number_of_words']:
        full_metric_dict['average_sen_length'] =  full_metric_dict['number_of_words'] / full_metric_dict['number_of_sentences']
        full_metric_dict['perc_complex_words'] =  full_metric_dict['number_of_complex_words'] / full_metric_dict['number_of_words']
        full_metric_dict['fog_index'] =  0.4 * (full_metric_dict['average_sen_length'] + 100 * full_metric_dict['perc_complex_words'])
    
    if include_covid:
        ## Covid words
        covid_dict = count_covid_words(text)
        total_covid = sum([v for k,v in covid_dict.items()])
        full_metric_dict['number_of_covid_words'] = total_covid 

    return full_metric_dict

## Identify sections for `risk factors` and `mda`

In [35]:
def identify_sections_of_interest(filing_list:list)->dict:
    sections = []
    for item in filing_list:
        if item['section_label'] not in sections:
            sections.append(item['section_label'])

    #### Identify risk factors

    risk_factor_section, mda_section = '', ''
    for section in sections:
        section_lower = section.lower()
        if not risk_factor_section:
            if all([term in section_lower for term in ['risk', 'factors']]) and not any([term in section_lower for term in ['quantitative', 'summary']]):
                risk_factor_section = section
        if not mda_section:
            if all([term in section_lower for term in ['management', 'discussion', 'analysis']]):
                mda_section = section
                
    return {
        'risk_factor_label' : risk_factor_section,
        'mda_label' : mda_section
    }

## Risk factors

In [36]:
def calc_metric_risk_factor(filing_list:list, label_dict:dict, filing_details:dict):
    full_metric_dict = {
        'uniqueID' : filing_details['uniqueID'],
        'number_of_sentences' : np.nan, 
        'number_of_tokens' : np.nan,
        'number_of_words' : np.nan,
        'number_of_numbers' : np.nan,
        'number_of_complex_words' : np.nan,
        'average_sen_length' : np.nan,
        'perc_complex_words' : np.nan,
        'fog_index' : np.nan,
        'number_of_covid_words' : np.nan,
        'number_of_risk_factors' : np.nan
    }

    risk_factor_label = label_dict['risk_factor_label']
    if risk_factor_label:
        rf_items = [item for item in filing_list if item['section_label'] == risk_factor_label]

        ## Identify the text and risk factor headers
        all_text = ' '.join([item['clean_text'] for item in rf_items])
        valid_text = ' '.join([item['clean_text'] for item in rf_items if item['type'] in ['text']])
        risk_factor_list = []
        for i, item in enumerate(rf_items):
            if item['type'] in ['header', 'sub-header']:
                header_text = item['clean_text']
                header_text_lower  = header_text.lower()
                if header_text not in risk_factor_list and not re.search('item \d', header_text_lower):
                    if not all([term in header_text_lower for term in ['risk', 'factors']]):
                        if not all([term in header_text_lower for term in ['face', 'materially', 'risks']]):
                            if i < (len(rf_items) - 1):
                                if rf_items[i+1]['type'] not in ['sub-header', 'header']: ## This prevents group headers from counting as a risk factor
                                    risk_factor_list.append(item['clean_text'])
                    
        ## Calculate metrics
        
        ### Covid
        covid_dict = count_covid_words(all_text)
        total_covid = sum([v for k,v in covid_dict.items()])
        full_metric_dict['number_of_covid_words'] = total_covid 
        
        ### Risk factors
        full_metric_dict['number_of_risk_factors'] = len(risk_factor_list)
        
        ### Text statistics
        for k,v in calc_metrics_for_text(valid_text, filing_details, include_covid=False).items():
            if k not in ['uniqueID']:
                full_metric_dict[k] = v
        
    
    return full_metric_dict

## MD&A

In [37]:
def calc_metric_mda(filing_list:list, label_dict:dict, filing_details:dict):

    full_metric_dict = {
            'uniqueID' : filing_details['uniqueID'],
            'number_of_sentences' : np.nan, 
            'number_of_tokens' : np.nan,
            'number_of_words' : np.nan,
            'number_of_numbers' : np.nan,
            'number_of_complex_words' : np.nan,
            'average_sen_length' : np.nan,
            'perc_complex_words' : np.nan,
            'fog_index' : np.nan,
            'number_of_covid_words' : np.nan,
        }

    mda_label = label_dict['mda_label']
    if mda_label:
        mda_items = [item for item in filing_list if item['section_label'] == mda_label]

        ## Identify the text and risk factor headers
        all_text = ' '.join([item['clean_text'] for item in mda_items])
        valid_text = ' '.join([item['clean_text'] for item in mda_items if item['type'] in ['text']])

        ## Calculate metrics

        ### Covid
        covid_dict = count_covid_words(all_text)
        total_covid = sum([v for k,v in covid_dict.items()])
        full_metric_dict['number_of_covid_words'] = total_covid 

        ### Text statistics
        for k,v in calc_metrics_for_text(valid_text, filing_details, include_covid=False).items():
            if k not in ['uniqueID']:
                full_metric_dict[k] = v
                
    return full_metric_dict

## Combo function

In [38]:
def calc_all_for_filing(filename:str, verbose:int=0)->dict:
    file_path_full = full_done_dict[filename]
    file_path_split, sections_extracted =  None, False
    if filename in split_done_dict.keys():
        file_path_split = split_done_dict[filename]
        sections_extracted = True

    ## Load data
    full_data = functions.fast_load_json(file_path_full)
    if file_path_split:
        split_data = functions.fast_load_json(file_path_split)

    filing_details = functions.extract_data_edgar_link(full_data['link'])
    if verbose > 0:
        print(filing_details['link'])

    filing_metrics = {}
    for key in ['link', 'uniqueID', 'cik', 'cik_padded']:
        filing_metrics[key]  = filing_details[key]

    filing_metrics['sections_extracted'] = sections_extracted
    
    ## ---------------------
    ## Stats for full filing
    ## ---------------------

    full_text = full_data['filing_text']
    full_metrics = calc_metrics_for_text(full_text,filing_details, include_covid=True)

    ## Add to filing level metrics
    for k,v in full_metrics.items():
        if k not in ['uniqueID']:
            filing_metrics['full_'+k]  = v

    ## ------------------
    ## Stats for sections
    ## ------------------
    
    if file_path_split:
        filing_list = split_data['filing_list']

        label_dict = identify_sections_of_interest(filing_list)

        rf_metrics = calc_metric_risk_factor(filing_list, label_dict, filing_details)
        mda_metrics = calc_metric_mda(filing_list, label_dict, filing_details)

        ## Add to filing level metrics
        for k,v in rf_metrics.items():
            if k not in ['uniqueID']:
                filing_metrics['rf_'+k]  = v

        for k,v in mda_metrics.items():
            if k not in ['uniqueID']:
                filing_metrics['mda_'+k]  = v
    
    return filing_metrics

---------------------
## Debug and testing
---------------------

In [39]:
filename = full_done_list[3201].name
print(filename)
calc_all_for_filing(filename,verbose=1)

745732-=-000074573217000034-=-rost-20171028x10q-=-htm.json.gz
https://www.sec.gov/Archives/edgar/data/745732/000074573217000034/rost-20171028x10q.htm


{'link': 'https://www.sec.gov/Archives/edgar/data/745732/000074573217000034/rost-20171028x10q.htm',
 'uniqueID': '745732-=-000074573217000034-=-rost-20171028x10q-=-htm',
 'cik': '745732',
 'cik_padded': '0000745732',
 'sections_extracted': False,
 'full_number_of_sentences': 357,
 'full_number_of_tokens': 9634,
 'full_number_of_words': 8810,
 'full_number_of_numbers': 562,
 'full_number_of_complex_words': 1936,
 'full_average_sen_length': 24.677871148459385,
 'full_perc_complex_words': 0.21975028376844494,
 'full_fog_index': 18.661159810121553,
 'full_number_of_covid_words': 0}

-------------------------
# Run
-------------------------

In [42]:
def combo_func(file):
    with warnings.catch_warnings():
        try:
            warnings.simplefilter("ignore")
            filename  = file.name
            return True, calc_all_for_filing(filename,verbose = 0)
        except Exception as e:
            return False, str(e)

In [43]:
todo_files = full_done_list

In [44]:
res_list = pqdm_p(todo_files, combo_func, n_jobs = 20)

QUEUEING TASKS | :   0%|          | 0/81109 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/81109 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/81109 [00:00<?, ?it/s]

In [45]:
full_metric_list = []
fail_list = []
for status, res in res_list:
    if status:
        full_metric_list.append(res)
    else:
        fail_list.append(res)
print(len(full_metric_list))

81109


In [46]:
full_metric_df = pd.DataFrame(full_metric_list)

## Add meta data

In [47]:
full_metric_df = pd.merge(full_metric_df, filing_df[['uniqueID', 'filingDate', 'reportDate', 'form']], on = 'uniqueID', how = 'left')

## Deal with risk factors in 10-Q

Often the company does not include risk factors in the 10-Q, only by reference

In [48]:
new_list = []
for index, row in full_metric_df.iterrows():
    if row['rf_number_of_sentences'] <= 5:
        for k,v in row.iteritems():
            if 'rf_'  == k[:3]:
                row[k]  = np.nan
    
    new_list.append(row.to_dict())

In [49]:
full_metric_df = pd.DataFrame(new_list)

## Stats

In [50]:
full_metric_df[full_metric_df.form == '10-K'].sort_values('full_fog_index').describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
full_number_of_sentences,20083.0,1776.259374,733.293568,32.0,1337.0,1675.0,2087.0,11659.0
full_number_of_tokens,20083.0,53824.337549,23530.824854,499.0,39409.5,50404.0,64103.0,351271.0
full_number_of_words,20083.0,50985.993377,22314.469008,466.0,37250.0,47740.0,60792.5,325301.0
full_number_of_numbers,20083.0,1501.465269,826.719945,9.0,1046.0,1348.0,1756.5,16641.0
full_number_of_complex_words,20083.0,11052.724842,5073.199625,136.0,8015.5,10298.0,13091.5,76774.0
full_average_sen_length,20083.0,28.46886,2.248223,10.355556,26.962952,28.36234,29.89367,40.606082
full_perc_complex_words,20083.0,0.216518,0.011421,0.170551,0.209099,0.215977,0.223295,0.299351
full_fog_index,20083.0,20.048261,0.925704,14.009707,19.428891,20.035036,20.638801,24.229627
full_number_of_covid_words,20083.0,19.674102,29.987178,0.0,0.0,1.0,34.0,590.0
rf_number_of_sentences,17840.0,342.532399,202.532248,6.0,202.0,290.0,437.0,2053.0


In [54]:
full_metric_df[full_metric_df.form == '10-Q'].sort_values('full_fog_index').describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
full_number_of_sentences,61026.0,646.564137,351.69586,2.0,409.0,555.0,789.0,3501.0
full_number_of_tokens,61026.0,20200.684446,11417.590669,31.0,12488.0,17140.0,24506.0,119607.0
full_number_of_words,61026.0,18861.441877,10908.286108,27.0,11541.25,15853.5,22805.75,109952.0
full_number_of_numbers,61026.0,723.459755,399.08633,3.0,479.0,641.0,858.0,6175.0
full_number_of_complex_words,61026.0,3909.694819,2334.625314,9.0,2353.0,3261.0,4707.0,23210.0
full_average_sen_length,61026.0,28.882377,2.533456,13.5,27.111195,28.719866,30.520224,45.137931
full_perc_complex_words,61026.0,0.206291,0.012691,0.140384,0.197877,0.206333,0.214586,0.333333
full_fog_index,61026.0,19.804603,1.133181,15.662289,19.012903,19.757491,20.548404,25.770986
full_number_of_covid_words,61026.0,13.420837,22.200112,0.0,0.0,0.0,22.0,318.0
rf_number_of_sentences,22263.0,207.65135,278.111279,6.0,15.0,37.0,387.0,1359.0


## Final clean for merge

### Convert link to fname

In [51]:
full_metric_df['fname'] = full_metric_df['link'].apply(lambda x: functions.extract_data_edgar_link(x)['fname'])

## Drop columns

In [52]:
full_metric_df = full_metric_df.drop(['link','filingDate', 'reportDate'], axis=1, errors = 'ignore')

## Store

### Stata

In [53]:
full_metric_df.to_stata(pipeline / 'out'/ 'text_statistics.dta', write_index=False)

### Excel

In [11]:
full_metric_df.to_excel(pipeline / 'out'/ 'text_statistics.xlsx', index=False)