In [1]:
import torch
import tensorflow as tf
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from PIL import Image
import pandas as pd
from torchvision import transforms
from sklearn.model_selection import train_test_split
import preprocessing
import torch.optim as optim

In [41]:
def split_data(df):
    train_ratio = 0.7
    val_ratio = 0.2
    test_ratio = 0.1

    train_val, test = train_test_split(df, test_size=test_ratio, random_state=42)
    train, val = train_test_split(train_val, test_size=val_ratio/(train_ratio + val_ratio), random_state=42)

    return train, val, test

In [3]:
# class CustomDataset(Dataset):
#     def __init__(self, dataframe, data_path, transform=None):
#         self.dataframe = dataframe
#         self.data_path = data_path
#         self.transform = transform
# 
#     def __len__(self):
#         return len(self.dataframe)
# 
#     def __getitem__(self, idx):
#         img_name = self.dataframe.iloc[idx, 0]  # Assuming the image column is 'Image_ID'
#         img_path = f"{self.data_path}/{img_name}"
#         image = Image.open(img_path)
#         age = torch.tensor(self.dataframe.iloc[idx,1])
#         gender = torch.tensor(self.dataframe.iloc[idx,2])
# 
#         if self.transform:
#             image = self.transform(image)
# 
#         return image, [age, gender]

In [42]:
# Define transformations for data augmentation
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(),
    # transforms.RandomRotation(20),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])

In [59]:
class CustomDataset2(Dataset):
    def __init__(self, image_folder, dataframe, transform=None):
        self.image_folder = image_folder
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = self.dataframe.iloc[idx, 0] + '.jpg'  # Assuming the image column is 'Image_ID'
        age = torch.tensor(self.dataframe.iloc[idx,1])
        gender = torch.tensor(self.dataframe.iloc[idx,2])

        # Lazy loading: return the image path and label instead of loading the image
        return img_name, [age,gender]

    def load_image(self, img_name):
        img_path = self.image_folder + '/' + img_name
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image

In [45]:
def preprocess_data(data_path,):
    train_data, val_data, test_data = split_data(df)  # Assuming you have a function that splits your data

    # train_dataset = CustomDataset(dataframe=train_data, data_path=data_path, transform=transform)
    # val_dataset = CustomDataset(dataframe=val_data, data_path=data_path, transform=transform)
    # test_dataset = CustomDataset(dataframe=test_data, data_path=data_path)
    

    return train_loader, val_loader, test_loader

In [66]:
data_path = '../data/UTKFace'

In [90]:
batch_size=32

In [63]:
df = pd.read_csv('../data/UTKFace_labels.csv', dtype={'Age':'float32', 'Gender':'float32'})

In [64]:
train_data, val_data, test_data = split_data(df) 

In [91]:
train_dataset = CustomDataset2(dataframe=train_data, image_folder=data_path, transform=transform)
val_dataset = CustomDataset2(dataframe=val_data, image_folder=data_path, transform=transform)
test_dataset = CustomDataset2(dataframe=test_data, image_folder=data_path)

In [92]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [54]:
# data_path = data_path = "../data/UTKFace"
# train_loader, val_loader, test_loader = preprocess_data(data_path)

In [93]:
# Iterate through the train_loader to get a batch of data
for batch_idx, (img_names, targets) in enumerate(train_loader):
    data = [train_dataset.load_image(img_name) for img_name in img_names]
    example_data = data  # This will contain a batch of images
    example_targets = targets  # This will contain the corresponding labels/targets
    break

In [94]:
tf.print(example_data)

