In [1]:
import torch
import torch.nn as nn
from resmasknet_test import *
import random
from tqdm import tqdm

# Model Definition

In [2]:
def resmasking_dropout1(in_channels=3, num_classes=7, weight_path=""):
    model = ResMasking(weight_path)
    model.fc = nn.Sequential(
        nn.Dropout(0.4),
        nn.Linear(512, 7)
        # nn.Linear(512, num_classes)
    )
    def get_resource_path():
        return ''

    # use_gpu = torch.cuda.is_available()
    # if use_gpu:
    #     model.load_state_dict(
    #         torch.load(
    #             os.path.join(
    #                 get_resource_path(), "ResMaskNet_Z_resmasking_dropout1_rot30.pth"
    #                 )
    #             )['net']
    #         )
    #     model.cuda()

    # else:
    model.load_state_dict(
        torch.load(
            os.path.join(
                get_resource_path(), "ResMaskNet_Z_resmasking_dropout1_rot30.pth"
            ),
        map_location={"cuda:0": "cpu"},
        )['net']
    )
    model.fc = nn.Sequential(
        nn.Dropout(0.4),
        nn.Linear(512, 1)
        # nn.Linear(512, num_classes)
    )
    return model

class SiameseRankNet(nn.Module):
    def __init__(self):
        super(SiameseRankNet, self).__init__()
        # Load ResMaskNet model
        self.model = resmasking_dropout1(in_channels=3, num_classes=7)

        
        # Define the fully connected layers on top of concatenated feature vectors
        # self.model.fc = nn.Sequential(
        #     nn.Dropout(0.4),
        #     nn.Linear(512, 1)
        # )
        
        self.sigmoid = nn.Sigmoid()
        # self.dropout = nn.Dropout(p=0.5)
        # self.relu = nn.ReLU()
        # self.activation = nn.Tanh()
    
    # _once
    def forward_once(self, x):
        # Forward pass through ResMaskNet
        x = self.model(x)
        # x = x.view(x.size()[0], -1)
        return x
    
    def forward(self, x1, x2):
        # Pass each input image through ResMaskNet to obtain feature vectors
        x1 = self.forward_once(x1)
        x2 = self.forward_once(x2)

        # Concatenate the feature vectors
        # x = torch.cat((x1, x2), dim=1)

        # Pass the concatenated feature vector through the fully connected layers
        # x = self.fc(x)

        # Pass the output through sigmoid to obtain the probability of the input images being similar
        # normalize x1 - x2 as a probability that x1 should rank higher than x2
        x = self.sigmoid(x1 - x2)
        return x
    


# Create Dataloader

In [3]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import cv2

aver_4_sorted_data = ['ha_212.png', 'ha_393.png', 'ha_428.png', 'ha_489.png', 'ha_412.png', 'ha_202.png', 'ha_348.png', 'ha_24.png', 'ha_407.png', 'ha_288.png', 'ha_367.png', 'ha_341.png', 'ha_235.png', 'ha_443.png', 'ha_450.png', 'ha_185.png', 'ha_50.png', 'ha_491.png', 'ha_301.png', 'ha_11.png', 'ha_422.png', 'ha_130.png', 'ha_243.png', 'ha_201.png', 'ha_32.png', 'ha_19.png', 'ha_384.png', 'ha_184.png', 'ha_311.png', 'ha_497.png', 'ha_256.png', 'ha_27.png', 'ha_107.png', 'ha_268.png', 'ha_329.png', 'ha_315.png', 'ha_2.png', 'ha_368.png', 'ha_241.png', 'ha_303.png', 'ha_221.png', 'ha_151.png', 'ha_342.png', 'ha_296.png', 'ha_152.png', 'ha_442.png', 'ha_186.png', 'ha_344.png', 'ha_215.png', 'ha_320.png', 'ha_149.png', 'ha_122.png', 'ha_54.png', 'ha_476.png', 'ha_106.png', 'ha_249.png', 'ha_132.png', 'ha_33.png', 'ha_207.png', 'ha_451.png', 'ha_172.png', 'ha_244.png', 'ha_454.png', 'ha_43.png', 'ha_131.png', 'ha_377.png', 'ha_396.png', 'ha_284.png', 'ha_59.png', 'ha_1.png', 'ha_252.png', 'ha_466.png', 'ha_110.png', 'ha_404.png', 'ha_292.png', 'ha_124.png', 'ha_482.png', 'ha_477.png', 'ha_5.png', 'ha_382.png', 'ha_9.png', 'ha_334.png', 'ha_381.png', 'ha_111.png', 'ha_380.png', 'ha_310.png', 'ha_475.png', 'ha_128.png', 'ha_314.png', 'ha_262.png', 'ha_174.png', 'ha_119.png', 'ha_139.png', 'ha_257.png', 'ha_233.png', 'ha_116.png', 'ha_399.png', 'ha_84.png', 'ha_145.png', 'ha_16.png']
base_folder = 'data/happiness_selected_imgonly100/'
data_path = [base_folder + i for i in aver_4_sorted_data]

