In [68]:
import data_loader

import torch
import torch.nn as nn

import numpy as np

import random
import normalizer
from PIL import Image, ImageDraw, ImageFont

import cv2

import models
import modelutils

In [None]:
class RMSELoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()
        
    def forward(self, yhat, y):
        return torch.sqrt(self.mse(yhat, y))




train = data_loader.load("I:/NSU/CV/tests/torch/data/train/coords",
                         "I:/NSU/CV/tests/torch/data/train/images", 
                        firstn = 6000, batchSize = 16, shuffle = True, 
                        displace = True, size = 400, show = False)



scaler = normalizer.MinMaxNormalizer()
scaler.fit([y for _, y in train])

print("Number of batches:", len(train))
for x, y in train:
   print(x.shape, y.shape)
 
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("devise is: ", device)



In [None]:
train[0][0].shape

In [5]:
mouth_pointlist = [44, 7, 33, 14, 2, 31, 49, 15, 42, 32, 9, 51, 38, 61,
    18, 23, 12, 47, 67, 1, 2]
mouth_boundaries = [7, 14, 15, 67]
eye_L_pointlist = [62, 65, 0, 13, 34, 64]
eye_R_pointlist = [16, 36, 54, 55, 53, 63]

In [194]:

def coords_to_img(coordsbatch, idlist_list, imgsize, blobsize, bglevel):
    imglist = []
    for coords in coordsbatch:
        true_cord_list = []
        for idlist in idlist_list:
            midx = 0.0
            midy = 0.0
            for id in idlist:
                midx += coords[id][0]
                midy += coords[id][1]
            midx /= len(idlist)
            midy /= len(idlist)
            img = np.zeros((imgsize, imgsize), dtype=np.float32)
            x = int(midx * imgsize)
            y = int(midy * imgsize)
            true_cord_list.append(np.array([x, y]))

        for cord in true_cord_list:
            img[cord[1], cord[0]] = 15 * blobsize
        
        img = cv2.GaussianBlur(img, (151, 151), sigmaX=blobsize, borderType=cv2.BORDER_REPLICATE)
        
        for cord in true_cord_list:
            cv2.circle(img, (cord[0], cord[1]), 1, 6, -1)
        
    # img[y, x] = 15000
    # img = cv2.GaussianBlur(img, (351, 351), sigmaX=50, borderType=cv2.BORDER_REPLICATE)
    # cv2.circle(img, (x, y), 0, 6, -1)
    # img = cv2.GaussianBlur(img, (11, 11), sigmaX=1)

        img = cv2.GaussianBlur(img, (11, 11), sigmaX=2)
        img += bglevel
        imglist.append(torch.from_numpy(img).unsqueeze(0))
    return torch.stack(imglist)

In [7]:
def get_mean_coords(landmarks_list, tensor):
    x = torch.zeros(tensor.shape[0]).to(device)
    y = torch.zeros(tensor.shape[0]).to(device)
    for id in landmarks_list:
        x += tensor[:,id,0]
        y += tensor[:,id,1]
    x /= len(landmarks_list)
    y /= len(landmarks_list)
    return x, y

In [196]:
class WMSELoss(nn.Module):
    def __init__(self,  device, l2_lambda = 0.0):
        super().__init__()
        self.device = device
        self.l2_lambda = l2_lambda
        
    def forward(self, x, y,weightmap, parameters = None):
        ls = ((x-y) * weightmap)**2
        sm = ls.sum() / (x.shape[0] * x.shape[1])
        if parameters is None:
            return sm
        
        pk = 0.0
        smp = 0.0
        if parameters is not None:
            for param in parameters:
                smp += (param**2).mean()
                pk += 1

        return sm + (smp / float(pk)) * self.l2_lambda

In [183]:
class CnnDetector(nn.Module):
    def __init__(self, device):
        super(CnnDetector, self).__init__()
        self.adpool = nn.AdaptiveAvgPool2d((128, 128)).to(device)
        self.pool = nn.MaxPool2d(2, 2).to(device)
        self.conv1 = nn.Conv2d(3, 3, 7, padding = 3).to(device) 
        self.conv2 = nn.Conv2d(3, 3, 3, padding = 1).to(device)
        self.conv3 = nn.Conv2d(3, 3, 3, padding = 1).to(device)
        self.conv4 = nn.Conv2d(3, 3, 3, padding = 1).to(device)
        self.conv5 = nn.Conv2d(3, 3, 3, padding = 1).to(device)
        self.conv6 = nn.Conv2d(3, 3, 3, padding = 1).to(device)
        self.conv7 = nn.Conv2d(3, 3, 3, padding = 1).to(device)
        self.conv8 = nn.Conv2d(3, 1, 3, padding = 1).to(device)
        self.act = nn.ReLU().to(device)

    def forward(self, x):
        # Input: [batch, 3, H, W]
        x = self.adpool(x)
        x = self.act(self.conv1(x))
        x = self.pool(self.act(self.conv2(x)))
        # # x1 = x.clone()
        x = self.act(self.conv3(x))
        # # x1 = x.clone()
        x = self.act(self.conv4(x)) # add skip connections (+x.copy()) if needed
        x = self.act(self.conv5(x))
        x = self.act(self.conv6(x)) # x1 = x.clone()
        # x = self.act(self.conv7(x))
        # x = self.act(self.conv8(x))
        return x

