### CNN classifier

In [None]:
import os

from IPython.display import clear_output

In [None]:
import sys

sys.path.insert(1, "/home/vinicius/storage1/projects/CNN_mimi/")

In [None]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import xarray as xr
from matplotlib import cm
from PIL import Image
from torch import nn
from torchvision import transforms
from tqdm import tqdm

#### Auxiliar functions

In [None]:
path_to_video = "/home/projeto_mimi/videos/video2A.avi"

#### Loading and labeling data

In [None]:
from src.video_processing import extract_video_frames, label_frames

In [None]:
frames = extract_video_frames(path_to_video)

n_frames, H, W, d = frames.shape

In [None]:
# Select 500 random frames
training_frames = np.random.choice(
    np.arange(3000, 6000, dtype=int), size=500, replace=False
)

In [None]:
labels = []
for i, idx_ in enumerate(training_frames):
    print(f"Frame ({i + 1}/{len(training_frames)})")
    labels += [label_frames(frames, idx_)]
    clear_output(wait=True)
labels = np.asarray(labels)

In [None]:
training_dataset = xr.DataArray(
    frames[training_frames],
    dims=("frames", "height", "width", "depth"),
    coords={"frames": labels},
)

In [None]:
plt.figure(figsize=(15, 4))

text = ["No rat", "Rat", "Rai in the box"]

pos = 1
for i in range(3):

    plot_frames = training_dataset.sel(frames=i)[:10]

    for f in range(10):
        plt.subplot(3, 10, pos)
        img = Image.fromarray(plot_frames[f].data)
        plt.imshow(torchvision.transforms.Resize((300, 300))(img))
        plt.xticks([])
        plt.yticks([])
        if f == 0:
            plt.ylabel(text[i])
        pos = pos + 1

#### Organize training and testing dataset

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

from src.utils import apply_transforms

In [None]:
training_dataset = xr.load_dataarray("/home/projeto_mimi/dataset/training_dataset.nc")

In [None]:
# Define transformations
trfs = transforms.Compose(
    [
        transforms.RandomHorizontalFlip(p=0.7),
        transforms.GaussianBlur((3, 3), sigma=(1, 2)),
        transforms.ToTensor(),
        transforms.Resize((300, 300)),
    ]
)

In [None]:
# Inputs and labels
X, y = apply_transforms(
    training_dataset, fraction=None, trfs=transforms.Resize((300, 300)), verbose=True
)
# Augment dataset
X_aug, y_aug = apply_transforms(training_dataset, fraction=0.5, trfs=trfs, verbose=True)
# Transpose due to random flip
X_aug = np.transpose(X_aug, (0, 2, 1, 3))

In [None]:
X_aug.shape

In [None]:
plt.subplot(1, 2, 1)
plt.imshow(np.transpose(X[10], (1, 2, 0)))
plt.title("Original Frame")
plt.axis("off")
plt.subplot(1, 2, 2)
plt.title("Augmented Frame")
plt.imshow(np.transpose(X_aug[10], (1, 2, 0)))
plt.axis("off")

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True)

In [None]:
X_test.shape

In [None]:
X_train = np.concatenate((X_train, X_aug), 0)
y_train = np.concatenate((y_train, y_aug), 0)

In [None]:
# Convert to PyTorch tensor
X_train, y_train = torch.Tensor(X_train), torch.Tensor(y_train)
X_test, y_test = torch.Tensor(X_test), torch.Tensor(y_test)

In [None]:
trainloader = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(X_train, y_train), batch_size=32
)
testloader = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(X_test, y_test), batch_size=32
)

#### CNN class definition

In [None]:
from src.model import CNNclassifer
from src.training import train

In [None]:
cnn = CNNclassifer(
    in_dim=300,
    n_classes=3,
    in_channels=3,
    n_filters=[32, 16, 16, 8],
    n_neurons=[1000, 500, 200],
    kernel_size=[(3, 3)] * 4,
    pool_size=(2, 2),
    dropout=0.2,
)

In [None]:
cnn

In [None]:
train(
    cnn,
    trainloader,
    testloader,
    device="auto",
    epochs=100,
    criterion=nn.CrossEntropyLoss,
    optimizer=torch.optim.Adam,
    verbose=True,
)

In [None]:
y_prev = (
    (torch.nn.functional.softmax(cnn(X_test.to("cuda")), -1))
    .argmax(-1)
    .to("cpu")
    .detach()
    .numpy()
)

In [None]:
y_test_cpu = y_test.to("cpu").detach().numpy()

In [None]:
accuracy_score(y_prev, y_test_cpu)

#### Cross Validation Score

In [None]:
from sklearn.model_selection import KFold

