In [None]:
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision import transforms, utils

from torch import nn, optim
import torch.nn.functional as F

import warnings
warnings.filterwarnings('ignore')

IMG_SIZE = 96
data_dir = Path('./input')
train_data = pd.read_csv(data_dir/'training.csv')
train_data.T

In [None]:


def show_keypoints(image, keypoints):
    
      
    plt.imshow(image, cmap='gray')
    if len(keypoints):
        plt.scatter(keypoints[:, 0], keypoints[:, 1], s=24, marker ='.', c='r')
        
def show_images(df, indxs, ncols=5, figsize=(15,10), with_keypoints=True):
    plt.figure(figsize=figsize)
    nrows = len(indxs) // ncols + 1
    for i, idx in enumerate(indxs):
        image = np.fromstring(df.loc[idx, 'Image'], sep=' ').astype(np.float32)\
                .reshape(-1, IMG_SIZE)
        if with_keypoints:
            keypoints = df.loc[idx].drop('Image').values.astype(np.float32)\
                        .reshape(-1, 2)
        else:
            keypoints = []
        plt.subplot(nrows, ncols, i + 1)
        plt.title(f'Sample #{idx}')
        plt.axis('off')
        plt.tight_layout()
        show_keypoints(image, keypoints)
    plt.show()

In [None]:
show_images(train_data, range(4))

In [None]:
train_df = train_data.dropna()
train_df.info()

In [None]:
test_data = pd.read_csv(data_dir / 'test.csv')
test_data.info()

In [None]:
test_data.head()

In [None]:
class FaceKeypointsDataset(Dataset):

    
    def __init__(self, dataframe, train=True, transform=None):
        self.dataframe = dataframe
        self.train = train
        self.transform = transform
        
    def __len__(self):
        return len(self.dataframe)
    
    def __getitem__(self, idx):
        image = np.fromstring(self.dataframe.iloc[idx, -1], sep=' ')\
                .astype(np.float32).reshape(-1, IMG_SIZE)
        
        if self.train:
            keypoints = self.dataframe.iloc[idx, :-1].values.astype(np.float32)
        else:
            keypoints = None

        sample = {'image': image, 'keypoints': keypoints}
        
        
        if self.transform:
            sample = self.transform(sample)
            
        return sample

In [None]:
class Normalize(object):

    
    def __call__(self, sample):
        image, keypoints = sample['image'], sample['keypoints']
        
        return {'image': image / 255., # scale to [0, 1]
                'keypoints': keypoints}
        
class ToTensor(object):


    def __call__(self, sample):
        image, keypoints = sample['image'], sample['keypoints']

        # swap color axis because
        # numpy image: H x W x C
        # torch image: C X H X W
        image = image.reshape(1, IMG_SIZE, IMG_SIZE)
        image = torch.from_numpy(image)
        
        if keypoints is not None:
            keypoints = torch.from_numpy(keypoints)
            return {'image': image, 'keypoints': keypoints}
        else:
            return {'image': image}


In [None]:
from torch.utils.data.sampler import SubsetRandomSampler

def prepare_train_valid_loaders(trainset, valid_size=0.2, 
                                batch_size=128):
 
    
    
    num_train = len(trainset)
    indices = list(range(num_train))
    np.random.shuffle(indices)
    split = int(np.floor(valid_size * num_train))
    train_idx, valid_idx = indices[split:], indices[:split]
    
    
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)
    
    
    train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                               sampler=train_sampler)
    valid_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                               sampler=valid_sampler)
    
    return train_loader, valid_loader

In [None]:

train_df = train_data.dropna()
test_df = test_data


batch_size = 128

valid_size = 0.2


tsfm = transforms.Compose([Normalize(), ToTensor()])


trainset = FaceKeypointsDataset(train_df, transform=tsfm)
testset = FaceKeypointsDataset(test_df, train=False, transform=tsfm)


train_loader, valid_loader = prepare_train_valid_loaders(trainset, 
                                                         valid_size,
                                                         batch_size)

test_loader = torch.utils.data.DataLoader(testset, batch_size=batch_size)

In [None]:
from torch import nn, optim
import torch.nn.functional as F

