This notebook contains the results from the validation and testing of the latent diffusion models. 

In [None]:
import os
import cv2
import wandb
import pickle
import shutil
import textwrap
import numpy as np
import pandas as pd
import seaborn as sns
from pathlib import Path
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report

workdir = "Generative Diffusion Models for 3D Geometric Objects"
if workdir in os.getcwd() and os.path.basename(os.getcwd()) != workdir:
    os.chdir("..")

from utils.data_processing import recalculate_holographic_features

## 1. Validation of the Model Training

In a first step, the logged metrics during training and validation are downloaded from Wandb.

In [None]:
# Selected LDM Variants
model_names = [
    "ldm_cls_8_512_e5",
    "ldm_clstbl_8_512_e5",
    "ldm_tbl_8_512_e5x",
]

The subsequent code downloads the ldm metrics for the different variants and saves them as csv.

In [None]:
# Initialize the wandb API
api = wandb.Api()

# Get the runs for a specific project
entity = "simonluder"
project_name = "MSE_P8"
runs = api.runs(f'{entity}/{project_name}')
run_names = [f"{model_name}_ldm" for model_name in model_names]

# Loop through the runs and download the one with the specific name
for run in runs:
    
    if run.name in run_names:

        if not os.path.exists(f'holographic_pollen/{run.name.replace("_ldm", "")}'):

            print(f"Downloading: {run.name}")
            run_id = run.id

            # download metrics from the history artifact
            artifact = api.artifact(name=f'{entity}/{project_name}/run-{run_id}-history:latest', type='wandb-history')
            filename = artifact.file("temp/wandb/")

            
            # create csv
            Path(f'holographic_pollen/{run.name.replace("_ldm", "")}').mkdir(parents=True, exist_ok=True) 
            df = pd.read_parquet(filename)
            df.to_csv(f'holographic_pollen/{run.name.replace("_ldm", "")}/metrics.csv', index=False)

            shutil.rmtree("temp/wandb/")


Now the metrics can be loaded was pandas dataframe for easy processing.

In [None]:
metrics = []

for model_name in model_names:
    df = pd.read_csv(f"holographic_pollen/{model_name}/metrics.csv")
    df.loc[:, "model"] = model_name.replace("_e5x", "").replace("_e5", "")
    df.loc[:, "model_name"] = model_name

    metrics.append(df)

metrics = pd.concat(metrics).reset_index(drop=True)

metrics.head(3)

# 12 epochs of trainin max
metrics = metrics.loc[metrics["epoch"] <= 12]

The visualization below shows the mean squared error between the predicted noise and the ground truth on the validation set.

In [None]:
validation_metrics = metrics.loc[metrics["val_epoch_reconstructon_loss"].notna()]

plt.figure(figsize=(12,5))
sns.lineplot(data=validation_metrics, x="step", y="val_epoch_reconstructon_loss", hue="model")
plt.title("Mean Squared Error od the predicted noise on the validation set")
plt.ylabel("Mean Squared Error")
plt.yscale("log")
plt.show()

The ldms were run every 10000 batches on the validation dataset. The following shows when the models performed best on the validation dataset.

In [None]:
min_idx = validation_metrics.groupby("model_name")["val_epoch_reconstructon_loss"].idxmin()
best_val_epoch = validation_metrics.loc[min_idx]

best_val_epoch = best_val_epoch[["model_name", "val_epoch_reconstructon_loss", "epoch", "_step"]]
best_val_epoch = best_val_epoch.rename(columns={"model_name":"model", 
                                                "val_epoch_reconstructon_loss":"validation loss",
                                                "_step":"step"})

best_val_epoch["step"] = best_val_epoch["step"].astype(int)
best_val_epoch["epoch"] = best_val_epoch["epoch"].astype(int)

best_val_epoch
# print(best_val_epoch.to_latex())

The model weights of the best validation epoch were then taken for testing.

## 2. Testing

### 2.1 MSE and LPIPS Errors on the testset

In [None]:
files = ["ldm_cls_8_512_e5", 
        "ldm_clstbl_8_512_e5", 
         "ldm_tbl_8_512_e5x"]

