In [None]:
!nvidia-smi

In [None]:
!git clone https://github.com/KoryakovDmitry/3D-printing-defect-cls.git

In [None]:
# !pip3 install -r /content/3D-printing-defect-cls/requirements_cpu.txt > /dev/null
!pip3 install -r /content/3D-printing-defect-cls/requirements_gpu.txt > /dev/null

In [None]:
# Download datasets

# DS_test_bin_flipped_replaced.zip
!gdown --id 1JGWubiqQ0XTvTMX9kjXkCcmQbdkMm6LV

# DS_synth_bin.zip
!gdown --id 1HCczXj1-EDU2jm81H0m-TAoVwjWAGlxf

# defects_not_augmented_dataset.zip
!gdown --id 18_NKHORM69052--NmS-RSlHo6FJJHiTm

In [None]:
# unpack images

!unzip /content/DS_synth_bin.zip -d /content/3D-printing-defect-cls/data > /dev/null
!unzip /content/DS_test_bin_flipped_replaced.zip -d /content/3D-printing-defect-cls/data  > /dev/null
!unzip /content/defects_not_augmented_dataset.zip -d /content/3D-printing-defect-cls/data > /dev/null

In [None]:
import os
import cv2
import random
import os.path as osp

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader, ConcatDataset

import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import seaborn as sns

import copy
import shutil
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import f1_score, precision_score, recall_score
from tqdm import tqdm_notebook
from sklearn.exceptions import DataConversionWarning
from sklearn.model_selection import train_test_split
from glob import glob
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
from tqdm import tqdm_notebook
from copy import deepcopy

