In [1]:
import numpy as np
import torch
from torchvision.transforms import ToTensor, Normalize, Compose, Resize, ToPILImage

def coord2Heatmap(landmarks, device, heatmap_width=512, heatmap_height=512, sigma=1):
    landmarks = landmarks.cpu()

    x_coords = landmarks[:, :11].detach().numpy()
    y_coords = landmarks[:, 11:].detach().numpy()

    i, j = np.meshgrid(np.arange(heatmap_height), np.arange(heatmap_width), indexing='ij')
    i, j = i[np.newaxis, ...], j[np.newaxis, ...]

    diff_x = i - y_coords[:, :, np.newaxis, np.newaxis]
    diff_y = j - x_coords[:, :, np.newaxis, np.newaxis]

    squared_distances = (diff_x ** 2 + diff_y ** 2) / (2 * sigma ** 2)
    heatmaps = np.exp(-squared_distances)

    heatmap_transform = Compose([
        #ToPILImage(),
        Resize((heatmap_width, heatmap_height)),
        ToTensor(),
        Normalize(mean=[0.5], std=[0.5])
    ])

    batch_length, n_landmarks, height, width = heatmaps.shape
    heatmaps_reshaped = heatmaps.reshape(batch_length * n_landmarks, height, width)

    heatmaps_images = torch.stack([heatmap_transform(Image.fromarray(image)) for image in heatmaps_reshaped])

    heatmaps_images = heatmaps_images.reshape(batch_length, n_landmarks, height, width)

    heatmaps_images = heatmaps_images.to(device)

    return heatmaps_images


Couldn't import dot_parser, loading of dot files will not be possible.


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from customDataset import *
#from heatmaps import *

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [3]:
class LandmarkRegression(nn.Module):
    def __init__(self, num_landmarks):
        super(LandmarkRegression, self).__init__()
        self.num_landmarks = num_landmarks
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),  # Grayscale input, so 1 channel
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.fc = nn.Sequential(
            nn.Linear(256 * 32 * 32, 1024),  # Adjust input size as needed (256*32*32 for 512x512 input)
            nn.ReLU(inplace=True),
            nn.Linear(1024, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, num_landmarks * 2)  # Each landmark has (x, y) coordinates
        )

    def forward(self, x):
        x = self.cnn(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        # Reshape the output to have dimensions (batch_size, num_landmarks, 512, 512)
        #x = x.view(x.size(0), self.num_landmarks, 512, 512)
        
        return x


In [4]:
def weighted_mse_loss(output, target, visibility_value):
    # Calculate the squared error between predicted and target landmarks
    squared_error = visibility_value.unsqueeze(2).unsqueeze(3).unsqueeze(4) * (output - target) ** 2
    
    # Compute the mean loss while considering visibility
    loss = squared_error.mean()
    
    return loss

In [5]:
# Example usage:
dataset_path = '../Data_preparation/Dataset1_2.csv' 
num_landmarks = 11
model = LandmarkRegression(num_landmarks)
model.to(device)
model.requires_grad_(True)

LandmarkRegression(
  (cnn): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): ReLU(inplace=True)
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc): Sequential(
    (0): Linear(in_features=262144, out_features=1024, bias=True)
    (1): ReLU(inplace=True)
    (2): Linear(in_features=1024, out_features=512, bias=True)
    (3): ReLU(inplace=Tr

In [6]:
train_loader, test_loader = load_data(dataset_path)

In [12]:
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adjust learning rate as needed

# Training parameters
num_epochs = 10
# Training loop
for epoch_index in range(num_epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    print(f"Epoch {epoch_index+1}")
    for batch_index, data in enumerate(train_loader):
        input, target, visibility = data[0].to(device), data[1].to(device), data[2].to(device)
        input.requires_grad = True

        optimizer.zero_grad()  # Zero the gradient buffers

        output = model(input)  # Forward pass

        output_heat = coord2Heatmap(output, device).unsqueeze(2)
        output_heat.requires_grad_(True)

        loss = weighted_mse_loss(output_heat, target, visibility)  # Compute the loss

        loss.backward()  # Backpropagation
        optimizer.step()  # Update the model's weights

        running_loss += loss.item()
        if batch_index % 10 == 0:
            print(f"--> Batch {batch_index+1}/{len(train_loader)} - Loss:{loss.item()}")
        
    # Calculate the average loss for this epoch
    average_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch_index+1}/{num_epochs}] - Loss: {average_loss:.4f}")
    torch.cuda.empty_cache()

In [12]:
model1 = LandmarkRegression(num_landmarks)
model1.load_state_dict(torch.load('vers_0_32.pt'))

UnpicklingError: unpickling stack underflow

In [None]:
model.eval()  # Set the model to evaluation mode
total_loss = 0.0

with torch.no_grad():
    for batch_index, data in enumerate(test_loader):
        input, target, visibility = data[0].to(device), data[1].to(device), data[2].to(device)
        output = model(input)
        loss = weighted_mse_loss(output, target, visibility)
        total_loss += loss.item()

average_test_loss = total_loss / len(test_loader)
print(f"Average Test Loss: {average_test_loss:.4f}")


ValueError: too many values to unpack (expected 2)