In [None]:
def CrossValidationAccuracy(
    cnn,
    X,
    y,
    k=10,
    epochs=100,
    batch_size=128,
    device="auto",
    criterion=nn.CrossEntropyLoss,
    optimizer=torch.optim.Adam,
    verbose=False,
):

    # Creating data folds
    kf = KFold(n_splits=k, shuffle=True)
    kf.get_n_splits(X)

    cv_acc = []
    cv_loss = []
    cv_acc_train = []
    cv_loss_train = []

    pbar = tqdm(kf.split(X)) if verbose else kf.split(X)
    fold = 1
    for train_index, test_index in pbar:

        pbar.set_description(f"Training in fold {fold}/{k}")

        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        X_train, y_train = torch.Tensor(X_train), torch.Tensor(y_train)
        X_test, y_test = torch.Tensor(X_test), torch.Tensor(y_test)

        trainloader = torch.utils.data.DataLoader(
            torch.utils.data.TensorDataset(X_train, y_train), batch_size=batch_size
        )
        testloader = torch.utils.data.DataLoader(
            torch.utils.data.TensorDataset(X_test, y_test), batch_size=batch_size
        )

        out = train(
            cnn,
            trainloader,
            testloader,
            device=device,
            epochs=epochs,
            criterion=criterion,
            optimizer=optimizer,
            verbose=verbose,
            return_scores=True,
            return_train_scores=True,
        )

        cv_loss_train += [out[0]]
        cv_acc_train += [out[1]]
        cv_loss += [out[2]]
        cv_acc += [out[3]]

        fold = fold + 1

    df = pd.DataFrame(
        np.stack((cv_loss_train, cv_acc_train, cv_loss, cv_acc), -1),
        columns=["loss_train", "acc_train", "loss_test", "acc_test"],
    )
    df.index.name = "fold"

    return df

In [None]:
acc = CrossValidationAccuracy(
    cnn, X_train, y_train, k=5, epochs=100, batch_size=32, device="auto", verbose=True
)

In [None]:
median = acc["acc_test"].median()
sigma = acc["acc_test"].std()
print(f"{median:.3f} +- {sigma:.3f}")

In [None]:
# Inputs and labels
X, y = apply_transforms(
    training_dataset, fraction=None, trfs=transforms.Resize((300, 300)), verbose=True
)
# Augment dataset
X_aug, y_aug = apply_transforms(training_dataset, fraction=0.5, trfs=trfs, verbose=True)
# Transpose due to random flip
X_aug = np.transpose(X_aug, (0, 2, 1, 3))

X_train = np.concatenate((X_train, X_aug), 0)
y_train = np.concatenate((y_train, y_aug), 0)

In [None]:
cnn_1_nodrop = CNNclassifer(
    in_dim=300,
    n_classes=3,
    in_channels=3,
    n_filters=[32, 16],
    n_neurons=[1000, 200],
    kernel_size=[(3, 3), (3, 3)],
    pool_size=(2, 2),
    dropout=0.0,
)

cnn_1 = CNNclassifer(
    in_dim=300,
    n_classes=3,
    in_channels=3,
    n_filters=[32, 16],
    n_neurons=[1000, 200],
    kernel_size=[(3, 3), (3, 3)],
    pool_size=(2, 2),
    dropout=0.2,
)

In [None]:
cv_cnn_1 = CrossValidationAccuracy(
    cnn_1, X_train, y_train, k=5, epochs=100, batch_size=32, device="auto", verbose=True
)

cv_cnn_1_nodrop = CrossValidationAccuracy(
    cnn_1_nodrop,
    X_train,
    y_train,
    k=5,
    epochs=100,
    batch_size=32,
    device="auto",
    verbose=True,
)

In [None]:
cnn_2 = CNNclassifer(
    in_dim=300,
    n_classes=3,
    in_channels=3,
    n_filters=[32, 16, 16, 8],
    n_neurons=[1000, 500, 500, 200],
    kernel_size=[(3, 3)] * 4,
    pool_size=(2, 2),
    dropout=0.2,
)


cnn_2_nodrop = CNNclassifer(
    in_dim=300,
    n_classes=3,
    in_channels=3,
    n_filters=[32, 16, 16, 8],
    n_neurons=[1000, 500, 500, 200],
    kernel_size=[(3, 3)] * 4,
    pool_size=(2, 2),
    dropout=0.0,
)

In [None]:
cv_cnn_2 = CrossValidationAccuracy(
    cnn_2, X_train, y_train, k=5, epochs=100, batch_size=32, device="auto", verbose=True
)

cv_cnn_2_nodrop = CrossValidationAccuracy(
    cnn_2_nodrop,
    X_train,
    y_train,
    k=5,
    epochs=100,
    batch_size=32,
    device="auto",
    verbose=True,
)

In [None]:
cnn_1

In [None]:
cnn_2

