In [None]:
import pandas as pd
import os # to get ahold of the basic operating system functionalities
from glob import glob # to get the filepaths
from constance import table_of_contents, sfcr_sections
import re # to use regular expressions
import pdfplumber #To import text from PDF to pandas Dataframe
import numpy as np #To do basic math operation
import matplotlib.pyplot as plt # to plot the graphs
import pickle # to open and load with pickle
import string #To remove punctation
import collections

pd.options.display.max_colwidth=100

In [None]:
reporting_years=[2019, 2020, 2021]

In [None]:
sfcrs = dict() # Sfcr dict with key reporting_year stores filepaths
for reporting_year in reporting_years:
    sfcrs[reporting_year] = glob(('\\').join([os.path.normpath('C:\\Users\\jakub\\Desktop\\PythonPrecinok\\NLTK\\env\\NLTK\\Scripts\\MLProject'), 'reports', str(reporting_year), '*.pdf']))


In [None]:
file = 'C:\\Users\\jakub\\Desktop\\PythonPrecinok\\NLTK\\env\\NLTK\\Scripts\\MLProject\\' # creating text_splits folder


try:
    os.mkdir(os.path.normpath(os.path.dirname(file) + '\\text_splits'))
except OSError:
    pass



In [None]:
# extracting the text to be stored in dataframes
extracted_text = dict()
number_of_pages=[]
try:
    with open ((os.path.normpath(os.path.dirname(file) + '\\text_splits\\number_of_pages.pkl')), 'rb') as f:
            number_of_pages = pickle.load(f)    
except FileNotFoundError:
    pass
for reporting_year in reporting_years:
    try:
        extracted_text[reporting_year] = pd.read_pickle(os.path.normpath(os.path.dirname(file) + '\\text_splits\\extracted_text_(' + str(reporting_year) + ').pkl')).fillna(value=np.nan)
    except FileNotFoundError:
        extracted_text[reporting_year] = pd.DataFrame()
        for filepath in sfcrs[reporting_year]:
            toc_page = 0
            undertaking_code = int(re.findall('\d+', filepath)[-1])
            with pdfplumber.open(filepath) as pdf_file:
                try:
                    number_of_pages.append(len(pdf_file.pages))
                    for page in pdf_file.pages:
                        if re.search(table_of_contents['cz'].replace(' ', '.*'), page.extract_text(), re.IGNORECASE):
                            toc_page = page.page_number - 1 #If the text is found- in table_of_contents['cz'], the page number is stored in the toc_page variable and the loop is broken.
                            break
                except TypeError:
                    pass
                actual_page = toc_page
                if toc_page:
                    toc_numbers = re.findall('\d+', pdf_file.pages[toc_page].extract_text(), flags=re.IGNORECASE) #This part of the code is checking the page numbers listed in the table of contents, and using the last page number listed to determine the starting page for processing the text in the PDF file.
                    if toc_numbers and (int(toc_numbers[-1])-1 <= toc_page):
                        actual_page = int(toc_numbers[-1])-1
                try:
                    for page in pdf_file.pages[actual_page:]:
                        extracted_text[reporting_year].at[actual_page, undertaking_code] = page.extract_text()
                        actual_page += 1
                except TypeError:
                    pass
        for undertaking_code in extracted_text[reporting_year]:
            if ((extracted_text[reporting_year][undertaking_code].values == '').sum() + extracted_text[reporting_year][undertaking_code].isna().sum()) == len(extracted_text[reporting_year].index):
                extracted_text[reporting_year].drop(undertaking_code, axis=1, inplace=True)
        extracted_text[reporting_year].to_pickle(os.path.normpath(os.path.dirname(file) + '\\text_splits\\extracted_text_(' + str(reporting_year) + ').pkl'))

with open((os.path.normpath(os.path.dirname(file) + '\\text_splits\\number_of_pages.pkl')),'wb')  as f:
    pickle.dump(number_of_pages, f)    #If we run this cell pickle will overwrite this again by same pickle    
print(number_of_pages)

In [None]:
#Making dictionary with quant features and target feature
quantitative_feature_str = {}

