In [1]:
import PIL
import os
import cv2
import torch
import torchvision
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torchvision.transforms as tt
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
%matplotlib inline

## Подготовка датасета

In [2]:
data_dir = './data'
print(os.listdir(data_dir))
classes_train = os.listdir(data_dir + "/train")
classes_valid = os.listdir(data_dir + "/validation")
print(f'Train Classes - {classes_train}')
print(f'Validation Classes - {classes_valid}')

['train', 'validation']
Train Classes - ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
Validation Classes - ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']


In [3]:
train_tfms = tt.Compose([tt.Grayscale(num_output_channels=1),
                         tt.RandomHorizontalFlip(),
                         tt.RandomRotation(30),
                         tt.ToTensor()])

valid_tfms = tt.Compose([tt.Grayscale(num_output_channels=1), tt.ToTensor()])

In [4]:
batch_size = 200
best_model = 0
result_dir = './photos'
print(os.listdir(result_dir))
result_tfms = tt.Compose([tt.Grayscale(num_output_channels=1), tt.ToTensor()])

[]


In [5]:
result_ds = [result_tfms(PIL.Image.open('./photos/'+path).resize((48, 48)))for path in os.listdir(result_dir)]

In [6]:
result_dl = DataLoader(result_ds, batch_size, num_workers=3, pin_memory=True)

In [7]:
train_ds = ImageFolder(data_dir + '/train', train_tfms)
valid_ds = ImageFolder(data_dir + '/validation', valid_tfms)

In [8]:
print(train_ds[0])

(tensor([[[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]]]), 0)


In [9]:
train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=3, pin_memory=True)
valid_dl = DataLoader(valid_ds, batch_size*2, num_workers=3, pin_memory=True)

In [10]:
def show_batch(dl):
    for images, labels in dl:
        fig, ax = plt.subplots(figsize=(12, 12))
        ax.set_xticks([]); ax.set_yticks([])
        print(images[0].shape)
        ax.imshow(make_grid(images[:64], nrow=8).permute(1, 2, 0))
        break

In [11]:
def show_res_batch(dl):
    for images in dl:
        fig, ax = plt.subplots(figsize=(12, 12))
        ax.set_xticks([]); ax.set_yticks([])
        print(images[0].shape)
        ax.imshow(make_grid(images[:3], nrow=3).permute(1, 2, 0))
        break

## Cuda

In [12]:
def get_default_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        return len(self.dl)

In [13]:
device = get_default_device()
device
print('GPU: ' + str(torch.cuda.is_available()))

GPU: True


In [14]:
device = 'cpu'

In [15]:
train_dl = DeviceDataLoader(train_dl, device)
valid_dl = DeviceDataLoader(valid_dl, device)

In [16]:
result_dl = DeviceDataLoader(result_dl, device)

## Шаги обучения


In [17]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)
        loss = F.cross_entropy(out, labels)
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)
        loss = F.cross_entropy(out, labels)
        acc = accuracy(out, labels)
        return {'val_loss': loss.detach(), 'val_acc': acc}
    def pred_step(self, batch):
        images = batch 
        out = self(images)
        return out
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        global best_model, new_model
        print("Epoch [{}], last_lr: {:.5f}, train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
            epoch, result['lrs'][-1], result['train_loss'], result['val_loss'], result['val_acc']))
        new_model = result['val_acc']
        if new_model > best_model:
            best_model = new_model
            torch.save(model.state_dict(), './models/emotion_detection_acc'+str(best_model)+'.pth')
            print('save ', './models/emotion_detection_acc'+str(best_model)+'.pth')

## Архитектура модели


In [18]:
def conv_block(in_channels, out_channels, pool=False):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), 
              nn.BatchNorm2d(out_channels), 
              nn.ELU(inplace=True)]
    if pool: layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

