In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import cross_val_predict
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score

In [None]:
# path = 'D:/study/poly/sem2/MM5427 Textual Analysis in Business/group project/groupcode/AnnualReports16_processed2.csv'
df = pd.read_csv("../document/AnnualReports1718.csv")
df.head()

In [None]:
# pip install spacy

### test processing

In [None]:
import nltk
import spacy 
from spacy.lang.en import English
from spacy.lang.en.stop_words import STOP_WORDS
from nltk.stem import WordNetLemmatizer

In [None]:
# Function to count the number of sentences in a text
def count_sentences(text):
    # Handle NaN values by returning 0 sentences
    if pd.isnull(text):
        return 0
    sentences = nltk.sent_tokenize(text)
    return len(sentences)

# Function to delete the first sentence in a text
def delete_first_sentence(text):
    sentences = nltk.sent_tokenize(text)
    if len(sentences) > 2:
        return ' '.join(sentences[2:])
    else:
        return text

# Apply the count_sentences function to the 'item7' column and create a new column 'sentence_count'
df['sentence_count'] = df['item7'].apply(count_sentences)

# Filter the DataFrame to keep only rows with 10 or more sentences in 'item7'
df = df[df['sentence_count'] > 10]

# Delete the first sentence in each text in the 'item7' column
df['item7'] = df['item7'].apply(delete_first_sentence)

# 删除空行并重置索引
df = df.dropna().reset_index(drop=True)

df.head()
print(df.item7)

In [None]:
df.head(10)

In [None]:
print(df.item7[3])

In [None]:
# Simple preprocessing by removing extra lines and lowercasing all text
df['item7'] = df['item7'].replace('\n','', regex=True)
df['item7'] = df['item7'].replace('\r','', regex=True)
df['item7'] = df['item7'].replace('\r','', regex=True)
df['item7'] = df['item7'].replace('[\d.,]+|[^\w\s]', '', regex=True)
df['item7'] = [x.lower() for x in df['item7']]
df['item7'] = df['item7'].replace('item 7.','', regex=True)

# Futher preprocessing by removing all stopwords and lemmatizing all text
documents = []

stemmer = WordNetLemmatizer()

for text in df['item7']:
    # Load English tokenizer, tagger, parser, NER and word vectors
    nlp = English()

    #  "nlp" Object is used to create documents with linguistic annotations.
    my_doc = nlp(text)

    # Create list of word tokens
    token_list = []
    for token in my_doc:
        token_list.append(token.text)

    # Create list of word tokens after removing stopwords
    filtered_sentence =[] 

    for word in token_list:
        lexeme = nlp.vocab[word]
        if lexeme.is_stop == False:
            filtered_sentence.append(word) 

    document = [stemmer.lemmatize(word) for word in filtered_sentence]
    document = ' '.join(document)

    documents.append(document)

df['item7'] = documents
df.head()

### nrc list

In [None]:
nrc = pd.read_csv('../word_list/NRC-Emotion-Lexicon.txt', sep = '\t', names = ['term', 'category', 'associated'])
nrc.head()

In [None]:
# rearrangement
category_list = nrc['category'] .unique().tolist()
filtered_df = nrc[nrc['associated'] == 1]
grouped_df = filtered_df.groupby('category')['term'].apply(list)
grouped_df

In [None]:
anti_list = grouped_df.loc['anticipation']
nrc_pos_list = grouped_df.loc['positive']
nrc_neg_list = grouped_df.loc['negative']
ang_list = grouped_df.loc['anger']
anti_list =  grouped_df.loc['anticipation']
dis_list =  grouped_df.loc['disgust']
joy_list = grouped_df.loc['joy']
fear_list = grouped_df.loc['fear']
sad_list =  grouped_df.loc['sadness']
surp_list = grouped_df.loc['surprise']
tru_list =  grouped_df.loc['trust']

### McDonald list

In [None]:
mcd = pd.read_csv('../word_list/Loughran-McDonald_MasterDictionary_1993-2023.csv')
mcd['Word'] = mcd['Word'].str.lower()
mcd.head()