for reporting_year in reporting_years:
    file_path = "Quantitative_features.xlsx" # Construct the file path
    
    quantitative_feature_str[reporting_year] = pd.read_excel(file_path, sheet_name=str(reporting_year)) # Load the sheet with the given reporting year into a dataframe
    quantitative_feature_str[reporting_year]=quantitative_feature_str[reporting_year].drop(columns='Solvency_ratio_from_next_year')
    quantitative_feature_str[reporting_year].set_index('Pojistovna_ID', inplace=True)
    

In [None]:
#Deleting the features thata have more then 40% of NaN values
import warnings
warnings.filterwarnings('ignore')

quantitative_feature_all_years = pd.DataFrame(columns=quantitative_feature_str[reporting_years[0]].columns) 

for reporting_year in reporting_years:
    df_temp = quantitative_feature_str[reporting_year] #temporary dataframe for each reporting year
    quantitative_feature_all_years = quantitative_feature_all_years.append(df_temp) #merge all the years together

percentage_nan_values_all_years = quantitative_feature_all_years.isna().mean(axis=0) * 100

columns_to_drop = percentage_nan_values_all_years[percentage_nan_values_all_years > 40].index.tolist()
for reporting_year in reporting_years:
    quantitative_feature_str[reporting_year] = quantitative_feature_str[reporting_year].drop(columns=columns_to_drop) #dropping columns


In [None]:
#Getting the list of quantitative columns to be use later in mixed model
columns_of_quant_features = list(quantitative_feature_str[next(iter(quantitative_feature_str))].columns) 
print(columns_of_quant_features)


In [None]:
#Creating full_text dataframe that merge all the reporting years together and also joining text and quantitative features
full_only_text = pd.DataFrame(columns=['Undertaking Code', 'Text'])

for year, values in extracted_text.items():
    if year in quantitative_feature_str.keys():  # check if there are quantitative features for this year
        df_qf = quantitative_feature_str[year]  # get the quantitative feature dataframe for this year
        for undertaking_code, texts in values.items():
            texts = [str(text) for text in texts]
            if undertaking_code in df_qf.index:  # check if there are quantitative features for this undertaking code
                row = {'Undertaking Code': str(undertaking_code) + '_' + str(year), 'Text': " ".join(texts)}
                row.update(df_qf.loc[undertaking_code].to_dict())  # add quantitative features to the row
                full_only_text = full_only_text.append(row, ignore_index=True)

full_text = full_only_text.set_index('Undertaking Code')

In [None]:
full_text_train=full_text.iloc[:48,]

In [None]:
full_text[columns_of_quant_features] = full_text[columns_of_quant_features].fillna(full_text_train[columns_of_quant_features].mean()) #Filling missing values in indicator columns by mean

In [None]:
with open('stopwords.txt', 'r', encoding='UTF-8') as file_stopwords:
    text = file_stopwords.read()

stop_words = text.split()

In [None]:
import nltk
#nltk.download('punkt')


#Remove Punctation
def remove_punctuation(txt):
    text_nopunct ="".join([c for c in txt if c not in string.punctuation])
    return text_nopunct

#Tokenize text using nltk library
def tokenize(txt):
    tokens = nltk.word_tokenize(txt)
    return tokens

#Remove tokens that include numbers
def to_lower(tokens):
    lower_letters = [token.lower() for token in tokens]
    return lower_letters

#Remove words with less than 3 letters and including numbers
def remove_short(tokens):
    text_no_stop = [token for token in tokens if (len(token) > 2) and (not any(char.isdigit() for char in token))]
    return text_no_stop

#Remove stopwords stored in txt file
def remove_stopwords(tokens, stop_words):
    text_no_stop=[token for token in tokens if token not in stop_words]
    return text_no_stop

def first_five_letters(token_list):
    first_five = [token[:5] for token in token_list] 
    return first_five

In [None]:
full_text['Text_nopunct']=full_text['Text'].apply(lambda x: remove_punctuation(x))

In [None]:
full_text['Text_tokenized']=full_text['Text_nopunct'].apply(lambda x: tokenize(x))

In [None]:
full_text['lower_only']=full_text['Text_tokenized'].apply(lambda x: to_lower(x))

