In [3]:
from sklearn.metrics import f1_score, accuracy_score
import pickle
import numpy as np
from skimage import io
from tqdm import tqdm, tqdm_notebook
from PIL import Image
from pathlib import Path
from collections import OrderedDict
from torchvision import transforms
from torchvision.models import resnet18
from torchvision.models import mobilenet_v2
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
from torch.nn import Dropout
from torch.nn import Conv1d
from torch.nn import BatchNorm1d
from torch.nn import BatchNorm2d
import torch.nn as nn
import torch
import torchvision
import os, shutil, glob
from shutil import copyfile
import pandas as pd
import matplotlib.patches as patches
from matplotlib.font_manager import FontProperties

# в sklearn не все гладко, чтобы в colab удобно выводить картинки 
# мы будем игнорировать warnings
import warnings
warnings.filterwarnings(action='ignore', category=DeprecationWarning)

from matplotlib import colors, pyplot as plt
%matplotlib inline

In [4]:
# разные режимы датасета 
DATA_MODES = ['train', 'val', 'test']
# все изображения будут масштабированы к размеру 224x224 px
RESCALE_SIZE = 224
# работаем на видеокарте
DEVICE = torch.device("cpu")

In [5]:
def save_encoder(file_name, encoder):
    with open(file_name, 'wb') as le_dump_file:
        pickle.dump(encoder, le_dump_file)
    return


