In [5]:
from nltk import accuracy
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, precision_recall_fscore_support
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
import torch
from transformers import BertTokenizer, BertForSequenceClassification
from tqdm import tqdm

import evaluate

metric = evaluate.load()

random_state = 42

In [6]:
import pandas as pd

def flatten_financial_dataset(financial_dataset):
    object_length_in_rows = 5
    metadata_columns_length = 5

    per_object_columns = list(financial_dataset.columns[:metadata_columns_length].values)

    value_columns = financial_dataset.columns[metadata_columns_length:]
    new_columns = per_object_columns + [f'{col}_{i + 1}' for i in range(object_length_in_rows) for col in value_columns]

    dfs = []

    for i in range(0, len(financial_dataset), object_length_in_rows):
        group = financial_dataset.iloc[i:i + object_length_in_rows]
        if len(group) < object_length_in_rows:
            break

        cik = group['cik'].iloc[0]
        ticker = group['ticker'].iloc[0]
        label = group['label'].iloc[0]
        subset = group['subset'].iloc[0]
        fiscal_periods = ';'.join(group['Fiscal Period'].astype(str).values)

        values = group.drop(columns=per_object_columns).values.flatten()

        dfs.append([cik, ticker, label, subset, fiscal_periods] + values.tolist())

    final_flatten_df = pd.DataFrame(dfs, columns=new_columns)
    final_flatten_df = final_flatten_df.reset_index(drop=True)
    return final_flatten_df


def get_train_val_test_split(X, y):
    X_train = X[X['subset'] == 'train']
    y_train = y[X['subset'] == 'train']

    X_val = X[X['subset'] == 'val']
    y_val = y[X['subset'] == 'val']

    X_test = X[X['subset'] == 'test']
    y_test = y[X['subset'] == 'test']

    X_train = X_train.drop('subset', axis=1)
    X_val = X_val.drop('subset', axis=1)
    X_test = X_test.drop('subset', axis=1)

    return X_train, y_train, X_val, y_val, X_test, y_test


def get_multimodal_data(drop_all_columns=False):
    # numerical dataset
    numerical_dataset = pd.read_csv('numerical_dataset_version5_original.csv')
    numerical_dataset = flatten_financial_dataset(numerical_dataset)

    numerical_dataset_preprocessed = numerical_dataset.drop(['cik', 'ticker', 'Fiscal Period'], axis=1)

    X_numerical = numerical_dataset_preprocessed.drop('label', axis=1)
    y_numerical = numerical_dataset_preprocessed['label']

    X_numerical_train, y_numerical_train, X_numerical_val, y_numerical_val, X_numerical_test, y_numerical_test =(
        get_train_val_test_split(X_numerical, y_numerical))

    # textual dataset
    textual_dataset = pd.read_csv('textual_data_version6_original.csv')
    textual_dataset_preprocessed = textual_dataset.drop(['cik'], axis=1)

    X_textual = textual_dataset_preprocessed.drop('label', axis=1)
    y_textual = textual_dataset_preprocessed['label']

    if drop_all_columns:
        X_textual = X_textual.drop(['ticker', 'report_datetime'], axis=1)

    X_textual_train, y_textual_train, X_textual_val, y_textual_val, X_textual_test, y_textual_test = (
        get_train_val_test_split(X_textual, y_textual))

    return (X_numerical_train, y_numerical_train, X_numerical_val, y_numerical_val, X_numerical_test, y_numerical_test,
            X_textual_train, y_textual_train, X_textual_val, y_textual_val, X_textual_test, y_textual_test)

def get_test_data():
    (X_numerical_train, y_numerical_train, X_numerical_val, y_numerical_val, X_numerical_test, y_numerical_test,
     X_textual_train, y_textual_train, X_textual_val, y_textual_val, X_textual_test, y_textual_test) = get_multimodal_data()

    tickers_test = X_textual_test['ticker']
    report_dates_test = X_textual_test['report_datetime']

    X_textual_test = X_textual_test.drop(['ticker', 'report_datetime'], axis=1)

    return X_numerical_test, y_numerical_test, X_textual_test, y_textual_test, tickers_test, report_dates_test

