In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import nn
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np
import pathlib
from PIL import Image
import matplotlib.pyplot as plt
from scipy.fftpack import dct
import time
from datetime import datetime


from google.colab import drive
drive.mount('/content/drive', force_remount=True)

'''
Usage:
as image preprocessor by calling DCT() as a transform

input: 128*128*3 PIL image
output: 64*256 torch array (histogram)

[128*128] => (crop) => [256 * [8*8]] => (DCT_2d) => [256 * [8 * 8]] => reshape => [256 * 64]  
'''
class DCT(object):
    def __init__(self):
        self.BLOCK_HEIGHT = 8
        self.BLOCK_WIDTH = 8
        self.BLOCK_SIZE = (self.BLOCK_HEIGHT, self.BLOCK_WIDTH)

    def div_block(self, img, block_size):
        img_height = img.height
        img_width = img.width
        block_height = block_size[0]
        block_width = block_size[1]
        assert(img_height % block_height == 0)
        assert(img_width % block_width == 0)

        blocks = []
        for i in range(0,img_height,block_height):
            for j in range(0,img_width,block_width):
                box = (j, i, j+block_width, i+block_height)
                block = np.array(img.crop(box))
                blocks.append(block)
        return np.array(blocks)

    def dct2(self, array_2d):
        return dct(dct(array_2d.T, norm = 'ortho').T, norm = 'ortho')

    def _dct2(self, array_2d):
        return dct(dct(array_2d, norm = 'ortho').T, norm = 'ortho').T

    def __call__(self, img):
        image = img
        blocks = self.div_block(image, self.BLOCK_SIZE)
        b_blocks, g_blocks, r_blocks = blocks[:, :, :, 0], blocks[:, :, :, 1], blocks[:, :, :, 2]
        test_blocks = (b_blocks + g_blocks + r_blocks) / 3 # naive greyscale
        result = np.array([self._dct2(test_block) for test_block in test_blocks])
        # return a torch.tensor
        return torch.from_numpy(result.reshape(256, 64).T).float()

    def __repr__(self):
        return "DCT"

'''
Usage: Same as DCT()

input: 64*256 torch array (histogram)
output: 64*256 torch array (frequency histogram)
'''
class DFT(object):
    def __init__(self):
        pass

    def __call__(self, freq):
        # convert into complex form containing real and imaginary part
        cmplx = torch.from_numpy(np.zeros((freq.shape[0], freq.shape[1], 2)))
        cmplx[:, :, 0] += freq
        out = torch.fft(cmplx, 1)[:, :, 0]
        return out

    def __repr__(self):
        return "DFT"

class Ycbcr_convert():
    def __init__(self):
        pass

    def __call__(self, img):
        return img.convert('YCbCr')

    def __repr__(self):
        return "Convert a PIL Image from RGB to YCbCr"

train_path='/content/drive/MyDrive/colab_data/train/'
test_path='/content/drive/MyDrive/colab_data/test/'

image_height = 128
image_width  = 128
mytransform = transforms.Compose([
    transforms.Resize((image_height,image_width), interpolation=Image.BICUBIC),
    Ycbcr_convert(),
    DCT(),
    #DFT()
])

trainset = datasets.ImageFolder(train_path, transform=mytransform)
testset = datasets.ImageFolder(test_path, transform=mytransform)

print('Total no. of train images: ', len(trainset))
print('Total no. of test images: ', len(testset))

Mounted at /content/drive


  "Argument interpolation should be of type InterpolationMode instead of int. "


Total no. of train images:  14459
Total no. of test images:  3662


In [None]:
#specify train, test images count here. Maintain 80-20 split
train_img_count = 400
test_img_count = 100

#specify epochs count
MAX_EPOCH = 10


trainset = torch.utils.data.random_split(trainset, [train_img_count, len(trainset)-train_img_count])[0]
testset = torch.utils.data.random_split(testset, [test_img_count, len(testset)-test_img_count])[0]


print('Total no. of train set images: ', len(trainset))
print('Total no. of test set images: ', len(testset))

root=pathlib.Path(train_path)
classes=sorted([j.name.split('/')[-1] for j in root.iterdir()])
print('classes:', classes)

batch_size = 32

trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True) 
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=True)

Total no. of train set images:  400
Total no. of test set images:  100
classes: ['Boredom', 'Confusion', 'Engagement', 'Frustration']