import warnings
warnings.filterwarnings(action='ignore', category=DataConversionWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
    
seed_everything(42)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def list_files(startpath):
  print(osp.basename(startpath))
  for root, dirs, files in os.walk(startpath):
    if root == startpath:
      continue
    level = root.replace(startpath, '').count(os.sep)
    indent = ' ' * 4 * (level)
    print(f'{indent}{osp.basename(root)}/')
    subindent = ' ' * 4 * (level + 1)
    if len(files):
      print(f"{subindent}count: {len(files)}")


In [None]:
list_files("/content/3D-printing-defect-cls/data/defects_not_augmented_dataset")

In [None]:
from torchvision.datasets import DatasetFolder
from torchvision.datasets.folder import default_loader
IMG_EXTENSIONS = (".jpg", ".jpeg", ".png", ".ppm", ".bmp", ".pgm", ".tif", ".tiff", ".webp")

class ImageFolderPath(DatasetFolder):
  def __init__(self, root, transform=None, target_transform=None, loader=default_loader, is_valid_file=None):
    super().__init__(            
            root,
            loader,
            IMG_EXTENSIONS if is_valid_file is None else None,
            transform=transform,
            target_transform=target_transform,
            is_valid_file=is_valid_file,)
    self.imgs = self.samples

  def __getitem__(self, index: int):
    """
    Args:
        index (int): Index

    Returns:
        tuple: (sample, target) where target is class_index of the target class.
    """
    path, target = self.samples[index]
    sample = self.loader(path)
    if self.transform is not None:
        sample = self.transform(sample)
    if self.target_transform is not None:
        target = self.target_transform(target)

    return sample, target, path

In [None]:
# Augmenting only train data
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(600),
        transforms.ColorJitter(brightness=.3, hue=.5, contrast=.1, saturation=.1),
        transforms.RandomRotation(degrees=30),
        transforms.RandomPerspective(distortion_scale=.3, p=.7),
        transforms.RandomHorizontalFlip(p=.5),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        transforms.RandomErasing(p=.3, scale=(0.01, 0.02))
    ]),
    'valid': transforms.Compose([
        transforms.Resize(600),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [None]:
NUM_WORKERS = 2 
BATCH_SIZE = 4

In [None]:
# Folder with data
data_dir_stand = '/content/3D-printing-defect-cls/data/defects_not_augmented_dataset'
image_datasets_stand = {folder: ImageFolderPath(os.path.join(data_dir_stand, folder),
                                          data_transforms[folder])
                  for folder in ['train', 'valid']}

# Class for loading data as batches
dataloaders_stand = {folder: DataLoader(image_datasets_stand[folder], batch_size=BATCH_SIZE,
                                              shuffle=True, num_workers=NUM_WORKERS)
               for folder in ['train', 'valid']}

dataset_stand_sizes = {folder: len(image_datasets_stand[folder]) for folder in ['train', 'valid']}
class_names = image_datasets_stand['train'].classes
dataset_stand_sizes

In [None]:
import cv2
from matplotlib import pyplot as plt

fig = plt.figure(figsize=(12, 9))

rows = 2
columns = 2
fontsize = 40

base_dir = "/content/3D-printing-defect-cls/data/defects_not_augmented_dataset/"

imgs_plot = ['train/YesDefects/0_4cd4ef8768114c1098fe9d3740de9f3c.png',
             'train/YesDefects/0_9b46cf26021f4824ab498c863849501c.png',
             'train/YesDefects/0_2ae9545a1f6446aca044941d27a446c4.png',
             'valid/NoDefects/NoDefects.152.jpg',
             ]

images = [cv2.imread(osp.join(base_dir, i))[:, :, ::-1] for i in imgs_plot]

for i in range(1, 5):
  plt.figure(figsize=(20, 12))
  # fig.add_subplot(rows, columns, i)

  lbl = osp.basename(osp.dirname(imgs_plot[i-1]))

  plt.imshow(images[i-1])
  plt.axis('off')
  plt.title(lbl, fontsize=fontsize)
  plt.show()


In [None]:
import cv2
from matplotlib import pyplot as plt

fig = plt.figure(figsize=(12, 9))

rows = 2
columns = 2
fontsize = 40

def to_np(inp):
  inp = inp.numpy().transpose((1, 2, 0))
  mean = np.array([0.485, 0.456, 0.406])
  std = np.array([0.229, 0.224, 0.225])
  inp = std * inp + mean
  inp = np.clip(inp, 0, 1)
  return inp

aug_plot_dir = "/content/3D-printing-defect-cls/data/test_aug"
os.makedirs(aug_plot_dir, exist_ok=True)
shutil.rmtree(aug_plot_dir)
os.makedirs(aug_plot_dir, exist_ok=True)

for i in imgs_plot:
  i = osp.join(base_dir, i)
  cls = osp.basename(osp.dirname(i))
  cls_l_dir_split = osp.join(aug_plot_dir, cls)
  os.makedirs(cls_l_dir_split, exist_ok=True)
  shutil.copy(i, cls_l_dir_split)

ifp = ImageFolderPath(aug_plot_dir, data_transforms["train"])
dl_ifp = DataLoader(ifp, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
inputs, classes, paths = next(iter(dl_ifp))

for i in range(1, 5):
  fig.add_subplot(rows, columns, i)

  lbl = osp.basename(osp.dirname(paths[i-1]))

  plt.imshow(to_np(inputs[i-1]))
  plt.axis('off')
  plt.title(lbl, fontsize=fontsize)


In [None]:
fontsize = 40
import cv2
from PIL import Image

def imshow(inp, title=None):
    inp = cv2.imread(inp)
    inp = cv2.resize(inp, (1000, 600))
    inp[180:240, 40:220, :] = 255
    if "NoDefects" == title:
        im = cv2.putText(inp, title, (50, 200), 1, 
                    1, (0, 255, 0), 1, cv2.LINE_AA)
    else:
        im = cv2.putText(inp, title, (50, 200), 1, 
                    1, (0, 0, 255), 1, cv2.LINE_AA)
    display(Image.fromarray(im[:, :, ::-1]))


In [None]:
def plot_graph(metrics, title="Функция потерь", xlabel='epoch', ylabel='loss'):
  sns.set(style="whitegrid", font_scale=1.4)

  plt.figure(figsize=(12, 8))
  for lbl in metrics:
    plt.plot(metrics[lbl], label=lbl)

  plt.title(title)
  plt.xlabel(xlabel)
  plt.ylabel(ylabel)
  plt.legend()
  plt.show()

def plot_graph_with_fine_tuning(metrics, metrics_fn, title="Функция потерь", xlabel='epoch', ylabel='loss'):
  sns.set(style="whitegrid", font_scale=1.4)

  plt.figure(figsize=(12, 8))
  for lbl in metrics:
    plt.plot(metrics[lbl], label=lbl)

  for lbl in metrics_fn:
    plt.plot(metrics_fn[lbl], label=f"{lbl}_fine_tuning")

  plt.title(title)
  plt.xlabel(xlabel)
  plt.ylabel(ylabel)
  plt.legend()
  plt.show()

In [None]:

def getTPFPTNFN_X(y_true, y_pred, inputs):
    TP, FP, TN, FN = [], [], [], []
    for idx, (s_true, s_pred) in enumerate(zip(y_true, y_pred)):
        if s_true == 1:
            if s_pred == 1: 
                TP.append(inputs[idx])
            else:
                FN.append(inputs[idx])
        else:
            if s_pred == 0:
                TN.append(inputs[idx])
            else:
                FP.append(inputs[idx])

    return TP, FP, TN, FN

def get_pers(cm):
  cm = cm.astype(float)
  cm_sum = cm.sum(axis=1)
  prop = deepcopy(cm)
  prop[0, :] = prop[0, :] / cm_sum[0]
  prop[1, :] = prop[1, :] / cm_sum[1]
  prop = prop * 100
  prop = np.round(prop, 1)
  prop = prop.astype('<U21')
  prop = np.core.defchararray.add(prop, "%)")
  prop = np.core.defchararray.add("\n(", prop)
  cm = cm.astype(int)
  out_cm = np.core.defchararray.add(cm.astype('<U21'), prop)
  return out_cm.tolist()

def plot_confusion_matrix(labels, pred_labels, ph, classes=["NoDefects", "YesDefects"]):
    cm = confusion_matrix(labels, pred_labels)
    pers = get_pers(cm)

    ax = plt.subplot()
    sns.set(font_scale=2.0) 
    sns.heatmap(cm, annot=pers, ax=ax, cmap="Blues", fmt='');  

    # Labels, title and ticks
    label_font = {'size':'18'}  
    ax.set_xlabel('Predicted Label', fontdict=label_font);
    ax.set_ylabel('True Label', fontdict=label_font);

    title_font = {'size':'21'}  
    ax.set_title(ph, fontdict=title_font);

    ax.tick_params(axis='both', which='major', labelsize=10) 
    ax.xaxis.set_ticklabels(classes)
    ax.yaxis.set_ticklabels(classes)
    plt.show()

def train_model(model, dataloaders, ds_sizes, criterion, optimizer, scheduler, num_epochs=40):
    since = time.time()

    best_model_wts = model.state_dict()
    n_best_eph = 0
    f1_exmp_best = {'train': [], "valid": []}
    best_f1 = 0.0
    best_preds_all = {'train': [], "valid": []}
    best_lbls_all = {'train': [], "valid": []}

    losses = {'train': [], "valid": []}
    accuracy = {'train': [], "valid": []}
    f1 = {'train': [], "valid": []}
    precision = {'train': [], "valid": []}
    recall = {'train': [], "valid": []}
    
    for epoch in tqdm_notebook(range(0, num_epochs)):
        preds_all = {'train': [], "valid": []}
        lbls_all = {'train': [], "valid": []}
        imgs_all = {'train': [], "valid": []}

        print('Epoch {}/{}'.format(epoch, num_epochs))
        print('-' * 10)
        
        # Train and valid phase for each epoch
        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train()  # setting model to train mode`
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0
            
            # Iterate over data.
            for inputs, labels, paths in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                running_loss += loss.cpu().detach().numpy()

                preds = preds.cpu().detach().numpy()
                labels = labels.cpu().detach().numpy()
                running_corrects += np.sum(preds == labels)
                preds_ = [1-l if p > 1 else p for p, l in zip(preds, labels)]
                preds_all[phase].extend(preds_)
                lbls_all[phase].extend(list(labels))

                if phase == 'valid':
                  imgs_all[phase].extend(paths)
            
            
            if phase == 'valid':
                
                TP_exmp, FP_exmp, TN_exmp, FN_exmp = getTPFPTNFN_X(lbls_all[phase], preds_all[phase], imgs_all[phase])
                if epoch == 15:
                  f1_exmp_best_15 = [TP_exmp, FP_exmp, TN_exmp, FN_exmp]
                  best_model_w_15 = copy.deepcopy(model.state_dict())
        
            plot_confusion_matrix(lbls_all[phase], preds_all[phase], phase)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / ds_sizes[phase]
            epoch_acc = running_corrects / ds_sizes[phase]
            epoch_f1 = f1_score(lbls_all[phase], preds_all[phase], pos_label=0)
            epoch_precision = precision_score(lbls_all[phase], preds_all[phase], pos_label=0)
            epoch_recall = recall_score(lbls_all[phase], preds_all[phase], pos_label=0)               
            
            
            losses[phase].append(epoch_loss)
            accuracy[phase].append(epoch_acc)
            f1[phase].append(epoch_f1)
            precision[phase].append(epoch_precision)
            recall[phase].append(epoch_recall)
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} f1: {epoch_f1:.4f} recall: {epoch_recall:.4f} precision: {epoch_precision:.4f}')

            if phase == 'valid' and epoch_f1 >= best_f1:
                best_f1 = epoch_f1
                best_model_w = copy.deepcopy(model.state_dict())
                best_preds_all = preds_all
                best_lbls_all = lbls_all
                n_best_eph = epoch
                f1_exmp_best = [TP_exmp, FP_exmp, TN_exmp, FN_exmp]

    time_elapsed = time.time() - since
    te_ = time_elapsed // 60
    te__ = time_elapsed % 60
    print(f'Training completed in {te_:.0f}m {te__:.0f}s')
    

    print(f'Best val f1: {best_f1:4f}, epoch: {n_best_eph}')
    
    for ph in ("train", "valid"):
        plot_confusion_matrix(best_lbls_all[ph], best_preds_all[ph], ph)

    model.load_state_dict(best_model_w_15)
    return model, losses, accuracy, f1, precision, recall, f1_exmp_best, f1_exmp_best_15

In [None]:
from PIL import Image
import torchvision.transforms as transforms
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt

class Predictor:
    def __init__(self, w_path, resize=(600, 600)):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        num_features = 9216
        self.model = models.alexnet(pretrained=False)
        
        self.model.classifier = nn.Sequential(
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(num_features, 2, bias=True),
            nn.Softmax(),
        )
        
        self.model = self.model.to(self.device)
        self.model.load_state_dict(torch.load(w_path, map_location=self.device))
        self.model.eval()
        self.to_tensor = transforms.ToTensor() 
        self.to_norm = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        self.classes = ["NoDefects", "YesDefects"]

    def inference(self, img: np.array, resize_to=(600, 600)):
        img = cv2.resize(img, resize_to)
        
        tensor = self.to_tensor(img)
        tensor = self.to_norm(tensor)
        tensor = tensor.unsqueeze(0)
        tensor = tensor.to(self.device)
        probs = self.model(tensor)
        probs = probs.cpu().detach()
        _, prediction = torch.max(probs, 1)
        prediction = prediction.numpy()
        probs = probs.numpy()
        return prediction[0], probs[0]
    
    def plot_res(self, img: np.array, pred, probs, resize_to_scale=2):
        plt.figure(figsize=(26, 14))
        img = cv2.resize(img, (img.shape[1] // resize_to_scale, img.shape[0] // resize_to_scale))
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        p = np.round(probs[pred], decimals=2)
        pred_prob = f"{self.classes[pred]} with prob: {str(p)}"
        pred_patch = mpatches.Patch(color="green" if pred == 0 else "red", label=pred_prob)
        p = np.round(probs[1-pred], decimals=2)
        pred_prob_another = f"{self.classes[1-pred]} with prob: {str(p)}"
        pred_patch_another = mpatches.Patch(color="green" if pred == 1 else "red", label=pred_prob_another)
        plt.legend(handles=[pred_patch, pred_patch_another], prop={'size': 30})
        plt.axis('off')
        plt.show()         


# Monitoring the 3D-printing process

## WITH FINE TUNING

In [None]:
model_fn = models.alexnet(pretrained=True)

In [None]:
# set number of conv layers to unfreeze
layers_to_unfreeze = 5

# turn off gradients on non-trainable layers
for param in model_fn.features[:-layers_to_unfreeze].parameters():
    param.requires_grad = False
    
# num_features - feature vector size passed to FC layer
num_features = 9216

# change fully-connected layer to our linear classifier
model_fn.classifier = nn.Sequential(
    nn.Dropout(p=0.5, inplace=False),
    nn.Linear(num_features, 2, bias=True),
    nn.Softmax(),
)
# send model to device
model_fn = model_fn.to(device)

# use cross-entropy as cost function
loss_fn = nn.CrossEntropyLoss()


optimizer = torch.optim.Adam(params=[
        {"params": list(model_fn.features.parameters())[-layers_to_unfreeze:], "lr": 1e-5, "weight_decay": 1e-5},
        # {"params": list(model.features.parameters()), "lr": 1e-5, "weight_decay": 1e-5},
        {"params": list(model_fn.classifier.parameters()), "lr": 1e-4, "weight_decay": 1e-5}                              
])

# multiplies learning_rate by 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

In [None]:
from torchinfo import summary

summary(model_fn, input_size=(4, 3, 600, 600)), model_fn

In [None]:
model_fn, losses_fn, accuracy_fn, f1_fn, precision_fn, recall_fn, f1_exmp_best_fn, f1_exmp_best_15_fn = train_model(model_fn, dataloaders_stand, dataset_stand_sizes, loss_fn, optimizer, exp_lr_scheduler, num_epochs=40)

In [None]:
torch.save(model_fn.state_dict(), "/content/3D-printing-defect-cls/weights/w_model_stand_fine_tuning_15.pth")

In [None]:
def evaluate(model, dataloader, name_loader, ds_size):
    accuracy = []
    f1 = []
    precision = []
    recall = []
    preds_all = []
    lbls_all = []
    imgs_all = []
    running_loss = 0.0
    running_corrects = 0

    model.eval()
    
    for data in dataloader:

        inputs, labels, paths = data
        
        inputs = inputs.to(device)
        labels = labels.to(device)
            
        # forward pass
        output = model(inputs)
        _, preds = torch.max(output, 1)

        preds = preds.cpu().detach().numpy()
        labels = labels.cpu().detach().numpy()
        running_corrects += np.sum(preds == labels)
        preds_ = [1-p if p > 1 else p for p, l in zip(preds, labels)]
        preds_all.extend(preds_)
        lbls_all.extend(list(labels))
        imgs_all.extend(paths)

        for i in range(len(preds_)):
          pr = preds_[i]
          lb = labels[i]
          if (pr == 1) and (lb == 0):
            print(paths[i], output[pr].cpu().detach().numpy())
            predtr.plot_res(cv2.imread(paths[i]), pred=pr, probs=output[pr].cpu().detach().numpy(), resize_to_scale=2)

    plot_confusion_matrix(lbls_all, preds_all, name_loader)
    epoch_acc = running_corrects / ds_size
    epoch_f1 = f1_score(lbls_all, preds_all, pos_label=0)
    epoch_precision = precision_score(lbls_all, preds_all, pos_label=0)
    epoch_recall = recall_score(lbls_all, preds_all, pos_label=0)

    print(f'{name_loader} Acc: {epoch_acc:.4f} f1: {epoch_f1:.4f} recall: {epoch_recall:.4f} precision: {epoch_precision:.4f}')
    matr = getTPFPTNFN_X(lbls_all, preds_all, imgs_all)
    return epoch_acc, epoch_f1, epoch_precision, epoch_recall, matr

In [None]:
out = evaluate(model_fn, dataloaders_stand['valid'], 'valid', dataset_stand_sizes['valid'])

In [None]:
plot_graph(losses_fn, title="Функция потерь", xlabel='epoch', ylabel='loss')

In [None]:
plot_graph(accuracy_fn, title="Аккуратность, точность", xlabel='epoch', ylabel='accuracy')

In [None]:
plot_graph(f1_fn, title="F1-score (среднее гармоническое Precision и Recall)", xlabel='epoch', ylabel='F1-score')

In [None]:
plot_graph(precision_fn, title="Точность", xlabel='epoch', ylabel='precision')

In [None]:
plot_graph(recall_fn, title="Полнота", xlabel='epoch', ylabel='recall')

In [None]:
TN_exmp, FN_exmp, TP_exmp, FP_exmp = f1_exmp_best_15_fn

In [None]:
predtr = Predictor(w_path="/content/3D-printing-defect-cls/weights/w_model_stand_fine_tuning_15.pth")

In [None]:
for i in TP_exmp:
  pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 600))
  predtr.plot_res(cv2.imread(i), pred=pred, probs=probs, resize_to_scale=2)


In [None]:
for i in TN_exmp:
  pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 600))
  print(pred)
  predtr.plot_res(cv2.imread(i), pred=pred, probs=probs, resize_to_scale=2)