In [220]:
model = CnnDetector(device).to(device)

criterion = WMSELoss(device)

learning_rate = 0.01
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [224]:
learning_rate = 0.001

# learning loop
epoch_loss = 0
for epoch in range(20):
    epoch_loss = 0
    step_loss = 0
    step_count = 0
    random.shuffle(train)
    for batch_idx, (inputs, answers) in enumerate(train):
        needshow = torch.tensor(False).to(device)
        if(batch_idx % 350 == 60):
           needshow = True

        inputs = inputs.to(device)
        answers = answers.to(device)
        outputs = model(inputs)
        imganswers = coords_to_img(answers, [eye_L_pointlist, eye_R_pointlist], 64, 5, bglevel = 0).to(device)
        weightmap = coords_to_img(answers, [eye_L_pointlist, eye_R_pointlist], 64, 10, bglevel = 0).to(device)
        if(needshow):
            #xshow = inputs[0].cpu().numpy()
            yshow = (outputs[0][0].cpu().detach().numpy() * 255.0).astype('float32')
            ansshow = (imganswers[0][0].cpu().detach().numpy() * 255.0).astype('float32')
            xshow = (inputs[0][0].cpu().detach().numpy() * 255.0).astype('float32')
            weightshow = (weightmap[0][0].cpu().detach().numpy() * 255.0).astype('float32')
            print(ansshow.max(), ansshow.min(), ansshow.mean(), ansshow.std())
            print(type(ansshow[0][0]))
            yshow = (yshow - yshow.min()) * 255 / (yshow.max() - yshow.min())
            ansshow = ansshow * 255 / ansshow.max()
            xshow = xshow * 255 / xshow.max()
            weightshow = weightshow * 255 / weightshow.max()
            pil_mod = Image.fromarray(yshow)
            pil_mod.show()
            # print(yshow.max(), yshow.min(), yshow.mean(), yshow.std())
            # pil_imagey = Image.fromarray(ansshow)
            # pil_imagey.show()
            # pil_weighshow = Image.fromarray(weightshow)
            # pil_weighshow.show()
            

        loss = criterion(outputs, imganswers, weightmap)
        step_loss += loss
        step_count += 1
        if(step_count == 20):
            optimizer.zero_grad()
            step_loss.backward()
            optimizer.step()
            print(f'Batch {batch_idx}, Loss: {step_loss.item() / step_count:.5f}, midweigth {model.conv7.weight.detach().cpu().numpy().mean()}')
            step_count = 0
            step_loss = 0

        epoch_loss += loss.item() 
        

        if batch_idx == 1600:
            break
    
    #learning_rate /= 10.0
    #optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    print(f'shape {inputs.shape}, Epoch {epoch + 1}, Loss: {epoch_loss/len(train):.5f}')


Batch 19, Loss: 30.70862, midweigth 0.006930931005626917
Batch 39, Loss: 30.76786, midweigth 0.006930931005626917
Batch 59, Loss: 30.42286, midweigth 0.006930931005626917
385.18933 6.1318e-40 8.86438 37.64441
<class 'numpy.float32'>
Batch 79, Loss: 31.19982, midweigth 0.006930931005626917
Batch 99, Loss: 31.12621, midweigth 0.006930931005626917
Batch 119, Loss: 30.36101, midweigth 0.006930931005626917
Batch 139, Loss: 30.27616, midweigth 0.006930931005626917
Batch 159, Loss: 30.33024, midweigth 0.006930931005626917
Batch 179, Loss: 30.36479, midweigth 0.006930931005626917
Batch 199, Loss: 30.40930, midweigth 0.006930931005626917
Batch 219, Loss: 30.45334, midweigth 0.006930931005626917
Batch 239, Loss: 30.19213, midweigth 0.006930931005626917
Batch 259, Loss: 30.23978, midweigth 0.006930931005626917
Batch 279, Loss: 30.69023, midweigth 0.006930931005626917
Batch 299, Loss: 30.46231, midweigth 0.006930931005626917
Batch 319, Loss: 30.74839, midweigth 0.006930931005626917
Batch 339, Loss