df_errors = list()
for file in files:
    with open(f"holographic_pollen/{file}/test/lpips_scores.pkl", "rb") as f:
        df = pd.DataFrame.from_dict(pickle.load(f))
        df["model"] = file.replace("_e5x", "").replace("_e5", "")
        df_errors.append(df)
df_errors = pd.concat(df_errors)

In [None]:
plt.figure(figsize=(8,4))
sns.boxplot(x='model', y='mse_scores', data=df_errors, flierprops={'marker': 'x', 'markersize': 4})
plt.title('Mean squared error on generated images for the testset')
plt.ylabel('Mean squared error')
plt.xlabel('model')
plt.xticks(rotation=90)
plt.show()

print(df_errors.groupby("model")["mse_scores"].quantile([0.25, 0.5, 0.75]))

In [None]:
plt.figure(figsize=(8,4))
sns.boxplot(x='model', y='lpips_scores', data=df_errors, flierprops={'marker': 'x', 'markersize': 4})
plt.title('LPIPS error on generated images for the testset')
plt.ylabel('LPIPS error')
plt.xlabel('model')
plt.xticks(rotation=90)
plt.show()

print(df_errors.groupby("model")["lpips_scores"].quantile([0.25, 0.5, 0.75]))

### 2.2 Testing the extracted visual features

The visual features extracted with `skimage.measure.regionprops` are subsequently tested. For this purpose, these measured values are extracted from the test images as well as from the generated images of the LDM variants. The difference between the extracted metrics from the generated images to the test is then determined.

The subsequent list shows the extracted metrics

In [None]:
measurement_columns = [
    'area',
    'bbox_area',
    'convex_area',
    'major_axis_length',
    'minor_axis_length',
    'eccentricity',
    'solidity',
    'perimeter',
    'perimeter_crofton',
    'equivalent_diameter',
    'orientation',
    'feret_diameter_max',
    'max_intensity',
    'min_intensity',
    'mean_intensity'
]

The first step is to recalculate the metrics on the test set.

In [None]:
df_test_gt = pd.read_csv("labels_test.csv")
df_test_gt.loc[:,"dataset_id"] = "."

In [None]:
df_test_gt = recalculate_holographic_features(df_test_gt, image_path = "data/holographic_pollen/test_images_256")

The metrics are then recalculated from the generated images of the different ldm variants.

In [None]:
df_test_pred_dict = dict()
for model_name in model_names:
    print(model_name)
    df_test_pred = df_test_gt[["event_id", "dataset_id", "label", "rec_path", "filenames", "class_id"]].copy(deep=True)
    df_test_pred.loc[:,"dataset_id"] = "."
    df_test_pred_dict[model_name] = recalculate_holographic_features(df_test_pred, image_path = f"holographic_pollen/{model_name}/test/images/")

The Mean Absolute Percentage Error (MAPE) and the Mean Absolute Error (MAE) between these metrics are then calculated. The MAE is only used for the evaluation orientation. As the MAPE may be outliers, images with an error in the area outside the 99.5th percentile are also removed from the evaluation (12 per model variant).

In [None]:
def drop_percentiles(df, groupby, cols, percentile=.99):
    df = df.copy()
    threshold = df.groupby(groupby)[cols].transform(lambda s: s.quantile(percentile))
    return df[df[cols].lt(threshold).all(axis=1)], df[df[cols].ge(threshold).all(axis=1)]

# def mse_error(df, columns):

#     ground_truth_cols = [col_name + "_gt" for col_name in columns]
#     predicted_cols = [col_name + "_pred" for col_name in columns]

#     # Calculate squared errors for each pair of columns
#     squared_errors = (df[ground_truth_cols].values - df[predicted_cols].values) ** 2

#     df = pd.DataFrame(columns=columns, data=squared_errors)

#     return df


def mae_error(df, columns):

    ground_truth_cols = [col_name + "_gt" for col_name in columns]
    predicted_cols = [col_name + "_pred" for col_name in columns]

    # Calculate squared errors for each pair of columns
    squared_errors = (df[ground_truth_cols].values - df[predicted_cols].values)

    df = pd.DataFrame(columns=columns, data=squared_errors).apply(abs)

    return df