class ResNet(ImageClassificationBase):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        
        self.conv1 = conv_block(in_channels, 128)
        self.conv2 = conv_block(128, 128, pool=True)
        self.res1 = nn.Sequential(conv_block(128, 128), conv_block(128, 128))
        self.drop1 = nn.Dropout(0.5)
        
        self.conv3 = conv_block(128, 256)
        self.conv4 = conv_block(256, 256, pool=True)
        self.res2 = nn.Sequential(conv_block(256, 256), conv_block(256, 256))
        self.drop2 = nn.Dropout(0.5)
        
        self.conv5 = conv_block(256, 512)
        self.conv6 = conv_block(512, 512, pool=True)
        self.res3 = nn.Sequential(conv_block(512, 512), conv_block(512, 512))
        self.drop3 = nn.Dropout(0.5)
        
        self.classifier = nn.Sequential(nn.MaxPool2d(6), 
                                        nn.Flatten(),
                                        nn.Linear(512, num_classes))
        
    def forward(self, xb):
        xb = to_device(xb,device)
        out = self.conv1(xb)
        out = self.conv2(out)
        out = self.res1(out) + out
        out = self.drop1(out)
        
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.res2(out) + out
        out = self.drop2(out)
        
        out = self.conv5(out)
        out = self.conv6(out)
        out = self.res3(out) + out
        out = self.drop3(out)
        
        out = self.classifier(out)
        return out

In [19]:
print(len(classes_train))

7


In [20]:
model = to_device(ResNet(1, len(classes_train)), device)

In [21]:
model = ResNet(1, len(classes_train))
model.load_state_dict(torch.load('./models/emotion_detection_acc0.5452366471290588.pth'))
model = to_device(model,device)

In [22]:
first_parameter = next(model.parameters())
input_shape = first_parameter.size()
print(input_shape)

torch.Size([128, 1, 3, 3])


In [23]:
param_size = 0
for param in model.parameters():
    param_size += param.nelement() * param.element_size()
buffer_size = 0
for buffer in model.buffers():
    buffer_size += buffer.nelement() * buffer.element_size()

size_all_mb = (param_size + buffer_size) / 1024**2
print('model size: {:.3f}MB'.format(size_all_mb))

model size: 41.149MB


In [24]:
print(f"gpu used {torch.cuda.max_memory_allocated(device=None)} memory")

gpu used 43154432 memory


## Обучение



In [25]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def predict(model, pred_loader):
    model.eval()
    outputs = [model.pred_step(batch) for batch in pred_loader]
    return [torch.max(el, dim=1)[1] for el in outputs]

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def fit_one_cycle(epochs, max_lr, model, train_loader, val_loader, 
                  weight_decay=0, grad_clip=None, opt_func=torch.optim.SGD):
    torch.cuda.empty_cache()
    history = []
    
    # Set up custom optimizer with weight decay
    optimizer = opt_func(model.parameters(), max_lr, weight_decay=weight_decay)
    # Set up one-cycle learning rate scheduler
    sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, 
                                                steps_per_epoch=len(train_loader))
    
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        train_losses = []
        lrs = []
        for batch in train_loader:
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            
            # Gradient clipping
            if grad_clip: 
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)
            
            optimizer.step()
            optimizer.zero_grad()
            
            # Record & update learning rate
            lrs.append(get_lr(optimizer))
            sched.step()
        
        # Validation phase
        result = evaluate(model, val_loader)
        
        result['train_loss'] = torch.stack(train_losses).mean().item()
        result['lrs'] = lrs
        model.epoch_end(epoch, result)
        history.append(result)
    return history

In [26]:
def train_model(epochs, max_lr, model, train_loader, val_loader, device,
                  weight_decay=0, grad_clip=None, opt_func=torch.optim.SGD):

    # The training configurations were not carefully selected.


    model.to(device)

    torch.cuda.empty_cache()
    history = []
    
    # Set up custom optimizer with weight decay
    optimizer = opt_func(model.parameters(), max_lr, weight_decay=weight_decay)
    # Set up one-cycle learning rate scheduler
    sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, 
                                                steps_per_epoch=len(train_loader))
    
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        train_losses = []
        lrs = []
        for batch in train_loader:
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            
            # Gradient clipping
            if grad_clip: 
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)
            
            optimizer.step()
            optimizer.zero_grad()
            
            # Record & update learning rate
            lrs.append(get_lr(optimizer))
            sched.step()
        
        # Validation phase
        result = evaluate(model, val_loader)
        print(result)
        #model.epoch_end(epoch, result)
        

    return model