In [None]:
neg_list = set(mcd[mcd['Negative'] != 0]['Word'])
pos_list = set(mcd[mcd['Positive'] != 0]['Word'])
unc_list= set(mcd[mcd['Uncertainty'] != 0]['Word'])
lit_list = set(mcd[mcd['Litigious'] != 0]['Word'])
stg_list = set(mcd[mcd['Strong_Modal'] != 0]['Word'])
weak_list = set(mcd[mcd['Weak_Modal'] != 0]['Word'])
ctr_list = set(mcd[mcd['Constraining'] != 0]['Word'])
Comp_list = set(mcd[mcd['Complexity'] != 0]['Word'])

### count sentiment increment

In [None]:
# A Function to Construct a Sentiment Variable Using a Lexicon-Based Approach
def sentiment_score(text, sen_list):
    temp_list = []
    for t in text:
        if isinstance(t, str):
            temp = 0
            for w in sen_list:
                temp += t.count(w)
            if len(t) != 0:
                temp_list.append(temp/len(t))
            else:
                temp_list.append(0)
        else:
            temp_list.append(0)
    return temp_list

In [None]:
sen_df = pd.DataFrame(df['item7']).copy()
sen_df['Pos_Dic'] = sentiment_score(df['item7'], pos_list)
sen_df['Neg_Dic'] = sentiment_score(df['item7'], neg_list)
sen_df['Anti_Dic'] = sentiment_score(df['item7'], anti_list)
sen_df.head()

In [None]:
sen_df['pos_anti_increment'] = (sen_df['Pos_Dic'] + sen_df['Anti_Dic'])/ sen_df['Anti_Dic']
sen_df['neg_anti_increment'] = (sen_df['Anti_Dic'] - sen_df['Neg_Dic'])/ sen_df['Anti_Dic']
sen_df.head()

In [None]:
sen_df['result'] = sen_df.apply(lambda row: row['pos_anti_increment'] if row['Pos_Dic'] > row['Neg_Dic'] else row['neg_anti_increment'], axis=1)
sen_df.head()

### 前期回报与语气增量的影响

