Gunning fog index

In [None]:
import pandas as pd
import nltk
import pyphen # For syllable counting
import string
import re
import numpy as np # For np.nan

# --- NLTK Resource Download Check (ensure 'punkt' is available) ---
# (Assuming 'punkt' is already downloaded or handled as in previous scripts)
PUNKT_RESOURCE_ID = 'tokenizers/punkt'
try:
    nltk.data.find(PUNKT_RESOURCE_ID)
except LookupError: # Catches both DownloadError and general LookupError
    print(f"NLTK 'punkt' tokenizer not found ('{PUNKT_RESOURCE_ID}'). Attempting to download...")
    try:
        nltk.download('punkt', quiet=True)
        nltk.data.find(PUNKT_RESOURCE_ID) # Verify after download
        print("'punkt' downloaded successfully.")
    except Exception as e:
        print(f"Failed to download or verify 'punkt': {e}")
        print("Please ensure an internet connection and NLTK can write to its data directory.")
        print("You might need to run Python with administrator privileges or manually download 'punkt'.")
        exit()
# --- End NLTK Resource Download Check ---

# Initialize Pyphen for English
try:
    dic = pyphen.Pyphen(lang='en_US')
except Exception as e:
    print(f"Error initializing pyphen for 'en_US': {e}")
    print("Please ensure pyphen dictionaries are correctly installed.")
    exit()

def count_syllables(word):
    """Counts syllables in a word using pyphen."""
    word = word.lower().strip(string.punctuation)
    if not word:
        return 0
    
    hyphenated_word = dic.inserted(word)
    if not hyphenated_word: 
        return 1 if len(word) > 0 else 0
        
    num_syllables = len(hyphenated_word.split('-'))
    
    if len(word) <= 3 and num_syllables == 1:
        return 1
    return num_syllables


def get_gunning_fog_index(text):
    """Calculates the Gunning Fog Index for a given text."""
    if not isinstance(text, str) or not text.strip():
        return np.nan # Use np.nan for missing/invalid inputs

    # 1. Sentence Segmentation
    sentences = nltk.sent_tokenize(text)
    num_sentences = len(sentences)
    if num_sentences == 0:
        return np.nan

    # 2. Word Tokenization and Counting
    words = []
    for sentence in sentences:
        sentence_words = nltk.word_tokenize(sentence)
        words.extend([word for word in sentence_words if word.isalnum()]) 

    num_words = len(words)
    if num_words == 0:
        return np.nan

    # 3. Average Words Per Sentence (AWSL)
    avg_words_per_sentence = num_words / num_sentences

    # 4. Complex Words (3+ syllables)
    complex_word_count = 0
    for word in words:
        if len(word) <= 2: 
            continue
        syllables = count_syllables(word)
        if syllables >= 3:
            complex_word_count += 1
    
    # 5. Percentage of Complex Words (PCW)
    percentage_complex_words = (complex_word_count / num_words) * 100 if num_words > 0 else 0

    # 6. Gunning Fog Index
    gunning_fog = 0.4 * (avg_words_per_sentence + percentage_complex_words)
    
    return gunning_fog

# --- Main script execution (Modified for Combined Text per Doc_id) ---
if __name__ == "__main__":
    # Load the CSV file
    try:
        df = pd.read_csv('llm_data3.csv')
    except FileNotFoundError:
        print("Error: 'llm_data3.csv' not found. Make sure the file is in the same directory.")
        exit()

    doc_level_gfi_results = []

    print("Processing documents to calculate Gunning Fog Index on combined company speech...")

    # Group by Doc_id to process each earnings call
    for doc_id, group in df.groupby('Doc_id'):
        # Filter for company representative speech (Speaker_Type == 1)
        company_speech_segments = group[group['Speaker_Type'] == 1]['Speech'].astype(str).tolist()
        
        if not company_speech_segments:
            # No company speech in this Doc_id, append NaN or skip
            doc_level_gfi_results.append({'Doc_id': doc_id, 'Gunning_Fog_Combined': np.nan})
            # print(f"No company speech found for Doc_id: {doc_id}")
            continue
            
        # Concatenate all company speech segments into a single string
        combined_company_text = " ".join(company_speech_segments)
        
        # Calculate Gunning Fog Index on the combined text
        # The get_gunning_fog_index function handles empty or very short combined_company_text
        gfi_score_for_doc = get_gunning_fog_index(combined_company_text)
        
        doc_level_gfi_results.append({'Doc_id': doc_id, 'Gunning_Fog_Combined': gfi_score_for_doc})

    print("Gunning Fog Index calculation complete for all documents.")

    if not doc_level_gfi_results:
        print("No documents processed or no company speech found in any document. Exiting.")
        exit()

    # Convert list of results to a DataFrame
    final_gfi_df = pd.DataFrame(doc_level_gfi_results)
    
    # Rename the column for clarity (optional, but good practice)
    final_gfi_df.rename(columns={'Gunning_Fog_Combined': 'Gunning_Fog'}, inplace=True)

    # Handle potential NaN values if a Doc_id had no company speech or all speeches resulted in None
    final_gfi_df.dropna(subset=['unning_Fog'], inplace=True) # Or your chosen column name

    # Save the results to a new CSV file
    output_filename = 'doc_gunning_fog_scores_combined_fix.csv'
    final_gfi_df.to_csv(output_filename, index=False)

    print(f"\nGunning Fog Index scores (calculated on combined text per Doc_id) saved to '{output_filename}'")
    print("\nFirst few rows of the output:")
    print(final_gfi_df.head())

