In [1]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
cuda = torch.cuda.is_available()
import numpy as np
import collections
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
from skimage import io
import centerloss

In [2]:
import torchvision
from PIL import Image

#981
train_size= 589
val_size= 196
test_size = 196

In [3]:
def parse_data(datadir, label_map):
    img_list = []
    file_list = []
    
    for root, directories, filenames in os.walk(datadir):      
        for filename in filenames:
            file_list.append(filename)
            if filename.endswith('.png'):
                
                filei = os.path.join(root, filename)
                file_ids = filename.split('_')
                file_id = file_ids[0] + '_' + file_ids[1]
                if file_id in label_map:
                    img_list.append(filei)
    return img_list[:train_size], img_list[train_size:train_size+val_size], img_list[train_size+val_size: train_size+val_size+test_size]
#     return img_list



def parse_emotion_data(datadir):
    em_map = {}
    file_list = []
    for root, directories, filenames in os.walk(datadir):
        for filename in filenames:
            file_list.append(filename)
            if filename.endswith('.txt'):
                   
                f = open(root +  "/" + filename, 'r')
                lines = []
                for line in f.readlines():
                    lines.append(line)
                value = lines[0]
                f.close()
                
                keys = filename.split('_')
                key = keys[0] + '_' + keys[1]
                em_map[key] = int(float(value.strip())) - 1
                
    return em_map


def split_folds_data(data, fold_id, num_folds):
    fold_size = len(data)//num_folds
    print(fold_size)
    data = data[:fold_size * num_folds]

    
    val_fold_ids = [(fold_id) % num_folds, (fold_id + 1) % num_folds]
    test_fold_ids = [(fold_id + 2) % num_folds, (fold_id + 3) % num_folds]
    
    data_div = [data[i*fold_size : (i+1)*fold_size] for i in range(num_folds)]
    
    t_data = np.concatenate([data_div[i] for i in range(num_folds) if i not in val_fold_ids and i not in test_fold_ids], axis=0)
    
    v_data = np.concatenate([data_div[fid] for fid in val_fold_ids], axis = 0)
    
    test_data = np.concatenate([data_div[fid] for fid in test_fold_ids], axis = 0)
    return t_data, v_data, test_data


In [4]:
label_map = parse_emotion_data("Emotion")
# img_list = parse_data("cohn-kanade-images", label_map)


# for i in range(10):
#     t_data, v_data, test_data = split_folds_data(img_list, i, 10)
#     print("img_list", len(img_list))
#     print("t_data len", len(t_data))
#     print("v_data len", len(v_data))
#     print("test_data len", len(test_data))
#     print("\n");
    

#     assert 0.6 * (len(img_list) - 1) == len(t_data)
#     assert 0.2 * (len(img_list) - 1) == len(v_data)
#     assert 0.2 * (len(img_list) - 1) == len(test_data)


In [5]:
class Config:
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

config = Config(
    num_classes = 7,
    width = 224,
    height = 224,
    num_epochs = 10,
    batch_size = 32,
    feat_dim = 7,
    lr_cent = 0.5,
    closs_weight = 0.5
)

In [6]:
class ImageDataset(Dataset):
    def __init__(self, file_list, label_map, train = False):
        self.file_list = file_list
        self.label_map = label_map
        self.train = train
        self.data_len = len(self.file_list)

    def __len__(self):
        if self.train:
            return self.data_len * 5
        else:
            return self.data_len

    def __getitem__(self, index):
        img = None
        img_pil = None
        img_h = config.width
        img_w = config.height
        if index < self.data_len:
            img = Image.open(self.file_list[index])
            img_pil = torchvision.transforms.Resize((img_h,img_w))(img)
            img = torchvision.transforms.ToTensor()(img_pil)
        elif index < 2 * self.data_len:
            index = index - self.data_len
            img = Image.open(self.file_list[index])
            img = torchvision.transforms.RandomHorizontalFlip(p = 1.0)(img)
            img_pil = torchvision.transforms.Resize((img_h,img_w))(img)
            img = torchvision.transforms.ToTensor()(img_pil)
        elif index < 3 * self.data_len:
            index = index - 2 * self.data_len
            img = Image.open(self.file_list[index])
            img = torchvision.transforms.RandomRotation(30)(img)
            img_pil = torchvision.transforms.Resize((img_h,img_w))(img)
            img = torchvision.transforms.ToTensor()(img_pil)
        elif index < 4 * self.data_len:
            index = index - 3 * self.data_len
            img = Image.open(self.file_list[index])
            img = torchvision.transforms.RandomAffine(5, translate=(0.1,0.1), scale=(1.1,1.2), shear=0, resample=False, fillcolor=0)(img)
            img_pil = torchvision.transforms.Resize((img_h,img_w))(img)
            img = torchvision.transforms.ToTensor()(img_pil)
        else:
            index = index - 4 * self.data_len
            img = Image.open(self.file_list[index])
            img = torchvision.transforms.RandomAffine(5, translate=(0,0), scale=(1.1,1.2), shear=5, resample=False, fillcolor=0)(img)
            img_pil = torchvision.transforms.Resize((img_h,img_w))(img)
            img = torchvision.transforms.ToTensor()(img_pil)
            
            
        
        if img.shape[0] == 3:
            img = torchvision.transforms.Grayscale(num_output_channels=1)(img_pil)
            img = torchvision.transforms.ToTensor()(img)
        img = torchvision.transforms.Normalize(mean=[0.485], std=[0.229])(img)
        keys = self.file_list[index].split('/')[-1].split('.')[0].split('_')
        label = self.label_map[keys[0] + '_' + keys[1]]
        return img, label