def generate_dataset(data_path):
    dataset = []
    for i in range(len(data_path)):
        for j in range(i+1, len(data_path)):
            dataset.append([[data_path[i], data_path[j]], 1])
    return dataset

raw_data = generate_dataset(data_path)
# print(raw_data)

In [4]:

import torch
import random
import numpy as np
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms

class PairwiseRatingDataset(Dataset):
    def __init__(self, data, transform=None, mode='train'):
        
        self.data = data
        self.transform = transform

        # compute pairs and labels
        self.pairs = [i[0] for i in self.data]
        self.labels = [i[1] for i in self.data]

        # self.pairs = self.load_image_data()
        
    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        img1 = Image.open(self.pairs[idx][0])
        img2 = Image.open(self.pairs[idx][1])

        # Apply transformations if any
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        return img1, img2, self.labels[idx]


    # def __getitem__(self, idx):
    #     return self.pairs[idx][0], self.pairs[idx][1], self.labels[idx]

    # def load_image_data(self):
    #     print('loading image data...')
    #     # Load images and label for a given index
    #     image_pairs = []
        
    #     for i in range(self.__len__()):
    #         img1 = Image.open(self.pairs[i][0])
    #         img2 = Image.open(self.pairs[i][1])

    #         # Apply transformations if any
    #         if self.transform:
    #             img1 = self.transform(img1)
    #             img2 = self.transform(img2)
            
    #         image_pairs.append([img1, img2])

    #     return image_pairs
        

# Define transformations to be applied to images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# train_features, train_labels = next(iter(dataloader))

In [5]:
# train_img1, train_img2, train_labels = next(iter(dataloader))
# print(train_img1.shape, train_img2.shape, train_labels.shape)

In [6]:
# transform_to_img((train_img1[0]*0.5 + 0.5) * 255, mode='RGB')

In [7]:
# transform_to_img = transforms.Compose([
#     transforms.ToPILImage(mode='RGB'),
#     ]
# )
# for i in range(len(train_img1)):
#     print(img1)
#     img1 = transform_to_img(train_img1[i])
#     img2 = transform_to_img(train_img2[i])
#     # img1.show()
#     # img2.show()
#     print(train_labels[i])
#     break
#     # input()
# # train_img1[0].shape
# # transform_to_img(train_img1)

In [8]:
# Display image and label.
# train_dataloader = DataLoader(mydataset['train'], batch_size=32, shuffle=True)
# train_features, train_labels = next(iter(train_dataloader))
# print(f"Feature batch shape: {train_features.size()}")
# print(f"Labels batch shape: {train_labels.size()}")
# img = train_features[0].squeeze()
# label = train_labels[0]
# plt.imshow(img, cmap="gray")
# plt.show()
# print(f"Label: {label}")

# prepare for training

In [9]:
model = SiameseRankNet()
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)

# dataset = PairwiseRatingDataset(raw_data, transform=transform)

# Split data into train, val sets
num_data = len(raw_data)
num_train = int(0.8 * num_data)
num_val = num_data - num_train

# Create indices for train and val sets
indices = list(range(num_data))
random.shuffle(indices)
train_indices = indices[:num_train]
val_indices = indices[num_train:]

