In [3]:
import copy, random
from collections import defaultdict
import numpy as np
from sklearn import pipeline
# from lime import lime_text
from sklearn.metrics import accuracy_score, classification_report, average_precision_score
import pandas as pd


def import_data(file_name_start, glob, pd, **kargs):
    file_list = glob(f'{file_name_start}*.txt')
    df_lst =[]
    for file in file_list:
        df_lst.append(pd.read_csv(file, **kargs))
    return pd.concat(df_lst).reset_index(drop=True)

def call_spacy_nlp(nlp, x):
    return nlp(x)

def build_tokens_list(doc):
    return [token.text for token in doc]

def build_lemmas_list(doc):
    return [token.lemma_ for token in doc]

def remove_not_alpha_list(doc):
    return [token.lemma_ for token in doc if token.lemma_.isalpha()]

def process_data(vectorizer, X_train, X_test, y_train, y_test, suffix=''):
    X_train_bow = vectorizer.fit_transform(X_train['meas_action_comment_concat_3m_lemmas_string'])
    X_test_bow = vectorizer.transform(X_test['meas_action_comment_concat_3m_lemmas_string'])

    dic_vocabulary = vectorizer.vocabulary_
    print('Vocabulary size:', len(dic_vocabulary))

    X_train_bow_df_init = pd.DataFrame(X_train_bow.toarray())
    X_train_bow_df_init.columns = vectorizer.get_feature_names_out()

    X_test_bow_df_init = pd.DataFrame(X_test_bow.toarray())
    X_test_bow_df_init.columns = vectorizer.get_feature_names_out()

    drop_cols = ['meas_action_comment_concat_0', 'meas_action_comment_concat_-1',
                 'meas_action_comment_concat_-2', 'meas_action_comment_concat_3m',
                 'meas_action_comment_concat_3m_doc', 'meas_action_comment_concat_3m_lemmas',
                 'meas_action_comment_concat_3m_lemmas_string']
    X_train_reduced = X_train.drop(columns=drop_cols)
    X_train_bow_df = pd.concat([X_train_reduced.reset_index(drop=True), X_train_bow_df_init.reset_index(drop=True)], axis=1)

    y_train = y_train.reset_index(drop=True)

    X_test_reduced = X_test.drop(columns=drop_cols)
    X_test_bow_df = pd.concat([X_test_reduced.reset_index(drop=True), X_test_bow_df_init.reset_index(drop=True)], axis=1)

    y_test = y_test.reset_index(drop=True)

    X_train_bow_df_fin = X_train_bow_df.drop(columns=['account_id','snapnum'])
    X_test_bow_df_fin = X_test_bow_df.drop(columns=['account_id','snapnum'])


    return X_train_bow_df_fin, X_test_bow_df_fin, vectorizer, X_train_bow


def choose_separator(element1, element2):
    if element1 == '#' and element2 == '#':
        return ''
    elif element1 in ['[','xxxxς']:
        return ''
    else:
        return ' '

def convert_list_to_string(x):
    new_lst = [choose_separator(x[i-1], x[i]) + el for i, el in enumerate(x)][1:]
    result = x[0] + ''.join(new_lst)
    return result


def find_index(lst, string):
    try:
        return next(i for i, tup in enumerate(lst) if tup[0] == string)
    except StopIteration:
        return -1


