# GANomaly Notebook for Individual ExperimentResults

## Initial Configurations

### Libraries import

In [None]:
import os
import sys
import random
import numpy as np
import pandas as pd
from scipy import stats
import matplotlib.pyplot as plt
import umap
from sklearn.decomposition import PCA
import cv2
from IPython.display import display, clear_output
from ipywidgets import interact, IntSlider

sys.path.append("../../")

In [None]:
from utils.metrics import dagostinoPearson_test, andersonDarling_test, shapiroWilks_test, chiSquare_test, fOneWay_test
from utils.metrics import brownForsythe_test, levene_test, bartlett_test
from utils.metrics import mannWhitney_test, kruskalWallis_test, kolmogorovSmirnov_test
from utils.savers import generate_qq_plot

### Experiment selection

In [None]:
experiment_id = "0002"
root_path = "/home/jefelitman/Saved_Models/Anomaly_parkinson/"
for i in sorted(os.listdir("/home/jefelitman/Saved_Models/Anomaly_parkinson/")):
    if experiment_id in i:
        experiment_folder = os.path.join(root_path, i)
experiment_folder

## Quantitative metrics

### Metrics loading

In [None]:
train_metrics = pd.read_csv(os.path.join(experiment_folder, "metrics/train.csv"))
test_metrics = pd.read_csv(os.path.join(experiment_folder, "metrics/test.csv"))
print("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}".format(
    train_metrics.loc[train_metrics.shape[0] - 1 ,"gen_error"],
    train_metrics.loc[train_metrics.shape[0] - 1 ,"disc_error"],
    test_metrics.loc[test_metrics.shape[0] - 1 ,"accuracy"],
    test_metrics.loc[test_metrics.shape[0] - 1 ,"precision"],
    test_metrics.loc[test_metrics.shape[0] - 1 ,"recall"],
    test_metrics.loc[test_metrics.shape[0] - 1 ,"specificity"],
    test_metrics.loc[test_metrics.shape[0] - 1 ,"f1_score"],
    test_metrics.loc[test_metrics.shape[0] - 1 ,"auc"]
))

### Train Graphics

