In [1]:
import data_loader

import torch
import torch.nn as nn

import random
import normalizer
from PIL import Image, ImageDraw, ImageFont

In [2]:
def show_tensor(tensor, landmarks, nolandmarks=False):
    # Make sure tensors are on CPU and detached from grad
    image = tensor.cpu().detach()
    landmarks = landmarks.cpu().detach()
    image /= image.mean()
    # Scale to 0-255 range
    image = (image * 255).clamp(0, 255)
    # Convert to numpy and correct data type
    image = image.numpy().astype('uint8')
    # If tensor is [C,H,W], convert to [H,W,C]
    if len(image.shape) == 3:
        image = image.transpose(1, 2, 0)
    
    # Convert to PIL Image
    pil_image = Image.fromarray(image)
    draw = ImageDraw.Draw(pil_image)
    
    # Get image dimensions
    width, height = pil_image.size
    
    if(nolandmarks == True):
        return pil_image
    # Draw each landmark
    for i in range(68):
        # Get coordinates (scale from 0-1 to image dimensions)
        x = int(landmarks[i, 0].item() * width)
        y = int(landmarks[i, 1].item() * height)
        z = landmarks[i, 2].item()
        
        # Draw point (red circle)
        radius = 2
        draw.ellipse([x-radius, y-radius, x+radius, y+radius], fill='white')
        
        # Draw value next to point
        draw.text((x+5, y-5), f'{i:.0f}', fill='white')
    
    return pil_image

In [3]:



class RMSELoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()
        
    def forward(self, yhat, y):
        return torch.sqrt(self.mse(yhat, y))




train = data_loader.load("I:/NSU/CV/tests/torch/data/train/coords",
                         "I:/NSU/CV/tests/torch/data/train/images", 
                        firstn = 7000, batchSize = 16, shuffle = True)



scaler = normalizer.MinMaxNormalizer()
scaler.fit([y for _, y in train])

print("Number of batches:", len(train))
for x, y in train:
    print(x.shape, y.shape)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("devise is: ", device)



condcords02865_3d.txt dataimg02865.jpeg batch 1 from 437.5
condcords25610_3d.txt dataimg25610.jpeg batch 1 from 437.5
condcords21341_3d.txt dataimg21341.jpeg batch 1 from 437.5
condcords03477_3d.txt dataimg03477.jpeg batch 1 from 437.5
condcords06273_3d.txt dataimg06273.jpeg batch 1 from 437.5
condcords22014_3d.txt dataimg22014.jpeg batch 1 from 437.5
condcords20657_3d.txt dataimg20657.jpeg batch 1 from 437.5
condcords01266_3d.txt dataimg01266.jpeg batch 1 from 437.5
condcords10465_3d.txt dataimg10465.jpeg batch 1 from 437.5
condcords05453_3d.txt dataimg05453.jpeg batch 1 from 437.5
condcords09902_3d.txt dataimg09902.jpeg batch 1 from 437.5
condcords12453_3d.txt dataimg12453.jpeg batch 1 from 437.5
condcords01782_3d.txt dataimg01782.jpeg batch 1 from 437.5
condcords22371_3d.txt dataimg22371.jpeg batch 1 from 437.5
condcords09344_3d.txt dataimg09344.jpeg batch 1 from 437.5
condcords12149_3d.txt dataimg12149.jpeg batch 1 from 437.5
condcords18444_3d.txt dataimg18444.jpeg batch 2 from 437