In [None]:
for i in FP_exmp:
  pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 600))
  predtr.plot_res(cv2.imread(i), pred=pred, probs=probs, resize_to_scale=2)


In [None]:
for i in FN_exmp:
  img = cv2.imread(i)
  # pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 600))
  img = cv2.resize(img, (600, 600))

  tensor = transforms.ToTensor()(img)
  tensor = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])(tensor)
  tensor = tensor.unsqueeze(0)
  tensor = tensor.to(device)
  probs = model_fn(tensor)
  probs = probs.cpu().detach()
  _, prediction = torch.max(probs, 1)
  prediction = prediction.numpy()
  probs = probs.numpy()
  print(prediction)
  predtr.plot_res(cv2.imread(i), pred=prediction[0], probs=probs[0], resize_to_scale=2)

## WITHOUT FINE TUNING

In [None]:
model = models.alexnet(pretrained=False)

In [None]:
# set number of conv layers to unfreeze
# layers_to_unfreeze = 5

# turn off gradients on non-trainable layers
# for param in model_fn.features[:-layers_to_unfreeze].parameters():
#     param.requires_grad = False
    
# num_features - feature vector size passed to FC layer
num_features = 9216

# change fully-connected layer to our linear classifier
model.classifier = nn.Sequential(
    nn.Dropout(p=0.5, inplace=False),
    nn.Linear(num_features, 2, bias=True),
    nn.Softmax(),
)
# send model to device
model = model.to(device)