In [None]:
full_text['No_single_letter']=full_text['lower_only'].apply(lambda x: remove_short(x))

In [None]:
full_text['No_stop_words']=full_text['No_single_letter'].apply(lambda x: remove_stopwords(x, stop_words))

In [None]:
full_text["five_letters"] = full_text["No_stop_words"].apply(lambda x: first_five_letters(x))

In [None]:
text_list = full_text["five_letters"].apply(lambda x: " ".join(x)).tolist()

In [None]:
text_no_stop_five_lett = ' '.join(text_list)

words = text_no_stop_five_lett.split() # Split the string into words

unique_words = set(words)

In [None]:
words = text_no_stop_five_lett.split() # Split the string into words
word_counts = collections.Counter(words) # Count the occurrences of each word

#print(word_counts)
one_word_occurrence = [word for word, count in word_counts.items() if count == 1]

In [None]:
text_list_ready=[]
for text in text_list:
    words = text.split()
    filtered_words = [word for word in words if word not in one_word_occurrence]
    filtered_text = ' '.join(filtered_words)
    text_list_ready.append(filtered_text)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

tfidf = TfidfVectorizer(ngram_range=(3,3))
tfidf.fit(text_list_ready) # Fit the vectorizer to the text data
tfidf_matrix = tfidf.transform(text_list_ready) # Transform the text data into a TF-IDF matrix

tfidf_df = pd.DataFrame(tfidf_matrix.toarray(), columns=tfidf.get_feature_names(), index=full_text.index) # Convert the matrix to a DataFrame

In [None]:
tfidf_df_final = tfidf_df.join(full_text[columns_of_quant_features])

In [None]:
tfidf_df_final_test=tfidf_df_final.iloc[48:,]

In [None]:
tfidf_df_final=tfidf_df_final.iloc[:48,]

In [None]:
subset_text_list_ready = text_list_ready[:48] # Get the subset of text data for the first 48 rows
subset_tfidf_matrix = tfidf.transform(subset_text_list_ready) # Transform the subset of text data into a TF-IDF matrix

subset_tfidf_df = pd.DataFrame(subset_tfidf_matrix.toarray(), columns=tfidf.get_feature_names()) # Convert the matrix to a DataFrame


In [None]:
subset_tfidf_matrix

In [None]:
X = tfidf_df_final[tfidf_df_final.columns.difference(['Solvency_ratio_bucket'])]
y = tfidf_df_final['Solvency_ratio_bucket']

y.value_counts()

In [None]:
X_test = tfidf_df_final_test[tfidf_df_final_test.columns.difference(['Solvency_ratio_bucket'])]
y_test = tfidf_df_final_test['Solvency_ratio_bucket']

y_test.value_counts()

In [None]:
X

## Logistic regression

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.model_selection import LeaveOneOut

lr = LogisticRegression()

param_grid = {'penalty': [ 'none', 'l1', 'l2', 'elasticnet'], 'C': [ 0.1, 1, 2, 3, 4, 10], 'solver':[ 'lbfgs' ,'liblinear'] }

grid_search = GridSearchCV(lr, param_grid, cv=LeaveOneOut())

grid_search.fit(X, y)

# Print the best hyperparameters found
print("Best hyperparameters:", grid_search.best_params_)

# Evaluate the model's performance with cross-validation
accuracy_scores = cross_val_score(grid_search.best_estimator_, X, y, cv=LeaveOneOut())

# Print the mean and standard deviation of the accuracy scores
print("Accuracy score:", accuracy_scores)
print("Mean accuracy:", np.mean(accuracy_scores))
print("Accuracy std:", np.std(accuracy_scores))

In [None]:
y_pred = grid_search.best_estimator_.predict(X_test)

from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)

print("Accuracy on testing data:", accuracy)

## KNN

In [None]:
from sklearn.neighbors import KNeighborsClassifier

knn= KNeighborsClassifier()

param_grid = {'n_neighbors': [2, 3, 4, 5, 6, 7, 8, 9, 13, 21, 34], 'weights': ['uniform', 'distance'], 'p': [1, 2, 3]}