In [4]:
class mouthBoundDetector(nn.Module):
    def __init__(self, device):
        super(mouthBoundDetector, self).__init__()
        self.last_detector_size = 128 
        self.adpool = nn.AdaptiveAvgPool2d((128, 128)).to(device) 
        self.conv1 = nn.Conv2d(3, 6, 3, padding = 1).to(device)
        self.pool = nn.MaxPool2d(2, 2).to(device)
        self.conv2 = nn.Conv2d(6, 9, 3, padding = 1).to(device)
        self.conv3 = nn.Conv2d(9, 20, 3, padding = 1).to(device)
        self.conv4 = nn.Conv2d(20, self.last_detector_size, 3, padding = 1).to(device)
        fcsize = 256
        self.fc1 = nn.Linear(self.last_detector_size*16*16, fcsize).to(device)
        self.fc_list = nn.ModuleList([nn.Linear(fcsize, fcsize).to(device) for _ in range(3)])
        self.prelast = nn.Linear(fcsize, fcsize).to(device)
        self.fc_last = nn.Linear(fcsize, 3 * 68).to(device)
        self.act = nn.ReLU().to(device)
        self.sigm = nn.Sigmoid().to(device)
    
    def forward(self, x):
        # Input: [batch, 3, H, W]
        x = self.adpool(x)
        x = self.pool(self.act(self.conv1(x)))
        x = self.pool(self.act(self.conv2(x)))
        x = self.act(self.conv3(x))  
        x = self.pool(self.act(self.conv4(x)))
        x = x.view(-1, self.last_detector_size*16*16)       
        x = self.act(self.fc1(x))
        for i in range(len(self.fc_list)):
            x = self.act(self.fc_list[i](x))
        x = self.act(self.prelast(x))
        x = self.fc_last(x)
        x = x.view(-1, 68, 3)
        return x


In [61]:
def crop_mouth(image_tensor_array, landmarks_tensor_array, mouth_landmarks):
    # Get mouth landmarks

    for image_tensor, landmarks_tensor in zip(image_tensor_array, landmarks_tensor_array):
        x1, y1, x2, y2 = 1, 1, 0, 0
        for i in mouth_landmarks:
            x = landmarks_tensor[i, 0].item()
            y = landmarks_tensor[i, 1].item()
            if x < x1:
                x1 = x
            if x > x2:
                x2 = x
            if y < y1:
                y1 = y
            if y > y2:
                y2 = y
        size = image_tensor.shape[2]
        # make rectangle 1 + b times bigger
        b = 2.0
        x = (x2 + x1) / b
        y = (y2 + y1) / b

        x1 = int(x * size - 100)
        x2 = int(x * size + 100)
        y1 = int(y * size - 64)
        y2 = int(y * size + 64)

        if x1 < 0:
            x1 = 0
        if y1 < 0:
            y1 = 0
        if x1 > size:
            x1 = size
        if y1 > size:
            y1 = size
        print(x1, y1, x2, y2)
        # Crop image
        image_tensor = image_tensor[:, y1:y2, x1:x2]
        # Get bounding box of mouth
    return image_tensor


class mouthPointsDetector(nn.Module):
    def __init__(self, device, mouth_bound_detector, mouth_landmarks):
        super(mouthPointsDetector, self).__init__()
        self.mb_detector = mouth_bound_detector
        self.last_detector_size = 8
        self.mouth_landmarks = mouth_landmarks
        
        self.conv1 = nn.Conv2d(3, 5, 3, padding = 1).to(device)
        self.pool = nn.MaxPool2d(2, 2).to(device)
        self.conv2 = nn.Conv2d(5, 5, 3, padding = 1).to(device)
        self.conv3 = nn.Conv2d(5, 5, 3, padding = 1).to(device)
        self.conv4 = nn.Conv2d(5, 5, 3, padding = 1).to(device)
        self.conv5 = nn.Conv2d(5, 6, 3, padding = 1).to(device)
        self.conv6 = nn.Conv2d(6, self.last_detector_size, 3, padding = 1).to(device)
        fcsize = 612
        
        self.adpool_size = 54, 30
        self.adpool = nn.AdaptiveAvgPool2d(self.adpool_size).to(device) 

        self.fc1 = nn.Linear(self.last_detector_size*self.adpool_size[0]*self.adpool_size[1],
                             fcsize).to(device)
        self.fc_list = nn.ModuleList([nn.Linear(fcsize, fcsize).to(device) for _ in range(12)])
        self.prelast = nn.Linear(fcsize, fcsize).to(device)
        self.fc_last = nn.Linear(fcsize, 3 * 68).to(device)
        self.act = nn.ReLU().to(device)
        self.sigm = nn.Tanh().to(device)
    
    def forward(self, x, needshow = False):
        # Input: [batch, 3, H, W]
        ans_with_mouth_bounds = self.mb_detector(x)
        x = crop_mouth(x, ans_with_mouth_bounds, self.mouth_landmarks)
        
        if(needshow):
            xshow = show_tensor(x[0:2], ans_with_mouth_bounds[0], nolandmarks=True)
            xshow.show()
            print(x.shape)

        x = self.act(self.conv1(x))
        
        x = self.act(self.conv2(x))
        x = self.act(self.conv3(x))
        x = self.act(self.conv4(x))
        x = self.pool(self.act(self.conv5(x)))
        x = self.act(self.conv6(x))  
        #print(x.shape)
        
        #x = self.act(self.conv6(x))
        x = self.adpool(x)
        
        x = x.view(-1, self.last_detector_size*self.adpool_size[0]*self.adpool_size[1])       
        x = self.act(self.fc1(x))
        for i in range(len(self.fc_list)):
            x = self.sigm(self.fc_list[i](x))
        x = self.sigm(self.prelast(x))
        x = self.fc_last(x)
        x = x.view(-1, 68, 3)
        return x