In [7]:
def dataset_hist_data(dataset):
    dataiter = iter(dataset)
    labels = []
    for i in range(len(dataset)):
        _, label = dataiter.next()
        labels += [label]
    return labels


# labels_all = [dataset_hist_data(train_dataset), dataset_hist_data(dev_dataset), dataset_hist_data(test_dataset)]
# n_bins = 30
# colors = ['red', 'tan', 'lime']
# plt.hist(labels_all, n_bins, density=True, histtype='bar', color=colors, label=['train', 'dev', 'test'])
# plt.legend(prop={'size': 10})
# plt.title("Data distribution")
# plt.show()





In [8]:
# Given image filename, return it's corresponding label from label_map
def label_util(filename, label_map):
    keys = filename.split('/')[-1].split('.')[0].split('_')
    label = label_map[keys[0] + '_' + keys[1]]
    return label

# expressions = ['Anger','Contempt','Disgust','Fear','Happy','Sadness','Surprise']
# idxs = np.random.randint(100, size=8)
# f, a = plt.subplots(2, 4, figsize=(10, 5))

    
# for i in range(8):
#     image = io.imread(train_img_list[idxs[i]]) 
#     r, c = i // 4, i % 4
    
#     # Display an image
#     label_no = label_util(train_img_list[idxs[i]], label_map)
#     a[r][c].set_title(expressions[label_no])
#     a[r][c].imshow(image)
#     a[r][c].axis('off')

# plt.show()

In [9]:
import logging
logging.basicConfig(filename="training_fer_ckp.log" ,
                            filemode="a+")
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

In [10]:
# train_dataset, dev_dataset, test_dataset = torch.utils.data.random_split(dataset, (train_size, val_size, test_size))

In [11]:
# for i in range(len(train_dataset)):
#     print(train_dataset[i][0].shape)

In [12]:
# Model Architecture

class AttentionLayer(nn.Module):
    def __init__(self, input1_size, input2_size):
        super(AttentionLayer, self).__init__()

        self.attention_fclayer = nn.Linear(input2_size, input1_size)
        
        
    def forward(self, input1, input2):
        
        self.input1 = input1
        self.input2 = input2
        
        self.batch, self.outchannel1, self.h1, self.w1 = input1.shape
        self.batch, self.outchannel2 = input2.shape
        
        input2_rescaled = self.attention_fclayer(self.input2)
        input2_rescaled = self.input2
        
        input2_bmm = input2_rescaled.view(self.batch,self.outchannel1,1)
        
        compat_scores = torch.zeros((self.batch, self.h1*self.w1))
        compat_scores = compat_scores.to(device)

        for h in range(self.h1):
            for w in range(self.w1):
                input1_bmm = self.input1[:,:,h,w].view(self.batch,1,self.outchannel1)
                compat_scores[:,h*self.w1+w] = torch.bmm(input1_bmm, input2_bmm).squeeze()
        
        normalized_compat_scores = F.softmax(compat_scores, dim=1)
        
        bmm_arg2 = self.input1.view(self.batch,self.outchannel1,self.h1*self.w1,1)
        bmm_argtemp = normalized_compat_scores.view(self.batch,1,self.h1*self.w1).repeat(1,self.outchannel1,1)
        bmm_arg1 = bmm_argtemp.view(self.batch,self.outchannel1,1,self.h1*self.w1)
        
        g_mod = torch.zeros((self.batch, self.outchannel1))
        g_mod = g_mod.to(device)
        
        for b in range(self.batch):
            g_mod[b,:] = torch.bmm(bmm_arg1[b,:,:,:], bmm_arg2[b,:,:,:]).squeeze()
        
        return g_mod