class MLP(nn.Module):
    def __init__(self, input_size, output_size, hidden_layers, drop_p =0.5):
        
       
        
        super(MLP, self).__init__()
        
        
        layer_sizes = [(input_size, hidden_layers[0])] \
                      + list(zip(hidden_layers[:-1], hidden_layers[1:]))
        self.hidden_layers = nn.ModuleList([nn.Linear(h1, h2) 
                                            for h1, h2 in layer_sizes])
        
        self.output = nn.Linear(hidden_layers[-1], output_size)
        
        self.dropout = nn.Dropout(drop_p)
        
    def forward(self, x):
    

        x = x.view(x.shape[0], -1)
        
        for layer in self.hidden_layers:
            x = F.relu(layer(x))
            x = self.dropout(x)
        x = self.output(x)    
        return x


In [None]:
model = MLP(input_size=IMG_SIZE*IMG_SIZE, output_size=30, 
            hidden_layers=[128, 64], drop_p=0.1)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

In [None]:
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.003)

In [None]:
def train(train_loader, valid_loader, model, criterion, optimizer, 
          n_epochs=50, saved_model='model.pt'):
    

    valid_loss_min = np.Inf

    train_losses = []
    valid_losses = []

    for epoch in range(n_epochs):
    
        train_loss = 0.0
        valid_loss = 0.0

        
        model.train() 
        for batch in train_loader:
            
            optimizer.zero_grad()
            
            output = model(batch['image'].to(device))
            
            loss = criterion(output, batch['keypoints'].to(device))
        
            loss.backward()
            
            optimizer.step()
    
            train_loss += loss.item()*batch['image'].size(0)

        
        model.eval()
        for batch in valid_loader:
        
            output = model(batch['image'].to(device))

            loss = criterion(output, batch['keypoints'].to(device))
            
            valid_loss += loss.item()*batch['image'].size(0)


        train_loss = np.sqrt(train_loss/len(train_loader.sampler.indices))
        valid_loss = np.sqrt(valid_loss/len(valid_loader.sampler.indices))

        train_losses.append(train_loss)
        valid_losses.append(valid_loss)

        print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'
              .format(epoch+1, train_loss, valid_loss))


        if valid_loss <= valid_loss_min:
            print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'
                  .format(valid_loss_min, valid_loss))
            torch.save(model.state_dict(), saved_model)
            valid_loss_min = valid_loss
            
    return train_losses, valid_losses


In [None]:
train_losses, valid_losses = train(train_loader, valid_loader, model,
                                   criterion, optimizer, n_epochs=50, 
                                   saved_model='model.pt')

In [None]:
def plot_RMSE(train_losses, valid_losses, y_max=50):
    plt.plot(train_losses, "--", linewidth=3, label="train")
    plt.plot(valid_losses, linewidth=3, label="val")
    plt.legend()
    plt.grid()
    plt.xlabel('Epoch')
    plt.ylabel('RMSE')
    plt.ylim((0, y_max))
    plt.show()

In [None]:
plot_RMSE(train_losses, valid_losses, y_max=40)

In [None]:
def predict(data_loader, model):
    
    
    model.eval()

    with torch.no_grad():
        for i, batch in enumerate(data_loader):
            output = model(batch['image'].to(device)).cpu().numpy()
            if i == 0:
                predictions = output
            else:
                predictions = np.vstack((predictions, output))
    
    return predictions

In [None]:
def view_pred_df(columns, test_df, predictions, image_ids=range(1,6)):
    
    pred_df = pd.DataFrame(predictions, columns=columns)
    pred_df = pd.concat([pred_df, test_df], axis=1)
    pred_df = pred_df.set_index('ImageId')
    show_images(pred_df, image_ids) 

In [None]:
model.load_state_dict(torch.load('model.pt'))
predictions = predict(test_loader, model)
columns = train_df.drop('Image', axis=1).columns
view_pred_df(columns, test_df, predictions)

In [None]:
class RandomHorizontalFlip(object):
    
    
    def __init__(self, p=0.5):
        self.p = p

    def __call__(self, sample):
        
        flip_indices = [(0, 2), (1, 3),
                        (4, 8), (5, 9), (6, 10), (7, 11),
                        (12, 16), (13, 17), (14, 18), (15, 19),
                        (22, 24), (23, 25)]
        
        image, keypoints = sample['image'], sample['keypoints']
        
        if np.random.random() < self.p:
            image = image[:, ::-1]
            if keypoints is not None:
                for a, b in flip_indices:
                    keypoints[a], keypoints[b]= keypoints[b], keypoints[a]
                keypoints[::2] = 96. - keypoints[::2]
        
        return {'image': image, 
                'keypoints': keypoints}

