In [1]:
import pandas as pd
import numpy as np
import pickle


from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    average_precision_score,
    confusion_matrix,
)

In [None]:
pickle_file_path = (
    "./output/beers/raha-baran-results-beers/error-detection/detection.dataset"
)

with open(pickle_file_path, "rb") as file:
    dataset_object = pickle.load(file)

print("Attributes of the loaded object:")
for key, value in vars(dataset_object).items():
    print(f"{key}: {type(value)}")

Attributes of the loaded object:
name: <class 'str'>
path: <class 'str'>
dataframe: <class 'pandas.core.frame.DataFrame'>
has_ground_truth: <class 'bool'>
clean_path: <class 'str'>
clean_dataframe: <class 'pandas.core.frame.DataFrame'>
dictionary: <class 'dict'>
results_folder: <class 'str'>
labeled_tuples: <class 'dict'>
labeled_cells: <class 'dict'>
labels_per_cluster: <class 'dict'>
detected_cells: <class 'dict'>
strategy_profiles: <class 'list'>
column_features: <class 'list'>
clusters_k_j_c_ce: <class 'dict'>
cells_clusters_k_j_ce: <class 'dict'>
sampled_tuple: <class 'numpy.int32'>
extended_labeled_cells: <class 'dict'>


In [None]:
import pickle
import pandas as pd


def save_annotated_results(pickle_file_path, original_dataset_path):
    """
    Load a pickled .dataset file, generate an annotated CSV based on errors,
    and save it with columns and shape from the original dataset.
    """
    try:
        # Load the pickled object
        with open(pickle_file_path, "rb") as file:
            dataset_object = pickle.load(file)

        # Load the original dataset to get the shape and columns
        original_df = pd.read_csv(original_dataset_path)

        # Assuming 'detected_cells' is a dictionary where keys are (row, column) tuples
        if hasattr(dataset_object, "detected_cells"):
            detected_cells = dataset_object.detected_cells  # Access the dictionary

            # Create a dictionary to map column index to column name from original_df
            col_name_mapping = {i: col for i, col in enumerate(original_df.columns)}

            # Create the annotated DataFrame with zeros based on the original dataset's shape and columns
            annotated_df = pd.DataFrame(
                0, index=original_df.index, columns=original_df.columns
            )

            # Populate the DataFrame with 1 where there are errors
            for key, value in detected_cells.items():
                row, col = key  # Directly using the tuple key (row, col)

                # Use the column index (col) to get the actual column name from the mapping
                col_name = col_name_mapping.get(col, None)
                if col_name is not None:
                    annotated_df.at[row, col_name] = 1  # Mark the detected error with 1

            # Save to a CSV file
            output_path = "./annotated_output.csv"
            annotated_df.to_csv(output_path, index=False, header=True)
            print(f"Annotated data saved to {output_path}")
        else:
            print("'detected_cells' attribute not found in the object.")

    except Exception as e:
        print(f"An error occurred: {e}")

In [None]:
save_annotated_results(
    "./output/beers/raha-baran-results-beers/error-detection/detection.dataset",
    "./output/beers/clean.csv",
)

Annotated data saved to ./annotated_cells.csv


In [5]:
def annotate_errors(
    df_fixed: pd.DataFrame, df_with_errors: pd.DataFrame
) -> pd.DataFrame:
    # Check if the dataframes have the same shape and columns after sorting
    if df_fixed.shape != df_with_errors.shape or not all(
        df_fixed.columns == df_with_errors.columns
    ):

        raise ValueError("Both dataframes must have the same structure.")

    # Convert both dataframes to strings for datatype-agnostic comparison
    df_fixed_str = df_fixed.astype(str)
    df_with_errors_str = df_with_errors.astype(str)

    # Create the annotation dataframe by comparing the two dataframes
    error_annotation = (df_fixed_str != df_with_errors_str).astype(int)

    return error_annotation