grid_search = GridSearchCV(knn, param_grid, cv=LeaveOneOut())

grid_search.fit(X, y)

print("Best hyperparameters:", grid_search.best_params_)

accuracy_scores = cross_val_score(grid_search.best_estimator_, X, y, cv=LeaveOneOut())


print("Accuracy score:", accuracy_scores)
print("Mean accuracy:", np.mean(accuracy_scores))
print("Accuracy std:", np.std(accuracy_scores))

In [None]:
y_pred = grid_search.best_estimator_.predict(X_test)

from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)

print("Accuracy on testing data:", accuracy)

## SVM

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, make_scorer


svm_model = SVC(kernel='linear')  
svm_model.fit(X, y)


accuracy = make_scorer(accuracy_score)
accuracy_scores = cross_val_score(svm_model, X, y, scoring = accuracy, cv = 7)


print("accuracy score:", accuracy_scores)
print("accuracy score:", accuracy_scores.mean())
print("Accuracy std:", np.std(accuracy_scores))


In [None]:
#Selecting only best 80% of features
features_coef = dict(zip(list(tfidf.get_feature_names()) + list(full_text[columns_of_quant_features].columns), list(svm_model.coef_[0]) + [0]*len(columns_of_quant_features))) # get feature names and coefficients, features must be from tfidf and also connected with quantitative variables
top_features = sorted(features_coef.items(), key=lambda x: abs(x[1]), reverse=True)[:int(len(features_coef)*0.8)] # sort by absolute coefficient value and select top X %
top_feature_names = [x[0] for x in top_features] # extract top feature names


X_train_top_x = X[top_feature_names] # subset the data to include only top X % features



In [None]:
from sklearn.neighbors import KNeighborsClassifier

svm= SVC()

param_grid = {'C': [0.1, 1, 5, 10, 100], 'kernel': ['linear', 'rbf', 'poly', 'sigmoid'], 'gamma': ['scale', 'auto', 0.1, 1, 10]}
grid_search = GridSearchCV(svm, param_grid, cv=LeaveOneOut(), verbose=1)

grid_search.fit(X_train_top_x, y)

print("Best hyperparameters:", grid_search.best_params_)

accuracy_scores = cross_val_score(grid_search.best_estimator_, X_train_top_x, y, cv=LeaveOneOut())


print("Accuracy score:", accuracy_scores)
print("Mean accuracy:", np.mean(accuracy_scores))
print("Accuracy std:", np.std(accuracy_scores))

In [None]:
y_pred = grid_search.best_estimator_.predict(X_test[top_feature_names])

from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)

print("Accuracy on testing data:", accuracy)

## Rozhodovací stromy

In [None]:
from sklearn.tree import DecisionTreeClassifier



# Initialize the decision tree classifier
dtc = DecisionTreeClassifier(random_state=42)

accuracy = make_scorer(accuracy_score)

accuracy_scores = cross_val_score(dtc, X, y, scoring=accuracy, cv=7)

# Print the mean and standard deviation of the accuracy scores
print("Accuracy score:", accuracy_scores)
print("Accuracy score:", accuracy_scores.mean())
print("Accuracy std:", np.std(accuracy_scores))


In [None]:
from sklearn.feature_selection import SelectFromModel

dtc.fit(X, y)
importances = dtc.feature_importances_


sorted_idx = np.argsort(importances)[::-1] # Get indices of the top features sorted in descending order of importance

n_features = len(importances)
n_keep = int(n_features * 0.2) # Compute the number of features to keep

top_idx = sorted_idx[:n_keep] # Get the indices of the top n_keep features

X_dtc_top_X = X.iloc[:, top_idx]

print(f"Original number of features: {n_features}")
print(f"Number of features to keep: {n_keep}")


In [None]:
from sklearn.tree import DecisionTreeClassifier

dt= DecisionTreeClassifier(random_state=42)

param_grid = {'criterion': ['gini', 'entropy', 'log_loss'], 'max_depth': [None, 5, 10, 15, 30], 'min_samples_split': [2, 5, 10], 'min_samples_leaf': [1, 2, 4, 8], 'max_features': [None, 'sqrt', 'log2']}
grid_search = GridSearchCV(dt, param_grid, cv=LeaveOneOut(), verbose=1)