Processing documents to calculate Gunning Fog Index on combined company speech...
Gunning Fog Index calculation complete for all documents.

Gunning Fog Index scores (calculated on combined text per Doc_id) saved to 'doc_gunning_fog_scores_combined_fix.csv'

First few rows of the output:
    Doc_id  Average_Gunning_Fog
0  1943275            10.718415
1  2038813             9.877681
2  2053228            15.148305
3  2053230            13.077899
4  2056621            13.453787


VADER Sentiment Analysis

In [2]:
import pandas as pd
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import numpy as np # For np.nan

# Initialize VADER so you don't have to do it for each row
analyzer = SentimentIntensityAnalyzer()

def get_vader_sentiment_scores(text):
    """
    Calculates VADER sentiment scores for a given text.
    Returns a dictionary with neg, neu, pos, and compound scores.
    """
    if not isinstance(text, str) or not text.strip():
        # Return a dictionary with NaNs for empty/invalid input
        return {'neg': np.nan, 'neu': np.nan, 'pos': np.nan, 'compound': np.nan}
    
    vs = analyzer.polarity_scores(text)
    return vs

# --- Main script execution (Modified for Combined Text per Doc_id) ---
if __name__ == "__main__":
    # Load the CSV file
    try:
        df = pd.read_csv('llm_data3.csv')
    except FileNotFoundError:
        print("Error: 'llm_data3.csv' not found. Make sure the file is in the same directory.")
        exit()

    doc_level_vader_results = []

    print("Processing documents to calculate VADER sentiment on combined company speech...")

    # Group by Doc_id to process each earnings call
    for doc_id, group in df.groupby('Doc_id'):
        # Filter for company representative speech (Speaker_Type == 1)
        company_speech_segments = group[group['Speaker_Type'] == 1]['Speech'].astype(str).tolist()
        
        if not company_speech_segments:
            # No company speech in this Doc_id, append a record with NaNs
            vader_scores_for_doc = {'neg': np.nan, 'neu': np.nan, 'pos': np.nan, 'compound': np.nan}
            # print(f"No company speech found for Doc_id: {doc_id}")
        else:
            # Concatenate all company speech segments into a single string
            combined_company_text = " ".join(company_speech_segments)
            
            # Calculate VADER sentiment on the combined text
            # The get_vader_sentiment_scores function handles empty or very short combined_company_text
            vader_scores_for_doc = get_vader_sentiment_scores(combined_company_text)
        
        # Add Doc_id to the scores dictionary and append to results list
        result_record = {'Doc_id': doc_id, **vader_scores_for_doc}
        doc_level_vader_results.append(result_record)

    print("VADER sentiment calculation complete for all documents.")

    if not doc_level_vader_results:
        print("No documents processed or no company speech found in any document. Exiting.")
        exit()

    # Convert list of results to a DataFrame
    final_vader_df = pd.DataFrame(doc_level_vader_results)
    
    # Rename columns for clarity in the final output
    final_vader_df.rename(columns={
        'neg': 'VADER_Negative_Combined',
        'neu': 'VADER_Neutral_Combined',
        'pos': 'VADER_Positive_Combined',
        'compound': 'VADER_Compound_Combined'
        # Or use 'Average_VADER_...' if you want to maintain consistency with previous naming conventions
        # e.g., 'Average_VADER_Negative' if that's what your regression scripts expect.
        # For now, '_Combined' clearly indicates the new method.
    }, inplace=True)

    # Handle potential NaN values if a Doc_id had no company speech or all speeches resulted in NaN scores
    # Dropping rows where the compound score is NaN is a common approach.
    final_vader_df.dropna(subset=['VADER_Compound_Combined'], inplace=True)

    # Save the results to a new CSV file
    output_filename = 'doc_vader_sentiment_scores_fix.csv'
    final_vader_df.to_csv(output_filename, index=False)

    print(f"\nVADER sentiment scores (calculated on combined text per Doc_id) saved to '{output_filename}'")
    print("\nFirst few rows of the output:")
    print(final_vader_df.head())