In [7]:
(X_numerical_train, y_numerical_train, X_numerical_val, y_numerical_val, X_numerical_test, y_numerical_test,
            X_textual_train, y_textual_train, X_textual_val, y_textual_val, X_textual_test, y_textual_test) = get_multimodal_data(drop_all_columns=True)

In [8]:
decisionTreeClassifier = DecisionTreeClassifier(random_state=random_state,
                                                min_samples_split=2,
                                                min_samples_leaf=1,
                                                max_depth=10,
                                                criterion='entropy',
                                                class_weight={True: 30, False: 1})

decisionTreeClassifier.fit(X_numerical_train, y_numerical_train)

In [9]:
model = BertForSequenceClassification.from_pretrained("./saved_model/finbert")
tokenizer = BertTokenizer.from_pretrained("./saved_model/finbert")

In [None]:
def HybridModelPredict(X_numerical, X_textual, labels):
    dt_predictions = decisionTreeClassifier.predict(X_numerical)
    finbert_predictions = torch.argmax(model(**tokenizer(X_textual, padding=True, truncation=True, return_tensors="pt")).logits, dim=-1)

    decision_tree_weight = 0.7
    finbert_weight = 0.3

    # Ensure both predictions are arrays
    dt_predictions = np.array(dt_predictions)
    finbert_predictions = np.array(finbert_predictions)

    # Weighted voting approach
    hybrid_predictions = np.round(decision_tree_weight * dt_predictions + finbert_weight * finbert_predictions)

    # Convert to integer (binary output)
    hybrid_predictions = hybrid_predictions.astype(int)

    accuracy = accuracy_score(labels, hybrid_predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, hybrid_predictions, average='weighted', zero_division=1)

    # print(f"Accuracy: {accuracy}")
    # print(f"Precision: {precision}")
    # print(f"Recall: {recall}")
    # print(f"F1 Score: {f1}")
    return accuracy, precision


In [15]:
accuracies = []
precisions = []

tp = 0
tn = 0
fp = 0
fn = 0
    
for i in tqdm(range(len(X_numerical_test))):
    
    X_numerical = X_numerical_test[i:i+1]
    X_textual = X_textual_test['text'][i:i+1].values.tolist()
    labels = y_textual_test[i:i+1].values.astype(int)
    
    dt_predictions = decisionTreeClassifier.predict(X_numerical)
    
    # make dt_predictions int
    dt_predictions_int = dt_predictions.astype(int)
    
    finbert_predictions = torch.argmax(model(**tokenizer(X_textual, padding=True, truncation=True, return_tensors="pt")).logits, dim=-1)

    decision_tree_weight = 0.3
    finbert_weight = 0.7

    # Ensure both predictions are arrays
    dt_predictions_int = np.array(dt_predictions_int)
    finbert_predictions = np.array(finbert_predictions)

    # Weighted voting approach
    hybrid_predictions = np.round(decision_tree_weight * dt_predictions_int + finbert_weight * finbert_predictions)

    # Convert to integer (binary output)
    hybrid_predictions = hybrid_predictions.astype(int)
    
    # calc confusion matrix
    tp += np.sum(np.logical_and(hybrid_predictions == 1, labels == 1))
    tn += np.sum(np.logical_and(hybrid_predictions == 0, labels == 0))
    fp += np.sum(np.logical_and(hybrid_predictions == 1, labels == 0))
    fn += np.sum(np.logical_and(hybrid_predictions == 0, labels == 1))


accuracy = ((tp+tn)/(tp+tn+fp+fn))
precision = (tp/(tp+fp))
print(f"accuracy: {accuracy}")
print(f"precision: {precision}")

  1%|          | 5/678 [34:01<76:20:20, 408.35s/it] 


KeyboardInterrupt: 