def ape_error(df, columns):
    ground_truth_cols = [col_name + "_gt" for col_name in columns]
    predicted_cols = [col_name + "_pred" for col_name in columns]

    # Calculate absolute percentage errors for each pair of columns
    absolute_percentage_errors = np.abs((df[ground_truth_cols].values - df[predicted_cols].values) / (df[ground_truth_cols].values + 1e-9)) * 100

    # Create a DataFrame to store the APE values
    ape_df = pd.DataFrame(columns=columns, data=absolute_percentage_errors)

    return ape_df


def calculate_metrics(df_gt, df_pred, model_names, measurement_columns, metric):
    # Initialize an empty list to store the MSE DataFrames
    df_list = []

    for model_name in model_names:
        # Merge the ground truth and predicted DataFrames on the 'filenames' column
        merged_df = pd.merge(df_gt, df_pred[model_name], 
                             left_on='filenames', right_on='filenames', 
                             how='outer', suffixes=["_gt", "_pred"]).copy()
        
        # Calculate the MSE for the merged DataFrame
        result = metric(merged_df, columns=measurement_columns)

        result["event_id"] = df_gt["event_id"]
        result["rec_path"] = df_gt["rec_path"]
        result["dataset_id"] = df_gt["dataset_id"]
        
        
        # Add the model name to the DataFrame
        result["model_name"] = model_name

        # Append the DataFrame to the list
        df_list.append(result)

    # Concatenate all the MSE DataFrames
    df = pd.concat(df_list)
    
    # # Clean up the model names
    df["model"] = df["model_name"].apply(lambda x: x.replace("_e5x", "").replace("_e5", ""))

    return df


df_mae = calculate_metrics(df_test_gt, df_test_pred_dict, model_names, measurement_columns, mae_error )
# filter out rows above 99.5 percentile
df_mae, df_mae_drop = drop_percentiles(df_mae, groupby="model", cols=["area"], percentile=0.995)

df_mae_mean = df_mae.groupby("model")[measurement_columns].mean()
df_mae_std = df_mae.groupby("model")[measurement_columns].std()
print(len(df_mae_drop))

df_ape = calculate_metrics(df_test_gt, df_test_pred_dict, model_names, measurement_columns, ape_error)
# filter out rows above 99.5 percentile
df_ape, df_ape_drop  = drop_percentiles(df_ape, groupby="model", cols=["area"], percentile=0.995)
df_ape_mean = df_ape.groupby("model")[measurement_columns].mean()
df_ape_std = df_ape.groupby("model")[measurement_columns].std()
print(len(df_ape_drop))

For the sake of transparency, these filtered out images are subsequently visualized.