# Create train and val datasets by indexing the PairwiseRatingDataset instance
train_dataset = [raw_data[i] for i in train_indices]
val_dataset = [raw_data[i] for i in val_indices]
train_dataset = PairwiseRatingDataset(train_dataset, transform=transform)
val_dataset = PairwiseRatingDataset(val_dataset, transform=transform)

BATCH_SIZE = 8
# Create DataLoader instances for train and val sets
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

loss_func = nn.BCELoss()
loss_func.to(device)

BCELoss()

In [10]:
for batch_idx, sample  in enumerate(train_dataloader):
    print(sample[0].shape)
    break

torch.Size([8, 3, 224, 224])


# Start training

In [11]:
from radam import *

def accuracy(output, target):
    with torch.no_grad():
        batch_size = target.size(0)
        pred = torch.argmax(output, dim=1)
        correct = pred.eq(target).float().sum(0)
        acc = correct * 100 / batch_size
    return [acc]

# start training
model.train()
lr = 0.0001
weight_decay = 0.001
momentum = 0.9

def GetLoss(model, batch):
    batch = {k:v.to(model.device) for k, v in batch.items()}
    print(batch)
#     out = model(x1 = batch[])

optimizer = RAdam(
            params=model.parameters(),
            lr=lr,
            weight_decay=weight_decay,
        )

In [12]:
torch.cuda.is_available()

True

In [17]:
# train
model.train()
train_loss = 0.0
train_acc = 0.0
for batch_idx, sample in tqdm(
    enumerate(train_dataloader), total=len(train_dataloader), leave=False
):
    print(len(sample))
    batch_img1, batch_img2, targets = sample[0].cuda(non_blocking=True), sample[1].cuda(non_blocking=True), sample[2].cuda(non_blocking=True) # or something similar
    torch.reshape(targets, [8,1])
    # compute output, measure accuracy and record loss
    outputs = model(batch_img1, batch_img2)
    print(outputs.shape)
    print(targets.shape)
    loss = loss_func(outputs, targets)
    print(loss.shape)
    acc = accuracy(outputs, targets)[0]
    # acc = eval_metrics(targets, outputs, 2)[0]

    train_loss += loss.item()
    train_acc += acc.item()

    # compute gradient and do SGD step
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

i += 1
train_loss_list.append(train_loss / i)
train_acc_list.append(train_acc / i)

  0%|          | 0/495 [00:00<?, ?it/s]

3


                                       

RuntimeError: CUDA out of memory. Tried to allocate 14.00 MiB (GPU 0; 7.80 GiB total capacity; 5.17 GiB already allocated; 28.06 MiB free; 5.22 GiB reserved in total by PyTorch)

In [None]:
len(train_dataloader)

In [None]:
# val



# model.eval()


# from torchsummary import summary

# print(summary(model, (32, 3, 224, 224)))
# # print(summary(model, [(3, 224, 224), (3, 224, 224)]))a

In [None]:
class FER2013(Dataset):
    def __init__(self, stage, configs, tta=False, tta_size=48):
        self._stage = stage
        self._configs = configs
        self._tta = tta
        self._tta_size = tta_size

        self._image_size = (configs["image_size"], configs["image_size"])

        self._data = pd.read_csv(os.path.join(configs["data_path"], "{}.csv".format(stage)))

        self._pixels = self._data["pixels"].tolist()
        self._emotions = pd.get_dummies(self._data["emotion"])

        self._transform = transforms.Compose(
            [
                transforms.ToPILImage(),
                transforms.ToTensor(),
            ]
        )

    def is_tta(self):
        return self._tta == True

    def __len__(self):
        return len(self._pixels)

    def __getitem__(self, idx):
        pixels = self._pixels[idx]
        pixels = list(map(int, pixels.split(" ")))
        image = np.asarray(pixels).reshape(48, 48)
        image = image.astype(np.uint8)

        image = cv2.resize(image, self._image_size)
        image = np.dstack([image] * 3)

        if self._stage == "train":
            image = seg(image=image)

        if self._stage == "test" and self._tta == True:
            images = [seg(image=image) for i in range(self._tta_size)]
            # images = [image for i in range(self._tta_size)]
            images = list(map(self._transform, images))
            target = self._emotions.iloc[idx].idxmax()
            return images, target

        image = self._transform(image)
        target = self._emotions.iloc[idx].idxmax()
        return image, target