# Few-shot Anomaly Detection

## Some short descriptions:


- The test dataset should be prepared as follows (similar to the dataset used in training):
 
 - test_dataset (a folder that contains the test videos in the form of frame images)
   - 0 (folder 0 represents a test scenario, one video under this folder would be enough; however, if you want to have more test videos under this folder, **these videos must be captured from the same camera view and it should not be a mixture of different scenarios**)
     - video_frames (a folder that contains the frame images)
   
   ...
   
- The finetuning process: 

 - a 3-frame video sequence is passed into the pre-trained model for finetuning,
 
 - the finetuned model is saved into the `model` folder,
 
 - and after that the rest frames are passed into the finetuned model for frame prediction and anomaly scoring.

- Each input is a 4-frame video sequence in the form of frame images, and the first 3 frames are used for the prediction of the 4-th frame.

- The predicted frame is compared with the actual frame, and if the difference between the predicted frame and the actual frame is greater than a threshold (currently we use 0.6 at this stage), show this frame.


## Suggestions for improvements

The ways to improve the performance of our model are:

- The training dataset is better to have more scenarios;

- the current dataset is suffering from the lower resolution.

### -- Load necessary packages

In [1]:
from __future__ import print_function
import matplotlib.pyplot as plt
import argparse
import torch
import torch.utils.data
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import Dataset,DataLoader
from torchvision import datasets, transforms, models
from tqdm import tqdm
from PIL import Image
import numpy as np
import ast
from torch.nn import functional as F
import os
import random
import torch.utils.data
import torchvision.utils as vutils
import torch.backends.cudnn as cudn
from torch.nn import functional as F
from unet_parts import *
# from scipy.misc import imsave
import torch.nn as nn
import ast
import sys
import imageio
# from skimage import img_as_ubyte
from sklearn.metrics import roc_curve, auc

import cv2

from rGAN import Generator, Discriminator
from dataset import TrainingDataset
from utils import createEpochData, roll_axis, loss_function, create_folder, prep_data, createEpochDataTest

# load functions from the training script for the finetuning of the model
from train import Load_Dataloader, overall_generator_pass, overall_discriminator_pass, meta_update_model
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

In [2]:
def overall_generator_pass_test(generator, discriminator, img, gt, valid):
    # print(len(img), gt.shape)
    recon_batch = generator(img)
    recon_batch = (recon_batch-recon_batch.min()) / (recon_batch.max() - recon_batch.min())
    gt = (gt-gt.min()) / (gt.max()-gt.min())
    msssim, f1, psnr = loss_function(recon_batch, gt)
    # print(msssim, f1, psnr)
    
    imgs = recon_batch.data.cpu().numpy()[0, :]
    imgs = roll_axis(imgs)
    
    return imgs, psnr

### -- Visualization functions for ground truth and predicted frame images

We first visualize the first 3 frame video sequence (for the videos used in finetuning and testing/validation stage).

In [3]:
%matplotlib inline
import matplotlib.pyplot as plt

def frame_visualization(img):
    for frame in range(len(img)):
        one_img = np.squeeze(img[frame])
        one_img = one_img.cpu()
        one_img = np.transpose(one_img, (1, 2, 0))
        plt.imshow(one_img)
        plt.axis('off')
        plt.show()
        
def pred_frame_visualization(img):
    plt.imshow(img)
    plt.axis('off')
    plt.show()

### -- Main test functions (including finetuning and validation/testing)
- **Before running the following codes, you should update the `frame_path` to the test dataset you prepared as I mentioned in the beginning of this jupyter notebook.**
- the threshold value for defining the anomaly is set as 0.85 at this stage for testing purposes, this value can be set to 0.9, 1.0, etc.
- we use K-shot = 10 to test 10 videos, to test 100 videos just change K-shot = 100 in main function.

In [13]:
"""TEST SCRIPT"""


num_tasks = 1
adam_betas = (0.5, 0.999)
gen_lr = 2e-4
dis_lr = 1e-5
model_folder_path = "model"
torch.manual_seed(1)
batch_size = 1
    