In [27]:
epochs = 130
max_lr = 0.0008
grad_clip = 0.1
weight_decay = 1e-4
opt_func = torch.optim.Adam

## Quantization



In [28]:
import torch.quantization
from torch.ao.quantization import get_default_qat_qconfig_mapping
from torch.ao.quantization.quantize_fx import prepare_qat_fx, convert_fx
device = 'cpu'

In [29]:
qconfig_mapping = get_default_qat_qconfig_mapping("fbgemm")
example_inputs = torch.randn(batch_size, 1, 48, 48, requires_grad=True).to(device)

In [30]:
model = ResNet(1, len(classes_train))
model.load_state_dict(torch.load('./models/emotion_detection_acc0.5452366471290588.pth'))
model = to_device(model,device)

In [31]:
prepared_model_static = prepare_qat_fx(model, qconfig_mapping, example_inputs)



In [32]:
for batch_idx, (data, target) in enumerate(train_dl):
    prepared_model_static(data)
    if batch_idx % 10 == 0:
        break
        print("Batch %d/%d complete, continue ..." %(batch_idx+1, len(valid_dl)))

In [33]:
quantized_model = convert_fx(prepared_model_static)

In [34]:
print('Original model:')
print(model)
print('')
print('Quantized model:')
print(quantized_model)

Original model:
ResNet(
  (conv1): Sequential(
    (0): Conv2d(1, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ELU(alpha=1.0, inplace=True)
  )
  (conv2): Sequential(
    (0): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ELU(alpha=1.0, inplace=True)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (res1): Sequential(
    (0): Sequential(
      (0): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ELU(alpha=1.0, inplace=True)
    )
    (1): Sequential(
      (0): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, trac

In [35]:
import os

# save the model and check the model size
def print_size_of_model(model, label=""):
    torch.save(model.state_dict(), "temp.p")
    size=os.path.getsize("temp.p")
    print("model: ",label,' \t','Size (KB):', size/1e3)
    os.remove('temp.p')
    return size


In [36]:
f=print_size_of_model(model,"fp32")
q=print_size_of_model(quantized_model,"int8")
print("{0:.2f} times smaller".format(f/q))

model:  fp32  	 Size (KB): 43169.862
model:  int8  	 Size (KB): 10863.994
3.97 times smaller


In [37]:
def quant_validation_epoch_end(outputs):
    batch_losses = [x['val_loss'] for x in outputs]
    epoch_loss = torch.stack(batch_losses).mean()
    batch_accs = [x['val_acc'] for x in outputs]
    epoch_acc = torch.stack(batch_accs).mean()
    return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
def quant_evaluate(model, val_loader):
    outputs = [quant_validation_step(model,batch) for batch in val_loader]
    return quant_validation_epoch_end(outputs)
def quant_validation_step(model, batch):
    print(1)
    images, labels = batch 
    out = model(images)
    loss = F.cross_entropy(out, labels)
    acc = accuracy(out, labels)
    return {'val_loss': loss.detach(), 'val_acc': acc}


In [71]:
device = 'cuda'
valid_dl = DeviceDataLoader(valid_dl, devi)

In [72]:
quantized_model = to_device(quantized_model,device)

In [38]:
eval_accuracy = quant_evaluate(quantized_model, valid_dl)



print(eval_accuracy)


1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
{'val_loss': 1.224511742591858, 'val_acc': 0.5483987331390381}


In [39]:
print(batch_size)
speedtest_inputs = torch.randn(100, 1, 48, 48, requires_grad=True).to(device)

print("Floating point FP32: ")
%timeit model.forward(speedtest_inputs)

print("Quantized INT8: ")
%timeit quantized_model.forward(speedtest_inputs)

200
Floating point FP32: 
7.52 s ± 247 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Quantized INT8: 
8.29 s ± 172 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [70]:
torch.save(quantized_model.state_dict(), "quant_model_0.556.pth")

In [75]:
model_scripted = torch.jit.script(quantized_model) # Export to TorchScript
model_scripted.save('quant_model_scripted.pt')