# use cross-entropy as cost function
loss_fn = nn.CrossEntropyLoss()


optimizer = torch.optim.Adam(params=[
        # {"params": list(model_fn.features.parameters())[-layers_to_unfreeze:], "lr": 1e-5, "weight_decay": 1e-5},
        {"params": list(model.features.parameters()), "lr": 1e-5, "weight_decay": 1e-5},
        {"params": list(model.classifier.parameters()), "lr": 1e-4, "weight_decay": 1e-5}                              
])

# multiplies learning_rate by 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

In [None]:
from torchinfo import summary

summary(model, input_size=(4, 3, 600, 600))

In [None]:
model

In [None]:
model, losses, accuracy, f1, precision, recall, f1_exmp_best, f1_exmp_best_15 = train_model(model, dataloaders_stand, dataset_stand_sizes, loss_fn, optimizer, exp_lr_scheduler, num_epochs=40)

In [None]:
torch.save(model.state_dict(), "/content/3D-printing-defect-cls/weights/w_model_stand.pth")

## PLOT GRAPHS

In [None]:
plot_graph_with_fine_tuning(losses, losses_fn, title="Функция потерь", xlabel='epoch', ylabel='loss')

In [None]:
plot_graph_with_fine_tuning(accuracy, accuracy_fn, title="Аккуратность, точность", xlabel='epoch', ylabel='accuracy')

