[Reference](https://unhypedai.medium.com/control-charts-for-anomaly-detection-75b615ab8f9b)

# Data Preparation


In [None]:
from os import listdir
from os.path import join, isdir
import numpy as np
from PIL import Image

class Config:
  DATASET_PATH ="UCSD_Anomaly_Dataset.v1p2/UCSDped1/Train"
  SINGLE_TEST_PATH = "UCSD_Anomaly_Dataset.v1p2/UCSDped1/Test/Test001"
  SEQUENCE_SIZE = 10
  NO_OF_STRIDES = 3
  INPUT_SIZE = 256
  BATCH_SIZE = 4
  EPOCHS = 3
  MODEL_PATH = "model.hdf5"

def get_strided_frames(stride, frames_list):
    """ For data augmenting purposes.
    Parameters
    ----------
    stride : int
        The stride between two consecutive frames
    frames_list : list
        A list of sorted frames in a folder each of shape 256 X 256
    Returns
    -------
    list
        A list of clips , each of desired sequence size length
    """
    strided_frames = []
    clip = np.zeros(shape=(Config.SEQUENCE_SIZE, Config.INPUT_SIZE, Config.INPUT_SIZE, 1))
    count = 0
    for start in range(Config.NO_OF_STRIDES):
        for i in range(start, len(frames_list), Config.NO_OF_STRIDES):
            clip[count, :, :, 0] = frames_list[i]
            count = count + 1
            if count == Config.SEQUENCE_SIZE:
                strided_frames.append(np.copy(clip))
                count = 0
    return strided_frames


def get_training_data():
    """ For getting list of all training data
    Returns
    -------
    list
        A list of training sequences of shape (NUMBER_OF_SEQUENCES,SINGLE_SEQUENCE_SIZE,FRAME_WIDTH,FRAME_HEIGHT,1) 
        Used for training the model
    """
    training_data = []
    # loop over the training folders (Train000,Train001,..)
    for folder_name in sorted(listdir(Config.DATASET_PATH)):
        folder_path = join(Config.DATASET_PATH, folder_name)
        if isdir(folder_path):
            frames_list = []
            # loop over all the images in the folder (0.tif,1.tif,..,199.tif)
            for file_name in sorted(listdir(folder_path)):
                file_path = join(folder_path, file_name)
                if str(file_path)[-3:] == "tif":
                    # read image and resize it to desired input size
                    img = Image.open(file_path).resize((Config.INPUT_SIZE, Config.INPUT_SIZE))
                    # normalization
                    img = np.array(img, dtype=np.float32) / 256.0
                    frames_list.append(img)
            # get the sequences from the list of images after applying data augmentation
            for stride in range(1, Config.NO_OF_STRIDES):
                training_data.extend(get_strided_frames(stride=stride, frames_list=frames_list))
    return training_data

def get_training_data_subset():
    """ For getting list of all training data
    Returns
    -------
    list
        A list of training sequences of shape (NUMBER_OF_SEQUENCES,SINGLE_SEQUENCE_SIZE,FRAME_WIDTH,FRAME_HEIGHT,1) 
        Used for finding Center, Upper limit and Lower limit
    """
    training_data = []
    # loop over the training folders (Train000,Train001,..)
    for folder_name in sorted(listdir(Config.DATASET_PATH)):
        folder_path = join(Config.DATASET_PATH, folder_name)
        if isdir(folder_path):
            frames_list = []
            # loop over all the images in the folder (0.tif,1.tif,..,199.tif)
            for file_name in sorted(listdir(folder_path)):
                file_path = join(folder_path, file_name)
                if str(file_path)[-3:] == "tif":
                    # read image and resize it to desired input size
                    img = Image.open(file_path).resize((Config.INPUT_SIZE, Config.INPUT_SIZE))
                    # normalization
                    img = np.array(img, dtype=np.float32) / 256.0
                    frames_list.append(img)
            # get the sequences from the list of images after applying data augmentation
            for stride in range(1, Config.NO_OF_STRIDES):
                training_data.extend(get_strided_frames(stride=stride, frames_list=frames_list))
                if len(training_data)==200:
                  break
    return training_data

# Training the model


In [None]:
# import tensorflow.keras
# from tensorflow.keras.layers import Conv2DTranspose, ConvLSTM2D, BatchNormalization, TimeDistributed, Conv2D
# from tensorflow.keras.models import Sequential, load_model
# from keras_layer_normalization import LayerNormalization

# def get_model(reload_model=True):
#     """
#     Parameters
#     ----------
#     reload_model : bool
#         Load saved model or retrain it
#     """
#     if not reload_model:
#         return load_model(Config.MODEL_PATH,custom_objects={'LayerNormalization': LayerNormalization})
#     training_set = get_training_set()
#     training_set = np.array(training_set)
#     seq = Sequential()
#     # # # # # Encoder
#     seq.add(TimeDistributed(Conv2D(128, (11, 11), strides=4, padding="same"), batch_input_shape=(None, 10, 256, 256, 1)))
#     seq.add(LayerNormalization())
#     seq.add(TimeDistributed(Conv2D(64, (5, 5), strides=2, padding="same")))
#     seq.add(LayerNormalization())
#     # # # # # Bottleneck
#     seq.add(ConvLSTM2D(64, (3, 3), padding="same", return_sequences=True))
#     seq.add(LayerNormalization())
#     seq.add(ConvLSTM2D(32, (3, 3), padding="same", return_sequences=True))
#     seq.add(LayerNormalization())
#     seq.add(ConvLSTM2D(64, (3, 3), padding="same", return_sequences=True))
#     seq.add(LayerNormalization())
#     # # # # # Decoder
#     seq.add(TimeDistributed(Conv2DTranspose(64, (5, 5), strides=2, padding="same")))
#     seq.add(LayerNormalization())
#     seq.add(TimeDistributed(Conv2DTranspose(128, (11, 11), strides=4, padding="same")))
#     seq.add(LayerNormalization())
#     seq.add(TimeDistributed(Conv2D(1, (11, 11), activation="sigmoid", padding="same")))
#     print(seq.summary())
#     seq.compile(loss='mse', optimizer=tensorflow.keras.optimizers.Adam(lr=1e-4, decay=1e-5, epsilon=1e-6))
#     seq.fit(training_set, training_set,
#             batch_size=Config.BATCH_SIZE, epochs=Config.EPOCHS, shuffle=False)
#     seq.save(Config.MODEL_PATH)
    return seq

# Determining Control Line, Lower Limit Line, and Upper Limit Line


In [None]:
model=get_model(False)
training_set = get_training_data_subset()
training_set = np.array(training_set)
training_set = training_set.reshape(-1,10,256,256,1)

data_points = []
for train_data in tqdm(training_set):
    train_data = train_data.reshape(-1,10,256,256,1)
    reconstructed_data = model.predict(train_data)
    sequences_reconstruction_cost = np.array(np.average(np.subtract(train_data, reconstructed_data)))
    data_points.append(sequences_reconstruction_cost)

center_line = np.mean(data_points)
print('Center Line is: ' + str(center_line))
sd = np.std(data_points_line)
print('Standard deviation is: ' + str(sd))
lim_upper = center_line + sd
print('Upper control limit is: ' + str(lim_upper))
lim_lower = center_line - sd
print ('Lower control limit is: ' + str(lim_lower))

# Detecting Anomaly with Control charts


In [None]:
import matplotlib.pyplot as plt

def get_single_test():
    sz = 200
    test = np.zeros(shape=(sz, 256, 256, 1))
    cnt = 0
    for f in sorted(listdir(Config.SINGLE_TEST_PATH)):
        if str(join(Config.SINGLE_TEST_PATH, f))[-3:] == "tif":
            img = Image.open(join(Config.SINGLE_TEST_PATH, f)).resize((256, 256))
            img = np.array(img, dtype=np.float32) / 256.0
            test[cnt, :, :, 0] = img
            cnt = cnt + 1
    return test

test = get_single_test()
sz = test.shape[0] - 10 + 1
sequences = np.zeros((sz, 10, 256, 256, 1))
# apply the sliding window technique to get the sequences
for i in range(0, sz):
    clip = np.zeros((10, 256, 256, 1))
    for j in range(0, 10):
        clip[j] = test[i + j, :, :, :]
    sequences[i] = clip

reconstructed_sequences = model.predict(sequences,batch_size=4)
sequences_reconstruction_cost_res = np.array([np.average(np.subtract(sequences[i],reconstructed_sequences[i])) for i in range(0,sz)])


plt.plot(sequences_reconstruction_cost_res)
plt.hlines(y=center_avg, xmin=0, xmax=200, colors='red', linestyles='-', lw=2, label='Center Line')
plt.hlines(y=lim_lower_avg, xmin=0, xmax=200, colors='aqua', linestyles='-', lw=2, label='Lower Limit Line')
plt.hlines(y=lim_upper_avg, xmin=0, xmax=200, colors='aqua', linestyles='-', lw=2, label='Upper Limit Line')
plt.ylabel('Reconstruction normality')
plt.xlabel('frame t')
plt.title('Anomaly Detection with Control Chart')
plt.show()