In [1]:
import os
import warnings
from typing import Tuple, Union

import pandas as pd
from tabulate import tabulate
import tkinter as tk
from tkinter import filedialog

from config import extract_log_name, get_file_path

# Suppress warnings
warnings.filterwarnings("ignore")


In [2]:
def calculate_completely_correct_cases(predicted_df: pd.DataFrame, correct_df: pd.DataFrame) -> float:
    """
    Calculate the proportion of completely correct Case ID values.

    A completely correct Case ID value is one where the 'Case ID' matches the 'Determined Case ID' for all rows where
    it appears, and vice versa.

    :param predicted_df: DataFrame containing Determined Case ID values.
    :param correct_df: DataFrame containing Case ID values.
    :return: Proportion of completely correct Case ID values.
    """
    if predicted_df.empty or correct_df.empty:
        return 0

    correct_cases = []

    for case_id in correct_df['Case ID'].unique():
        subset_correct = correct_df[correct_df['Case ID'] == case_id]
        subset_predicted = predicted_df.loc[subset_correct.index]

        condition_1 = all(subset_correct['Case ID'] == subset_predicted['Determined Case ID'])

        if not condition_1:
            continue

        subset_determined = predicted_df[predicted_df['Determined Case ID'] == case_id]
        subset_cases = correct_df.loc[subset_determined.index]

        condition_2 = all(subset_determined['Determined Case ID'] == subset_cases['Case ID'])

        if condition_2:
            correct_cases.append(case_id)

    proportion_completely_correct = len(correct_cases) / len(correct_df['Case ID'].unique()) * 100

    return proportion_completely_correct


def calculate_correct_case_different_naming(predicted_df: pd.DataFrame, correct_df: pd.DataFrame,
                                            calculation: bool = True) -> Union[pd.DataFrame, float]:
    """
    Calculate the proportion of factually completely correct Case ID values.

    A factually completely correct Case ID value is one where the Determined Case ID is different from the Case ID, yet
    uniquely maps back to the same Case ID value.

    :param predicted_df: DataFrame containing Determined Case ID values.
    :param correct_df: DataFrame containing Case ID values.
    :param calculation: If True, calculate the proportion of factually completely correct Case ID values.
                        If False, return the list of factually completely correct Case ID values.
    :return: Proportion of factually completely correct Case ID values if calculation is True,
             otherwise return the DataFrame of factually completely correct Case ID values.
    """
    if predicted_df.empty or correct_df.empty:
        return 0 if calculation else pd.DataFrame()

    correct_cases = []

    for case_id in correct_df['Case ID'].unique():
        subset_correct = correct_df[correct_df['Case ID'] == case_id]
        subset_predicted = predicted_df.loc[subset_correct.index]

        condition_1 = len(subset_predicted['Determined Case ID'].unique()) == 1

        if not condition_1:
            continue

        unique_value = subset_predicted['Determined Case ID'].iloc[0]

        condition_2 = unique_value != case_id

        if not condition_2:
            continue

        condition_3 = not predicted_df[(predicted_df['Determined Case ID'] == unique_value) &
                                       (correct_df['Case ID'] != case_id)].any().any()

        if condition_3:
            correct_cases.append(case_id)

    if not calculation:
        matching_rows = correct_df[correct_df['Case ID'].isin(correct_cases)]
        return matching_rows

    proportion_correct_different_naming = len(correct_cases) / len(correct_df['Case ID'].unique()) * 100

    return proportion_correct_different_naming


def calculate_factual_matching_proportion(predicted_df: pd.DataFrame, correct_df: pd.DataFrame) -> float:
    """
    Calculate the proportion of factually matching Case ID values.

    :param predicted_df: DataFrame containing Determined Case ID values.
    :param correct_df: DataFrame containing Case ID values.
    :return: Proportion of factually matching Case ID values.
    """
    if predicted_df.empty or correct_df.empty:
        return 0

    matching_rows = calculate_correct_case_different_naming(predicted_df, correct_df, False)
    proportion_factual_matching = len(matching_rows) / len(predicted_df) * 100

    return proportion_factual_matching


def calculate_matching_proportion(predicted_df: pd.DataFrame, correct_df: pd.DataFrame) -> float:
    """
    Calculate the proportion of matching Case ID values.

    :param predicted_df: DataFrame containing Determined Case ID values.
    :param correct_df: DataFrame containing Case ID values.
    :return: Proportion of matching Case ID values.
    """
    if predicted_df.empty or correct_df.empty:
        return 0

    matching_rows = predicted_df[predicted_df['Determined Case ID'] == correct_df['Case ID']]
    proportion_matching = len(matching_rows) / len(predicted_df) * 100

    return proportion_matching


def evaluate_accuracy(predicted_df: pd.DataFrame, complete_df: pd.DataFrame) -> dict:
    """
    Evaluate the accuracy of the repaired log.

    :param predicted_df: DataFrame containing the determined log.
    :param complete_df: DataFrame containing complete log.
    :return: Dictionary containing the quality metrics.
    """
    matching = calculate_matching_proportion(predicted_df, complete_df)
    factual_matching = calculate_factual_matching_proportion(predicted_df, complete_df)
    correct_proportion = calculate_completely_correct_cases(predicted_df, complete_df)
    factual_correct_proportion = calculate_correct_case_different_naming(predicted_df, complete_df)

    return {
        "Accuracy": matching,
        "Factual Accuracy": factual_matching,
        "Overall Accuracy": matching + factual_matching,
        "Real Case Accuracy": correct_proportion,
        "Factual Case Accuracy": factual_correct_proportion,
        "Overall Case Accuracy": correct_proportion + factual_correct_proportion
    }