generator = Generator(batch_size=batch_size) 
discriminator = Discriminator()
generator.cuda()
discriminator.cuda()


k_shots = 1000

# define dataloader
tf = transforms.Compose([transforms.Resize((256,256)),transforms.ToTensor()])

create_folder(model_folder_path)
generator_path = os.path.join(model_folder_path, str.format("Generator_finetuned_liyun1500_shtech.pt"))
discriminator_path = os.path.join(model_folder_path, str.format("Discriminator_finetuned_liyun1500_shtech.pt"))

# load the pre-trained model
print('- start loading pre-trained model')

generator.load_state_dict(torch.load(generator_path))
discriminator.load_state_dict(torch.load(discriminator_path))
# if you use CPU
#     generator.load_state_dict(torch.load(generator_path, map_location=torch.device('cpu')))
#     discriminator.load_state_dict(torch.load(discriminator_path, map_location=torch.device('cpu')))
    
print('- loading pretrained model done')
frame_path = r'C:\Users\liyun\Desktop\testing\frames'
dirs = os.listdir(frame_path)
dirs = [d for d in dirs if d != '.DS_Store']  # Exclude '.DS_Store'
dirs.sort(key=int)
print(dirs)
auc_list = []
    
# the test dataloader
train_path_list = createEpochDataTest(frame_path, num_tasks, 10)
print(train_path_list)
train_dataloader = Load_Dataloader(train_path_list, tf, batch_size)

    
# Meta-Validation
print ('\n Meta Validation/Test \n')

# forward pass
    
for i, epoch_of_tasks in enumerate(train_dataloader):
    epoch_results = 'results'# .format(epoch+1)
    create_folder(epoch_results)
#     print(epoch_of_tasks)
        
    for tidx, task in enumerate(epoch_of_tasks):
        
        print(os.path.join(frame_path, dirs[tidx]))
        
        frame_mask_path = r'C:\Users\liyun\Desktop\testing\test_frame_mask'
        npy_path = os.path.join(frame_mask_path, dirs[tidx])
        loaded_array = np.load(npy_path + '.npy')
        
        s_list = []

        with torch.no_grad():
            generator.eval()
            discriminator.eval()
                
            # Meta-Validation
            print ('\n Meta Validation/Test \n')

            for vidx, val_frame_sequence in tqdm(enumerate(task[-k_shots:]), total = len(task[-k_shots:])):
                # print(vidx)
#                 print(val_frame_sequence)
                if vidx == 0:
                    dummy_frame_sequence = val_frame_sequence
                        
                if 1: # vidx % 2 == 0:
                    img = val_frame_sequence[0]

                    # frame_visualization(img)
                    gt = val_frame_sequence[1]
                        
                    img, gt, valid, fake = prep_data(img, gt)

                    # k-Validation Generator
                    imgs, psnr = overall_generator_pass_test(generator, discriminator, img, gt, valid)
                    img_path = os.path.join(epoch_results,'{}-fig-val{}.png'.format(tidx+1, vidx+1))
                    # imsave(img_path , imgs)

                    # imgs = imgs.astype(np.uint8)
                    # imgs = (imgs-np.min(imgs))/(np.max(imgs) - np.min(imgs))
                    imgs = cv2.normalize(imgs, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U)
                    # imageio.imwrite(img_path , img_as_ubyte(imgs))
                        
                    s_list.append(psnr)
#                     print('Frame No.: ', vidx, ' ...')



        normalized_psnr_list = []
        y_score = np.array(s_list)
        y_gt = loaded_array[3:]
        print(y_gt)

        for psnr in y_score:
            normalized_psnr = 1 - (psnr - np.min(y_score)) / (np.max(y_score) - np.min(y_score))
            normalized_psnr_list.append(normalized_psnr)
        y_psnr_scores = np.array(normalized_psnr_list)
#         print(normalized_psnr_list)
        fpr, tpr, thresholds = roc_curve(y_gt, y_psnr_scores)