class ConvBlock(nn.Module):
    def __init__(self, C_in, C_out, kernel_size, stride):
        super(ConvBlock, self).__init__()
        self.block = nn.Sequential(
                          nn.Conv2d(in_channels=C_in, out_channels=C_out, kernel_size=kernel_size, stride=stride, padding=(1,1)),
                          nn.BatchNorm2d(C_out),
                          nn.ReLU(),
                          nn.MaxPool2d(2))
        
    def forward(self, x):
        return self.block(x)
    
class LinearBlock(nn.Module):
    def __init__(self, insize, outsize):
        super(LinearBlock, self).__init__()
        self.linblock = nn.Sequential(
                          nn.Linear(insize, outsize),
                          nn.BatchNorm1d(outsize),
                          nn.ReLU())
        
    def forward(self, x):
        return self.linblock(x)
    
class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)
    
class BaselineModel(nn.Module):
    def __init__(self, num_blocks):
        super(BaselineModel, self).__init__()
        layers = []
        num_classes = 7
        channels = [1, 256, 128, 64] # this needs to be modified according to num_blocks
        linear_size = [64*28*28, 512, 256]
        
        self.convlayer1 = ConvBlock(C_in=channels[0], C_out=channels[1], kernel_size=3, stride=1)
        self.convlayer2 = ConvBlock(C_in=channels[1], C_out=channels[2], kernel_size=3, stride=1)
        self.convlayer3 = ConvBlock(C_in=channels[2], C_out=channels[3], kernel_size=3, stride=1)        
        
        self.flattenlayer = Flatten()
        
        self.fclayer1 = nn.Linear(linear_size[0], linear_size[1])
        self.fclayer2 = nn.Linear(linear_size[1], linear_size[2])
        
        self.attlayer1 = AttentionLayer(channels[1],linear_size[2])
        
        self.fclayer3 = nn.Linear(channels[1], config.num_classes)
        
    def forward(self, x):
        
        self.out1 = self.convlayer1(x)
        self.out2 = self.convlayer2(self.out1)
        self.out3 = self.convlayer3(self.out2)
        
        self.out4 = self.flattenlayer(self.out3)
        
        self.out5 = self.fclayer1(self.out4)
        self.out6 = self.fclayer2(self.out5)
        
        self.attout = self.attlayer1(self.out1, self.out6)
        
        self.out = self.fclayer3(self.attout)
        
        return self.out

In [13]:

criterion = nn.CrossEntropyLoss()

device = torch.device("cuda" if cuda else "cpu")

criterion_closs = centerloss.CenterLoss(config.num_classes, config.feat_dim, device)

In [14]:
def train_closs(model,n_epochs,train_dataloader, test_loader):
    model.train()
    model.to(device)
    train_losses = []
    eval_losses = []
    eval_accs = []
    for epoch in range(n_epochs):
        avg_loss = 0.0
        for batch_num, (feats, labels) in enumerate(train_dataloader):
            feats, labels = feats.to(device), labels.to(device)
            
            optimizer.zero_grad()
            optimizer_closs.zero_grad()
            
            outputs = model(feats)
            
            loss = criterion(outputs, labels.long())
            c_loss = criterion_closs(outputs, labels.long())
            loss = loss + config.closs_weight * c_loss
            loss.backward()
            
            optimizer.step()
            for param in criterion_closs.parameters():
                param.grad.data *= (1. / config.closs_weight)
            optimizer_closs.step()
            
            avg_loss += loss.item()
            if batch_num % 50 == 49:
                logger.info('Epoch: {}\tBatch: {}\tAvg-Loss: {:.4f}'.format(epoch+1, batch_num+1, avg_loss/50))
 
                avg_loss = 0.0    
        
            torch.cuda.empty_cache()
            del feats
            del labels
            del loss
        train_loss, train_accuracy = test_classify_closs(model,train_dataloader)
        test_loss, test_accuracy = test_classify_closs(model,test_loader)
        eval_losses.append(test_loss)
        train_losses.append(train_loss)
        eval_accs.append(test_accuracy)
        logger.info('Epoch: {}\tTrain Loss: {}\tTrain Acc: {}\tTest-Loss: {}\tTest-acc: {:.4f}'.format(epoch+1, train_loss,train_accuracy, test_loss, test_accuracy))
    return train_losses, eval_losses, eval_accs

def test_classify_closs(model, test_loader):
    with torch.no_grad():
        model.eval()
        test_loss = []
        accuracies = 0
        total = 0
        for batch_num, (feats, labels) in enumerate(test_loader):
            feats, labels = feats.to(device), labels.to(device)
            outputs = model(feats)
            _, pred_labels = torch.max(F.softmax(outputs, dim=1), 1)
            pred_labels = pred_labels.view(-1)
            loss = criterion(outputs, labels.long())
            c_loss = criterion_closs(outputs, labels.long())
            loss = loss + config.closs_weight * c_loss
            
            accuracies += float(torch.sum(torch.eq(pred_labels, labels)).item())
            total+=float(len(labels))
            test_loss.extend([loss.item()]*feats.size()[0])
            torch.cuda.empty_cache()
            del feats
            del labels
    model.train()
    return np.mean(test_loss), accuracies/total