In [3]:
def evaluate_completeness(df: pd.DataFrame, column: str = 'Determined Case ID') -> float:
    """
    Evaluate the completeness of the log.

    :param df: DataFrame containing the log.
    :param column: Column to evaluate completeness. Default is 'Determined Case ID'.
    :return: Proportion of missing values in the 'Case ID' column.
    """
    if column not in df.columns:
        return float('-inf')

    percentage_not_na = (1 - df[column].isna().sum() / len(df[column])) * 100
    return percentage_not_na


In [4]:
def evaluate_repaired_logs(folder_path: str, log_name: str, complete_log: pd.DataFrame) -> pd.DataFrame:
    """
    Evaluate the repaired logs.

    :param folder_path: Path to the folder containing the repaired logs.
    :param log_name: Name of the log.
    :param complete_log: DataFrame containing the complete log.
    :return: DataFrame containing the quality metrics.
    """
    quality_metrics = pd.DataFrame(columns=[
        "Iteration", "Completeness", "Accuracy", "Factual Accuracy", "Overall Accuracy", "Real Case Accuracy",
        "Factual Case Accuracy", "Overall Case Accuracy"
    ])

    if os.path.exists(folder_path):
        num_iterations = len([f for f in os.listdir(folder_path) if f.endswith('.csv')
                              and f.startswith(f"determined_{log_name}_iteration_")])

        if num_iterations:
            for i in range(1, num_iterations + 1):
                log_file = os.path.join(folder_path, f"determined_{log_name}_iteration_{i}.csv")

                if os.path.exists(log_file):
                    predicted_log = pd.read_csv(log_file)

                    if i == 1:
                        completeness = evaluate_completeness(predicted_log, 'Original Case ID')
                        quality_metrics = quality_metrics.append({"Iteration": 0, "Completeness": completeness},
                                                                 ignore_index=True)

                    completeness = evaluate_completeness(predicted_log)
                    iteration_metrics = {"Iteration": i, "Completeness": completeness}
                    accuracy = evaluate_accuracy(predicted_log, complete_log)
                    iteration_metrics.update(accuracy)
                    quality_metrics = quality_metrics.append(iteration_metrics, ignore_index=True)

    return quality_metrics


In [5]:
def format_metrics(metrics: pd.DataFrame) -> pd.DataFrame:
    """
    Format the quality metrics DataFrame.

    :param metrics: DataFrame containing the quality metrics.
    :return: Formatted DataFrame.
    """
    if metrics.empty:
        return metrics

    cols_to_format = metrics.columns.drop('Iteration')
    metrics[cols_to_format] = metrics[cols_to_format].applymap(lambda x: '{:.2f}%'.format(x) if pd.notnull(x) else "")
    metrics['Iteration'] = metrics['Iteration'].astype(int)

    return metrics


In [6]:
def get_input() -> Tuple[pd.DataFrame, str, str]:
    """
    Get the path to the folder containing the repaired logs as well as the complete log with corresponding name.

    :return:
    """
    complete_log_path = get_file_path("preprocessed complete log")

    if os.path.exists(complete_log_path):
        complete_log = pd.read_csv(complete_log_path)
        print("CSV file successfully read.")
        log_name = extract_log_name(complete_log_path)

        if "DISPLAY" in os.environ:
            root = tk.Tk()
            root.withdraw()

            folder_path = filedialog.askdirectory(
                title="Select the folder that contains the repaired logs after each iteration")

            if not folder_path:
                raise ValueError("Error: No file selected.")
        else:
            folder_path = input("Enter the path to the folder that contains the repaired logs after each iteration: ")

            if not folder_path:
                raise ValueError("Error: No file selected.")

            folder_path = folder_path.strip('"')

        print("Folder path successfully read.")

        return complete_log, log_name, folder_path

    return pd.DataFrame(), "", ""

In [7]:
complete_log, log_name, folder_path = get_input()

if folder_path:
    metrics = evaluate_repaired_logs(folder_path, log_name, complete_log)
    metrics = format_metrics(metrics)
    print(tabulate(metrics, headers='keys', tablefmt='grid', showindex=False))

CSV file successfully read.
Folder path successfully read.
+-------------+----------------+------------+--------------------+--------------------+----------------------+-------------------------+-------------------------+
|   Iteration | Completeness   | Accuracy   | Factual Accuracy   | Overall Accuracy   | Real Case Accuracy   | Factual Case Accuracy   | Overall Case Accuracy   |
|           0 | 80.95%         |            |                    |                    |                      |                         |                         |
+-------------+----------------+------------+--------------------+--------------------+----------------------+-------------------------+-------------------------+
|           1 | 97.62%         | 83.33%     | 0.00%              | 83.33%             | 33.33%               | 0.00%                   | 33.33%                  |
+-------------+----------------+------------+--------------------+--------------------+----------------------+----------------