In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import os
from PIL import Image

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

!pip install albumentations
import albumentations as A
from albumentations.pytorch import ToTensorV2

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
from torchvision.models import resnet50

In [None]:
!unzip "/content/drive/MyDrive/archive (1).zip" -d "/content/data"

# 1. Исследую данные

In [None]:
train_path = '/content/data/training_frames_keypoints.csv'
test_path = '/content/data/test_frames_keypoints.csv'

train = pd.read_csv(train_path)
test = pd.read_csv(test_path)

In [None]:
train.head()

In [None]:
train.info()

In [None]:
test.head()

In [None]:
test.info()

# 2. Config

In [None]:
config = {
    'batch_size'    : 64,
    'num_workers'   : 0,
    'total_epochs'  : 201,
    'save_epoch'    : 10,
    'learning_rate' : 0.001,
    'train_path'    : '/content/data/training',
    'test_path'     : '/content/data/test',
    'save_path'     : './weights',
    'input_size'    : 100,
}

# 3. Dataset

In [None]:
class MyDataset(Dataset):
    def __init__(self, images_path, image_names, landmarks, transform=None):
        super().__init__()
        self.images_path = images_path
        self.image_names = image_names
        self.landmarks = landmarks
        self.transform = transform

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, index):
        # Путь к изображению
        img_name = self.image_names[index]
        img_path = self.images_path + '/' + img_name

        img = cv2.imread(img_path)

        landmarks_img = np.array(self.landmarks.iloc[index])
        landmarks_img = landmarks_img.reshape(-1, 2)

        if self.transform:
            transformed = self.transform(image=img, keypoints=landmarks_img)

            img = transformed["image"]

            landmarks_img = transformed["keypoints"]
            landmarks_img = np.array(landmarks_img).flatten()

        return img, landmarks_img


In [None]:
train_image_names = train['Unnamed: 0']
train_landmarks = train.drop('Unnamed: 0', axis=1)
test_image_names = test['Unnamed: 0']
test_landmarks = test.drop('Unnamed: 0', axis=1)

In [None]:
train_transform = A.Compose([
    A.Resize(220, 220),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy', remove_invisible=False))

test_transform = A.Compose([
    A.Resize(220, 220),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy', remove_invisible=False))

In [None]:
train_data = MyDataset(
    config['train_path'],
    train_image_names,
    train_landmarks,
    train_transform
    )

test_data = MyDataset(
    config['test_path'],
    test_image_names,
    test_landmarks,
    test_transform
    )

In [None]:
l = train_data[0][1]
l = np.array(l)

x = l[::2]
y = l[1::2]

In [None]:
train_data[0][0].shape

In [None]:
plt.scatter(x, y, s=5)
plt.imshow(train_data[0][0][0], cmap='gray')

# 4. DataLoader

In [None]:
train_loader = DataLoader(
    train_data,
    batch_size=config['batch_size'],
    shuffle=True,
    num_workers=config['num_workers'],
    drop_last=True,
    pin_memory=True
    )

test_loader = DataLoader(
    test_data,
    batch_size=config['batch_size'],
    shuffle=False,
    num_workers=config['num_workers'],
    pin_memory=True
    )

# 5. Архитектура: train, test

In [None]:
def train(model, device, train_loader, criterion, optimizer):
    losses = np.array([])

    model.train()

    for img, landmarks in train_loader:
        img, landmarks = img.to(device), landmarks.to(device)
        img = img.float()
        landmarks = landmarks.float()

        output = model(img)
        loss = criterion(output, landmarks)

        losses = np.append(losses, loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return losses.mean()


In [None]:
def test(model, device, test_loader, criterion):
    losses = np.array([])

    model.eval()

    with torch.no_grad():
        for img, landmarks in test_loader:
            img, landmarks = img.to(device), landmarks.to(device)
            img = img.float()
            landmarks = landmarks.float()

            output = model(img)
            loss = criterion(output, landmarks)

            losses = np.append(losses, loss.item())

    return losses.mean()

# 6. Модель, оптимизатор и функция потерь

In [175]:
class SimpleCNNModel(nn.Module) :
    def __init__(self):
        super().__init__()
        self.model = resnet50(pretrained=True)
        self.model.fc = nn.Linear(2048, 68*2)

    def forward(self, x) :
        output = self.model(x)
        return output

In [None]:
device = torch.device('cpu')

if torch.cuda.is_available():
    device = torch.device('cuda:0')

print('Device:', device)

In [None]:
model = SimpleCNNModel().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate'])
criterion = nn.SmoothL1Loss().to(device)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5, eta_min=0.0001)

# 7. Обучение!!!

In [None]:
def save_checkpoint(save_path, state, epoch, tag=''):
    if not os.path.exists(save_path):
        os.makedirs(save_path)

    filename = os.path.join(save_path, "{}checkpoint-{:06}.pth.tar".format(tag, epoch))
    torch.save(state, filename)

In [None]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

In [None]:
from IPython.display import clear_output
%matplotlib inline

In [None]:
print('Начало обучения!!!')
log = {"epoch": [], "train_loss": [],  "val_loss": []}

for epoch in range(config['total_epochs']):
    train_loss = train(model, device, train_loader, criterion, optimizer)
    test_loss = test(model, device, test_loader, criterion)

    if epoch % config['save_epoch'] == 0:
        state = {
            'epoch': epoch,
            'state_dict': model.state_dict(),
            'loss': test_loss,
            'optimizer': optimizer.state_dict(),
            'criterion': criterion.state_dict()
        }
        save_checkpoint(config['save_path'], state, epoch, '')

    log['epoch'].append(epoch)
    log['train_loss'].append(train_loss)
    log['val_loss'].append(test_loss)

    clear_output(wait=True)
    plt.plot(log['epoch'], log['train_loss'], label='train')
    plt.plot(log['epoch'], log['val_loss'], label='val')
    plt.legend()
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.title('Loss')
    plt.show()

    line = '[{}/{}]\t\tLR: {:.2}\t\tTrain loss: {:.3}\t\tVal loss: {:.3}'.format(
        epoch,
        config['total_epochs'] - 1,
        get_lr(optimizer),
        train_loss,
        test_loss
    )

    print(line)

    scheduler.step()

print('КОНЕЦ!!! НАКОНЕЦ!!!')

In [None]:
model.cpu()

# 8. Проверка

In [192]:
def show_img_with_landmarks(img, landmarks, predict):
    x = landmarks[::2]
    y = landmarks[1::2]

    x_pred = predict.detach().numpy()[0, ::2]
    y_pred = predict.detach().numpy()[0, 1::2]

    size = 10
    color = '#00ff5f'
    color_pred = '#ec5353'

    plt.scatter(x, y, s=size, c=color)
    plt.scatter(x_pred, y_pred, s=size, c=color_pred)
    plt.imshow(img[0][0], cmap='gray')


In [None]:
model.load_state_dict(torch.load('/content/drive/MyDrive/checkpoint-000050.pth.tar', map_location=torch.device('cpu'))['state_dict'])
model.eval()

In [None]:
index = 1

test_img = torch.tensor(np.expand_dims(train_data[index][0], axis=0)).float()
test_l = train_data[index][1]
pred_l = model(test_img)

show_img_with_landmarks(test_img, test_l, pred_l)

In [None]:
test_img.shape