def model_eval(model, X_train, y_train, X_test, y_test, K=20):
    # Fit the model
    model.fit(X_train, y_train.values.ravel())

    # Get predictions and probabilities
    predictions = model.predict(X_test)
    prob = model.predict_proba(X_test)

    # Basic metrics
    accuracy = accuracy_score(y_test.values.ravel(), predictions)
    report = classification_report(y_test.values.ravel(), predictions, labels=[0, 1], target_names=["0", "1"],
                                   output_dict=True)
    prec_1 = report["1"]["precision"]
    rec_1 = report["1"]["recall"]
    f1_1 = report["1"]["f1-score"]

    # Ranker metrics
    # Extract the positive class probabilities
    pos_prob = prob[:, 1]

    # PR-AUC
    pr_auc = average_precision_score(y_test.values.ravel(), pos_prob)

    # Sort indices by predicted probability in descending order
    sorted_indices = np.argsort(pos_prob)[::-1]

    # Precision@K
    top_k_true = y_test.values.ravel()[sorted_indices[:K]]
    precision_at_k = np.mean(top_k_true) if K <= len(pos_prob) else np.nan

    # Recall@K
    num_positives = np.sum(y_test.values.ravel())
    recall_at_k = (np.sum(top_k_true) / num_positives) if num_positives > 0 else 0.0

    # R-Precision: Precision at R, where R = total number of positives
    if num_positives > 0:
        top_r_true = y_test.values.ravel()[sorted_indices[:num_positives]]
        r_precision = np.mean(top_r_true)
    else:
        r_precision = 0.0

    # Return all metrics
    # Original returns: rec_1, prec_1, f1_1, accuracy, predictions, prob
    # Added: pr_auc, precision_at_k, recall_at_k, r_precision
    return rec_1, prec_1, f1_1, accuracy, pr_auc, precision_at_k, recall_at_k, r_precision


def eval_results(models, X_train, y_train, X_test, y_test, K=20):

    results_bow = defaultdict(dict)
    models_to_train = copy.deepcopy(models)

    for model in models_to_train:
        rec, prec, f1, acc, predictions, prob = model_eval(model[1], X_train, y_train, X_test, y_test)

        # If prob is a 2D array, select the positive class probabilities
        if prob.ndim == 2 and prob.shape[1] == 2:
            prob = prob[:, 1]

        # Compute PR-AUC
        pr_auc = average_precision_score(y_test, prob)

        # For ranking-based metrics, sort by predicted probabilities (descending order)
        sorted_indices = np.argsort(prob)[::-1]

        # Precision@K
        top_k_true = y_test[sorted_indices[:K]]
        precision_at_k = np.mean(top_k_true)  # fraction of positives in top K

        # Recall@K
        num_positives = np.sum(y_test)
        recall_at_k = np.sum(top_k_true) / num_positives if num_positives > 0 else 0.0

        # R-Precision
        top_r_true = y_test[sorted_indices[:num_positives]] if num_positives > 0 else []
        r_precision = np.mean(top_r_true) if num_positives > 0 else 0.0

        results_bow[model[0]] = {
            'accuracy': acc,
            'recall': rec,
            'precision': prec,
            'f1_score': f1,
            'predictions': predictions,
            'probs': prob,
            f'precision_at_{K}': precision_at_k,
            f'recall_at_{K}': recall_at_k,
            'r_precision': r_precision,
            'pr_auc': pr_auc
        }

    return results_bow, models_to_train



# def lime_explainer(model_name, models, results, X_test, y_test, y_train, vectorizer, prob_threshold=0.7, num_features=3):
#     # find indices where both model and ground truth are positive
#     positive_indices =  [i for i, (x,y,z) in enumerate(zip(results[model_name]['predictions'].tolist(), results[model_name]['probs'][:,1].tolist(), y_test.values[:,0].tolist())) if (x == z == 1 and y > 0.7)]
#     negative_indices =  [i for i, (x,y,z) in enumerate(zip(results[model_name]['predictions'].tolist(), results[model_name]['probs'][:,0].tolist(), y_test.values[:,0].tolist())) if (x == z == 0 and y > 0.7)]

#     ## select observation
#     i = random.choice(positive_indices) if len(positive_indices) > 0 else random.choice(negative_indices)

#     model_index = find_index(models, model_name)

#     model = pipeline.Pipeline([("vectorizer", vectorizer),
#                                ("classifier", models[model_index][1])])

#     txt_instance = X_test["meas_action_comment_concat_3m_lemmas_string"].iloc[i]

#     ## find explanation
#     explainer = lime_text.LimeTextExplainer(class_names=
#                  np.unique(y_train))
#     explained = explainer.explain_instance(txt_instance,
#                  model.predict_proba, num_features=3)

#     return i, txt_instance, explained