#         print(thresholds)

        roc_auc = auc(fpr, tpr)
        auc_list.append(roc_auc)

        print("AUC:", roc_auc)

        plt.figure(figsize=(8, 8))
        plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = {:.2f})'.format(roc_auc))
        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
        plt.xlabel('False Positive Rate (FPR)')
        plt.ylabel('True Positive Rate (TPR)')
        plt.title('Receiver Operating Characteristic (ROC) Curve')
        plt.legend(loc='lower right')
        plt.show()
    
    
    

        x_values = range(len(y_psnr_scores))

        plt.plot(x_values, y_psnr_scores, marker='o', linestyle='-')

        plt.title('The line chart')
        plt.xlabel('i-th frame')
        plt.ylabel('anomaly score')
        # plt.axhline(y=0.98, color='r', linestyle='--', label='y=0.98')
        plt.show()
        print(auc_list)

print(auc_list)

- start loading pre-trained model
- loading pretrained model done
['01_0141', '01_0162', '01_0163', '01_0177', '02_0128', '02_0161', '02_0164', '03_0031', '03_0032', '03_0033', '03_0035', '03_0036', '03_0039', '03_0041', '03_0059', '03_0060', '03_0061', '04_0001', '04_0003', '04_0004', '04_0010', '04_0011', '04_0012', '04_0013', '04_0046', '04_0050', '05_0017', '05_0018', '05_0019', '05_0020', '05_0021', '05_0022', '05_0023', '05_0024', '06_0144', '06_0145', '06_0147', '06_0150', '06_0153', '06_0155', '07_0005', '07_0006', '07_0007', '07_0008', '07_0009', '07_0047', '07_0048', '07_0049', '08_0044', '08_0058', '08_0077', '08_0078', '08_0079', '08_0080', '08_0156', '08_0157', '08_0158', '08_0159', '08_0178', '08_0179', '09_0057', '10_0037', '10_0038', '10_0042', '10_0074', '10_0075', '11_0176', '12_0142', '12_0143', '12_0148', '12_0149', '12_0151', '12_0152', '12_0154', '12_0173', '12_0174', '12_0175']
----------- selected videos:  ['C:\\Users\\liyun\\Desktop\\testing\\frames\\01_0141']


100%|██████████████████████████████████████████████████████████████████████████████████| 20/20 [00:10<00:00,  1.98it/s]


[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0]


ValueError: Found input variables with inconsistent numbers of samples: [310, 20]

In [None]:
a = [0.8864601018675722, 0.7176496478873239, 0.8544747433636323, 0.6998158379373849, 0.3807155322862129, 0.7834581569521328, 0.8775240384615385, 0.9263285566686134, 0.24946601941747573]
round1 = [0.8392756083757782, 0.649452269170579, 0.8593587482476371, 0.6822702159718734, 0.36906631762652703, 0.837959947899707, 0.8686698717948719, 0.9246664570324784, 0.2219660194174757, 0.6522238163558105]
round2 = [0.80398991057097, 0.9644807150490624, 0.6899177729464492, 0.5870643642072213, 0.35214549587951116, 0.9266185347378388, 0.8172229725384269, 0.8355593124823893, 0.8765978367748279, 0.9936588921282798]
round3 = [0.9011524822695036, 0.8963072055855561, 0.933464696223317, 0.8169014084507042, 0.686618863761721, 0.9551117938214713, 0.7301129140806102, 0.5254385964912281, 0.9528343086732466, 0.9570776255707762]
round4 = [0.8089291680588039, 0.8445885509838998, 0.8303990610328639, 0.8926791677369638, //
          0.5881070249882132, 0.23456937799043062, 0.39896616541353386, 0.3704577968526466, 0.5126456876456876, 0.34292084726867333]

mean_a = sum(a) / len(a)
mean_b = sum(round3) / len(round3)

print("Mean of list a:", mean_a)
print("Mean of list b:", mean_b)