In [None]:
aug_train_df = train_data.dropna()
aug_transform = transforms.Compose([RandomHorizontalFlip(p=1.0), 
                                    Normalize(),
                                    ToTensor()])
aug_trainset = FaceKeypointsDataset(aug_train_df, transform=aug_transform)

In [None]:
plt.subplot(1,2,1)
plt.title(f'Original')
plt.axis('off')
plt.tight_layout()
image = trainset[0]['image'].numpy().squeeze()
keypoints = trainset[0]['keypoints'].numpy().reshape(-1,2)
show_keypoints(image, keypoints)
plt.subplot(1,2,2)
plt.title(f'Horizontal Flip')
plt.axis('off')
plt.tight_layout()
image = aug_trainset[0]['image'].numpy().squeeze()
keypoints = aug_trainset[0]['keypoints'].numpy().reshape(-1,2)
show_keypoints(image, keypoints)

In [None]:
def show_dataset_images(dataset, n_images=10, n_cols=5, figsize=(15,10)):
   
        
    plt.figure(figsize=figsize)
    n_rows = n_images // n_cols + 1
    for idx in range(n_images):
        image = dataset[idx]['image'].numpy().squeeze()
        keypoints = dataset[idx]['keypoints'].numpy().reshape(-1,2)
        plt.subplot(n_rows, n_cols, idx+1)
        plt.grid(False)
        plt.tight_layout()
        show_keypoints(image, keypoints)

In [None]:
show_dataset_images(trainset, 4, 2, (6,8))

In [None]:
show_dataset_images(aug_trainset, 4, 2, (6,8))

In [None]:

batch_size = 128

valid_size = 0.2



aug_train_df = train_data.dropna()


aug_tfms = transforms.Compose([RandomHorizontalFlip(p=0.5),
                                    Normalize(),
                                    ToTensor()])

aug_trainset = FaceKeypointsDataset(aug_train_df, transform=aug_tfms)


aug_train_loader, aug_valid_loader = prepare_train_valid_loaders(aug_trainset, 
                                                                 valid_size,
                                                                 batch_size)


In [None]:
model = MLP(input_size=IMG_SIZE*IMG_SIZE, output_size=30, hidden_layers=[128, 64], drop_p=0.1)
model = model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.003)

In [None]:
aug_train_losses, aug_valid_losses = train(aug_train_loader, aug_valid_loader, model, criterion, 
                                           optimizer, n_epochs=50, saved_model='aug_model.pt')

In [None]:
plot_RMSE(aug_train_losses, aug_valid_losses, y_max=40)

In [None]:
model.load_state_dict(torch.load('aug_model.pt'))
predictions = predict(test_loader, model)
columns = train_df.drop('Image', axis=1).columns
view_pred_df(columns, test_df, predictions)

In [None]:
class CNN(nn.Module):
    
    
    def __init__(self, outputs=30):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64*12*12, 1024)
        self.fc2 = nn.Linear(1024, outputs)
        self.dropout = nn.Dropout(0.3)
        
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(-1, 64*12*12)
        x = F.relu(self.fc1(self.dropout(x)))
        x = self.fc2(self.dropout(x))
        
        return x


In [None]:
model = CNN(outputs=30)
model = model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.003)

In [None]:
aug_cnn_train_losses, aug_cnn_valid_losses = train(aug_train_loader, aug_valid_loader, model, criterion, 
                                                   optimizer, n_epochs=50, saved_model='aug_cnn.pt')


In [None]:
plot_RMSE(aug_cnn_train_losses, aug_cnn_valid_losses, y_max=40)

In [None]:
model.load_state_dict(torch.load('aug_cnn.pt'))

predictions = predict(test_loader, model)
view_pred_df(columns, test_df, predictions)

In [None]:
train_data.info()

