#Introduction

This notebook is a easy guide for training and testing of anomaly detection based on MVTec https://github.com/AdneneBoumessouer/MVTec-Anomaly-Detection.

In [None]:
!pip3 install ktrain

###Link Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [6]:
%cd /content/drive/MyDrive/MVTec-Anomaly-Detection-For-Industry

/content/drive/MyDrive/MVTec-Anomaly-Detection-For-Industry


#Dataset 

### Directory Structure using your own dataset

To train with your own dataset, you need to have a comparable directory structure. For example:

In [7]:
#data
  #├── class1
  # │   ├── test
  # │   │   ├── good
  # │   │   ├── defect
  # │   └── train
  # │       └── good
  # ├── class2
  # │   ├── test
  # │   │   ├── good
  # │   │   ├── defect
  # │   └── train
  # │       └── good
  # ...


#Training 

In [None]:
# During training, the CAE trains exclusively on defect-free images and learns to reconstruct (predict) defect-free training samples.

# usage: train.py [-h] -d [-a] [-c] [-l] [-b] [-i]

# optional arguments:

# -h, --help show this help message and exit

# -d , --input-dir directory containing training images

# -a , --architecture architecture of the model to use for training: 'mvtecCAE', 'baselineCAE', 'inceptionCAE' or 'resnetCAE'

# -c , --color color mode for preprocessing images before training: 'rgb' or 'grayscale'

# -l , --loss loss function to use for training: 'mssim', 'ssim' or 'l2'

# -b , --batch batch size to use for training

# -i, --inspect generate inspection plots after training

# Example usage:

# python3 train.py -d mvtec/capsule -a mvtecCAE -b 8 -l ssim -c grayscale


In [None]:
!python3 train.py -d data/can -a baselineCAE -b 8 -l ssim -c grayscale

#Finetune 

In [None]:
# This script used a subset of defect-free training images and a subset of both defect and defect-free test images to determine good values for minimum defect area and threshold pair of parameters that will be used during testing for classification and segmentation.

# usage: finetune.py [-h] -p [-m] [-t]

# optional arguments: -h, --help show this help message and exit

# -p , --path path to saved model

# -m , --method method for generating resmaps: 'ssim' or 'l2'

# -t , --dtype datatype for processing resmaps: 'float64' or 'uint8'

In [None]:
!python3 finetune.py -p path/to/saved/model -m ssim -t float64

#Testing

In [None]:
# This script classifies test images using the minimum defect area and threshold previously approximated at the finetuning step.

# usage: test.py [-h] -p [-s]

# optional arguments: -h, --help show this help message and exit

# -p , --path path to saved model

# -s, --save save segmented images

In [None]:
!python3 test.py -p path/to/model

#Inference 

In [7]:
from skimage.measure import compare_ssim
import sys
import os
import argparse
from pathlib import Path
import shlex
import time
import json
import tensorflow as tf
from processing import utils
from processing import postprocessing
from processing.preprocessing import Preprocessor
from processing.preprocessing import get_preprocessing_function
from processing.postprocessing import label_images
from processing.utils import printProgressBar
from skimage.util import img_as_ubyte
from sklearn.metrics import confusion_matrix
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import logging

In [8]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [9]:

def get_true_classes(filenames):
    # retrieve ground truth
    y_true = [1 if "good" not in filename.split("/") else 0 for filename in filenames]
    return y_true


def is_defective(areas, min_area):
    """Decides if image is defective given the areas of its connected components"""
    areas = np.array(areas)
    if areas[areas >= min_area].shape[0] > 0:
        return 1
    return 0


def predict_classes(resmaps, min_area, threshold):
    # threshold residual maps with the given threshold
    resmaps_th = resmaps > threshold
    # compute connected components
    _, areas_all = label_images(resmaps_th)
    # Decides if images are defective given the areas of their connected components
    y_pred = [is_defective(areas, min_area) for areas in areas_all]
    return y_pred


def save_segmented_images(resmaps, threshold, filenames, save_dir):
    # threshold residual maps with the given threshold
    resmaps_th = resmaps > threshold
    # create directory to save segmented resmaps
    seg_dir = os.path.join(save_dir, "segmentation")
    if not os.path.isdir(seg_dir):
        os.makedirs(seg_dir)
    # save segmented resmaps
    for i, resmap_th in enumerate(resmaps_th):
        fname = utils.generate_new_name(filenames[i], suffix="seg")
        fpath = os.path.join(seg_dir, fname)
        plt.imsave(fpath, resmap_th, cmap="gray")
    return



In [11]:

def main(args):
    # parse arguments
    model_path = args.path

    # ============= LOAD MODEL AND PREPROCESSING CONFIGURATION ================

    # load model and info
    model, info, _ = utils.load_model_HDF5(model_path)
    # set parameters
    input_directory = info["data"]["input_directory"]
    architecture = info["model"]["architecture"]
    loss = info["model"]["loss"]
    rescale = info["preprocessing"]["rescale"]
    shape = info["preprocessing"]["shape"]
    color_mode = info["preprocessing"]["color_mode"]
    vmin = info["preprocessing"]["vmin"]
    vmax = info["preprocessing"]["vmax"]
    nb_validation_images = info["data"]["nb_validation_images"]

    # =================== LOAD VALIDATION PARAMETERS =========================

    # model_dir_name = os.path.basename(str(Path(model_path).parent))
    # finetune_dir = os.path.join(
    #     os.getcwd(),
    #     "results",
    #     input_directory,
    #     architecture,
    #     loss,
    #     model_dir_name,
    #     "finetuning",
    # )
    # subdirs = os.listdir(finetune_dir)
    # for subdir in subdirs:
    #     logger.info(
    #         "testing with finetuning parameters from \n{}...".format(
    #             os.path.join(finetune_dir, subdir)
    #         )
    #     )
    #     try:
    #         with open(
    #             os.path.join(finetune_dir, subdir, "finetuning_result.json"), "r"
    #         ) as read_file:
    #             validation_result = json.load(read_file)
    #     except FileNotFoundError:
    #         logger.warning("run finetune.py before testing.\nexiting script.")
    #         sys.exit()

    min_area = 2
    threshold = 0.2
    method = 0.2
    dtype = 'float64'

    #     # ====================== PREPROCESS TEST IMAGES ==========================

    #     # get the correct preprocessing function
    preprocessing_function = get_preprocessing_function(architecture)

    # # initialize preprocessor
    preprocessor = Preprocessor(
        input_directory=input_directory,
        rescale=rescale,
        shape=shape,
        color_mode=color_mode,
        preprocessing_function=preprocessing_function,
    )

    # get test image
    img = preprocessor.get_test_image(args.img_pth)
    plt.imshow(img.reshape(128,128), cmap='gray')
    plt.show()
    #predict on test image
    start_time = time.time()

    imgs_test_pred = model.predict(img)
    map = np.sqrt((imgs_test_pred - img)**2)

    print("--- %s seconds ---" % (time.time() - start_time))

    plt.imshow(imgs_test_pred.reshape(128,128), cmap='gray')
    plt.show()

    map = np.where(map> 0.5,1,0)
    plt.imshow(map.reshape(128,128), cmap='gray')
    plt.show()

    (score, diff ) = compare_ssim(imgs_test_pred.reshape(128,128)*255, img.reshape(128,128)*255, full=True)
    diff = (diff * 255) 
    print(score)
    plt.imshow(diff, cmap = 'gray')
    plt.show()

    #     # instantiate TensorImages object
    #     tensor_test = postprocessing.TensorImages(
    #         imgs_input=imgs_test_input,
    #         imgs_pred=imgs_test_pred,
    #         vmin=vmin,
    #         vmax=vmax,
    #         method=method,
    #         dtype=dtype,
    #         filenames=filenames,
    #     )

    #     # ====================== CLASSIFICATION ==========================

    #     # retrieve ground truth
    #     y_true = get_true_classes(filenames)

    #     # predict classes on test images
    #     y_pred = predict_classes(
    #         resmaps=tensor_test.resmaps, min_area=min_area, threshold=threshold
    #     )

    #     # confusion matrix
    #     tnr, fp, fn, tpr = confusion_matrix(y_true, y_pred, normalize="true").ravel()

    #     # initialize dictionary to store test results
    #     test_result = {
    #         "min_area": min_area,
    #         "threshold": threshold,
    #         "TPR": tpr,
    #         "TNR": tnr,
    #         "score": (tpr + tnr) / 2,
    #         "method": method,
    #         "dtype": dtype,
    #     }

    #     # ====================== SAVE TEST RESULTS =========================

    #     # create directory to save test results
    #     save_dir = os.path.join(
    #         os.getcwd(),
    #         "results",
    #         input_directory,
    #         architecture,
    #         loss,
    #         model_dir_name,
    #         "test",
    #         subdir,
    #     )

    #     if not os.path.isdir(save_dir):
    #         os.makedirs(save_dir)

    #     # save test result
    #     with open(os.path.join(save_dir, "test_result.json"), "w") as json_file:
    #         json.dump(test_result, json_file, indent=4, sort_keys=False)

    #     # save classification of image files in a .txt file
    #     classification = {
    #         "filenames": filenames,
    #         "predictions": y_pred,
    #         "truth": y_true,
    #         "accurate_predictions": np.array(y_true) == np.array(y_pred),
    #     }
    #     df_clf = pd.DataFrame.from_dict(classification)
    #     with open(os.path.join(save_dir, "classification.txt"), "w") as f:
    #         f.write(
    #             "min_area = {}, threshold = {}, method = {}, dtype = {}\n\n".format(
    #                 min_area, threshold, method, dtype
    #             )
    #         )
    #         f.write(df_clf.to_string(header=True, index=True))

    #     # print classification results to console
    #     with pd.option_context("display.max_rows", None, "display.max_columns", None):
    #         print(df_clf)

    #     # save segmented resmaps
    #     #if save:
    #     save_segmented_images(tensor_test.resmaps, threshold, filenames, save_dir)

    #     # print test_results to console
    #     print("test results: {}".format(test_result))





In [None]:
if __name__ == "__main__":
    # create parser
    parser = argparse.ArgumentParser()
    
    parser.add_argument("-p",
        "--path", type=str, default= 'saved_models/ad_data/can/baselineCAE/ssim/06-05-2021_02-02-52/baselineCAE_b2_e100.hdf5' ,required=True, metavar="", help="path to saved model"
    )
    parser.add_argument(
        "-i", "--img_pth", type=str, default = 'ad_data/can/test/defect/1665.jpg', help="save segmented images",
    )


    notebook_args = f"""
    --path "saved_models/data/can/baselineCAE/ssim/03-07-2021_09-02-52/baselineCAE_b8_e12.hdf5"
    --img_pth "data/can/test/defect/1739.jpg"
    """
    args = parser.parse_args(shlex.split(notebook_args))

    main(args)