class BaseDataset(Dataset):
    """
    Датасет с картинками, который паралельно подгружает их из папок
    производит скалирование и превращение в торчевые тензоры
    """
    def __init__(self, files, mode, encoder_file='label_encoder.pkl'):
        super().__init__()
        # список файлов для загрузки
        self.files = sorted(files)
        # режим работы
        self.mode = mode
        if self.mode not in DATA_MODES:
            print(f"{self.mode} is not correct; correct modes: {DATA_MODES}")
            raise NameError
        self.len_ = len(self.files)
        self.label_encoder = LabelEncoder()
        if self.mode != 'test':
            self.labels = [path.parent.name for path in self.files]
            self.label_encoder.fit(self.labels)
            save_encoder(encoder_file, self.label_encoder)
    
    def __len__(self):
        return self.len_
    
    def load_sample(self, file):
        image = Image.open(file)
        return image
    
    def __getitem__(self, index):
        # для преобразования изображений в тензоры PyTorch и нормализации входа
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) 
        ])
        x = self.load_sample(self.files[index])
        x = self._prepare_sample(x)
        x = np.array(x , dtype='float32') / 255.
        x = transform(x)
        if self.mode == 'test':
            return x
        else:
            label = self.labels[index]
            label_id = self.label_encoder.transform([label])
            y = label_id.item()
            return x, y
        
    def _prepare_sample(self, image):
        w,h = image.size
        img0 = image
        if (w > h):
            img0 = transforms.Pad((0,(w-h)//2,0,w-h-((w-h)//2)))(image)
        elif (w < h):
            img0 = transforms.Pad(((h-w)//2,0,h-w-((h-w)//2), 0))(image)
        w,h = img0.size
        if(w == RESCALE_SIZE and h == RESCALE_SIZE):
            return img0
        return transforms.Resize((RESCALE_SIZE, RESCALE_SIZE))(img0)

In [6]:
class TransformedDataset(BaseDataset):
    """
    Датасет с картинками, который паралельно подгружает их из папок
    производит скалирование и делает рандом кроп с аугментацией
    """
    def __init__(self, files, mode, encoder_file='label_encoder.pkl'):
        super().__init__(files, mode, encoder_file)
        return


    def __len__(self):
        return self.len_


    def load_sample(self, file):
        image = Image.open(file)
        #image.load()
        return image
    
    def __getitem__(self, index):
        # для преобразования изображений в тензоры PyTorch и нормализации входа
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) 
        ])
        x = self.load_sample(self.files[index])
        hh = max(0, int(np.random.randn()*22.))
        hh = hh if(hh*10 < RESCALE_SIZE) else 22
        if(hh > 0):
            x = transforms.RandomCrop((RESCALE_SIZE-hh,RESCALE_SIZE-hh))(x)
            x = transforms.RandomApply((transforms.RandomHorizontalFlip(), 
                                        transforms.RandomAffine(degrees=75, scale=(0.7, 1.1), shear=0.2)), p=0.8)(x)
            x = transforms.Resize((RESCALE_SIZE, RESCALE_SIZE))(x)
        x = np.array(x, dtype='float32')/ 255.0
        x = transform(x)
        if self.mode == 'test':
            return x
        else:
            label = self.labels[index]
            label_id = self.label_encoder.transform([label])
            y = label_id.item()
            return x, y
        


In [7]:
def imshow(inp, title=None, plt_ax=plt, default=False):
    """Imshow для тензоров"""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt_ax.imshow(inp)
    if title is not None:
        plt_ax.set_title(title)
    plt_ax.grid(False)

In [8]:
%matplotlib inline
import numpy as np
import matplotlib
from matplotlib import pyplot as plt
from IPython.display import clear_output
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score, accuracy_score
from tqdm import tqdm, tqdm_notebook
from torch.optim import SGD
from torchvision.models import mobilenet_v2
matplotlib.style.use('ggplot')
# создаем класс TrainClassifier для тренировки моделей на pytorch


def __get_device__(): # получаем устройство
    return torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")


def default_lr_schedule():
    """функция возвращает итератор с 320 значениями learning rate"""
    lrs = [0.002/(1.+0.9*n) for n in np.arange(20)] # range(100)
    lrs.extend([0.0002 - (0.0000005*k) for k in np.arange(300)])
    return lrs.__iter__()

    
def mobilenetv2(n_outputs = 2):
    simple_cnn = mobilenet_v2(pretrained=False)
    simple_cnn.classifier = nn.Sequential(
        nn.Dropout(0.2),
        nn.BatchNorm1d(1280),
        nn.Linear(1280, n_outputs, bias=True))
    return simple_cnn


def original_mobilenetv2(n_outputs = 12):
    simple_cnn = mobilenet_v2(pretrained=True)
    simple_cnn.classifier = nn.Sequential(
        nn.Dropout(0.2),
        nn.Linear(1280, n_outputs, bias=True))
    return simple_cnn


class TrainClassifier:
    
    def __init__(self, n_outputs, model:nn.Module = None, criterion = None, batch_size = 64):
        self.device = __get_device__()
        self.neuralnet = mobilenetv2(n_outputs).to(self.device) if model is None else model.to(self.device)
        self.criterion = nn.CrossEntropyLoss() if criterion is None else criterion
        self.optimizer = torch.optim.Adam(self.neuralnet.parameters())
        self.optimizer_generator = None
        self.batch_size = batch_size
        self.epochs = 100
        self.finalizer = None
        self.total_epochs = 0
        self.history = []
        return


    def __optimizer_gen__(self, optimizer_type, lr_schedule):
        self.lr = lr_schedule.__next__()
        while self.lr is not None:
            print('LR = ', self.lr)
            yield torch.optim.Adam(self.neuralnet.parameters(), lr = self.lr) if(optimizer_type == 'adam') else torch.optim.SGD(self.neuralnet.parameters(), lr = lr)
            self.lr = lr_schedule.__next__()
        return


    def __fit_once__(self, train_loader):
        self.neuralnet.train()
        running_loss = 0.0
        running_corrects = 0
        processed_data = 0
        if (self.optimizer_generator is not None):
            self.optimizer = self.optimizer_generator.__next__()
        k = 0
        for inputs, labels in train_loader:
            inputs = inputs.to(self.device)
            labels = labels.to(self.device)
            self.optimizer.zero_grad()
            outputs = self.neuralnet(inputs)
            loss = self.criterion(outputs, labels)
            loss.backward()
            self.optimizer.step()
            preds = torch.argmax(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            processed_data += inputs.size(0)
            k += 1
            if (k%500 == 0):
                print(k)
        loss = running_loss / processed_data
        acc = running_corrects.cpu().numpy() / processed_data
        self.total_epochs += 1
        return loss, acc


    def __eval_once__(self, val_loader):
        running_loss = 0.0
        running_corrects = 0
        processed_size = 0
        for inputs, labels in val_loader:
            inputs = inputs.to(self.device)
            labels = labels.to(self.device)
            self.neuralnet.eval()
            with torch.set_grad_enabled(False):
                outputs = self.neuralnet(inputs)
                loss = self.criterion(outputs, labels)
                preds = torch.argmax(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            processed_size += inputs.size(0)
        loss = running_loss / processed_size
        acc = running_corrects.double() / processed_size
        return loss, acc.item()


    def with_epochs(self, epochs, finalizer=None):
        self.epochs = epochs
        return self


    def with_lr(self, lr, optimizer_type = 'adam'):
        self.lr = lr
        self.optimizer = torch.optim.Adam(self.neuralnet.parameters(), lr=lr) if(optimizer_type == 'adam') else torch.optim.SGD(self.neuralnet.parameters(), lr=lr)
        return self


    def with_optimizer(self, optimizer_type, lr_schedule = None):
        if(lr_schedule is None):
            self.optimizer = torch.optim.Adam(self.neuralnet.parameters()) if(optimizer_type == 'adam') else torch.optim.SGD(self.neuralnet.parameters())
        else:
            self.lr_schedule = lr_schedule
            self.optimizer_generator = self.__optimizer_gen__(optimizer_type, lr_schedule)
        return self


    def with_metrics(self, metric = 'acc'):
        self.metric = 'acc'
        return self


    #supports only mathplotlib.pyplot
    def with_graph(self, graph_type = 'loss_results'):
        def draw_func(history):
            if (len(history) == 0):
                return
            loss, res, val_loss, val_res = zip(*history)
            plt.figure(figsize=(9, 7))
            if (graph_type == 'results'):
                plt.plot(res, label="train_acc")
                plt.plot(val_res, label="val_acc")
                plt.ylabel("acc")
            elif (graph_type == 'loss' or graph_type == 'loss_results'):
                plt.plot(loss, label="train_loss")
                plt.plot(val_loss, label="val_loss")
                plt.ylabel("loss")
                if (graph_type == 'loss_results'):
                    plt.legend(loc='best')
                    plt.xlabel("epochs")
                    plt.show()
                    plt.figure(figsize=(9, 7))
                    plt.plot(res, label="train_acc")
                    plt.plot(val_res, label="val_acc")
                    plt.ylabel("acc")
            else:
                return
            plt.legend(loc='best')
            plt.xlabel("epochs")
            plt.show()
            return

        self.draw_fn = draw_func
        return self


    #terminal operation
    def fit(self, train_dataset, val_dataset, enable_tqdm=False):
        end_results = []
        train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
        val_loader =    DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False)
        log_template = "\nEpoch {ep:03d} train loss: {t_loss:0.4f} valid loss {v_loss:0.4f} train acc {t_acc:0.4f} valid acc {v_acc:0.4f}"
        def display_results(train_loss, train_acc, val_loss, val_acc, epoch, pbar, log_template):
            if (enable_tqdm):
                pbar.update(1)
                tqdm.write(log_template.format(ep=epoch+1, t_loss=train_loss,v_loss=val_loss, t_acc=train_acc, v_acc=val_acc))
            else:
                print(log_template.format(ep=epoch+1, t_loss=train_loss, v_loss=val_loss, t_acc=train_acc, v_acc=val_acc))
        
        
        def cycle(pbar):
            for epoch in range(self.epochs):
                train_loss, train_acc = self.__fit_once__(train_loader)
                val_loss, val_acc = self.__eval_once__(val_loader)
                end_results.append((train_loss, train_acc, val_loss, val_acc))
                display_results(train_loss, train_acc, val_loss, val_acc, epoch, pbar, log_template)
        

        if (enable_tqdm):
            with tqdm(desc="epoch", total=self.epochs) as pbar:
                cycle(pbar)
        else:
            cycle(None)
        self.history.extend(end_results)
        return end_results


    def evaluate(self, val_dataset):    #terminal operation
        val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False)
        return self.__eval_once__(val_loader)


    def do_fit(self, train_dataset, val_dataset, enable_tqdm=False):
        hist = self.fit(train_dataset, val_dataset, enable_tqdm)
        return self


    def do_evaluate(self, val_dataset):
        val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False)
        loss, acc = self.__eval_once__(val_loader)
        print("evaluation loss/acc: ", loss, acc)
        return self


    def display_history(self):    #terminal operation
        if(self.draw_fn is not None):
            self.draw_fn(self.history)
        return
    

    def display_model(self):    #terminal operation
        print(self.neuralnet)


    def display_all(self, print_model = False):    #terminal operation
        self.display_history()
        if (print_model):
            self.display_model()
        return


    def save_dict(self, name):
        torch.save(self.neuralnet.state_dict(), name + '.dict')