In [None]:
X = np.array(df['pre_alpha']).reshape(-1, 1)
y = np.array(sen_df['result']).reshape(-1, 1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = LinearRegression()

y_train = np.array(y_train)
model.fit(X_train, y_train)

In [None]:
import statsmodels.api as sm
# Coefficient
coefficient = model.coef_[0]
print("Coefficient:", coefficient)

# Predict on the test set
y_pred = model.predict(X_test)

# MSE
mse = mean_squared_error(y_test, y_pred)
print("Mean Squared Error:", mse)

# R2
r2 = r2_score(y_test, y_pred)
print("R-squared:", r2)

# P-value using statsmodels
X_train_sm = sm.add_constant(X_train)  # Add a constant term to X2_train
model_sm = sm.OLS(y_train, X_train_sm)
results = model_sm.fit()
p_value = results.pvalues[1]
print("P-value:", p_value)

前期回报情况不好的可能会对未来更积极

### 不稳定因素与预期增量的影响

In [None]:
df2 = pd.DataFrame(sen_df['result']).copy()
df2['unc_Dic'] = sentiment_score(df['item7'], unc_list)
df2['stg_Dic'] = sentiment_score(df['item7'], stg_list)
df2['weak_Dic'] = sentiment_score(df['item7'], weak_list)

df2['lit_Dic'] = sentiment_score(df['item7'], lit_list)
df2['ctr_Dic'] = sentiment_score(df['item7'], ctr_list)

df2['unc_risk'] = df2['unc_Dic'] + df2['weak_Dic'] - df2['stg_Dic']
df2['lit_risk'] = df2['lit_Dic'] + df2['ctr_Dic']


In [None]:
df2.head()

In [None]:
# Split the dataset into training and test sets
features = df2.loc[:, 'unc_Dic':'lit_risk']
X = features
y = df2['result']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
import statsmodels.api as sm

results = []

# Iterate over each feature and evaluate the linear regression model
for feature in features:
    # Create the linear regression model
    model = sm.OLS(y_train, sm.add_constant(X_train[[feature]]))
    results_single = model.fit()

    # Predict on the test set
    X_test_const = sm.add_constant(X_test[[feature]])
    y_pred = results_single.predict(X_test_const)

    # Calculate evaluation metrics
    mse = mean_squared_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    # Get the regression coefficient and p-value
    coef = results_single.params[1]
    p_value = results_single.pvalues[1]

    # Append the results to the list
    results.append({'Feature': feature,'Coefficient': coef, 'P-value': p_value, 'MSE': mse, 'R2 Score': r2})

# Create a DataFrame from the results list
results_df = pd.DataFrame(results)
sorted_results = results_df.sort_values(by=['P-value', 'R2 Score'], ascending=[True, True])
print(sorted_results)

弱语气，不稳定风险对期望增量有正向影响显著，强语气反而会弱化期望增量

- Index 部分

In [None]:
lexicon = pd.read_csv('../word_list/Future.txt', sep = '\t', names = ['term', 'category', 'associated'])
lexicon.head()

In [None]:
lexicon['term'] = lexicon['term'].str.lower()
lexicon.head()

In [None]:
list(lexicon[(lexicon['category'] == 'positive') & (lexicon['associated'] == 1)].term.sample(10))

In [None]:
#重新存储符合条件的词汇
future_list = list(lexicon[(lexicon['category'] == 'future') & (lexicon['associated'] == 1)].term)
past_list = list(lexicon[(lexicon['category'] == 'past') & (lexicon['associated'] == 1)].term)
present_list = list(lexicon[(lexicon['category'] == 'present') & (lexicon['associated'] == 1)].term)

positive_list = list(lexicon[(lexicon['category'] == 'positive') & (lexicon['associated'] == 1)].term)
negative_list = list(lexicon[(lexicon['category'] == 'negative') & (lexicon['associated'] == 1)].term)

print(len(future_list))
print(len(past_list))
print(len(present_list))
print(len(positive_list))
print(len(negative_list))


In [None]:
import re

def tense_count(text, tense_list):
    f_list = []
    for t in text:
        f = 0
        for w in tense_list:
            pattern = w.replace('*', '.*')  # 将*替换为.*
            regex = re.compile(pattern)
            f += len(regex.findall(t))
        f_list.append(f)
    return f_list

In [None]:
df['future_count'] = tense_count(df['item7'], future_list)
df['past_count'] = tense_count(df['item7'], past_list)
df['present_count'] = tense_count(df['item7'], present_list)

df['positive_count'] = tense_count(df['item7'], positive_list)
df['negative_count'] = tense_count(df['item7'], negative_list)

df.head()

In [None]:
# 计算每个文件的单词总数

# 定义一个函数，用于去除标点符号并计算单词总数
def count_words_without_punctuation(text):
    # 计算单词总数
    word_count = len(text.split())
    return word_count

df['item7'] = df['item7'].astype(str)
# 对每个文本计算单词总数
df['word_counts'] = df['item7'].apply(count_words_without_punctuation)

print(df['word_counts'])

In [None]:
print(df.word_counts[:50])

In [None]:
# 计算percent of future words (百分比绝对值，值域在0-100)
df.percent_of_future_words = [(100* df.future_count[i] / df.word_counts[i]) for i in range(len(df.word_counts))]
print(df.percent_of_future_words[:10])

In [None]:
# 计算percent of past words (百分比绝对值，值域在0-100)
df.percent_of_past_words = [(100* df.past_count[i] / df.word_counts[i]) for i in range(len(df.word_counts))]
print(df.percent_of_past_words[:10])

In [None]:
# 计算percent of present words (百分比绝对值，值域在0-100)
df.percent_of_present_words = [(100* df.present_count[i] / df.word_counts[i]) for i in range(len(df.word_counts))]
print(df.percent_of_present_words[:10])

In [None]:
# 计算percent of positive words (百分比绝对值，值域在0-100)
df.percent_of_positive_words = [(100* df.positive_count[i] / df.word_counts[i]) for i in range(len(df.word_counts))]
print(df.percent_of_positive_words[:10])

In [None]:
# 计算percent of negative words (百分比绝对值，值域在0-100)
df.percent_of_negative_words = [(100* df.negative_count[i] / df.word_counts[i]) for i in range(len(df.word_counts))]
print(df.percent_of_negative_words[:10])

In [None]:
# 计算每一句的Future words VS. Past/Present words index (FvsP) 并存储
import math

FvsP = []

for row in range(len(df)):
    FvsP_in_row = math.log ((1 + df.percent_of_future_words[row]) / (1 + df.percent_of_present_words[row] + df.percent_of_past_words[row]))
    FvsP.append(FvsP_in_row)  
    
print(FvsP[:50])

In [None]:
# 计算每一句的Positive emotion words VS. Negative emotion words index (PvsN) 并存储

PvsN = []

for row in range(len(df)):
    PvsN_in_row = math.log ((1 + df.percent_of_positive_words[row]) / (1 + df.percent_of_negative_words[row]))
    PvsN.append(PvsN_in_row)  
    
print(PvsN[:50])


In [None]:
# regression-FvsP on return
import statsmodels.api as sm

df['FvsP'] = FvsP

X = df['FvsP']
y = df['market_abnormal_return']


# Add a constant term to the regression
X_wContant = sm.add_constant(X)
Model_all_index = sm.OLS(endog=y, exog=X_wContant).fit(maxiter = 5000)

print(Model_all_index.summary())

In [None]:
# regression-PvsN on return
import statsmodels.api as sm

df['PvsN'] = PvsN

X = df['PvsN']
y = df['market_abnormal_return']


# Add a constant term to the regression
X_wContant = sm.add_constant(X)
Model_all_index = sm.OLS(endog=y, exog=X_wContant).fit(maxiter = 5000)

print(Model_all_index.summary())

In [None]:
# regression-FvsP + PvsN + control variabels on return
import statsmodels.api as sm
from itertools import combinations

X = df[['FvsP','PvsN']]
C = df[['nasdq', 'market_value', 'btm', 'pre_alpha', 'pre_rmse', 'InstOwn_Perc', 'log_share']]
y = df['market_abnormal_return']

best_model = None
best_features = None
best_aic = float('inf')

# 逐步选择自变量
for x in combinations(X, 2):  # 从X中选择2个变量的组合
    for c in combinations(C, 7):
        # 构建自变量
        X = df[list(x)+list(c)]
        X = sm.add_constant(X)
        # 拟合模型
        model = sm.OLS(y, X).fit()
        # 计算AIC
        aic = model.aic
        # 保存最佳模型
        if aic < best_aic:
            best_model = model
            best_features = list(x) + list(c)
            best_aic = aic

# 输出最佳模型结果
if best_model is not None:
    print(best_model.summary())
    print("Best features:", best_features)
else:
    print("No model found.")   

In [None]:
# regression-FvsP on result

X = df['FvsP']
y = df2['result']


# Add a constant term to the regression
X_wContant = sm.add_constant(X)
Model_all_index = sm.OLS(endog=y, exog=X_wContant).fit(maxiter = 5000)

print(Model_all_index.summary())

In [None]:
# regression-PvsN on result

X = df['PvsN']
y = df2['result']


# Add a constant term to the regression
X_wContant = sm.add_constant(X)
Model_all_index = sm.OLS(endog=y, exog=X_wContant).fit(maxiter = 5000)

print(Model_all_index.summary())

In [None]:
# regression-FvsP + PvsN + control variabels on result


X = df[['FvsP','PvsN']]
C = df[['nasdq', 'market_value', 'btm', 'pre_alpha', 'pre_rmse', 'InstOwn_Perc', 'log_share']]
y = df2['result']

best_model = None
best_features = None
best_aic = float('inf')

# 逐步选择自变量
for x in combinations(X, 2):  # 从X中选择2个变量的组合
    for c in combinations(C, 7):
        # 构建自变量
        X = df[list(x)+list(c)]
        X = sm.add_constant(X)
        # 拟合模型
        model = sm.OLS(y, X).fit()
        # 计算AIC
        aic = model.aic
        # 保存最佳模型
        if aic < best_aic:
            best_model = model
            best_features = list(x) + list(c)
            best_aic = aic

# 输出最佳模型结果
if best_model is not None:
    print(best_model.summary())
    print("Best features:", best_features)
else:
    print("No model found.")   