KeyboardInterrupt: 

In [None]:
# testloop
criterion = SelectiveRMSELoss(eye_L_pointlist + eye_R_pointlist, False, device)
epoch_loss = 0
test = data_loader.load("I:/NSU/CV/tests/torch/data/test/coords",
                        "I:/NSU/CV/tests/torch/data/test/images",  
                        firstn = 2000, batchSize = 16, shuffle = True, displace = True, 
                        size = 400, show = False)
random.shuffle(test)
                        


In [None]:
model

In [None]:
with torch.no_grad():
    for batch_idx, (inputs, answers) in enumerate(test):
        inputs = inputs.to(device)
        answers = answers.to(device)
        answers = scaler.transform(answers)
        outputs = model(inputs)
        outputs = scaler.inverse_transform(outputs)
        loss = criterion(outputs, answers)
        epoch_loss += loss.item()

    print(f'Test Loss: {epoch_loss/len(test):.4f}')


In [None]:
random.shuffle(test)
with torch.no_grad():
    inputs, answers = test[1]
    inputs = inputs.to(device)
    answers = answers.to(device)
    outputs = model(inputs)
    outputs = scaler.inverse_transform(outputs)
    # x_l, y_l = get_mean_coords(eye_L_pointlist, outputs)
    # x_r, y_r = get_mean_coords(eye_R_pointlist, outputs)
    # print(x_l[0], y_l[0], x_r[0], y_r[0])
    # outputs[0][1][0] = x_l[0]
    # outputs[0][1][1] = y_l[0]
    # outputs[0][2][0] = x_r[0]
    # outputs[0][2][1] = y_r[0]
    print(outputs.shape, answers.shape)
    img = modelutils.show_tensor(inputs[0], outputs[0])
    img.show()
    imgdlib = modelutils.show_tensor(inputs[0], answers[0])
    imgdlib.show()



In [25]:


#Save the model after training
torch.save({
    'model_state_dict': model.state_dict(),
}, 'globalDetector.pth')



In [None]:
class mouthBoundDetector(nn.Module):
    def __init__(self, device):
        super(mouthBoundDetector, self).__init__()
        self.last_detector_size = 128 
        self.adpool = nn.AdaptiveAvgPool2d((128, 128)).to(device) 
        self.conv1 = nn.Conv2d(3, 6, 3, padding = 1).to(device)
        self.pool = nn.MaxPool2d(2, 2).to(device)
        self.conv2 = nn.Conv2d(6, 9, 3, padding = 1).to(device)
        self.conv3 = nn.Conv2d(9, 20, 3, padding = 1).to(device)
        self.conv4 = nn.Conv2d(20, self.last_detector_size, 3, padding = 1).to(device)
        fcsize = 256
        self.fc1 = nn.Linear(self.last_detector_size*16*16, fcsize).to(device)
        self.fc_list = []
        for i in range(3):
            self.fc_list.append(nn.Linear(fcsize, fcsize).to(device))
        self.prelast = nn.Linear(fcsize, fcsize).to(device)
        self.fc_last = nn.Linear(fcsize, 3 * 68).to(device)
        self.act = nn.ReLU().to(device)
        self.sigm = nn.Sigmoid().to(device)
    
    def forward(self, x):
        # Input: [batch, 3, H, W]
        x = self.adpool(x)
        x = self.pool(self.act(self.conv1(x)))
        x = self.pool(self.act(self.conv2(x)))
        x = self.act(self.conv3(x))  
        x = self.pool(self.act(self.conv4(x)))
        x = x.view(-1, self.last_detector_size*16*16)       
        x = self.act(self.fc1(x))
        for i in range(len(self.fc_list)):
            x = self.act(self.fc_list[i](x))
        x = self.act(self.prelast(x))
        x = self.fc_last(x)
        x = x.view(-1, 68, 3)
        return x


In [None]:
with torch.no_grad():
    inputs, answers = test[1]
    inputs = inputs.to(device)
    answers = answers.to(device)
    outputs = model(inputs)
    outputs = scaler.inverse_transform(outputs)
    print(outputs.shape, answers.shape)
    img = show_tensor(inputs[0], outputs[0])
    img.show()
    imgdlib = show_tensor(inputs[0], answers[0])
    imgdlib.show()