In [None]:
#!kaggle competitions download -c facial-keypoints-detection

In [None]:
# if keypoint dataset is unzipped
#!unzip facial-keypoints-detection.zip
#!ls

In [None]:
# if training is unzipped
#!unzip training.zip
#!ls

In [None]:
# I took this kaggle problem
# https://www.kaggle.com/karanjakhar/facial-keypoint-detection

import pandas as pd
# from google.colab import drive
import numpy as np
import matplotlib.pyplot as plt
import random
import torch

In [None]:
print(torch.__version__)

In [None]:
data = pd.read_csv('./datasets/facial_keypoints/training.csv') 
print(data.columns)
print(data.head())
print(data.isnull().any().value_counts())
data.fillna(method = 'ffill',inplace = True)
print(data.isnull().any().value_counts())

In [None]:
num_train_data = len(data)
pixel_list = []
for i in range(num_train_data):
    row = data['Image'][i].split(' ')
    pixel = ['0' if x == '' else x for x in row] # handling empty image pixels
    pixel_list.append(pixel)

In [None]:
#images = data['Image'].replace(r'^\s*$', '0')

In [None]:
# Pytorch takes channels in the second dimension. For that, I swap axes (dimensions)
image_tensor = np.array(pixel_list, dtype = 'float')
print(np.shape(image_tensor))
image_tensor = image_tensor.reshape(-1, 96, 96, 1)
image_tensor = np.swapaxes(image_tensor, 2, 3)
image_tensor = np.swapaxes(image_tensor, 1, 2)
print(np.shape(image_tensor))

In [None]:
labels = data.drop('Image',axis = 1)

label_list = []
for i in range(num_train_data):
    label = labels.iloc[i,:]
    label_list.append(label)
label_tensor = np.array(label_list,dtype = 'float')

In [None]:
from matplotlib.pyplot import Circle

index = random.randint(0,1000)

fig, ax = plt.subplots(1)
ax.set_aspect('equal')

ax.imshow(image_tensor[index].reshape(96,96),cmap='gray')

for xx, yy in label_tensor[index].reshape((15,2)):
    circ = Circle((xx,yy),2,color='red')
    ax.add_patch(circ)

In [None]:
train_len = 6000
img_and_label = []
for i in range(train_len):
    img_and_label.append([image_tensor[i], label_tensor[i]])

# we use Dataloader objects in pytorch to easily iterate on our dataset while performing training loops
train_loader = torch.utils.data.DataLoader(img_and_label, shuffle=True, batch_size=500)
img1, lbl1 = next(iter(train_loader))
print("first training batch: \n" + "input shape: " + str(img1.shape) + "\n" + "label shape: " + str(lbl1.shape))

test_data = []
for i in range(train_len, num_train_data): # since we have no labels for real test data!
    test_data.append([image_tensor[i], label_tensor[i]])

test_loader = torch.utils.data.DataLoader(test_data, shuffle=True, batch_size=500)
test1, tlbl1 = next(iter(test_loader))
print("test batch: \n" + "input shape: " + str(test1.shape) + "\n" + "label shape: " + str(tlbl1.shape))

In [None]:
import torch.nn as nn
import torch.nn.functional as F

# we write our networks as class instances. dont forget to inherit from nn.Module
class Net(nn.Module):
    # we always need an init method to define our output matrices (similar to nodes in graph)
    def __init__(self):
        super(Net, self).__init__()
        
        self.max_pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.leaky_relu = nn.LeakyReLU(0.1)
        
        self.conv1 = nn.Conv2d(1, 32, 5) #1, 32
        self.conv1_bn = nn.BatchNorm2d(32)
        
        self.conv2 = nn.Conv2d(32, 64, 5) #32, 64
        self.conv2_bn = nn.BatchNorm2d(64)

        #self.conv3 = nn.Conv2d(16, 32, 5) #64, 128
        #self.conv3_bn = nn.BatchNorm2d(32)

        #self.conv4 = nn.Conv2d(32, 64, 5) #128, 256
        #self.conv4_bn = nn.BatchNorm2d(64)
        
        self.fc1 = nn.Linear(64 * 21 * 21, 120)
        self.fc1_bn = nn.BatchNorm1d(120)
        self.fc2 = nn.Linear(120, 84)
        self.fc2_bn = nn.BatchNorm1d(84)
        self.fc3 = nn.Linear(84, 30)
        
    # we always need an forward method to draw our computational graph (similar to completing the graph with edges)
    def forward(self, x):

        x = self.max_pool(self.leaky_relu(self.conv1_bn(self.conv1(x))))
        x = self.max_pool(self.leaky_relu(self.conv2_bn(self.conv2(x))))
        #x = self.leaky_relu(self.conv3_bn(self.conv3(x)))
        #x = self.max_pool(self.leaky_relu(self.conv4_bn(self.conv4(x))))

        # vectorize (flatten)
        x = x.reshape(-1, 64 * 21 * 21)
        #x = torch.flatten(x)
        #x = torch.reshape(x, (input_shape, -1))
        x = torch.sigmoid(self.fc1_bn(self.fc1(x)))        
        x = torch.sigmoid(self.fc2_bn(self.fc2(x)))
        x = self.fc3(x)
        return x

inzvaNet = Net()

In [None]:
import torch.optim as optim

criterion = nn.MSELoss()
optimizer = optim.SGD(inzvaNet.parameters(), lr=0.0001, momentum=0.9)

In [None]:
inzvaNet = inzvaNet.float()

In [None]:
#we get info on our gpu, put it in the variable "device"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Assuming that we are on a CUDA machine, this should print a CUDA device:

print(device)

#we carry our model into gpu
inzvaNet.to(device)

In [None]:
# our training loop
# check for free memory option -> this code may lead to memory explosion if grads are not cleared, etc.
for epoch in range(10):  # loop over the dataset multiple times

    running_loss = 0.0

    # here we use the dataloader object. it performs .next() operation in each iteration of the loop
    # we also group our batches with numbers. we do this with enumerate. we do this so we can know in which batch we are 
    for i, data in enumerate(train_loader, start = 0):
        # get the inputs; data is a list of [inputs, labels]

        #inputs, labels = data
        inputs, labels = data[0].float().to(device), data[1].float().to(device)
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = inzvaNet(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 100 == 0:    # print every 2000 mini-batches
            print('Epoch %d Loss: %.3f' %
                  (epoch + 1, running_loss / 2000))
            running_loss = 0.0

print('Finished Training')

In [None]:
rand_test = random.randint(0,49)
test_batch = next(iter(test_loader))

test_batch_data = test_batch[0].float().to(device)
test_batch_label = test_batch[1].float().to(device)

preds = inzvaNet(test_batch_data).cpu()

fig, ax = plt.subplots(1)
ax.set_aspect('equal')

ax.imshow(test_batch_data[rand_test].cpu().view((96,96)), cmap = 'gray')

for xx, yy in preds[rand_test].reshape((15,2)):
    circ = Circle((xx, yy), 2, color='red')
    ax.add_patch(circ)

In [None]:
fig, ax = plt.subplots(1)
ax.set_aspect('equal')
ax.imshow(test_batch_data[rand_test].cpu().view((96,96)), cmap = 'gray')

for xx, yy in test_batch_label[rand_test].reshape((15,2)):
    circ = Circle((xx, yy), 2, color='red')
    ax.add_patch(circ)