In [None]:
def plot_images(df, num_cols=2):
    num_rows = int(np.ceil(len(df) / num_cols))
    fig, ax = plt.subplots(num_rows, num_cols * 2, figsize=(4 * num_cols, 3 * num_rows))

    models = df["model"].unique()
    model_colors = {model: plt.cm.get_cmap("tab10")(i) for i, model in enumerate(models)}

    for model_idx, model in enumerate(models):
        col_idx = (model_idx % num_cols) * 2
        fig.text(0.5 * col_idx / num_cols + 0.5 / num_cols, 1.02, model, ha='center', fontsize=12, weight='bold', color=model_colors[model])

    for i, row in df.reset_index().iterrows():
        gt_file = os.path.join("data/holographic_pollen/test_images_256", row["rec_path"])
        pred_file = os.path.join("holographic_pollen", row["model_name"], "test/images", row["rec_path"])

        img_gt = cv2.imread(gt_file)
        img_pred = cv2.imread(pred_file)

        col_idx = (i // num_rows) * 2
        row_idx = i % num_rows

        bg_color = model_colors[row["model"]]

        ax[row_idx, col_idx].imshow(img_gt)
        ax[row_idx, col_idx].set_title("Ground Truth", size=10)
        ax[row_idx, col_idx + 1].imshow(img_pred)
        ax[row_idx, col_idx + 1].set_title("Generated", size=10)
        
        for axis in [ax[row_idx, col_idx], ax[row_idx, col_idx + 1]]:
            axis.tick_params(left=False, right=False, labelleft=False, labelbottom=False, bottom=False)
            for spine in axis.spines.values():
                spine.set_edgecolor(bg_color)

        fig.text(0.5 * col_idx / num_cols + 0.5 / num_cols, row_idx / (num_rows + 0.2) + 1 / num_rows, 
                 textwrap.fill(row["rec_path"], 40, ), ha='center', fontsize=10)

    plt.tight_layout()
    plt.show()

# Example call to the function
plot_images(df_mae_drop, num_cols=3)

In [None]:
plot_images(df_ape_drop.groupby("model").head(12), num_cols=3)

In [None]:
df_ape.value_counts("model_name")

In [None]:
df_mae_drop.value_counts("model_name")

The following table shows the MAE for the extracted metrics. The standard deviation is also shown.

In [None]:
def find_non_zero_decimal_pos(number):
    # Convert the number to a string
    num_str = str(number)
    
    # Find the position of the decimal point
    decimal_position = num_str.find('.')
    
    # Iterate through the characters after the decimal point
    for i, string in enumerate(num_str):
        if string != '0' and string != '.':
            return i - decimal_position if i > decimal_position else 0
    
    # If no non-zero decimal is found, return -1
    return -1

# Create an empty dataframe to store the result
result_df = pd.DataFrame()
# Iterate through columns
for col in df_mae_mean.columns:
    smallest_number = df_mae_mean[col].min()
    dec_round = find_non_zero_decimal_pos(smallest_number)
    # Combine elements from df1 and df2 with the desired format
    result_df[col] = [f"{df_mae_mean.loc[model, col].round(dec_round)}±{df_mae_std.loc[model, col].round(dec_round)}" for model in df_mae_mean.index]
    result_df.index = df_mae_mean.index
    
result_df = result_df.T

result_df["improvement"] = (df_mae_mean.T["ldm_tbl_8_512"] / df_mae_mean.T["ldm_clstbl_8_512"]).round(1)
result_df.loc[result_df.index=="orientation"]
# Display the result
# result_df.T
# result_df

And similarly with the MAPE bellow

In [None]:
# Create an empty dataframe to store the result
result_df = pd.DataFrame()
# Iterate through columns
for col in df_ape_mean.columns:
    smallest_number = df_ape_mean[col].min()
    dec_round = find_non_zero_decimal_pos(smallest_number)
    # Combine elements from df1 and df2 with the desired format
    result_df[col] = [f"{df_ape_mean.loc[model, col].round(dec_round+1)}±{df_ape_std.loc[model, col].round(dec_round+1)}" for model in df_ape_mean.index]
    result_df.index = df_ape_mean.index
    
result_df = result_df.T
result_df
# print(result_df.to_latex())

### 2.3 Quantitative Evaluation

The generated images are subsequently visualized in order to compare them visually with the ground truth from the test data set.

In [None]:
# load the ground truth labels from the testset
ground_truh_labels = pd.read_csv("labels_test.csv")

# Select samples from test set
sample_filenames = ground_truh_labels.groupby("label").head(1).sort_values(by="label")["rec_path"].values

In [None]:
# load the test images
test_results = []
for model_name in model_names:
    df = pd.read_csv(f"labels_test_{model_name}.csv")
    df.loc[:, "model_name"] = model_name
    df.loc[:, "model"] = model_name.replace("_e5x", "").replace("_e5", "")
    df["dataset_id"] = df["filenames"].apply(lambda x: os.path.split(x)[0])
    df["rec_path"] = df["filenames"].apply(lambda x: os.path.split(x)[1])
    test_results.append(df)
    
test_results = pd.concat(test_results).reset_index(drop=True)

test_results.head(3)

In [None]:
# # Add ground truth labels to the dataframe
test_results = pd.merge(test_results, ground_truh_labels[['event_id', 'rec_path', 'filenames']], on=['event_id', 'rec_path'], how='left', suffixes=("", "_gt"))

In [None]:

# Number of samples to display
num_samples = len(set(test_results["label"]))

# Filter the test results for the selected samples
samples = test_results[test_results["rec_path"].isin(sample_filenames)].reset_index()

# Calculate the number of subplots required
samples_per_subplot = 8  # Number of samples per subplot
num_subplots = (num_samples + samples_per_subplot - 1) // samples_per_subplot  # Calculate the number of subplots needed
num_cols = len(samples[samples["rec_path"] == sample_filenames[0]]) + 1
num_rows = samples_per_subplot

# Iterate through each subplot
for subplot_idx in range(num_subplots):
    # Create a figure for each subplot
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(2 * num_cols, 2.3 * num_rows))
    
    # Iterate through the samples for the current subplot
    for row_idx in range(num_rows):
        sample_idx = subplot_idx * samples_per_subplot + row_idx
        if sample_idx >= num_samples:
            break
        
        sample_filename = sample_filenames[sample_idx]
        # Get the subset of samples for the current filename
        sample_subset = samples[samples["rec_path"] == sample_filename].reset_index()
        
        # # Display the ground truth image
        gt_path = os.path.join("Z:/marvel/marvel-fhnw/data/Poleno", sample_subset.at[0, "filenames_gt"])
        ground_truth_image = cv2.imread(gt_path)
        axes[row_idx, 0].imshow(ground_truth_image)
        axes[row_idx, 0].set_title("Ground Truth", size=12)
        axes[row_idx, 0].tick_params(left=False, right=False, labelleft=False, labelbottom=False, bottom=False)
        
        # Display the generated images for each sample
        for col_idx, row in sample_subset.iterrows():
            rel_path = f'holographic_pollen/{row["model_name"]}/test/images/{row["rec_path"]}'
            generated_image = cv2.imread(rel_path)
            axes[row_idx, col_idx + 1].imshow(generated_image)
            axes[row_idx, col_idx + 1].set_title(row["model"], size=12)
            axes[row_idx, col_idx + 1].tick_params(left=False, right=False, labelleft=False, labelbottom=False, bottom=False)
        
        # Set the label as subtitle for each row
        fig.text(-0.1, (num_rows - row_idx - 0.5) / num_rows, sample_subset.at[0, "label"], ha='center', va='center', fontsize=12, fontweight='bold')
    
    # Adjust layout and show the plot
    plt.tight_layout()
    plt.show()