In [6]:
class CustomLoss(nn.Module):
    def __init__(self, vertlist, reverse):
        super().__init__()
        self.weight = weight
    
    def forward(self, predictions, targets):
        # You can implement any custom loss calculation here
        element_wise_loss = torch.abs(predictions - targets)
        # You can add weights, combine losses, or add regularization terms
        weighted_loss = element_wise_loss * self.weight
        return torch.mean(weighted_loss)

In [44]:
class SelectiveRMSELoss(nn.Module):
    def __init__(self, pointlist, reverse, device, l2_lambda = 0.0):
        super().__init__()
        self.reverse = reverse
        self.pointlist = pointlist
        self.device = device
        self.l2_lambda = l2_lambda
        
    def forward(self, x, y, parameters = None):
        ls = (x-y)**2
        losslist = []
        if(self.reverse):
            self.pointlist = list(set(range(68)) - set(self.pointlist))
        sm = torch.tensor(0.0).to(device)
        k = torch.tensor(1.0).to(device)
        for i in range(x.shape[0]):
            for j in self.pointlist:
                sm += (ls[i][j].mean())
                k += 1
        pk = 0.0
        smp = 0.0
        if parameters is not None:
            for param in parameters:
                smp += (param**2).mean()
                pk += 1

        return torch.sqrt(sm / float(k)) + (smp / float(pk)) * self.l2_lambda

In [8]:
mouth_pointlist = [44, 7, 33, 14, 2, 31, 49, 15, 42, 32, 9, 51, 38, 61,
    18, 23, 12, 47, 67, 1, 2]
mouth_boundaries = [7, 14, 15, 67]
eye_L_pointlist = [62, 65, 0, 13, 34, 64]
eye_R_pointlist = [16, 36, 54, 55, 53, 63]

In [28]:
modelBoundDetector = mouthBoundDetector(device)
state = torch.load('mouthBoundDetector.pth')
modelBoundDetector.load_state_dict(state['model_state_dict'])
modelBoundDetector.to(device)
modelBoundDetector.eval()

<generator object Module.parameters at 0x000002CE68640E40>

In [62]:

model = mouthPointsDetector(device, modelBoundDetector, mouth_boundaries)
criterion = SelectiveRMSELoss(mouth_pointlist, False, device, l2_lambda = 0.25)
#criterion = nn.MSELoss()
learning_rate = 0.0001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# learning loop
epoch_loss = 0
for epoch in range(1):
    epoch_loss = 0
    step_loss = 0
    random.shuffle(train)
    for batch_idx, (inputs, answers) in enumerate(train):
        inputs = inputs.to(device)
        answers = answers.to(device)
        answers = scaler.transform(answers)
        needshow = torch.tensor(False).to(device)
        if(batch_idx % 30 == 0):
            needshow = True
        outputs = model(inputs, needshow)
        outputs = scaler.inverse_transform(outputs)
        loss = criterion(outputs, answers,  modelBoundDetector.parameters())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item() 
        
        if batch_idx % 5 == 1:
            print(f'Batch {batch_idx}, Loss: {loss.item():.5f}')

        if batch_idx == 1600:
            break
    
    learning_rate /= 10.0
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    print(f'shape {inputs.shape}, Epoch {epoch + 1}, Loss: {epoch_loss/len(train):.5f}')