In [None]:
plot_graph_with_fine_tuning(f1, f1_fn, title="F1-score (среднее гармоническое Precision и Recall)", xlabel='epoch', ylabel='F1-score')

In [None]:
plot_graph_with_fine_tuning(precision, precision_fn, title="Точность", xlabel='epoch', ylabel='precision')

In [None]:
plot_graph_with_fine_tuning(recall, recall_fn, title="Полнота", xlabel='epoch', ylabel='recall')

In [None]:
predtr = Predictor(w_path="/content/3D-printing-defect-cls/weights/w_model_stand.pth")

In [None]:
TN_exmp, FN_exmp, TP_exmp, FP_exmp = f1_exmp_best

In [None]:
for i in FP_exmp:
  pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 600))
  predtr.plot_res(cv2.imread(i), pred=pred, probs=probs, resize_to_scale=2)


In [None]:
for i in FN_exmp:
  pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 600))
  predtr.plot_res(cv2.imread(i), pred=pred, probs=probs, resize_to_scale=2)

# SYNTH

In [None]:
list_files("/content/3D-printing-defect-cls/data/DS_synth_bin")

In [None]:
list_files("/content/3D-printing-defect-cls/data/DS_test_bin_flipped_replaced")

In [None]:
dir_split = "/content/3D-printing-defect-cls/data/DS_synth_bin_split"
os.makedirs(dir_split, exist_ok=True)
shutil.rmtree(dir_split)
os.makedirs(dir_split, exist_ok=True)