In [None]:
cv_cnn_1["model"] = "cnn1"
cv_cnn_1["drop"] = 1
cv_cnn_1_nodrop["model"] = "cnn1_nodrop"
cv_cnn_1_nodrop["drop"] = 0
cv_cnn_2["model"] = "cnn2"
cv_cnn_2["drop"] = 1
cv_cnn_2_nodrop["model"] = "cnn2_nodrop"
cv_cnn_2_nodrop["drop"] = 0
cv_scores = pd.concat([cv_cnn_1, cv_cnn_1_nodrop, cv_cnn_2, cv_cnn_2_nodrop])

In [None]:
cv_scores

In [None]:
import seaborn as sns
from scipy.stats import mannwhitneyu

In [None]:
def convert_pvalue_to_asterisks(pvalue):
    if pvalue <= 0.0001:
        return "****"
    elif pvalue <= 0.001:
        return "***"
    elif pvalue <= 0.01:
        return "**"
    elif pvalue <= 0.05:
        return "*"
    return "ns"


def add_stats_annot(pval, x1, x2, y, h, col):
    plt.plot([x1, x1, x2, x2], [y, y + h, y + h, y], lw=1.5, c=col)
    plt.text(
        (x1 + x2) * 0.5,
        y + h,
        convert_pvalue_to_asterisks(pval),
        ha="center",
        va="bottom",
        color=col,
    )


def mwhitney(x, y, boot=1000):
    _, p = mannwhitneyu(
        np.random.choice(x, size=boot),
        np.random.choice(y, size=boot),
        alternative="greater",
    )
    return p

In [None]:
ax = plt.subplot(111)
sns.boxplot(data=cv_scores, x="model", y="acc_test", hue="drop", showfliers=False)
sns.boxplot(
    data=cv_scores,
    x="model",
    y="acc_train",
    hue="drop",
    showfliers=False,
    color="lightgray",
)

add_stats_annot(
    mwhitney(
        cv_scores.loc[cv_scores.model == "cnn1_nodrop"].acc_test,
        cv_scores.loc[cv_scores.model == "cnn1"].acc_test,
    ),
    0.2,
    0.8,
    0.9,
    0.005,
    "k",
)

add_stats_annot(
    mwhitney(
        cv_scores.loc[cv_scores.model == "cnn2"].acc_test,
        cv_scores.loc[cv_scores.model == "cnn2_nodrop"].acc_test,
    ),
    2.2,
    2.8,
    0.97,
    0.005,
    "k",
)

add_stats_annot(
    mwhitney(
        cv_scores.loc[cv_scores.model == "cnn2"].acc_test,
        cv_scores.loc[cv_scores.model == "cnn1"].acc_test,
    ),
    0.2,
    2.2,
    0.96,
    0.005,
    "k",
)

ax.legend().remove()
[ax.spines[pos].set_visible(False) for pos in ["top", "right"]]


plt.xticks(
    [0.2, 0.8, 2.2, 2.8],
    ["CNN1", "CNN1 (no drop.)", "CNN2", "CNN2 (no drop.)"],
    rotation=45,
)
plt.ylabel("CV accuracies")
plt.xlabel("")
plt.savefig("figures/cv_acc.png", bbox_inches="tight", transparent=True, dpi=600)

In [None]:
torch.save(cnn_2, "cnn_2")

#### Classify video

In [None]:
import cv2

from src.utils import apply_transforms
from src.video_processing import extract_video_frames

In [None]:
path_to_video = "/home/projeto_mimi/videos/video2B.avi"

frames = extract_video_frames(path_to_video)

_, H, W, d = frames.shape

In [None]:
frames = xr.DataArray(frames, dims=("frames", "W", "H", "d")).isel(
    frames=slice(4000, 6000)
)

n_frames = len(frames)

In [None]:
frames, _ = apply_transforms(
    frames, fraction=None, trfs=transforms.Resize((300, 300)), verbose=True
)

In [None]:
frames = torch.Tensor(frames)

In [None]:
model = torch.load("cnn_2")
model.eval().to("cuda")

In [None]:
labels = np.empty(n_frames)
for i in tqdm(range(n_frames)):
    out = model(frames[i, ...][None, ...].to("cuda"))
    labels[i] = torch.nn.functional.softmax(out.to("cpu"), dim=-1).argmax(-1)

In [None]:
labels = labels.astype(int)

In [None]:
frames = frames.to("cpu").detach().numpy()

In [None]:
for i in range(n_frames):
    img = np.transpose(frames[i], (1, 2, 0))
    if labels[i] == 1:
        frame = cv2.copyMakeBorder(
            img, 5, 5, 5, 5, cv2.BORDER_CONSTANT, value=[255, 0, 0]
        )
    elif labels[i] == 2:
        frame = cv2.copyMakeBorder(
            img, 5, 5, 5, 5, cv2.BORDER_CONSTANT, value=[0, 255, 0]
        )
    elif labels[i] == 0:
        frame = cv2.copyMakeBorder(
            img, 5, 5, 5, 5, cv2.BORDER_CONSTANT, value=[0, 0, 255]
        )
    cv2.imwrite(f"labeled_frames/frame_{labels[i]}_{i}.jpg", frame)