158 321 358 449
157 319 357 447
158 322 358 450
156 318 356 446
158 321 358 449
158 321 358 449
157 319 357 447
159 323 359 451
158 321 358 449
157 319 357 447
156 318 356 446
158 321 358 449
158 321 358 449
157 320 357 448
157 319 357 447
158 321 358 449
torch.Size([3, 128, 200])
156 318 356 446
155 317 355 445
155 316 355 444
155 316 355 444
154 315 354 443
154 315 354 443
154 315 354 443
156 318 356 446
155 317 355 445
154 315 354 443
156 318 356 446
157 320 357 448
154 315 354 443
157 319 357 447
158 321 358 449
156 318 356 446
Batch 1, Loss: 0.58272
154 314 354 442
153 312 353 440
154 315 354 443
155 317 355 445
153 314 353 442
155 316 355 444
154 315 354 443
154 315 354 443
154 315 354 443
156 317 356 445
157 319 357 447
154 315 354 443
153 313 353 441
155 316 355 444
155 317 355 445
154 315 354 443
152 311 352 439
152 312 352 440
153 313 353 441
152 312 352 440
153 313 353 441
152 311 352 439
152 312 352 440
153 314 353 442
154 315 354 443
152 312 352 440
153 312 353 440
151 309

In [10]:
class mouthBoundDetector(nn.Module):
    def __init__(self, device):
        super(mouthBoundDetector, self).__init__()
        self.last_detector_size = 128 
        self.adpool = nn.AdaptiveAvgPool2d((128, 128)).to(device) 
        self.conv1 = nn.Conv2d(3, 6, 3, padding = 1).to(device)
        self.pool = nn.MaxPool2d(2, 2).to(device)
        self.conv2 = nn.Conv2d(6, 9, 3, padding = 1).to(device)
        self.conv3 = nn.Conv2d(9, 20, 3, padding = 1).to(device)
        self.conv4 = nn.Conv2d(20, self.last_detector_size, 3, padding = 1).to(device)
        fcsize = 256
        self.fc1 = nn.Linear(self.last_detector_size*16*16, fcsize).to(device)
        self.fc_list = nn.ModuleList([nn.Linear(fcsize, fcsize).to(device) for _ in range(3)])
        self.prelast = nn.Linear(fcsize, fcsize).to(device)
        self.fc_last = nn.Linear(fcsize, 3 * 68).to(device)
        self.act = nn.ReLU().to(device)
        self.sigm = nn.Sigmoid().to(device)
    
    def forward(self, x):
        # Input: [batch, 3, H, W]
        x = self.adpool(x)
        x = self.pool(self.act(self.conv1(x)))
        x = self.pool(self.act(self.conv2(x)))
        x = self.act(self.conv3(x))  
        x = self.pool(self.act(self.conv4(x)))
        x = x.view(-1, self.last_detector_size*16*16)       
        x = self.act(self.fc1(x))
        for i in range(len(self.fc_list)):
            x = self.act(self.fc_list[i](x))
        x = self.act(self.prelast(x))
        x = self.fc_last(x)
        x = x.view(-1, 68, 3)
        return x


In [188]:
# testloop
criterion = SelectiveRMSELoss(mouth_pointlist, False, device)
epoch_loss = 0
test = data_loader.load("I:/NSU/CV/tests/torch/data/test/coords",
                        "I:/NSU/CV/tests/torch/data/test/images",  
                        firstn = 2000, batchSize = 16, shuffle = True)
random.shuffle(test)
                        


The history saving thread hit an unexpected error (OperationalError('database or disk is full')).History will not be written to the database.
condcords26954_3d.txt dataimg26954.jpeg batch 1 from 125.0
condcords27975_3d.txt dataimg27975.jpeg batch 1 from 125.0
condcords27060_3d.txt dataimg27060.jpeg batch 1 from 125.0
condcords29228_3d.txt dataimg29228.jpeg batch 1 from 125.0
condcords27230_3d.txt dataimg27230.jpeg batch 1 from 125.0
condcords28991_3d.txt dataimg28991.jpeg batch 1 from 125.0
condcords28398_3d.txt dataimg28398.jpeg batch 1 from 125.0
condcords28407_3d.txt dataimg28407.jpeg batch 1 from 125.0
condcords27780_3d.txt dataimg27780.jpeg batch 1 from 125.0
condcords27914_3d.txt dataimg27914.jpeg batch 1 from 125.0
condcords28511_3d.txt dataimg28511.jpeg batch 1 from 125.0
condcords29020_3d.txt dataimg29020.jpeg batch 1 from 125.0
condcords27650_3d.txt dataimg27650.jpeg batch 1 from 125.0
condcords28268_3d.txt dataimg28268.jpeg batch 1 from 125.0
condcords29100_3d.txt dataimg291

