<a href="https://colab.research.google.com/github/hc07180011/testing-cv/blob/main/flicker-detection/flicker-detection/CNN%2BLSTM_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Relevent Object classes

- Tensor flow feature extractor
- Custom Data Loader (main preprocess event)
- pytorch LSTM model
- Custom F1 precision recall aggregation object
- Statistic evaluation wrapper

In [35]:
import re
import cv2
import json
import gc
import torch
import numpy as np
import tensorflow as tf

from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import NearMiss
from imblearn.pipeline import Pipeline
from sklearn.metrics import confusion_matrix, precision_recall_curve, roc_curve, auc, roc_auc_score, f1_score, classification_report
from torch.nn import functional as F

from tensorflow.keras import Model
from tensorflow.keras.applications import resnet

from typing import Callable,Tuple
from io import StringIO


class BaseCNN:
    """
    adaptive pooling sample:
    https://ideone.com/cJoN3x
    """
    # tf.random.set_seed(12345)
    tf.keras.utils.set_random_seed(12345)
    tf.config.experimental.enable_op_determinism()

    def __init__(self) -> None:
        self.__target_shape = (200, 200)
        self.__embedding = None
        self.strategy = tf.distribute.MirroredStrategy()
        tf.get_logger().setLevel('INFO')

    def get_embedding(self, images: np.ndarray, batched=True) -> np.ndarray:
        if not batched:
            images = np.expand_dims(images, axis=0)
        with self.strategy.scope():
            resized_images = tf.image.resize(
                images, self.__target_shape, tf.image.ResizeMethod.NEAREST_NEIGHBOR)  # check resize differences
            return self.__embedding.predict(resnet.preprocess_input(resized_images))

    def get_embed_cpu(self, images: np.ndarray, batched=True) -> np.ndarray:
        if not batched:
            images = np.expand_dims(images, axis=0)
        resized_images = np.array([cv2.resize(image, dsize=self.__target_shape,
                                              interpolation=cv2.INTER_CUBIC) for image in images])
        with self.strategy.scope():
            image_tensor = tf.convert_to_tensor(resized_images, np.float32)
            return self.__embedding(resnet.preprocess_input(image_tensor)).numpy()

    def extractor(self, extractor: Model, weights: str = "imagenet", pooling: str = "Max") -> Model:
        with self.strategy.scope():
            self.__embedding = extractor(
                weights=weights,
                input_shape=self.__target_shape + (3,),
                include_top=False,
                pooling=pooling
            )
            return self.__embedding