train_imgs, valid_imgs = train_test_split(glob("/content/3D-printing-defect-cls/data/DS_synth_bin/*/*"), test_size=0.2, random_state=42)

for l, sub_ds in zip(("train", "valid"), (train_imgs, valid_imgs)):
  l_dir_split = osp.join(dir_split, l)
  os.makedirs(l_dir_split, exist_ok=True)
  for cls in ("YesDefects", "NoDefects"):
    cls_l_dir_split = osp.join(l_dir_split, cls)
    os.makedirs(cls_l_dir_split, exist_ok=True)
    for i in sub_ds:
      if osp.basename(osp.dirname(i)) == cls:
        shutil.copy(i, cls_l_dir_split)


In [None]:
list_files("/content/3D-printing-defect-cls/data/DS_synth_bin_split")

In [None]:
# Augmenting only train data
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((600, 800)),
        transforms.ColorJitter(brightness=.3, hue=.5, contrast=.1, saturation=.1),
        transforms.RandomRotation(degrees=30),
        transforms.RandomPerspective(distortion_scale=.3, p=.7),
        transforms.RandomHorizontalFlip(p=.5),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        transforms.RandomErasing(p=.3, scale=(0.01, 0.02))
    ]),
    'valid': transforms.Compose([
        transforms.Resize((600, 800)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [None]:
# Folder with data
data_dir_synth = '/content/3D-printing-defect-cls/data/DS_synth_bin_split'
image_datasets_synth = {folder: ImageFolderPath(os.path.join(data_dir_synth, folder),
                                          data_transforms[folder])
                  for folder in ['train', 'valid']}

# Class for loading data as batches
dataloaders_synth = {folder: DataLoader(image_datasets_synth[folder], batch_size=BATCH_SIZE,
                                              shuffle=True, num_workers=NUM_WORKERS)
               for folder in ['train', 'valid']}

dataset_synth_sizes = {folder: len(image_datasets_synth[folder]) for folder in ['train', 'valid']}
class_names = image_datasets_synth['train'].classes
dataset_synth_sizes

In [None]:
# Folder with data
data_dir_synth = '/content/3D-printing-defect-cls/data/DS_test_bin_flipped_replaced'
image_datasets_synth["test"] = ImageFolderPath(data_dir_synth, data_transforms["valid"])

# Class for loading data as batches
dataloaders_synth["test"] = DataLoader(image_datasets_synth["test"], batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

dataset_synth_sizes["test"] = len(image_datasets_synth["test"])
dataset_synth_sizes

In [None]:
import cv2
from matplotlib import pyplot as plt

fig = plt.figure(figsize=(20, 14))

rows = 2
columns = 2
fontsize = 40
base_dir = "/content/3D-printing-defect-cls/data/DS_synth_bin_split/train/"


imgs_plot = ['NoDefects/0_95502ea5-c82e-473d-b846-6f04d92bfedb_33_25_190_0.9.png',
             'YesDefects/aa1f733e-b78a-4c5e-ae5b-fa92b650806d_ls_x_0.16667_10.0_33_20_210_0.7.png',
             'YesDefects/image_2022-05-17_02-00-21.png',
             'YesDefects/image_2022-05-17_02-00-27.png',
             ]

images = [cv2.imread(osp.join(base_dir, i))[:, :, ::-1] for i in imgs_plot]

for i in range(1, 5):
  fig.add_subplot(rows, columns, i)

  lbl = osp.basename(osp.dirname(imgs_plot[i-1]))

  plt.imshow(images[i-1])
  plt.axis('off')
  plt.title(lbl, fontsize=fontsize)


In [None]:
import cv2
from matplotlib import pyplot as plt

fig = plt.figure(figsize=(20, 14))

rows = 2
columns = 2
fontsize = 40

def to_np(inp):
  inp = inp.numpy().transpose((1, 2, 0))
  mean = np.array([0.485, 0.456, 0.406])
  std = np.array([0.229, 0.224, 0.225])
  inp = std * inp + mean
  inp = np.clip(inp, 0, 1)
  return inp

aug_plot_dir = "/content/3D-printing-defect-cls/test_aug_synth"
os.makedirs(aug_plot_dir, exist_ok=True)
shutil.rmtree(aug_plot_dir)
os.makedirs(aug_plot_dir, exist_ok=True)

for i in imgs_plot:
  i = osp.join(base_dir, i)
  cls = osp.basename(osp.dirname(i))
  cls_l_dir_split = osp.join(aug_plot_dir, cls)
  os.makedirs(cls_l_dir_split, exist_ok=True)
  shutil.copy(i, cls_l_dir_split)

ifp = ImageFolderPath(aug_plot_dir, data_transforms["train"])
dl_ifp = DataLoader(ifp, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
inputs, classes, paths = next(iter(dl_ifp))

for i in range(1, 5):
  fig.add_subplot(rows, columns, i)

  lbl = osp.basename(osp.dirname(paths[i-1]))

  plt.imshow(to_np(inputs[i-1]))
  plt.axis('off')
  plt.title(lbl, fontsize=fontsize)


In [None]:
import os.path as osp
import cv2
from matplotlib import pyplot as plt

fig = plt.figure(figsize=(20, 14))

rows = 3
columns = 3
fontsize = 40
base_dir = "/content/3D-printing-defect-cls/data/DS_test_bin_flipped_replaced/"

imgs_plot = ['NoDefects/IMG_20220512_000058.jpg',
             'NoDefects/IMG_20220512_000456.jpg',
             'YesDefects/IMG_20220512_000015.jpg',
             'YesDefects/IMG_20220512_001409.jpg',
             'YesDefects/IMG_20220511_235910.jpg',
             'YesDefects/IMG_20220511_235656.jpg',
             'YesDefects/IMG_20220511_235654.jpg',
             'YesDefects/IMG_20220512_000346.jpg',
             'YesDefects/IMG_20220511_235758.jpg',
             ]

images = [cv2.imread(osp.join(base_dir, i))[:, :, ::-1] for i in imgs_plot]

for i in range(1, 10):
  fig.add_subplot(rows, columns, i)

  lbl = osp.basename(osp.dirname(imgs_plot[i-1]))

  plt.imshow(images[i-1])
  plt.axis('off')
  plt.title(lbl, fontsize=fontsize)


In [None]:
model_synth = models.alexnet(pretrained=True)

In [None]:
# set number of conv layers to unfreeze
layers_to_unfreeze = 5

# turn off gradients on non-trainable layers
for param in model_synth.features[:-layers_to_unfreeze].parameters():
    param.requires_grad = False
    
# num_features - feature vector size passed to FC layer
num_features = 9216

# change fully-connected layer to our linear classifier
model_synth.classifier = nn.Sequential(
    nn.Dropout(p=0.5, inplace=False),
    nn.Linear(num_features, 2, bias=True),
    nn.Softmax(),
)
# send model to device
model_synth = model_synth.to(device)

# use cross-entropy as cost function
loss_fn = nn.CrossEntropyLoss()


optimizer = torch.optim.Adam(params=[
        {"params": list(model_synth.features.parameters())[-layers_to_unfreeze:], "lr": 1e-5, "weight_decay": 1e-5},
        # {"params": list(model.features.parameters()), "lr": 1e-5, "weight_decay": 1e-5},
        {"params": list(model_synth.classifier.parameters()), "lr": 1e-4, "weight_decay": 1e-5}                              
])

# multiplies learning_rate by 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

In [None]:
from torchinfo import summary

summary(model_synth, input_size=(4, 3, 600, 600)), model_synth

In [None]:
def train_model(model, dataloaders, ds_sizes, criterion, optimizer, scheduler, num_epochs=40):
    since = time.time()

    best_model_wts = model.state_dict()
    n_best_eph = 0
    f1_exmp_best = {'train': [], "valid": [], "test": []}
    best_f1 = 0.0
    best_preds_all = {'train': [], "valid": [], "test": []}
    best_lbls_all = {'train': [], "valid": [], "test": []}

    losses = {'train': [], "valid": [], "test": []}
    accuracy = {'train': [], "valid": [], "test": []}
    f1 = {'train': [], "valid": [], "test": []}
    precision = {'train': [], "valid": [], "test": []}
    recall = {'train': [], "valid": [], "test": []}
    
    for epoch in tqdm_notebook(range(1, num_epochs+1)):
        preds_all = {'train': [], "valid": [], "test": []}
        lbls_all = {'train': [], "valid": [], "test": []}
        imgs_all = {'train': [], "valid": [], "test": []}

        print('Epoch {}/{}'.format(epoch, num_epochs))
        print('-' * 10)
        
        # Train and valid phase for each epoch
        for phase in ['train', 'valid', 'test']:
            if phase == 'train':
                model.train()  # setting model to train mode`
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0
            
            # Iterate over data.
            for inputs, labels, paths in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                running_loss += loss.cpu().detach().numpy()

                preds = preds.cpu().detach().numpy()
                labels = labels.cpu().detach().numpy()
                running_corrects += np.sum(preds == labels)
                preds_ = [1-l if p > 1 else p for p, l in zip(preds, labels)]
                preds_all[phase].extend(preds_)
                lbls_all[phase].extend(list(labels))

                if phase == 'test':
                  imgs_all[phase].extend(paths)
            
            
            if phase == 'test':
                TP_exmp, FP_exmp, TN_exmp, FN_exmp = getTPFPTNFN_X(lbls_all[phase], preds_all[phase], imgs_all[phase])
        
            plot_confusion_matrix(lbls_all[phase], preds_all[phase], phase)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / ds_sizes[phase]
            epoch_acc = running_corrects / ds_sizes[phase]
            epoch_f1 = f1_score(lbls_all[phase], preds_all[phase], pos_label=0)
            epoch_precision = precision_score(lbls_all[phase], preds_all[phase], pos_label=0)
            epoch_recall = recall_score(lbls_all[phase], preds_all[phase], pos_label=0)               
            
            
            losses[phase].append(epoch_loss)
            accuracy[phase].append(epoch_acc)
            f1[phase].append(epoch_f1)
            precision[phase].append(epoch_precision)
            recall[phase].append(epoch_recall)
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} f1: {epoch_f1:.4f} recall: {epoch_recall:.4f} precision: {epoch_precision:.4f}')

            if phase == 'test' and epoch_f1 >= best_f1:
                best_f1 = epoch_f1
                best_model_w = copy.deepcopy(model.state_dict())
                best_preds_all = preds_all
                best_lbls_all = lbls_all
                n_best_eph = epoch
                f1_exmp_best = [TP_exmp, FP_exmp, TN_exmp, FN_exmp]

    time_elapsed = time.time() - since
    te_ = time_elapsed // 60
    te__ = time_elapsed % 60
    print(f'Training completed in {te_:.0f}m {te__:.0f}s')
    

    print(f'Best test f1: {best_f1:4f}, epoch: {n_best_eph}')
    
    for ph in ("train", "valid", "test"):
        plot_confusion_matrix(best_lbls_all[ph], best_preds_all[ph], ph)

    model.load_state_dict(best_model_w)
    return model, losses, accuracy, f1, precision, recall, f1_exmp_best

In [None]:
model_synth, losses_synth, accuracy_synth, f1_synth, precision_synth, recall_synth, f1_exmp_best_synth = train_model(model_synth, dataloaders_synth, dataset_synth_sizes, loss_fn, optimizer, exp_lr_scheduler, num_epochs=10)

In [None]:
torch.save(model_synth.state_dict(), "/content/3D-printing-defect-cls/weights/w_model_synth_fine_tuning.pth")

In [None]:
def evaluate(model, dataloader, name_loader, ds_size):
    accuracy = []
    f1 = []
    precision = []
    recall = []
    preds_all = []
    lbls_all = []
    imgs_all = []
    running_loss = 0.0
    running_corrects = 0

    model.eval()
    
    for data in dataloader:

        inputs, labels, paths = data
        
        inputs = inputs.to(device)
        labels = labels.to(device)
            
        # forward pass
        output = model(inputs)
        _, preds = torch.max(output, 1)

        preds = preds.cpu().detach().numpy()
        labels = labels.cpu().detach().numpy()
        running_corrects += np.sum(preds == labels)
        preds_ = [1-p if p > 1 else p for p, l in zip(preds, labels)]
        preds_all.extend(preds_)
        lbls_all.extend(list(labels))
        imgs_all.extend(paths)

    plot_confusion_matrix(lbls_all, preds_all, name_loader)
    epoch_acc = running_corrects / ds_size
    epoch_f1 = f1_score(lbls_all, preds_all, pos_label=0)
    epoch_precision = precision_score(lbls_all, preds_all, pos_label=0)
    epoch_recall = recall_score(lbls_all, preds_all, pos_label=0)

    print(f'{name_loader} Acc: {epoch_acc:.4f} f1: {epoch_f1:.4f} recall: {epoch_recall:.4f} precision: {epoch_precision:.4f}')
    matr = getTPFPTNFN_X(lbls_all, preds_all, imgs_all)
    return epoch_acc, epoch_f1, epoch_precision, epoch_recall, matr

In [None]:
out = evaluate(model_synth, dataloaders_synth['test'], 'test', dataset_synth_sizes['test'])

In [None]:
plot_graph(losses_synth, title="Функция потерь", xlabel='epoch', ylabel='loss')

In [None]:
plot_graph(accuracy_synth, title="Аккуратность, точность", xlabel='epoch', ylabel='accuracy')

In [None]:
plot_graph(f1_synth, title="F1-score (среднее гармоническое Precision и Recall)", xlabel='epoch', ylabel='F1-score')

In [None]:
plot_graph(precision_synth, title="Точность", xlabel='epoch', ylabel='precision')

In [None]:
plot_graph(recall_synth, title="Полнота", xlabel='epoch', ylabel='recall')

In [None]:
TN_exmp, FN_exmp, TP_exmp, FP_exmp = f1_exmp_best_synth

In [None]:
predtr = Predictor(w_path="/content/3D-printing-defect-cls/weights/w_model_synth_fine_tuning.pth")

In [None]:
for i in TP_exmp[:1]:
  pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 800))
  predtr.plot_res(cv2.imread(i), pred=pred, probs=probs, resize_to_scale=2)


In [None]:
for i in TN_exmp[:1]:
  pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 800))
  predtr.plot_res(cv2.imread(i), pred=pred, probs=probs, resize_to_scale=2)


In [None]:
for i in FP_exmp:
  pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 800))
  predtr.plot_res(cv2.imread(i), pred=pred, probs=probs, resize_to_scale=2)


In [None]:
for i in FN_exmp:
  pred, probs = predtr.inference(cv2.imread(i), resize_to=(600, 800))
  predtr.plot_res(cv2.imread(i), pred=pred, probs=probs, resize_to_scale=2)