: 

In [91]:
model

mouthBoundDetector(
  (adpool): AdaptiveAvgPool2d(output_size=(128, 128))
  (conv1): Conv2d(3, 6, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(6, 9, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(9, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv4): Conv2d(20, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (fc1): Linear(in_features=32768, out_features=256, bias=True)
  (fc_list): ModuleList(
    (0-2): 3 x Linear(in_features=256, out_features=256, bias=True)
  )
  (prelast): Linear(in_features=256, out_features=256, bias=True)
  (fc_last): Linear(in_features=256, out_features=204, bias=True)
  (act): ReLU()
  (sigm): Sigmoid()
)

In [181]:
with torch.no_grad():
    for batch_idx, (inputs, answers) in enumerate(test):
        inputs = inputs.to(device)
        answers = answers.to(device)
        answers = scaler.transform(answers)
        outputs = model(inputs)
        outputs = scaler.inverse_transform(outputs)
        loss = criterion(outputs, answers)
        epoch_loss += loss.item()

    print(f'Test Loss: {epoch_loss/len(test):.4f}')


Test Loss: 0.0266


In [185]:
random.shuffle(test)

In [186]:
with torch.no_grad():
    inputs, answers = test[1]
    inputs = inputs.to(device)
    answers = answers.to(device)
    outputs = model(inputs)
    outputs = scaler.inverse_transform(outputs)
    print(outputs.shape, answers.shape)
    img = show_tensor(inputs[0], outputs[0])
    img.show()
    imgdlib = show_tensor(inputs[0], answers[0])
    imgdlib.show()



torch.Size([1, 68, 3]) torch.Size([16, 68, 3])


In [89]:


#Save the model after training
torch.save({
    'model_state_dict': model.state_dict(),
}, 'mouthBoundDetector.pth')

In [None]:
class mouthBoundDetector(nn.Module):
    def __init__(self, device):
        super(mouthBoundDetector, self).__init__()
        self.last_detector_size = 128 
        self.adpool = nn.AdaptiveAvgPool2d((128, 128)).to(device) 
        self.conv1 = nn.Conv2d(3, 6, 3, padding = 1).to(device)
        self.pool = nn.MaxPool2d(2, 2).to(device)
        self.conv2 = nn.Conv2d(6, 9, 3, padding = 1).to(device)
        self.conv3 = nn.Conv2d(9, 20, 3, padding = 1).to(device)
        self.conv4 = nn.Conv2d(20, self.last_detector_size, 3, padding = 1).to(device)
        fcsize = 256
        self.fc1 = nn.Linear(self.last_detector_size*16*16, fcsize).to(device)
        self.fc_list = []
        for i in range(3):
            self.fc_list.append(nn.Linear(fcsize, fcsize).to(device))
        self.prelast = nn.Linear(fcsize, fcsize).to(device)
        self.fc_last = nn.Linear(fcsize, 3 * 68).to(device)
        self.act = nn.ReLU().to(device)
        self.sigm = nn.Sigmoid().to(device)
    
    def forward(self, x):
        # Input: [batch, 3, H, W]
        x = self.adpool(x)
        x = self.pool(self.act(self.conv1(x)))
        x = self.pool(self.act(self.conv2(x)))
        x = self.act(self.conv3(x))  
        x = self.pool(self.act(self.conv4(x)))
        x = x.view(-1, self.last_detector_size*16*16)       
        x = self.act(self.fc1(x))
        for i in range(len(self.fc_list)):
            x = self.act(self.fc_list[i](x))
        x = self.act(self.prelast(x))
        x = self.fc_last(x)
        x = x.view(-1, 68, 3)
        return x


In [10]:
with torch.no_grad():
    
    inputs, answers = test[1]
    inputs = inputs.to(device)
    answers = answers.to(device)
    outputs = model(inputs)
    outputs = scaler.inverse_transform(outputs)
    print(outputs.shape, answers.shape)
    img = show_tensor(inputs[0], outputs[0])
    img.show()
    imgdlib = show_tensor(inputs[0], answers[0])
    imgdlib.show()

NameError: name 'model' is not defined