In [None]:
epochs = 500
portions = epochs // 100
columns = 5
rows = (epochs // portions) // columns
path = os.path.join(experiment_folder, "outputs/graphics/quantitative/")
for metric in ["gen_error", "disc_error", "accuracy", "specificity"]:
    fig, axs = plt.subplots(rows, columns, figsize=(30, 25))
    fig.suptitle('{} over Epochs'.format(metric))
    for i in range(rows):
        for j in range(columns):
            axs[i, j].plot(train_metrics[metric][(i*columns+j)*portions:(i*columns+j+1)*portions])
            filename = 'train_{}.png'.format(metric)
    fig.savefig(path+filename)
    plt.close(fig)

### Test graphics

In [None]:
epochs = 500
portions = epochs // 100
columns = 5
rows = (epochs // portions) // columns
path = os.path.join(experiment_folder, "outputs/graphics/quantitative/")
for metric in test_metrics.columns[1:]:
    fig, axs = plt.subplots(rows, columns, figsize=(30, 25))
    fig.suptitle('{} over Epochs'.format(metric))
    for i in range(rows):
        for j in range(columns):
            axs[i, j].plot(test_metrics[metric][(i*columns+j)*portions:(i*columns+j+1)*portions])
            filename = 'test_{}.png'.format(metric)
    fig.savefig(path+filename)
    plt.close(fig)

## Qualitative metrics

### Errors loading

In [None]:
base_path = os.path.join(experiment_folder, "outputs/errors/")
for t in ["encoder", "contextual", "adversarial"]:
        for c in ["normal", "abnormal"]:
            globals()["all_{}_{}".format(t, c)] = np.r_[[]]
            
for t in ["encoder", "contextual", "adversarial"]:
    for m in ["train", "test"]:
        classes = ["normal"] if m == "train" else ["normal", "abnormal"]
        for c in classes:
            all_data = "all_{}_{}".format(t, c)
            errors = np.load(os.path.join(base_path, t, m, c+".npy"))
            globals()["{}_{}_{}".format(m, t, c)] = errors
            globals()[all_data] = np.concatenate([globals()[all_data], errors])

### Qualitative metrics

In [None]:
for g in ["train", "test", "all"]:
    print("-------------- {} ---------------------".format(g))
    classes = ["normal"] if g == "train" else ["normal", "abnormal"]
    for c in classes:
        for t in ["encoder", "contextual", "adversarial"]:
            data = globals()["{}_{}_{}".format(g, t,c)]
            m = np.mean(data)
            s = np.std(data)
            print("{} error ({}): {}\t{}\t{}\t{}\t{}\t{}".format(
                t,
                c,
                m,
                s,
                stats.skew(data),
                stats.kurtosis(data),
                int(m < 2*s),
                1 - stats.norm(m, s).cdf(0)
            ))
            print("")

### Normality tests

In [None]:
for g in ["train", "test", "all"]:
    print("-------------- {} ---------------------".format(g))
    classes = ["normal"] if g == "train" else ["normal", "abnormal"]
    for c in classes:
        for t in ["encoder", "contextual", "adversarial"]:
            data = globals()["{}_{}_{}".format(g, t,c)]
            norm_dist = stats.norm.rvs(loc=np.mean(data), scale=np.std(data), size=data.shape, random_state=8128)
            chi_test = chiSquare_test(data, norm_dist)
            print("{} tests ({}): {}\t{}\t{}\t{}\t{}\t{}\t{}\t{}".format(
                t,
                c,
                int(brownForsythe_test(data, norm_dist)),
                int(levene_test(data, norm_dist)),
                int(bartlett_test(data, norm_dist)),
                int(dagostinoPearson_test(data)),
                int(andersonDarling_test(data)),
                int(shapiroWilks_test(data)),
                "{} ({})".format(int(chi_test[0]), round(chi_test[1], 5)),
                int(fOneWay_test(data, norm_dist))
            ))
            print("")

### Grouping tests

In [None]:
for t in ["encoder", "contextual", "adversarial"]:
    print("-------------- {} ---------------------".format(t))
    for g1, g2 in [("train", "test"), ("test", "all"), ("train", "all")]:
        data1 = globals()["{}_{}_normal".format(g1, t)]
        data2 = globals()["{}_{}_normal".format(g2, t)]
        #norm_dist = stats.norm.rvs(loc=np.mean(data), scale=np.std(data), size=data.shape, random_state=8128)
        print("{} vs {}: {}\t{}\t{}\t{}\t{}\t{}\t{}".format(
            g1,
            g2,
            int(brownForsythe_test(data1, data2)),
            int(levene_test(data1, data2)),
            int(bartlett_test(data1, data2)),
            int(mannWhitney_test(data1, data2)),
            int(kruskalWallis_test(data1, data2)),
            int(kolmogorovSmirnov_test(data1, data2)),
            int(fOneWay_test(data1, data2))
        ))
        print("")

### Classing tests

In [None]:
for g in ["test", "all"]:
    print("-------------- {} ---------------------".format(g))
    for t in ["encoder", "contextual", "adversarial"]:
        data1 = globals()["{}_{}_normal".format(g, t)]
        data2 = globals()["{}_{}_abnormal".format(g, t)]
        line = "{} normal vs abnormal: {}\t{}\t{}".format(
            t,
            int(brownForsythe_test(data1, data2)),
            int(levene_test(data1, data2)),
            int(bartlett_test(data1, data2)),
        )
        if g == "test":
            line += "\t{}\t{}\t{}\t{}".format(
                int(mannWhitney_test(data1, data2)),
                int(kruskalWallis_test(data1, data2)),
                int(kolmogorovSmirnov_test(data1, data2)),
                int(fOneWay_test(data1, data2))
            )
        else:
            chi_1_test = chiSquare_test(data1, data2)
            chi_2_test = chiSquare_test(data2, data1)
            line += "\t{}\t{}\t{}\t{}\t{}\t{}".format(
                "{} ({})".format(int(chi_1_test[0]), round(chi_1_test[1], 5)),
                "{} ({})".format(int(chi_2_test[0]), round(chi_2_test[1], 5)),
                int(mannWhitney_test(data1, data2)),
                int(kruskalWallis_test(data1, data2)),
                int(kolmogorovSmirnov_test(data1, data2)),
                int(fOneWay_test(data1, data2))
            )
        print(line)
        print("")

### Train graphics

#### QQ plots

In [None]:
save_path = os.path.join(experiment_folder, "outputs/graphics/qualitative/")
for t in ["encoder", "contextual", "adversarial"]:
    for c in ["normal"]:
        data = globals()["train_{}_{}".format(t,c)]
        generate_qq_plot(
            data, 
            save_path, 
            "train_{}_{}".format(t,c),
            ".png",
            np.mean(data),
            np.std(data)
        )

### Test graphics

#### Box plots

In [None]:
save_path = os.path.join(experiment_folder, "outputs/graphics/qualitative/")
rows = 1
columns = 3
fig, axs = plt.subplots(rows, columns, figsize=(15, 10))
for i, t in enumerate(["encoder", "contextual", "adversarial"]):
    data = "test_{}_".format(t)
    axs[i].boxplot([globals()[data+"normal"], globals()[data+"abnormal"]], labels=['Normal', 'Abnormal'])
    axs[i].set_title("{} errors".format(t))

filename = 'test_boxplot.png'
fig.savefig(save_path+filename)
plt.close(fig)

#### QQ plots

In [None]:
save_path = os.path.join(experiment_folder, "outputs/graphics/qualitative/")
for t in ["encoder", "contextual", "adversarial"]:
    for c in ["normal", "abnormal"]:
        data = globals()["test_{}_{}".format(t,c)]
        generate_qq_plot(
            data, 
            save_path, 
            "test_{}_{}".format(t,c),
            ".png",
            np.mean(data),
            np.std(data)
        )

### All data graphics

#### Box plots

In [None]:
save_path = os.path.join(experiment_folder, "outputs/graphics/qualitative/")
rows = 1
columns = 3
fig, axs = plt.subplots(rows, columns, figsize=(15, 10))
for i, t in enumerate(["encoder", "contextual", "adversarial"]):
    data = "all_{}_".format(t)
    axs[i].boxplot([globals()[data+"normal"], globals()[data+"abnormal"]], labels=['Normal', 'Abnormal'])
    axs[i].set_title("{} errors".format(t))

filename = 'all_data_boxplot.png'
fig.savefig(save_path+filename)
plt.close(fig)

#### QQ plots

In [None]:
save_path = os.path.join(experiment_folder, "outputs/graphics/qualitative/")
for t in ["encoder", "contextual", "adversarial"]:
    for c in ["normal", "abnormal"]:
        data = globals()["all_{}_{}".format(t,c)]
        generate_qq_plot(
            data, 
            save_path, 
            "all_{}_{}".format(t,c),
            ".png",
            np.mean(data),
            np.std(data)
        )

## Visual results

### Loading latent vectors

In [None]:
base_path = os.path.join(experiment_folder, "outputs/latent_vectors/")
for n in ["generator", "discriminator"]:
    for t in ["input", "output"]:
        for m in ["train", "test"]:
            classes = ["normal"] if m == "train" else ["normal", "abnormal"]
            for c in classes:
                all_data = "all_{}_{}_{}".format(n, t, c)
                if all_data not in globals().keys():
                    globals()[all_data] = []
            
                data = "{}_{}_{}_{}".format(m, n, t, c)
                globals()[data] = []
                path = os.path.join(base_path, t + "_" + n, m, c)
                for file in sorted(os.listdir(path)):
                    vector = np.load(os.path.join(path, file))
                    globals()[data].append(vector)
                    globals()[all_data].append(vector)
                globals()[data] = np.r_[globals()[data]]
for n in ["generator", "discriminator"]:
    for t in ["input", "output"]:
        for c in ["normal", "abnormal"]:
            all_data = "all_{}_{}_{}".format(n, t, c)
            globals()[all_data] = np.r_[globals()[all_data]]

### UMAP latent vectors

In [None]:
save_path = os.path.join(experiment_folder, "outputs/graphics/visuals/")
for n in ["generator", "discriminator"]:
    for t in ["input", "output"]:
        for m in ["train", "test", "all"]:
            classes = ["normal"] if m == "train" else ["normal", "abnormal"]
            globals()["{}_mapped_{}_{}".format(m, n, t)] = umap.UMAP(random_state = 8128).fit_transform(np.concatenate(
                    [globals()["{}_{}_{}_{}".format(m, n, t, c)] for c in classes], axis=0
                )
            )

            divisor = globals()["{}_{}_{}_normal".format(m, n, t)].shape[0]
            plt.scatter(globals()["{}_mapped_{}_{}".format(m, n, t)][:divisor,0],
                globals()["{}_mapped_{}_{}".format(m, n, t)][:divisor,1], color="blue", s=5, label="Normal"
            )
            if "abnormal" in classes:
                plt.scatter(globals()["{}_mapped_{}_{}".format(m, n, t)][divisor:,0],
                    globals()["{}_mapped_{}_{}".format(m, n, t)][divisor:,1], color="red", s=5, label="Abnormal"
                )
            plt.legend()
            plt.title("UMAP for {} {} data in {}".format(n, t, m))
            filename = '{}_umap_{}_{}.png'.format(m, n, t)
            plt.savefig(save_path+filename)
            plt.close()

### PCA latent vectors

In [None]:
save_path = os.path.join(experiment_folder, "outputs/graphics/visuals/")
for n in ["generator", "discriminator"]:
    for t in ["input", "output"]:
        for m in ["train", "test", "all"]:
            classes = ["normal"] if m == "train" else ["normal", "abnormal"]
            globals()["{}_mapped_{}_{}".format(m, n, t)] = PCA(n_components=2, random_state=8128).fit_transform(np.concatenate(
                    [globals()["{}_{}_{}_{}".format(m, n, t, c)] for c in classes], axis=0
                )
            )

            divisor = globals()["{}_{}_{}_normal".format(m, n, t)].shape[0]
            plt.scatter(globals()["{}_mapped_{}_{}".format(m, n, t)][:divisor,0],
                globals()["{}_mapped_{}_{}".format(m, n, t)][:divisor,1], color="blue", s=5, label="Normal"
            )
            if "abnormal" in classes:
                plt.scatter(globals()["{}_mapped_{}_{}".format(m, n, t)][divisor:,0],
                    globals()["{}_mapped_{}_{}".format(m, n, t)][divisor:,1], color="red", s=5, label="Abnormal"
                )
            plt.legend()
            plt.title("PCA for {} {} data in {}".format(n, t, m))
            filename = '{}_pca_{}_{}.png'.format(m, n, t)
            plt.savefig(save_path+filename)
            plt.close()

### Video comparision

In [None]:
def show_all_videos(real_videos, fake_videos, substraction_videos):
    assert len(real_videos) == len(fake_videos) == len(substraction_videos)
    for i in range(len(real_videos)):
        assert real_videos[i].shape == fake_videos[i].shape == substraction_videos[i].shape
    
    frame_slider = IntSlider(min=1, max=real_videos[0].shape[0], step=1)
    volume_slider = IntSlider(min=1, max=len(real_videos), step=1)

    def update_frame_max(*args):
        frame_slider.max = real_videos[volume_slider.value].shape[0]
    volume_slider.observe(update_frame_max, 'value')
    
    interact(lambda volume, frame: plt.imshow(real_videos[volume-1][frame-1]),
        volume=volume_slider, frame=frame_slider
    )
    interact(lambda volume, frame: plt.imshow(fake_videos[volume-1][frame-1]),
        volume=volume_slider, frame=frame_slider
    )
    interact(lambda volume, frame: plt.imshow(substraction_videos[volume-1][frame-1]),
        volume=volume_slider, frame=frame_slider
    )

In [None]:
base_path = os.path.join(experiment_folder, "outputs/samples/")
for t in ["real", "fake", "substraction"]:
    for c in ["normal", "abnormal"]:
        globals()["{}_{}_videos".format(t, c)] = []

for t in ["real", "fake", "substraction"]:
    for m in ["train", "test"]:
        classes = ["normal"] if m == "train" else ["normal", "abnormal"]
        for c in classes:
            videos_path = os.path.join(base_path, t, m, c)
            for video_folder in sorted(os.listdir(videos_path)):
                video = []
                video_path = os.path.join(videos_path, video_folder)
                for frame in sorted(os.listdir(video_path)):
                    video.append(cv2.cvtColor(
                        cv2.imread(
                            os.path.join(video_path, frame)
                        ), cv2.COLOR_BGR2RGB
                    ))
                globals()["{}_{}_videos".format(t, c)].append(np.r_[video])
len(real_normal_videos)

In [None]:
videos = "normal"
show_all_videos(
    globals()["real_{}_videos".format(videos)],
    globals()["fake_{}_videos".format(videos)],
    globals()["substraction_{}_videos".format(videos)],
)