In [15]:
def train(model,n_epochs,train_dataloader, test_loader):
    model.train()
    model.to(device)
    train_losses = []
    eval_losses = []
    eval_accs = []
    for epoch in range(n_epochs):
        avg_loss = 0.0
        for batch_num, (feats, labels) in enumerate(train_dataloader):
            feats, labels = feats.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(feats)
            loss = criterion(outputs, labels.long())
            loss.backward()
            
            optimizer.step()
            
            avg_loss += loss.item()
            if batch_num % 50 == 49:
                logger.info('Epoch: {}\tBatch: {}\tAvg-Loss: {:.4f}'.format(epoch+1, batch_num+1, avg_loss/50))
 
                avg_loss = 0.0    
        
            torch.cuda.empty_cache()
            del feats
            del labels
            del loss
        train_loss, train_accuracy = test_classify_loss(model,train_dataloader)
        test_loss, test_accuracy = test_classify_loss(model,test_loader)
        eval_losses.append(test_loss)
        train_losses.append(train_loss)
        eval_accs.append(test_accuracy)
        logger.info('Epoch: {}\tTrain Loss: {}\tTrain Acc: {}\tTest-Loss: {}\tTest-acc: {:.4f}'.format(epoch+1, train_loss,train_accuracy, test_loss, test_accuracy))
    return train_losses, eval_losses, eval_accs

def test_classify_loss(model, test_loader):
    with torch.no_grad():
        model.eval()
        test_loss = []
        accuracies = 0
        total = 0
        for batch_num, (feats, labels) in enumerate(test_loader):
            feats, labels = feats.to(device), labels.to(device)
            outputs = model(feats)
            _, pred_labels = torch.max(F.softmax(outputs, dim=1), 1)
            pred_labels = pred_labels.view(-1)
            loss = criterion(outputs, labels.long())
            accuracies += float(torch.sum(torch.eq(pred_labels, labels)).item())
            total+=float(len(labels))
            test_loss.extend([loss.item()]*feats.size()[0])
            torch.cuda.empty_cache()
            del feats
            del labels
    model.train()
    return np.mean(test_loss), accuracies/total

In [16]:
num_folds = 1
running_val_acc = 0.0
running_test_acc = 0.0
batch_size = config.batch_size
num_epochs = config.num_epochs
for fold_id in range(0, num_folds):
    
#     train_img_list, val_img_list, test_list = split_folds_data(img_list, fold_id, num_folds)
    model = BaselineModel(num_blocks=3)
    optimizer = optim.Adam(model.parameters())
    
    #optimizer_closs = optim.Adam(model.parameters())
    #optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
    train_img_list, val_img_list, test_img_list = parse_data("cohn-kanade-images", label_map)
    train_dataset = ImageDataset(train_img_list, label_map, train = True)
    dev_dataset = ImageDataset(val_img_list, label_map)
    test_dataset = ImageDataset(test_img_list, label_map)

    # dataset based on fold_id
    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, 
                                               shuffle=True, num_workers=8,drop_last=True)

    dev_dataloader = torch.utils.data.DataLoader(dev_dataset, batch_size=batch_size, 
                                               shuffle=True, num_workers=8, drop_last=True)
    
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, 
                                               shuffle=False, num_workers=8, drop_last=True)

    train_losses, eval_losses, eval_accs = train(model, num_epochs, train_dataloader,dev_dataloader)
    running_val_acc += eval_accs[-1]
    
    test_loss, test_acc = test_classify_loss(model, test_dataloader)
    running_test_acc += test_acc
    
    
final_val_acc = running_val_acc / num_folds
final_test_acc = running_test_acc / num_folds

logger.info("val acc | test acc {} {} ".format(final_val_acc, final_test_acc))

AttributeError: 'AttentionLayer' object has no attribute 'batch'

In [None]:
#torch.save(model.state_dict(), "models/fer_cnn_ckp_augmentation_adam_closs_200_epochs.pt")

In [None]:
test_dataset = ImageDataset(test_img_list, label_map)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, 
                                               shuffle=False, num_workers=8)

In [None]:
test_loss, test_acc = test_classify_loss(model, test_dataloader)

In [None]:
test_acc

In [None]:
plt.title('Training Loss')
plt.xlabel('Epoch Number')
plt.ylabel('Loss')
plt.plot(train_losses)
plt.savefig("training_loss.png")

In [None]:
plt.title('Validation Accuracy')
plt.xlabel('Epoch Number')
plt.ylabel('accuracy')
plt.plot(eval_accs)
plt.savefig("val_acc.png")