Processing documents to calculate VADER sentiment on combined company speech...
VADER sentiment calculation complete for all documents.

VADER sentiment scores (calculated on combined text per Doc_id) saved to 'doc_vader_sentiment_scores_fix.csv'

First few rows of the output:
    Doc_id  VADER_Negative_Combined  VADER_Neutral_Combined  \
0  1943275                    0.031                   0.817   
1  2038813                    0.026                   0.845   
2  2053228                    0.025                   0.802   
3  2053230                    0.017                   0.820   
4  2056621                    0.013                   0.817   

   VADER_Positive_Combined  VADER_Compound_Combined  
0                    0.152                   1.0000  
1                    0.129                   0.9998  
2                    0.173                   1.0000  
3                    0.164                   0.9999  
4                    0.170                   1.0000  


Lexicon Based Analysis, LM Dictionary

In [3]:
import pandas as pd
import nltk
import re
import numpy as np

# --- NLTK Resource Download Check (ensure 'punkt' is available) ---
# (Assuming 'punkt' is already downloaded or handled as in previous scripts)
PUNKT_RESOURCE_ID = 'tokenizers/punkt'
try:
    nltk.data.find(PUNKT_RESOURCE_ID)
except LookupError:
    print(f"NLTK 'punkt' tokenizer not found ('{PUNKT_RESOURCE_ID}'). Attempting to download...")
    try:
        nltk.download('punkt', quiet=True)
        nltk.data.find(PUNKT_RESOURCE_ID)
        print("'punkt' downloaded successfully.")
    except Exception as e:
        print(f"Failed to download or verify 'punkt': {e}")
        exit()
# --- End NLTK Resource Download Check ---

# --- Define Lexicons ---
LM_DICTIONARY_FILE = "Loughran-McDonald_MasterDictionary_1993-2024.csv"
LM_CATEGORIES_TO_LOAD = {
    "Negative": "lm_negative",
    "Positive": "lm_positive",
    "Uncertainty": "lm_uncertainty",
    "Litigious": "lm_litigious",
    "Strong_Modal": "lm_strong_modal",
    "Weak_Modal": "lm_weak_modal",
    "Constraining": "lm_constraining"
}
LM_WORD_LISTS = {key: set() for key in LM_CATEGORIES_TO_LOAD.values()}

def load_lm_category_words(lm_df, lm_category_column_name):
    words_set = set()
    if lm_category_column_name in lm_df.columns:
        word_series = None
        if lm_df[lm_category_column_name].dtype == 'bool':
            word_series = lm_df[lm_df[lm_category_column_name]]['Word']
        elif pd.api.types.is_numeric_dtype(lm_df[lm_category_column_name]):
            word_series = lm_df[lm_df[lm_category_column_name] > 0]['Word']
        else:
            try:
                word_series = lm_df[lm_df[lm_category_column_name].fillna(False).astype(bool)]['Word']
            except ValueError:
                 print(f"Warning: LM Column '{lm_category_column_name}' type {lm_df[lm_category_column_name].dtype} failed bool conversion.")
        
        if word_series is not None and not word_series.empty:
            if 'Word' in lm_df.columns:
                words_set = set(word_series.astype(str).str.lower().tolist()) # Ensure words are strings and lowercased
                print(f"Successfully loaded {len(words_set)} LM '{lm_category_column_name}' words.")
            else:
                print(f"Warning: 'Word' column missing in LM dict for '{lm_category_column_name}'.")
        elif word_series is None:
             pass
        else:
            print(f"Warning: No words found for LM category '{lm_category_column_name}'.")
    else:
        print(f"Warning: LM Category column '{lm_category_column_name}' not found.")
    return words_set