In [6]:
def inspect_classification(
    true_dataset: pd.DataFrame, pred_dataset: pd.DataFrame, input_dataset: pd.DataFrame
):
    true_dataset.reset_index(drop=True)
    pred_dataset.reset_index(drop=True)
    input_dataset.reset_index(drop=True)

    true_dataset.columns = input_dataset.columns
    pred_dataset.columns = input_dataset.columns

    calc = true_dataset.add(2)
    calc_out = pred_dataset.copy()
    calc_out[calc_out == 0] = -1

    calc = calc.add(calc_out)

    # True positive calculation
    tp = calc == 4
    true_positive_df = input_dataset[tp].astype(str)
    true_positive_df = true_positive_df.replace(to_replace="nan", value=0)
    true_positive_df = true_positive_df.reset_index(drop=True)  # Remove index

    # False positive calculation
    fp = calc == 3
    false_positive_df = input_dataset[fp].astype(str)
    false_positive_df = false_positive_df.replace(to_replace="nan", value=0)
    false_positive_df = false_positive_df.reset_index(drop=True)  # Remove index

    # False negative calculation
    fn = calc == 2
    false_negative_df = input_dataset[fn].astype(str)
    false_negative_df = false_negative_df.replace(to_replace="nan", value=0)
    false_negative_df = false_negative_df.reset_index(drop=True)  # Remove index

    all_errors_df = input_dataset[fp | fn].astype(str)
    all_errors_df = all_errors_df.replace(to_replace="nan", value=0)
    all_errors_df = all_errors_df.reset_index(drop=True)  # Remove index

    return true_positive_df, false_positive_df, false_negative_df, all_errors_df

In [None]:
def calculate_metrics(true_dataset: pd.DataFrame, pred_dataset: pd.DataFrame):
    # Flatten the dataframes to 1D arrays
    y_true = true_dataset.values.flatten()
    y_pred = pred_dataset.values.flatten()

    # Basic metrics
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)

    # Class-specific accuracy
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    class_0_accuracy = tn / (tn + fp)  # Accuracy for class 0
    class_1_accuracy = tp / (tp + fn)  # Accuracy for class 1

    # AUC scores
    roc_auc = roc_auc_score(y_true, y_pred)
    pr_auc = average_precision_score(y_true, y_pred)

    # Count of 1s in true and predicted labels
    true_positives_count = sum(y_true == 1)  # Total 1s in true labels
    predicted_positives_count = tp  # Total 1s in predictions

    return {
        "accuracy": float(accuracy),
        "precision": float(precision),
        "recall": float(recall),
        "f1_score": float(f1),
        "roc_auc": float(roc_auc),
        "pr_auc": float(pr_auc),
        "true_positives_count": int(true_positives_count),
        "predicted_positives_count": int(predicted_positives_count),
        "false_positives_count": int(fp),
    }

In [None]:
folder = "flights"

input_dataset = pd.read_csv(f"../../datasets/{folder}/{folder}.csv")
pred_dataset = pd.read_csv(f"./output/{folder}/annotated_output.csv")

true_dataset = pd.read_csv(f"../../datasets/{folder}/clean.csv")

error_annotation = annotate_errors(true_dataset, input_dataset)

true_positive_df, false_positive_df, false_negative_df, all_errors_df = (
    inspect_classification(error_annotation, pred_dataset, input_dataset)
)

# true_positive_df.to_csv(f"../datasets/{folder}/tp.csv")
# false_positive_df.to_csv(f"../datasets/{folder}/fp.csv")
# false_negative_df.to_csv(f"../datasets/{folder}/fn.csv")
# all_errors_df.to_csv(f"../datasets/{folder}/all_errors.csv")

print(calculate_metrics(error_annotation, pred_dataset))

{'accuracy': 0.8805916305916306, 'precision': 0.7602909865152591, 'recall': 0.8709349593495935, 'f1_score': 0.8118605532398636, 'roc_auc': 0.8777915916966548, 'pr_auc': 0.7003434126139427, 'true_positives_count': 4920, 'predicted_positives_count': 4285, 'false_positives_count': 1351}