In [20]:
from sklearn.model_selection import train_test_split
# Инициализируем датасеты 
# далее тренируем сеть на основе MobileNet v2
# в течении 80 эпох постепенно уменьшая LR
torch.manual_seed(17)
np.random.seed(17)

# скопировал картинки после работы preprocess_img.ipynb в каталог train
# из всех картинок перенес с удалением в источнике по 30 картинок из каталогов 0 и 1 в testset
TRAIN_DIR = Path('./train')
# каталог для test set
TEST_DIR = Path('./testset')

test_files = sorted(list(TEST_DIR.rglob('*.jpg')))
test_dataset = BaseDataset(test_files, mode="val")

train_val_files = sorted(list(TRAIN_DIR.rglob('*.jpg')))
labels = [path.parent.name for path in train_val_files]

#dataset
train_files, val_files = train_test_split(train_val_files, test_size=0.15, stratify=labels)
train_dataset = TransformedDataset(train_files, mode='train')
val_dataset = BaseDataset(val_files, mode='val')
val_dataset_a = TransformedDataset(val_files, mode='val')


In [None]:
SAVE_DIR = './mobilenetv2_80_'
schedule = default_lr_schedule()
# тренируем сеть на основе MobileNet v2 с количеством классов = 2 batch_size = 128
# в течении 80 эпох постепенно уменьшая LR
model = TrainClassifier(3, batch_size = 128)
model.with_epochs(80).with_optimizer('adam', lr_schedule = schedule).with_graph()
model.do_fit(train_dataset, val_dataset, enable_tqdm=True).display_all()
# сохраняем словарь нейросети на диске
model.save_dict(SAVE_DIR+'3_cl')