try:
    lm_df_full = pd.read_csv(LM_DICTIONARY_FILE)
    if 'Word' in lm_df_full.columns:
        # It's better to lowercase the words from the dictionary once, rather than in load_lm_category_words
        # lm_df_full['Word'] = lm_df_full['Word'].astype(str).str.lower() 
        # Actually, let's do it inside load_lm_category_words to ensure it's applied per series
        pass
    else:
        print(f"CRITICAL WARNING: 'Word' column not found in {LM_DICTIONARY_FILE}.")

    for lm_col_name, score_prefix in LM_CATEGORIES_TO_LOAD.items():
        LM_WORD_LISTS[score_prefix] = load_lm_category_words(lm_df_full, lm_col_name)
except FileNotFoundError:
    print(f"ERROR: LM dictionary '{LM_DICTIONARY_FILE}' not found.")
except Exception as e:
    print(f"ERROR: Could not parse LM dictionary '{LM_DICTIONARY_FILE}': {e}")

WEAK_MODAL_VERBS_CUSTOM = {"may", "might", "could", "would", "should"}
HEDGING_LEXICON_CUSTOM = {
    "about", "almost", "apparently", "approximately", "around", "assume", "assumed", "assumes",
    "assumption", "believe", "believed", "believes", "broadly", "cautiously", "conceivably",
    "could", "estimate", "estimated", "estimates", "fairly", "feel", "felt", "frequently",
    "generally", "guess", "guessed", "guesses", "hopefully", "indicate", "indicated", "indicates",
    "largely", "likely", "mainly", "may", "maybe", "might", "mostly", "often", "overall",
    "partially", "perhaps", "plausibly", "possibly", "potential", "potentially", "presumably",
    "presume", "probable", "probably", "quite", "rather", "relatively", "roughly", "seems", "seemed",
    "should", "sometimes", "somewhat", "suggest", "suggested", "suggests", "suppose", "supposed",
    "tend", "tended", "tends", "typically", "uncertain", "unclear", "unlikely", "usually", "virtually",
    "would", "according to", "appear to be", "appears to be", "as far as i can tell", "as far as we know",
    "based on", "can be seen as", "could be", "doubtful that", "effective as", "expected to",
    "expected to be", "feels like", "from our perspective", "highly likely", "i believe", "i feel",
    "i think", "in general", "in most cases", "in my opinion", "in our opinion", "in our view",
    "it appears", "it could be that", "it is conceivable", "it is likely", "it is possible",
    "it is probable", "it may be", "it might be", "it seems", "it seems that", "it would appear",
    "it would seem", "looks like", "may be", "might be", "more or less", "my impression is",
    "not necessarily", "on balance", "our understanding is", "point of view", "points to",
    "possible that", "presumed to be", "seems to", "so to speak", "suggests that", "tend to",
    "tends to", "there is a chance", "there is a possibility", "to some extent", "we assume",
    "we believe", "we estimate", "we feel", "we guess", "we suggest", "we think", "will likely",
    "would argue", "would assume", "would guess", "would suggest"
}
HEDGING_WORDS_CUSTOM = {word for word in HEDGING_LEXICON_CUSTOM if ' ' not in word}
HEDGING_PHRASES_CUSTOM = {phrase for phrase in HEDGING_LEXICON_CUSTOM if ' ' in phrase}
# --- End Define Lexicons ---

def preprocess_text_for_lexicon(text):
    if not isinstance(text, str) or not text.strip():
        return [], "" 
    lower_text = text.lower()
    tokens = re.findall(r'\b\w+\b', lower_text)
    return tokens, lower_text

def calculate_lexicon_scores(speech_text):
    scores = {f"{prefix}_ratio": np.nan for prefix in LM_WORD_LISTS.keys()}
    scores.update({
        'custom_weak_modal_ratio': np.nan,
        'custom_hedging_word_ratio': np.nan,
        'custom_hedging_phrase_ratio': np.nan,
        'total_words_for_lex_analysis': 0
    })

    if not isinstance(speech_text, str) or not speech_text.strip():
        return scores

    tokens, lower_speech_text = preprocess_text_for_lexicon(speech_text)
    num_total_words = len(tokens)
    scores['total_words_for_lex_analysis'] = num_total_words

    if num_total_words == 0:
        for key in scores:
            if key.endswith("_ratio"): scores[key] = 0.0
        return scores

    for score_prefix, word_list in LM_WORD_LISTS.items():
        if word_list: 
            count = sum(1 for token in tokens if token in word_list)
            scores[f"{score_prefix}_ratio"] = count / num_total_words
        else:
            scores[f"{score_prefix}_ratio"] = 0.0 

    weak_modal_custom_count = sum(1 for token in tokens if token in WEAK_MODAL_VERBS_CUSTOM)
    scores['custom_weak_modal_ratio'] = weak_modal_custom_count / num_total_words

    hedging_word_custom_count = sum(1 for token in tokens if token in HEDGING_WORDS_CUSTOM)
    scores['custom_hedging_word_ratio'] = hedging_word_custom_count / num_total_words

    hedging_phrase_custom_count = 0
    for phrase in HEDGING_PHRASES_CUSTOM:
        hedging_phrase_custom_count += lower_speech_text.count(phrase)
    scores['custom_hedging_phrase_ratio'] = hedging_phrase_custom_count / num_total_words
    
    return scores