grid_search.fit(X_dtc_top_X, y)

print("Best hyperparameters:", grid_search.best_params_)

accuracy_scores = cross_val_score(grid_search.best_estimator_, X_dtc_top_X, y, cv=LeaveOneOut())


print("Accuracy score:", accuracy_scores)
print("Mean accuracy:", np.mean(accuracy_scores))
print("Accuracy std:", np.std(accuracy_scores))

In [None]:
y_pred = grid_search.best_estimator_.predict(X_test.iloc[:, top_idx])

from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)

print("Accuracy on testing data:", accuracy)

## Naive Bayes, Feature selection

In [None]:
from sklearn.feature_selection import SelectPercentile, f_classif


percentile = 50 # Select the top X % of features based on ANOVA F-value
selector = SelectPercentile(f_classif, percentile=percentile)
selector.fit(subset_tfidf_matrix, y)
tfidf_selected = selector.transform(tfidf_matrix)

In [None]:
selected_indices = selector.get_support(indices=True) # Get the indices of the selected features

In [None]:
tfidf_df_selected = tfidf_df.iloc[:, selected_indices]
tfidf_df_final = tfidf_df_selected.join(full_text[columns_of_quant_features])

In [None]:
tfidf_df_final

In [None]:
tfidf_df_final_test=tfidf_df_final.iloc[48:,]

In [None]:
tfidf_df_final=tfidf_df_final.iloc[:48,]

In [None]:
tfidf_df_final

In [None]:
X = tfidf_df_final[tfidf_df_final.columns.difference(['Solvency_ratio_bucket'])]
y = tfidf_df_final['Solvency_ratio_bucket']

y.value_counts()

In [None]:
X_test = tfidf_df_final_test[tfidf_df_final_test.columns.difference(['Solvency_ratio_bucket'])]
y_test = tfidf_df_final_test['Solvency_ratio_bucket']

y_test.value_counts()

In [None]:
from sklearn.naive_bayes import GaussianNB

nb=GaussianNB()  

param_grid = {'var_smoothing': [1e-9, 1e-8, 1e-7, 1e-6, 1e-5]}
grid_search = GridSearchCV(nb, param_grid, cv=LeaveOneOut(), verbose=1)

grid_search.fit(X, y)

print("Best hyperparameters:", grid_search.best_params_)

accuracy_scores = cross_val_score(grid_search.best_estimator_, X, y, cv=LeaveOneOut())


print("Accuracy score:", accuracy_scores)
print("Mean accuracy:", np.mean(accuracy_scores))
print("Accuracy std:", np.std(accuracy_scores))

In [None]:
y_pred = grid_search.best_estimator_.predict(X_test)

from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)

print("Accuracy on testing data:", accuracy)

## Neural network

In [None]:
from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import GridSearchCV, LeaveOneOut
from tensorflow import keras

# Define the Keras model builder function
def create_model(optimizer='adam', activation='relu', hidden1=128, hidden2=64):
    model = keras.Sequential([
        keras.layers.Dense(hidden1, activation=activation, input_shape=(X.shape[1],)),
        keras.layers.Dense(hidden2, activation=activation),
        keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    return model

# Create the Keras classifier object
keras_clf = KerasClassifier(build_fn=create_model)

# Define the parameter grid
param_grid = param_grid = {'batch_size': [64, 128], 'epochs': [5, 10], 'optimizer': ['adam', 'sgd'], 'activation': ['relu', 'tanh'], 'hidden1': [64, 128], 'hidden2': [64]}

# Set up the grid search
grid_search = GridSearchCV(estimator=keras_clf, param_grid=param_grid, cv=LeaveOneOut(), verbose=1)

# Fit the grid search to the data
grid_search.fit(X, y)

# Print the best parameters and accuracy
print("Best parameters:", grid_search.best_params_)
print("Accuracy score:", grid_search.best_score_)


In [None]:
y_pred = grid_search.best_estimator_.predict(X_test)

from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)

print("Accuracy on testing data:", accuracy)