class Streamer(object):
    """
    https://jamesmccaffrey.wordpress.com/2021/03/08/working-with-huge-training-data-files-for-pytorch/
    """

    def __init__(self,
                 embedding_list_train: list,
                 label_path: str,
                 data_dir: str,
                 mem_split: int,
                 chunk_size: int,
                 batch_size: int,
                 sampler: Callable = None,
                 multiclass: bool = False,
                 overlap_chunking: bool = False,
                 ) -> None:
        self.multiclass = multiclass
        self.overlap_chunking = overlap_chunking

        self.embedding_list_train = embedding_list_train
        self.chunk_embedding_list = np.array_split(
            embedding_list_train, mem_split)
        self.data_dir = data_dir
        self.raw_labels = json.load(open(label_path, "r"))

        self.mem_split = mem_split
        self.chunk_size = chunk_size
        self.batch_size = batch_size
        self.sampler = sampler
        self.sampling_params = None

        self.cur_chunk = 0
        self.X_buffer, self.y_buffer = (), ()

    def __len__(self) -> int:
        # FIX ME
        return len(self.embedding_list_train)*len(self.chunk_embedding_list)

    def __iter__(self):
        return self

    def __next__(self) -> Tuple[torch.Tensor, torch.Tensor]:
        if (not self.X_buffer or not self.y_buffer) and self.cur_chunk == len(self.chunk_embedding_list):
            gc.collect()
            raise StopIteration

        if (not self.X_buffer or not self.y_buffer):
            self._load_embeddings(
                self.chunk_embedding_list[self.cur_chunk])
    
            self.cur_chunk += 1
            X, y = self._re_sample()
            self.X_buffer, self.y_buffer = self._batch_sample(
                X, y, self.batch_size)
            gc.collect()

        X, y = self.X_buffer.pop(), self.y_buffer.pop()
        idx = np.arange(X.shape[0]) - 1
        random.shuffle(idx)
        return torch.from_numpy(X[idx]).float(), torch.from_numpy(y[idx]).long()

    def _re_sample(self,) -> Tuple[np.ndarray, np.ndarray]:
        X, y = np.array(self.X_buffer), np.array(self.y_buffer)
        if self.sampler is None and self.ipca is None or not np.any(np.array(self.y_buffer)) == 1:
            return X, y

        if self.sampler:
            return self._sampling(X, y, self.sampler)

        return X, y

    def _load_embeddings(
        self,
        embedding_list_train: list,
        mov_dif: bool = False,
    ) -> None:
        for key in embedding_list_train:
            real_filename = key.replace("reduced_", "").replace(".npy", "")
            loaded = np.load(
                "{}".format(os.path.join(
                    self.data_dir, key))
            )

            flicker_idxs = np.array(
                self.raw_labels[real_filename], dtype=np.uint16) - 1
            if self.overlap_chunking:
                self.X_buffer += (*self._overlap_chunks(loaded,
                                  flicker_idxs, self.chunk_size),)
                self.y_buffer += (1,)*flicker_idxs.size
                loaded = np.delete(loaded, flicker_idxs, axis=0)
                flicker_idxs = np.array([])

            buf_label = np.zeros(loaded.shape[0])
            buf_label[flicker_idxs.tolist()] = 1
            self.X_buffer += (*self._get_chunk_array(loaded,
                                                     self.chunk_size),)
            self.y_buffer += tuple(
                sum(x) if self.multiclass else 1 if sum(x) else 0
                for x in self._get_chunk_array(buf_label, self.chunk_size)
            )
            gc.collect()

    def _shuffle(self) -> None:
        random.shuffle(self.embedding_list_train)
        self.chunk_embedding_list = np.array_split(
            self.embedding_list_train, self.mem_split)
        self.cur_chunk = 0
        self.X_buffer, self.y_buffer = (), ()
        gc.collect()

    @staticmethod
    def _mov_dif_chunks(
        input_arr: np.ndarray,
    ) -> np.ndarray:
        difference = np.diff(input_arr, axis=-1)
        return (255*(difference - np.min(difference))/np.ptp(difference)).astype(np.int8)

    @staticmethod
    def _overlap_chunks(
        input_arr: np.ndarray,
        labels: np.ndarray,
        chunk_size: int
    ) -> np.ndarray:
        vid_pad = np.zeros(
            (input_arr.shape[0]+chunk_size, *input_arr.shape[1:]))
        vid_pad[chunk_size//2:-chunk_size//2] = input_arr
        return np.array([
            vid_pad[idx:idx+chunk_size]
            for idx in labels
        ])

    @staticmethod
    def _get_chunk_array(input_arr: np.array, chunk_size: int) -> list:
        chunks = np.array_split(
            input_arr,
            list(range(
                chunk_size,
                input_arr.shape[0] + 1,
                chunk_size
            ))
        )
        i_pad = np.zeros(chunks[0].shape)
        i_pad[:len(chunks[-1])] = chunks[-1]
        chunks[-1] = i_pad
        return chunks

    @staticmethod
    def _sampling(
        X_train: np.array,
        y_train: np.array,
        sampler: Callable,
    ) -> Tuple[np.array, np.array]:
        """
        batched alternative:
        https://imbalanced-learn.org/stable/references/generated/imblearn.keras.BalancedBatchGenerator.html
        """
        if isinstance(sampler, list):
            sampler = Pipeline(sampler)
        original_X_shape = X_train.shape
        X_train, y_train = sampler.fit_resample(
            np.reshape(X_train, (-1, np.prod(original_X_shape[1:]))),
            y_train
        )
        X_train = np.reshape(X_train, (-1,) + original_X_shape[1:])
        return X_train, y_train

    @staticmethod
    def _batch_sample(
        X: np.ndarray,
        y: np.ndarray,
        batch_size: int,
    ) -> Tuple[np.ndarray, np.ndarray]:
        X = [
            X[i:i+batch_size]
            for i in range(0, len(X), batch_size)
        ]
        y = [
            y[i:i+batch_size]
            for i in range(0, len(y), batch_size)
        ]
        return X, y


class LSTM(torch.nn.Module):
    def __init__(
        self,
        input_dim: int,
        output_dim: int,
        hidden_dim: int,
        layer_dim: int,
        bidirectional=False,
    ) -> None:
        super(LSTM, self).__init__()
        # Hidden dimensions
        self.hidden_dim = hidden_dim
        # Number of hidden layers
        self.layer_dim = layer_dim
        # Output dim classes
        self.output_dim = output_dim
        self.n_directions = 2 if bidirectional else 1

        # LSTM Layer
        self.lstm = torch.nn.LSTM(input_size=input_dim, hidden_size=hidden_dim, num_layers=layer_dim,
                            batch_first=True, bidirectional=bidirectional)
        # Linear Dense
        self.fc1 = torch.nn.Linear(hidden_dim*self.n_directions, hidden_dim//2)
        # Linear Dense
        self.fc2 = torch.nn.Linear(hidden_dim//2, self.output_dim)
        # initialize weights & bias with stdv -> 0.05
        self.initialization()

    def init_hidden(self, x: torch.Tensor) -> torch.FloatTensor:
        h0 = torch.zeros(
            self.layer_dim*self.n_directions,
            x.size(0),
            self.hidden_dim,
            device="cuda"
        ).requires_grad_()

        # Initialize cell state
        c0 = torch.zeros(
            self.layer_dim*self.n_directions,
            x.size(0),
            self.hidden_dim,
            device="cuda"
        ).requires_grad_()
        return h0, c0

    def forward(self, x) -> torch.Tensor:
        # One time step
        out, _ = self.lstm(x, self.init_hidden(x))
        # Dense lstm
        out = self.fc1(out)
        # Dense for softmax
        out = self.fc2(out)
        return out[:, -1]

    def initialization(self) -> None:
        for m in self.modules():
            if isinstance(m, torch.nn.Linear):
                torch.nn.init.normal_(param.data, std=0.05)
            elif isinstance(m, torch.nn.LSTM):
                for name, param in m.named_parameters():
                    if 'weight_ih' in name:
                        for i in range(4):
                            mul = param.shape[0]//4
                            torch.nn.init.xavier_uniform_(param[i*mul:(i+1)*mul])
                    elif 'weight_hh' in name:
                        for i in range(4):
                            mul = param.shape[0]//4
                            torch.nn.init.xavier_uniform_(param[i*mul:(i+1)*mul])
                    elif 'bias' in name:
                        torch.nn.init.zeros_(param.data)



class F1Score(torch.nn.Module):
    """
    Class for f1 calculation in Pytorch.
    """

    def __init__(self, average: str = 'weighted'):
        """
        Init.

        Args:
            average: averaging method
        """
        self.average = average
        if average not in [None, 'micro', 'macro', 'weighted']:
            raise ValueError('Wrong value of average parameter')

    @staticmethod
    def calc_f1_micro(predictions: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
        """
        Calculate f1 micro.

        Args:
            predictions: tensor with predictions
            labels: tensor with original labels

        Returns:
            f1 score
        """
        true_positive = torch.eq(labels, predictions).sum().float()
        f1_score = torch.div(true_positive, len(labels))
        return f1_score

    @staticmethod
    def calc_f1_count_for_label(predictions: torch.Tensor,
                                labels: torch.Tensor, label_id: int) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Calculate f1 and true count for the label

        Args:
            predictions: tensor with predictions
            labels: tensor with original labels
            label_id: id of current label

        Returns:
            f1 score and true count for label
        """
        # label count
        true_count = torch.eq(labels, label_id).sum()

        # true positives: labels equal to prediction and to label_id
        true_positive = torch.logical_and(torch.eq(labels, predictions),
                                          torch.eq(labels, label_id)).sum().float()
        # precision for label
        precision = torch.div(true_positive, torch.eq(
            predictions, label_id).sum().float())
        # replace nan values with 0
        precision = torch.where(torch.isnan(precision),
                                torch.zeros_like(precision).type_as(
                                    true_positive),
                                precision)

        # recall for label
        recall = torch.div(true_positive, true_count)
        # f1
        f1 = 2 * precision * recall / (precision + recall)
        # replace nan values with 0
        f1 = torch.where(torch.isnan(f1), torch.zeros_like(
            f1).type_as(true_positive), f1)
        return f1, true_count

    def __call__(self, predictions: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
        """
        Calculate f1 score based on averaging method defined in init.

        Args:
            predictions: tensor with predictions
            labels: tensor with original labels

        Returns:
            f1 score
        """

        # simpler calculation for micro
        if self.average == 'micro':
            return self.calc_f1_micro(predictions, labels)

        f1_score = 0
        for label_id in range(len(labels.unique())):
            f1, true_count = self.calc_f1_count_for_label(
                predictions, labels, label_id)

            if self.average == 'weighted':
                f1_score += f1 * true_count
            elif self.average == 'macro':
                f1_score += f1

        if self.average == 'weighted':
            f1_score = torch.div(f1_score, len(labels))
        elif self.average == 'macro':
            f1_score = torch.div(f1_score, len(labels.unique()))

        return f1_score


class F1_Loss(torch.nn.Module):
    '''Calculate F1 score. Can work with gpu tensors

    The original implmentation is written by Michal Haltuf on Kaggle.

    Returns
    -------
    torch.Tensor
        `ndim` == 1. epsilon <= val <= 1

    Reference
    ---------
    - https://www.kaggle.com/rejpalcz/best-loss-function-for-f1-score-metric
    #sklearn.metrics.f1_score
    - https://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html
    - https://discuss.pytorch.org/t/calculating-precision-recall-and-f1-score-in-case-of-multi-label-classification/28265/6
    - http://www.ryanzhang.info/python/writing-your-own-loss-function-module-for-pytorch/
    '''

    def __init__(self, epsilon=1e-7):
        super().__init__()
        self.epsilon = epsilon

    def forward(self, y_pred, y_true,):
        assert y_pred.ndim == 2
        assert y_true.ndim == 1
        y_true = F.one_hot(y_true, 2).to(torch.float32)
        y_pred = F.softmax(y_pred, dim=1)

        tp = (y_true * y_pred).sum(dim=0).to(torch.float32)
        tn = ((1 - y_true) * (1 - y_pred)).sum(dim=0).to(torch.float32)
        fp = ((1 - y_true) * y_pred).sum(dim=0).to(torch.float32)
        fn = (y_true * (1 - y_pred)).sum(dim=0).to(torch.float32)

        precision = tp / (tp + fp + self.epsilon)
        recall = tp / (tp + fn + self.epsilon)

        f1 = 2 * (precision*recall) / (precision + recall + self.epsilon)
        f1 = f1.clamp(min=self.epsilon, max=1-self.epsilon)
        return 1 - f1.mean()


class Evaluation(object):
    """
    https://onlineconfusionmatrix.com/
    https://discuss.pytorch.org/t/bce-loss-vs-cross-entropy/97437/3
    """
    logging.getLogger('PIL.PngImagePlugin').setLevel(logging.WARNING)

    def __init__(self,
                 plots_folder: str = "plots/",
                 classes: int = 2,
                 f1_metric: F1Score = F1Score(average='macro'),
                 ) -> None:
        self.plots_folder = plots_folder
        self.classes = classes
        self.f1_metric = f1_metric

    def roc_auc(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
    ):
        """
        plot ROC Curve
        https://stackoverflow.com/questions/45332410/roc-for-multiclass-classification
        """
        roc_auc, fpr, tpr = {}, {}, {}
        for i in range(self.classes):
            fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_pred[:, i])
            roc_auc[i] = auc(fpr[i], tpr[i])
        # Plot of a ROC curve for a specific class
        for i in range(self.classes):
            plt.figure()
            plt.plot([0, 1], [0, 1], linestyle="dashed")
            plt.plot(fpr[i], tpr[i], marker="o")
            plt.plot([0, 0, 1], [0, 1, 1], linestyle="dashed", c="red")
            plt.legend([
                "No Skill",
                "ROC curve (area = {:.2f})".format(roc_auc[i]),
                "Perfect"
            ])
            plt.xlabel("False Positive Rate")
            plt.ylabel("True Positive Rate")
            plt.title(f"Class-{i} ROC Curve")
            plt.savefig(os.path.join(self.plots_folder, f"roc_curve_{i}.png"))
        plt.close()

    def pr_curve(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
    ) -> None:
        """
        plot PR Curve
        """
        precision, recall = {}, {}
        for i in range(self.classes):
            precision[i], recall[i], _ = precision_recall_curve(
                y_true[:, i], y_pred[:, i])
        # Plot of a ROC curve for a specific class
        for i in range(self.classes):
            plt.figure()
            plt.plot([0, 1], [0, 0], linestyle="dashed")
            plt.plot(recall[i], precision[i], marker="o")
            plt.legend([
                "No Skill",
                "Model"
            ])
            plt.xlabel("Recall")
            plt.ylabel("Precision")
            plt.title(f"Class-{i} Precision-recall Curve")
            plt.savefig(os.path.join(self.plots_folder, f"pc_curve_{i}.png"))
        plt.close()

    def cm(
        self,
        y_true: torch.tensor,
        y_pred: torch.tensor,
    ) -> None:
        f1_score = self.f1_metric(y_pred, y_true)
        logging.info("f1: {:.4f}".format(f1_score))

        # plot Confusion Matrix
        # https://towardsdatascience.com/understanding-the-confusion-matrix-from-scikit-learn-c51d88929c79
        cm = confusion_matrix(
            y_true.cpu().numpy(),
            y_pred.cpu().numpy(),
        )
        fig = plt.figure(num=-1)
        ax = fig.add_subplot()
        sns.heatmap(cm, annot=True, fmt='g', ax=ax)
        ax.set_xlabel('Predicted')
        ax.set_ylabel('Actual')
        ax.set_title("Multiclass F1 Harmonization: {:.4f}".format(f1_score))
        fig.savefig(os.path.join(self.plots_folder, "confusion_matrix.png"))

    @ staticmethod
    def plot_callback(
        train_metric: np.ndarray,
        val_metric: np.ndarray,
        name: str, num=0
    ) -> None:
        plt.figure(num=num, figsize=(16, 4), dpi=200)
        plt.plot(val_metric)
        plt.plot(train_metric)
        plt.legend(["val_{}".format(name), "{}".format(name), ])
        plt.xlabel("# Epochs")
        plt.ylabel("{}".format(name))
        plt.title("{} LSTM, Chunked, Oversampling".format(name))
        plt.savefig("{}.png".format(
            os.path.join("plots/", name)))
        plt.close()

    @ staticmethod
    def report_to_df(report) -> pd.DataFrame:  # FIX ME
        report = re.sub(r" +", " ", report).replace("avg / total",
                                                    "avg/total").replace("\n ", "\n")
        report_df = pd.read_csv(StringIO("Classes" + report),
                                sep=' ', index_col=0, on_bad_lines='skip')
        report_df.to_csv("plots/report.csv")
        return report_df

    def report(
        self,
        y_true: np.ndarray,
        y_classes: np.ndarray,
    ) -> pd.DataFrame:
        return self.report_to_df(
            classification_report(y_true, y_classes, digits=4)
        )

    @staticmethod
    def miss_classified(
        X_test: torch.Tensor,
        y_classes: torch.Tensor,
        y_true: torch.Tensor,
        data_src: str = 'data/vgg16_emb',
        missed_out: str = 'data/missed_labels.json',
        test_set: str = None,
    ) -> None:
        """
        use numpy mesh grid for all combinations 
        """
        midx = (y_classes != y_true).nonzero().flatten()
        chunk_size = X_test.shape[1]
        X_test = X_test[midx].flatten(start_dim=0, end_dim=1)
        # logging.debug(f"{len(midx)} - {X_test.shape}")

        if len(midx) > 0 and os.path.isdir(data_src) and len(os.listdir(data_src)) != 0:
            missed_labels = {}
            for emb in test_set:
                embedding = torch.from_numpy(
                    np.load(f"{os.path.join(data_src,emb)}"))
                if embedding.shape[0] < X_test.shape[0]:
                    embedding = torch.cat((
                        embedding,
                        torch.ones((X_test.shape[0]-embedding.shape[0], X_test.shape[-1])
                                   )), dim=0)
                else:
                    X_test = torch.cat((
                        X_test,
                        torch.ones((embedding.shape[0]-X_test.shape[0], embedding.shape[-1])
                                   )), dim=0)

                idx = torch.logical_not(
                    torch.sum((X_test - embedding), dim=1)).sum().item()
                # logging.debug(f"{emb} - {idx//chunk_size} - {type(idx)}")
                missed_labels[emb] = idx//chunk_size
            json.dump(missed_labels, open(f"{missed_out}", "w"))
        return X_test[midx]

# Main functions
- extract vgg16 embeddings
- train test split on embedding names
- torch testing

In [36]:
import os
import cv2
import tqdm
import logging
import random

import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import seaborn as sns

from tensorflow.keras.applications import DenseNet121, mobilenet, vgg16, InceptionResNetV2, InceptionV3

data_base_dir = "data"
os.makedirs(data_base_dir, exist_ok=True)


def np_embed(
    video_data_dir: str,
    output_dir: str
) -> None:
    os.makedirs(output_dir, exist_ok=True)
    feature_extractor = BaseCNN()
    feature_extractor.extractor(vgg16.VGG16)  # mobilenet.MobileNet
    for path in tqdm.tqdm(os.listdir(video_data_dir)):
        if os.path.exists(os.path.joinm(output_dir, "{}.npy".format(path))):
            continue

        vidcap = cv2.VideoCapture(os.path.join(video_data_dir, path))
        success, image = vidcap.read()

        embeddings = ()
        while success:
            embeddings += (
                feature_extractor.get_embed_cpu(
                    cv2.resize(image, (200, 200)), batched=False
                ).flatten(),)
            success, image = vidcap.read()

        embeddings = np.array(embeddings)
        logging.info(f"{path} / {embeddings.shape}")

        real_name = path.split(".mp4")[0]  # mapping[path.split(".mp4")[0]]

        np.save(os.path.join(output_dir, real_name), embeddings)

def preprocessing(
    label_path: str,
    data_dir: str,
    cache_path: str,
) -> Tuple[np.ndarray, np.ndarray]:

    if os.path.exists("/{}.npz".format(cache_path)):
        __cache__ = np.load("/{}.npz".format(cache_path), allow_pickle=True)
        return tuple(__cache__[k] for k in __cache__)

    raw_labels = json.load(open(label_path, "r"))

    embedding_path_list = sorted([
        x for x in os.listdir(data_dir)
        if x.replace(".npy", "") in raw_labels
    ])
    # video_0B061FQCB00136_barbet_07-07-2022_00-12-11-280.npy <- file not found, not enough space using my google drive
    false_positives_vid = [
        '17271FQCB00002_video_6.npy',
        'video_0B061FQCB00136_barbet_07-07-2022_00-05-51-678.npy',
        'video_0B061FQCB00136_barbet_07-07-2022_00-12-11-280.npy',
        'video_0B061FQCB00136_barbet_07-21-2022_15-37-32-891.npy',
        'video_0B061FQCB00136_barbet_07-21-2022_14-17-42-501.npy',
        'video_03121JEC200057_sunfish_07-06-2022_23-18-35-286.npy'
    ]
    embedding_list_test = random.sample(list(set(embedding_path_list) - set(false_positives_vid)),int(len(embedding_path_list)*0.1))
    embedding_list_test += false_positives_vid
    embedding_list_val = embedding_list_test

    embedding_list_train = list(
        set(embedding_path_list) - set(embedding_list_test))

    length = max([len(embedding_list_test), len(
        embedding_list_val), len(embedding_list_train)])
    pd.DataFrame({
        "train": tuple(embedding_list_train) + ("",) * (length - len(embedding_list_train)),
        "val": tuple(embedding_list_val) + ("",) * (length - len(embedding_list_val)),
        "test": tuple(embedding_list_test) + ("",) * (length - len(embedding_list_test))
    }).to_csv("{}.csv".format(cache_path))

    np.savez(cache_path, embedding_list_train,
             embedding_list_val, embedding_list_test)


def load_metrics(load_path):
    if load_path == None:
        return

    state_dict = torch.load(load_path, map_location=device)
    logging.info(f'Model loaded from <== {load_path}')

    return torch.Tensor(state_dict['loss_callback']).numpy(),\
        torch.Tensor(state_dict['f1_callback']).numpy(),\
        torch.Tensor(state_dict['val_loss_callback']).numpy(),\
        torch.Tensor(state_dict['val_f1_callback']).numpy()

def torch_testing(
    ds_test: Streamer,
    model: torch.nn.Module,
    objective: Callable = torch.nn.Softmax(),
    classes: int = 2,
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    save_path: str = 'model0',
) -> None:
    logging.getLogger('matplotlib').setLevel(logging.WARNING)
    metrics = Evaluation(plots_folder="plots/", classes=classes)

    model.eval()

    y_pred, y_true = (), ()
    with torch.no_grad():
        for (x0, y0) in ds_test:
            x0, y0 = x0.to(device), y0.to(device)
            output = model(x0)
            y_pred += (objective(output),)
            y_true += (y0,)

    y_pred, y_true = torch.cat(y_pred, dim=0), torch.cat(y_true, dim=0)
    y_classes = torch.topk(y_pred, k=1, dim=1).indices.flatten()
    metrics.cm(y_true.detach(), y_classes.detach())

    y_pred, y_true = (y_pred).cpu().numpy(), y_true.cpu().numpy()
    y_bin = np.zeros((y_true.shape[0], classes))
    idx = np.array([[i] for i in y_true])
    np.put_along_axis(y_bin, idx, 1, axis=1)

    metrics.roc_auc(y_bin, y_pred)
    metrics.pr_curve(y_bin, y_pred)

    loss, f1, val_loss, val_f1 = load_metrics(f"{save_path}/metrics.pth")
    metrics.plot_callback(loss, val_loss, "loss", num=43)
    metrics.plot_callback(f1, val_f1, "f1", num=42)
    metrics.report(y_true, y_classes.cpu().numpy())

# Script


In [37]:
label_path = "/content/drive/MyDrive/google_cv/flicker_detection/plots/new_label.json"
data_path = "/content/drive/MyDrive/google_cv/flicker_detection/plots/vgg16_emb"
videos_path = "/content/drive/MyDrive/google_cv/Flicker_Videos/0824"
cache_path = "/content/drive/MyDrive/google_cv/flicker_detection/plots/.cache"
model_path = "/content/drive/MyDrive/google_cv/flicker_detection/plots/model0/"

# np_embed(
#     videos_path,
#     data_path,
# )

preprocessing(
    label_path,
    data_path,
    cache_path,
)

    

(array(['04011FDD4000FC_video_0_7b1ffc26-10f0-4678-a72b-d6f88692e900.npy',
        '07141FDD40022L_video_0_b563013b-832f-4d3b-85a9-95b221c88454.npy',
        '95281FFBA0006N_video_0_f589f9bd-9f38-4c83-8917-9081ae176447_2.npy',
        '06051FDD4000K6_video_0_b563013b-832f-4d3b-85a9-95b221c88454.npy',
        '04021FDD40000Q_video_0_0122ff53-396c-415d-8ef7-c07234f21572.npy',
        '04021FDD40000Q_video_0_17e3ff08-9da2-4b2e-af05-3723238a870f.npy',
        '95281FFBA0006N_video_0_56140057-ae22-4822-8e72-f20a9862a98c.npy',
        '00_flicker_issue_00_00_32 - 00_00_33_606ae6bf-cd77-4792-9cf6-82277c3b1416.npy',
        '0A271FQCB00132_video_0_a01b478b-fb37-4283-8339-0ab71b2f321c.npy',
        '92SBA06321_video_0_b9b480c1-7c0d-41c2-a16b-7a00ecae70cb.npy',
        '00_flicker_issues_00_00_28 - 00_00_29_f589f9bd-9f38-4c83-8917-9081ae176447.npy',
        '06041FQCB00116_video_0_ec94c3fc-240e-4b7b-b2b4-3e6b70e859d4.npy',
        '07141FDD40022L_video_2_a79d380c-9352-42cd-9f7e-ca08dda19e87.npy'

In [38]:
__cache__ = np.load("{}.npz".format(cache_path), allow_pickle=True)

embedding_list_train, embedding_list_val, embedding_list_test = tuple(
    __cache__[lst] for lst in __cache__)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

chunk_size = 30
batch_size = 1024
input_dim = 18432
output_dim = 2
hidden_dim = 64 # pretrained model hidden -> 128
layer_dim = 1
bidirectional = True
multiclass = False
overlap_chunking = True

ds_test = Streamer(
        embedding_list_test,
        label_path,
        data_path,
        mem_split=1,
        chunk_size=chunk_size,
        batch_size=batch_size,
        sampler=None,
        overlap_chunking=overlap_chunking,
    )

model = LSTM(
        input_dim=input_dim,
        output_dim=output_dim,
        hidden_dim=hidden_dim,
        layer_dim=layer_dim,
        bidirectional=bidirectional,
    )
dic = torch.load(os.path.join(model_path, 'model.pth'))['model_state_dict']
for key in dict(dic).keys():
    dic[key.replace("module.","")] = dic.pop(key)

print(dic.keys())
print(model.eval())

model.load_state_dict(dic)
torch_testing(ds_test, model,
                    device=device, classes=2, save_path=model_path)

odict_keys(['lstm.weight_ih_l0', 'lstm.weight_hh_l0', 'lstm.bias_ih_l0', 'lstm.bias_hh_l0', 'lstm.weight_ih_l0_reverse', 'lstm.weight_hh_l0_reverse', 'lstm.bias_ih_l0_reverse', 'lstm.bias_hh_l0_reverse', 'fc1.weight', 'fc1.bias', 'fc2.weight', 'fc2.bias'])
LSTM(
  (lstm): LSTM(18432, 64, batch_first=True, bidirectional=True)
  (fc1): Linear(in_features=128, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=2, bias=True)
)


FileNotFoundError: ignored