# --- Main script execution (Modified for Combined Text per Doc_id) ---
if __name__ == "__main__":
    try:
        df = pd.read_csv('llm_data3.csv') 
    except FileNotFoundError:
        print("Error: 'llm_data3.csv' not found. Make sure the earnings call data file is in the same directory.")
        exit()

    doc_level_lexicon_results = []

    print("Processing documents to calculate lexicon scores on combined company speech...")

    for doc_id, group in df.groupby('Doc_id'):
        company_speech_segments = group[group['Speaker_Type'] == 1]['Speech'].astype(str).tolist()
        
        current_doc_scores = {} # To hold scores for the current document
        if not company_speech_segments:
            # Initialize all score keys with NaN if no company speech
            # This uses the keys from the `calculate_lexicon_scores` default return structure
            temp_scores = calculate_lexicon_scores("") # Get default structure
            for key in temp_scores:
                if key.endswith("_ratio"):
                    current_doc_scores[key] = np.nan
                elif key == 'total_words_for_lex_analysis':
                    current_doc_scores[key] = 0
            # print(f"No company speech found for Doc_id: {doc_id}")
        else:
            combined_company_text = " ".join(company_speech_segments)
            current_doc_scores = calculate_lexicon_scores(combined_company_text)
        
        result_record = {'Doc_id': doc_id, **current_doc_scores}
        doc_level_lexicon_results.append(result_record)

    print("Lexicon score calculation complete for all documents.")

    if not doc_level_lexicon_results:
        print("No documents processed or no company speech found. Exiting.")
        exit()

    final_lexicon_df = pd.DataFrame(doc_level_lexicon_results)
    
    # Rename columns if needed, e.g., append '_Combined' or adjust to match regression script expectations
    # For simplicity, current names like 'lm_negative_ratio' already imply they are for the document
    # if this script's output is the only source.
    # Example renaming:
    # new_column_names = {'lm_negative_ratio': 'lm_negative_ratio_combined', ...}
    # final_lexicon_df.rename(columns=new_column_names, inplace=True)
    
    # Define which columns are the actual ratio scores for checking NaNs
    # This can be derived from the keys in the 'scores' dict in calculate_lexicon_scores
    score_ratio_columns = [key for key in calculate_lexicon_scores("").keys() if key.endswith("_ratio")]
    final_lexicon_df.dropna(subset=score_ratio_columns, how='all', inplace=True)


    output_filename = 'doc_lexicon_based_scores_fix.csv'
    final_lexicon_df.to_csv(output_filename, index=False)

    print(f"\nLexicon-based scores (calculated on combined text per Doc_id) saved to '{output_filename}'")
    print("\nFirst few rows of the output:")
    print(final_lexicon_df.head())

Successfully loaded 2345 LM 'Negative' words.
Successfully loaded 347 LM 'Positive' words.
Successfully loaded 297 LM 'Uncertainty' words.
Successfully loaded 903 LM 'Litigious' words.
Successfully loaded 19 LM 'Strong_Modal' words.
Successfully loaded 27 LM 'Weak_Modal' words.
Successfully loaded 184 LM 'Constraining' words.
Processing documents to calculate lexicon scores on combined company speech...
Lexicon score calculation complete for all documents.

Lexicon-based scores (calculated on combined text per Doc_id) saved to 'doc_lexicon_based_scores_fix.csv'

First few rows of the output:
    Doc_id  lm_negative_ratio  lm_positive_ratio  lm_uncertainty_ratio  \
0  1943275           0.006918           0.013836              0.007588   
1  2038813           0.006372           0.008850              0.014159   
2  2053228           0.009234           0.021622              0.009685   
3  2053230           0.007192           0.019521              0.008562   
4  2056621           0.004623  