In [None]:
datasets = {'L': ['left_eye_center_x', 'left_eye_center_y',
                  'right_eye_center_x','right_eye_center_y',
                  'nose_tip_x', 'nose_tip_y',
                  'mouth_center_bottom_lip_x', 'mouth_center_bottom_lip_y',
                  'Image'
                 ], 
            'S': ['left_eye_inner_corner_x','left_eye_inner_corner_y', 
                  'left_eye_outer_corner_x', 'left_eye_outer_corner_y', 
                  'right_eye_inner_corner_x', 'right_eye_inner_corner_y', 
                  'right_eye_outer_corner_x', 'right_eye_outer_corner_y', 
                  'left_eyebrow_inner_end_x', 'left_eyebrow_inner_end_y', 
                  'left_eyebrow_outer_end_x', 'left_eyebrow_outer_end_y', 
                  'right_eyebrow_inner_end_x', 'right_eyebrow_inner_end_y', 
                  'right_eyebrow_outer_end_x', 'right_eyebrow_outer_end_y',
                  'mouth_left_corner_x', 'mouth_left_corner_y', 
                  'mouth_right_corner_x', 'mouth_right_corner_y', 
                  'mouth_center_top_lip_x', 'mouth_center_top_lip_y',
                  'Image'
                 ]
           }

In [None]:
class RandomHorizontalFlip(object):
    
    
    def __init__(self, p=0.5, dataset='A'):
        
        
        self.p = p
        self.dataset = dataset

    def __call__(self, sample):
        
        if self.dataset == 'L':
            flip_indices = [(0, 2), (1, 3)]
        elif self.dataset == 'S':
            flip_indices = [(0, 4), (1, 5), (2, 6), (3, 7),
                            (8, 12), (9, 13), (10, 14), (11, 15),
                            (16, 18), (17, 19)]
        else:
            flip_indices = [(0, 2), (1, 3),
                            (4, 8), (5, 9), (6, 10), (7, 11),
                            (12, 16), (13, 17), (14, 18), (15, 19),
                            (22, 24), (23, 25)]
        
        image, keypoints = sample['image'], sample['keypoints']
        
        if np.random.random() < self.p:
            image = image[:, ::-1]
            if keypoints is not None:
                for a, b in flip_indices:
                    keypoints[a], keypoints[b]= keypoints[b], keypoints[a]
                keypoints[::2] = 96. - keypoints[::2]
        
        return {'image': image, 
                'keypoints': keypoints}

In [None]:
L_aug_df = train_data[datasets['L']].dropna()

L_aug_df.info()


In [None]:

L_aug_tfms = transforms.Compose([RandomHorizontalFlip(p=0.5, dataset='L'),
                                 Normalize(), ToTensor()])


L_aug_trainset = FaceKeypointsDataset(L_aug_df, transform=L_aug_tfms)



L_aug_train_loader, L_aug_valid_loader = prepare_train_valid_loaders(L_aug_trainset, 
                                                                     valid_size,
                                                                     batch_size)

In [None]:
outputs = len(datasets['L']) - 1
model = CNN(outputs)
model = model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.003)

In [None]:
L_train_losses, L_valid_losses = train(L_aug_train_loader, L_aug_valid_loader, 
                                   model, criterion, optimizer, 
                                   n_epochs=50, saved_model='L_aug_cnn.pt')

In [None]:
plot_RMSE(L_train_losses, L_valid_losses, y_max=20)

In [None]:
model.load_state_dict(torch.load('L_aug_cnn.pt'))

L_predictions = predict(test_loader, model)

L_columns = L_aug_df.drop('Image', axis=1).columns

view_pred_df(L_columns, test_df, L_predictions)

In [None]:
S_aug_df = train_data[datasets['S']].dropna()
S_aug_df.info()

In [None]:

S_aug_tfms = transforms.Compose([RandomHorizontalFlip(p=0.5, dataset='S'),
                                 Normalize(), ToTensor()])

S_aug_trainset = FaceKeypointsDataset(S_aug_df, transform=S_aug_tfms)



S_aug_train_loader, S_aug_valid_loader = prepare_train_valid_loaders(S_aug_trainset, 
                                                                     valid_size,
                                                                     batch_size)

In [None]:
outputs = len(datasets['S']) - 1
model = CNN(outputs)
model = model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.003)

In [None]:
S_train_losses, S_valid_losses = train(S_aug_train_loader, S_aug_valid_loader, 
                                   model, criterion, optimizer, 
                                   n_epochs=50, saved_model='S_aug_cnn.pt')

In [None]:
plot_RMSE(S_train_losses, S_valid_losses, y_max= 20)

In [None]:
model.load_state_dict(torch.load('S_aug_cnn.pt'))

S_predictions = predict(test_loader, model)

S_columns = S_aug_df.drop('Image', axis=1).columns

view_pred_df(S_columns, test_df, S_predictions)

In [None]:
predictions = np.hstack((L_predictions, S_predictions))
columns = list(L_columns) + list(S_columns)
view_pred_df(columns, test_df, predictions)