[tensor([[[-0.2549, -0.2471, -0.2471,  ..., -0.9686, -0.9765, -0.9922],
         [-0.2549, -0.2471, -0.2471,  ..., -0.9686, -0.9765, -0.9922],
         [-0.2549, -0.2471, -0.2471,  ..., -0.9451, -0.9529, -0.9765],
         ...,
         [-0.2235, -0.2863, -0.2157,  ..., -0.4196, -0.3961, -0.2784],
         [-0.3725, -0.4431, -0.4431,  ..., -0.3333, -0.1843, -0.0980],
         [-0.4980, -0.6471, -0.6941,  ..., -0.1765,  0.0667,  0.0745]],

        [[-0.2157, -0.2157, -0.2157,  ..., -0.9686, -0.9686, -0.9843],
         [-0.2157, -0.2157, -0.2157,  ..., -0.9686, -0.9765, -0.9922],
         [-0.2157, -0.2157, -0.2157,  ..., -0.9529, -0.9608, -0.9765],
         ...,
         [-0.2157, -0.2784, -0.2078,  ..., -0.3647, -0.3412, -0.2235],
         [-0.3647, -0.4353, -0.4353,  ..., -0.2863, -0.1294, -0.0431],
         [-0.4902, -0.6392, -0.6863,  ..., -0.1294,  0.1216,  0.1294]],

        [[-0.1843, -0.1922, -0.1922,  ..., -0.9686, -0.9686, -0.9686],
         [-0.1843, -0.1922, -0.1922,  ..., -

In [82]:
tf.print(example_targets)

[tensor([26., 49.,  9., 26., 76., 45., 26., 17.]),
 tensor([1., 0., 0., 0., 0., 1., 1., 0.])]


In [99]:
import torch
import torch.nn as nn

class CustomModel(nn.Module):
    def __init__(self):
        super(CustomModel, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.max_pool = nn.MaxPool2d(kernel_size=2)

        # Fully connected layers
        self.flatten = nn.Flatten()
        self.dense_shared = nn.Linear(64 * 64 * 64, 128)  # Calculate the input size based on your input_shape

        # Output layers
        self.classification_output = nn.Linear(128, 1)
        self.regression_output = nn.Linear(128, 1)

        # Activation functions
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        self.linear = nn.Identity()  # No activation for linear output

    def forward(self, x):
        # Forward pass through convolutional layers
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.max_pool(x)

        # Flatten and pass through fully connected layers
        x = self.flatten(x)
        x = self.relu(self.dense_shared(x))

        # Classification branch
        classification_out = self.sigmoid(self.classification_output(x)).squeeze()

        # Regression branch
        regression_out = self.linear(self.regression_output(x)).squeeze()

        return regression_out, classification_out

In [100]:
class MultiTaskLossWrapper(nn.Module):
    def __init__(self, task_num):
        super(MultiTaskLossWrapper, self).__init__()
        self.task_num = task_num
        self.log_vars = nn.Parameter(torch.zeros((task_num)))

    def forward(self, age_pred, gen_pred, age_true, gen_true):
        mse, binCrossEntropy = nn.MSELoss(), nn.BCELoss()
        
        loss0 = mse(age_pred, age_true)
        loss1 = binCrossEntropy(gen_pred, gen_true)

        precision0 = torch.exp(-self.log_vars[0])
        loss0 = precision0*loss0 + self.log_vars[0]

        precision1 = torch.exp(-self.log_vars[1])
        loss1 = precision1*loss1 + self.log_vars[1]

        return loss0+loss1

In [101]:
model = CustomModel()
loss_func = MultiTaskLossWrapper(2)

In [104]:
# Define your optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)  # You can adjust the learning rate as needed

# Training loop
epochs = 50  # Define the number of epochs for training
for epoch in range(epochs):
    print("epoch:", epoch)
    model.train()  # Set the model to training mode
    total_loss = 0.0

    for batch_idx, (img_names, targets) in enumerate(train_loader):
        print(batch_idx)
        data = torch.stack([train_dataset.load_image(img_name) for img_name in img_names])
        # print(type(data))
        optimizer.zero_grad()  # Zero the gradients to prevent accumulation
        age_pred, gen_pred = model(data)  # Forward pass
        age_true = targets[0]
        gen_true = targets[1]
        # print("age:",age_pred)
        # print("gen:",gen_pred)
        # print(targets[0])
        # print(age_pred)
        # age = targets[:, 0]  # Assuming age is the first element in targets
        # gender = targets[:, 1]  # Assuming gender is the second element in targets
        # print(age)
        loss = loss_func(age_pred, gen_pred, age_true, gen_true)
        total_loss += loss.item()
        # Backpropagation
        loss.backward()
        optimizer.step()

    # Calculate average loss for the epoch
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch + 1}/{epochs}], Loss: {avg_loss:.4f}")

epoch: 0
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33


KeyboardInterrupt: 

In [None]:
# Validation loop
## NOT CHECKED
model.eval()  # Set the model to evaluation mode
total_val_loss = 0.0
with torch.no_grad():
    for batch_idx, (val_img_names, val_targets) in enumerate(val_loader):
        val_data = torch.stack([val_dataset.load_image(img_name) for img_name in val_img_names])
        val_age_pred, val_gen_pred = model(val_data)  # Forward pass
        val_age_true = val_targets[0]
        val_gen_true = val_targets[1]
        val_loss = loss_func(val_age_pred, val_gen_pred, val_age_true, val_gen_true)
        total_val_loss += val_loss.item()

# Calculate average validation loss
avg_val_loss = total_val_loss / len(val_loader)
print(f"Validation Loss: {avg_val_loss:.4f}")

In [105]:
# Prediction loop 
## NOT WORKING YET
model.eval()  # Set the model to evaluation mode
predictions = []
with torch.no_grad():
    for test_img_names in test_loader:
        test_data = torch.stack([test_dataset.load_image(img_name) for img_name in test_img_names])
        test_age_pred, test_gen_pred = model(test_data)  # Forward pass
        # You can do something with the predictions here, like storing them for further analysis
        predictions.append((test_age_pred, test_gen_pred))

TypeError: can only concatenate str (not "tuple") to str