In [None]:
#custom layer/more like activation function
def custom_layer1d(X, pool_size):
    Y = torch.zeros(X.shape[0] - pool_size + 2, X.shape[1] - pool_size + 2, X.shape[2] - pool_size + 2)
    #print(X.shape[0], X.shape[1], X.shape[2])
    #print(Y.shape[0], Y.shape[1], Y.shape[2])
    for i in range(Y.shape[0]):
      for j in range(Y.shape[1]):
        for k in range(Y.shape[2]):
          Y[i,j,k] = X[i:i + pool_size, j:j + pool_size, k:k + pool_size].max()-X[i:i + pool_size, j:j + pool_size, k:k + pool_size].min().abs()
    return Y

In [None]:
class FreNet(nn.Module):
    def __init__(self):
        super(FreNet, self).__init__()
        self.backbone = nn.Sequential(nn.Conv1d(64, 32, 3, padding=1),
                             nn.BatchNorm1d(32),
                             nn.ReLU(),
                             nn.MaxPool1d(2),
                             nn.Conv1d(32, 64, 3, padding=1),
                             nn.BatchNorm1d(64),
                             nn.ReLU(),
                             nn.MaxPool1d(2),
                             #nn.Conv1d(64, 128, 3, padding=1),
                             #nn.BatchNorm1d(128),
                             #nn.ReLU(),
                             #nn.MaxPool1d(2),
                             nn.Flatten(),
                             nn.Linear(4096, 64),
                             nn.ReLU(),
                             nn.Linear(64, 64))

    def forward(self, x):
        out = self.backbone.forward(x)
        return out

In [None]:
device = 'cpu'
model = FreNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0002)

# Hyper-parameters
learning_rate = 0.0001 # adopt a small lr to ensure convergence
batch_size = 32
resumetraining = False
print_every = 1
test_n_savemodel_every_epoch = 1
#device = 'cuda'
seed_no = 0
stop_at_loss = 0.1

In [None]:
#Train model 
#print(checkpoint)
for epoch in range(MAX_EPOCH):
    total_loss, total_acc = 0, 0
    cnt = 0
    for i, data in enumerate(trainloader):
        X, y = data[0].float().to(device), data[1].to(device)
        optimizer.zero_grad() 
       
        # forward
        out = model(X)        
        #print(out, y_pred)
        loss = criterion(out, y)

        # backward
        loss.backward()
        optimizer.step()

        # stats
        y_pred = torch.argmax(out, dim=1)
        total_acc += (y_pred == y).sum().item() / len(y_pred)
        total_loss += loss.item()

        cnt += 1
        if i % print_every == 0:
            avg_loss = total_loss / cnt
            avg_acc = total_acc / cnt
            total_acc, total_loss = 0, 0
            cnt = 0
            # print(out.T, '\n', y_pred.T, '\n', y.T)
            print('[Epoch %d Iter %d] Loss: %5f  Acc: %5f' % (epoch+1, i+1, avg_loss, avg_acc))
    torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': criterion,
            }, 
            #specify save path along with model name
            '/content/drive/MyDrive/colab_data/All_models/model_name')
#print(checkpoint)

In [None]:
#for saving checkpoints 
"""# load the model checkpoint
checkpoint = torch.load('/content/drive/MyDrive/colab_data/models')
#print(checkpoint)
# load model weights state_dict
model.load_state_dict(checkpoint['model_state_dict'])
print('Previously trained model weights state_dict loaded...')
# load trained optimizer state_dict
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
print('Previously trained optimizer state_dict loaded...')
epoch = checkpoint['epoch']
# load the criterion
criterion = checkpoint['loss']
print('Trained model loss function loaded...')
print(f"Previously trained for {epoch} number of epochs...")
# train for more epochs
epoch = 5
print(f"Train for {epoch} more epochs...")
"""

'# load the model checkpoint\ncheckpoint = torch.load(\'/content/drive/MyDrive/colab_data/models\')\n#print(checkpoint)\n# load model weights state_dict\nmodel.load_state_dict(checkpoint[\'model_state_dict\'])\nprint(\'Previously trained model weights state_dict loaded...\')\n# load trained optimizer state_dict\noptimizer.load_state_dict(checkpoint[\'optimizer_state_dict\'])\nprint(\'Previously trained optimizer state_dict loaded...\')\nepoch = checkpoint[\'epoch\']\n# load the criterion\ncriterion = checkpoint[\'loss\']\nprint(\'Trained model loss function loaded...\')\nprint(f"Previously trained for {epoch} number of epochs...")\n# train for more epochs\nepoch = 5\nprint(f"Train for {epoch} more epochs...")\n'