### 2.3 Siamese Classifier Results

#### Validation

At first the validation accuracy of the siamese classifier during training is visualized

In [None]:

# Initialize the wandb API
api = wandb.Api()

# Get the runs for a specific project
entity = "simonluder"
project_name = "MSE_P8"
runs = api.runs(f'{entity}/{project_name}')
run_names = [f"siamese_classifier_genus_l" ]

# Loop through the runs and download the one with the specific name
for run in runs:
    
    if run.name in run_names:

        print(f"Downloading: {run.name}")
        run_id = run.id

        # download metrics from the history artifact
        artifact = api.artifact(name=f'{entity}/{project_name}/run-{run_id}-history:latest', type='wandb-history')
        filename = artifact.file("temp/wandb/")

        # create csv
        Path(f'holographic_pollen/{run.name}').mkdir(parents=True, exist_ok=True) 
        df = pd.read_parquet(filename)
        df.to_csv(f'holographic_pollen/{run.name}/metrics.csv', index=False)

        shutil.rmtree("temp/wandb/")

In [None]:
metrics = []

for model_name in run_names:
    df = pd.read_csv(f"holographic_pollen/{model_name}/metrics.csv")
    df.loc[:, "model"] = model_name.replace("_l", "")
    df.loc[:, "model_name"] = model_name

    metrics.append(df)

metrics = pd.concat(metrics).reset_index(drop=True)

plt.figure(figsize=(12,5))
sns.lineplot(data=metrics, x="step", y="val_accuracy", hue="model")
plt.title("Classification Accuracy on the validation set")
plt.ylabel("Mean Accuracy")
plt.show()

#### Testing

Load the ground truth

In [None]:
df_ground_truth = pd.read_csv("labels_test.csv")
df_ground_truth["genus"] = df_ground_truth["label"].apply(lambda x: x.split()[0])
id_to_genus = df_ground_truth.drop_duplicates().set_index('genus_id')['genus'].to_dict()

Load the results