In [None]:
#делаем независимую проверку точности на тестовом датасете
model.do_evaluate(test_dataset);

In [9]:
# функции для использования в чат-боте
import numpy as np
import torch
from torch import nn
from torchvision.models import mobilenet_v2
MODEL = None
RESCALE_SIZE = 224


def get_model(n_outputs = 3):
    simple_cnn = mobilenet_v2(pretrained=False)
    simple_cnn.classifier = nn.Sequential(
        nn.Dropout(0.2),
        nn.BatchNorm1d(1280),
        nn.Linear(1280, n_outputs, bias=True))
    return simple_cnn


def load_state_dict(neuralnet, name):
    neuralnet.load_state_dict(torch.load(name))
    return neuralnet


def predict_one_sample(model, inputs, device=DEVICE):
    """Предсказание, для одной картинки"""
    with torch.no_grad():
        model.eval()
        logit = model(inputs).cpu()
        probs = torch.nn.functional.softmax(logit, dim=-1).numpy()
    return probs


def get_image(img_name):
    # для преобразования изображений в тензоры PyTorch и нормализации входа
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) 
    ])
    image = Image.open(img_name)
    image.load()
    x = np.array(image.resize((RESCALE_SIZE, RESCALE_SIZE)))
    x = np.array(x / 255, dtype='float32')
    x = transform(x)
    return x


In [13]:
def mobilenetv2(n_outputs = 2):
    simple_cnn = mobilenet_v2(pretrained=False)
    simple_cnn.classifier = nn.Sequential(
        nn.Dropout(0.2),
        nn.BatchNorm1d(1280),
        nn.Linear(1280, n_outputs, bias=True))
    return simple_cnn.cpu()

# код для использования в чат-боте

model = mobilenetv2(n_outputs = 3)
load_state_dict(model, '../model/mobilenetv2_80_3_cl.dict').cpu()
MODEL = model


label_encoder = pickle.load(open("../model/label_encoder.pkl", 'rb'))
img = get_image('./yama.jpg')
prob_pred = predict_one_sample(MODEL, img[None,...])
print(prob_pred)
y_pred = np.argmax(prob_pred)
predicted_label = label_encoder.classes_[y_pred]
print(predicted_label, 'asphalt' if predicted_label == '0' else 'defect' if predicted_label == '1' else 'latch')
img = get_image('./none.jpg')
prob_pred = predict_one_sample(MODEL, img[None,...])
y_pred = np.argmax(prob_pred)
predicted_label = label_encoder.classes_[y_pred]
print(predicted_label, 'asphalt' if predicted_label == '0' else 'defect' if predicted_label == '1' else 'latch')



RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.