In [None]:
# variants = ["test", "ldm_cls_8_512_e5", "ldm_clstbl_8_512_e5"]
# variants = ["test", "test_256", "test_256_16", "validation", "validation_256_16", "ldm_cls_8_512_e5", "ldm_clstbl_8_512_e5", "ldm_tbl_8_512_e5x"]
variants = [ "test_256", "ldm_cls_8_512_e5", "ldm_clstbl_8_512_e5", "ldm_tbl_8_512_e5x"]
# classifier = "resnet_classifier_005"
classifier = "siamese_classifier_genus_l"
# classifier = "siamese_classifier_mini"
# classifier = "siamese_classifier_genus_complete"
ground_truth = "test_256"

labels = dict()
predictions = dict()

df_predictions = []

for variant in variants:
    # labels_dir = f"holographic_pollen/{classifier}/validation/{9999}/labels.npy"
    # predictions_dir = f"holographic_pollen/{classifier}/validation/{9999}/predictions.npy"
    labels_dir = f"holographic_pollen/{classifier}/test/{variant}/labels.npy"
    predictions_dir = f"holographic_pollen/{classifier}/test/{variant}/predictions.npy"
    gt_dir = f"holographic_pollen/{classifier}/test/{ground_truth}/predictions.npy"
    # labels_dir = f"C:/Users/simon/Downloads/{classifier}/test/{variant}/labels.npy"
    # predictions_dir = f"C:/Users/simon/Downloads/{classifier}/test/{variant}/predictions.npy"

    
    df = pd.DataFrame({"true_id":np.load(labels_dir), "pred_id":np.load(predictions_dir), "equal_pred":np.load(predictions_dir)==np.load(gt_dir)})
    
    df["variant"] = variant.replace("_256", "").replace("_16", "").replace("_e5x", "").replace("_e5", "")
    df["true_genus"] = df["true_id"].apply(lambda x: id_to_genus[x])
    df["pred_genus"] = df["pred_id"].apply(lambda x: id_to_genus[x])

    ground_truth
    

    df_predictions.append(df)

df_predictions = pd.concat(df_predictions)
df_predictions["correct"] = df_predictions["pred_id"] == df_predictions["true_id"]
df_predictions.head(3)

In [None]:
# path = "C:/Users/simon/Downloads/"
# classifier  = "siamese_classifier_genus_complete"

# variants = os.listdir(f"{path}/{classifier}/validation")
# for ckpt in os.listdir(f"{path}/{classifier}/validation"):

#     labels_dir = f"{path}/{classifier}/validation/{ckpt}/labels.npy"
#     predictions_dir = f"{path}/{classifier}/validation/{ckpt}/predictions.npy"
#     labels[ckpt] = np.load(labels_dir)
#     predictions[ckpt] = np.load(predictions_dir)

# ckpt = "7800"
# (labels[ckpt] == predictions[ckpt]).sum() / len(labels[ckpt])

Show the model accuracy

In [None]:
print(df_predictions.groupby("variant")["correct"].mean())

Visualization of the confusion matrix

In [None]:
df = df_predictions

# Number of variants
variants = list(df["variant"].drop_duplicates())
print(variants)

# Create subplots
fig, axes = plt.subplots(1, len(variants), figsize=(8 * len(variants), 6))

for idx, variant in enumerate(variants):
    sub_df = df.loc[df["variant"]==variant]
    conf_matrix = confusion_matrix(sub_df["true_id"], sub_df["pred_id"], normalize="true")

    sub_df
    labels = list(sub_df.sort_values("true_id")["true_genus"].drop_duplicates())

    df_cm = pd.DataFrame(data=conf_matrix, index=labels, columns=labels)

    ax = axes[idx]
    sns.heatmap(df_cm, annot=False, fmt='g', cmap='Blues', cbar=False, ax=ax, vmin=0, vmax=1)
    ax.set_title(variant)
    ax.set_xlabel('Predicted')

    if idx == 0:
        ax.set_ylabel('True')

    if idx == len(variants)-1:
        cbar = fig.colorbar(axes[0].collections[0], ax=axes, location='right', pad=0.01)

# plt.tight_layout()
plt.show()

In [None]:
for idx, variant in enumerate(reversed(variants)):
    print(variant)
    sub_df = df.loc[df["variant"]==variant]
    print(classification_report(sub_df["true_id"], sub_df["pred_id"]))