In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from optparse import OptionParser
import numpy as np
import matplotlib.pyplot as plt
from keras.utils import to_categorical
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, confusion_matrix
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
from torch.autograd import Variable
import torch.optim as optim
from pytorch_metric_learning import losses
import os
import shutil
from PIL import Image, ImageOps
from torchvision import transforms as T
import cv2
import random
from scipy.special import softmax
import torch.utils.data as data

In [2]:

class DataSource(data.Dataset):
    def __init__(self, root, resize=224, crop_size=224, train=True):
        self.root = os.path.expanduser(root)
        self.resize = resize
        self.crop_size = crop_size
        self.train = train
        
        self.images_path = []
        self.labels = []
        if train:
            train_path = root + '\\train'
            #print(train_path)
            for roots, files, dir in os.walk(train_path, topdown=False):
                    for name in dir:
                        full = os.path.join(roots, name)
                        if full.find("jpg") != -1:
                            self.images_path.append(full)
                        if full.find("angry")  !=-1:
                            self.labels.append(0)
                        if full.find("disgust")  !=-1:
                            self.labels.append(1)
                        if full.find("fear")  !=-1:
                            self.labels.append(2)
                        if full.find("happy")  !=-1:
                            self.labels.append(3)
                        if full.find("sad")  !=-1:
                            self.labels.append(4)
                        if full.find("surprise")  !=-1:
                            self.labels.append(5)
                        if full.find("neutral")  !=-1:
                            self.labels.append(6)

        
        else:
            test_path = root + '\\test'
            for roots, files, dir in os.walk(test_path, topdown=False):
                for name in dir:
                    full = os.path.join(roots, name)
                    if full.find("jpg") != -1:
                        self.images_path.append(full)
                        if full.find("angry")  !=-1:
                            self.labels.append(0)
                        if full.find("disgust")  !=-1:
                            self.labels.append(1)
                        if full.find("fear")  !=-1:
                            self.labels.append(2)
                        if full.find("happy")  !=-1:
                            self.labels.append(3)
                        if full.find("sad")  !=-1:
                            self.labels.append(4)
                        if full.find("surprise")  !=-1:
                            self.labels.append(5)
                        if full.find("neutral")  !=-1:
                            self.labels.append(6)


        # TODO: Define preprocessing

    def resized(self, data):
        # TODO: Perform preprocessing
        comp = T.ToTensor()
        data = comp(data)
        if self.train:
            compose = T.Compose([T.Resize((224, 224)), T.RandomHorizontalFlip(),T.Normalize(mean=0.5, std=0.5)])
            data = compose(data)
        else:
            compose = T.Compose([T.Resize((224, 224))])
            data = compose(data)
        return data

    def __getitem__(self, index):
        """
        return the data of one image
        """
        img_path = self.images_path[index]
        data = Image.open(img_path)
        # TODO: Perform preprocessing
        data = self.resized(data)
        print(data.shape)
        cv2.imwrite("data_check.jpg", np.asarray(torch.permute(data, (1, 2, 0)) * 255))
        #print("Data saved!")
        #print(data.shape)
        return data, self.labels[index]

    def __len__(self):
        return len(self.images_path)

In [3]:
class depthwise_separable_conv(nn.Module):
    def __init__(self, nin, nout, kernel_size, padding, bias=False):
        super(depthwise_separable_conv, self).__init__()
        self.depthwise = nn.Conv2d(nin, nin, kernel_size=kernel_size, padding=padding, groups=nin, bias=bias)
        self.pointwise = nn.Conv2d(nin, nout, kernel_size=1, bias=bias)

    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        return out
class Xception(nn.Module):
    def __init__(self, input_channel, num_classes=7):
        super(Xception, self).__init__()
        
        # Entry Flow
        self.entry_flow_1 = nn.Sequential(
            nn.Conv2d(input_channel, 32, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(True),
            
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(True)
        )
        
        self.entry_flow_2 = nn.Sequential(
            depthwise_separable_conv(64, 128, 3, 1),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            
            depthwise_separable_conv(128, 128, 3, 1),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        
        self.entry_flow_2_residual = nn.Conv2d(64, 128, kernel_size=1, stride=2, padding=0)
        
        self.entry_flow_3 = nn.Sequential(
            nn.ReLU(True),
            depthwise_separable_conv(128, 256, 3, 1),
            nn.BatchNorm2d(256),
            
            nn.ReLU(True),
            depthwise_separable_conv(256, 256, 3, 1),
            nn.BatchNorm2d(256),
            
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        
        self.entry_flow_3_residual = nn.Conv2d(128, 256, kernel_size=1, stride=2, padding=0)
        
        self.entry_flow_4 = nn.Sequential(
            nn.ReLU(True),
            depthwise_separable_conv(256, 728, 3, 1),
            nn.BatchNorm2d(728),
            
            nn.ReLU(True),
            depthwise_separable_conv(728, 728, 3, 1),
            nn.BatchNorm2d(728),
            
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        
        self.entry_flow_4_residual = nn.Conv2d(256, 728, kernel_size=1, stride=2, padding=0)
        
        # Middle Flow
        self.middle_flow = nn.Sequential(
            nn.ReLU(True),
            depthwise_separable_conv(728, 728, 3, 1),
            nn.BatchNorm2d(728),
            
            nn.ReLU(True),
            depthwise_separable_conv(728, 728, 3, 1),
            nn.BatchNorm2d(728),
            
            nn.ReLU(True),
            depthwise_separable_conv(728, 728, 3, 1),
            nn.BatchNorm2d(728)
        )
        
        # Exit Flow
        self.exit_flow_1 = nn.Sequential(
            nn.ReLU(True),
            depthwise_separable_conv(728, 728, 3, 1),
            nn.BatchNorm2d(728),
            
            nn.ReLU(True),
            depthwise_separable_conv(728, 1024, 3, 1),
            nn.BatchNorm2d(1024),
            
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        self.exit_flow_1_residual = nn.Conv2d(728, 1024, kernel_size=1, stride=2, padding=0)
        self.exit_flow_2 = nn.Sequential(
            depthwise_separable_conv(1024, 1536, 3, 1),
            nn.BatchNorm2d(1536),
            nn.ReLU(True),
            
            depthwise_separable_conv(1536, 2048, 3, 1),
            nn.BatchNorm2d(2048),
            nn.ReLU(True)
        )
        
        self.linear = nn.Linear(2048, num_classes)
        
    def forward(self, x):
        entry_out1 = self.entry_flow_1(x)
        entry_out2 = self.entry_flow_2(entry_out1) + self.entry_flow_2_residual(entry_out1)
        entry_out3 = self.entry_flow_3(entry_out2) + self.entry_flow_3_residual(entry_out2)
        entry_out = self.entry_flow_4(entry_out3) + self.entry_flow_4_residual(entry_out3)
        
        middle_out = self.middle_flow(entry_out) + entry_out
        
        for i in range(7):
          middle_out = self.middle_flow(middle_out) + middle_out

        exit_out1 = self.exit_flow_1(middle_out) + self.exit_flow_1_residual(middle_out)
        exit_out2 = self.exit_flow_2(exit_out1)

        exit_avg_pool = F.adaptive_avg_pool2d(exit_out2, (1, 1))                
        exit_avg_pool_flat = exit_avg_pool.view(exit_avg_pool.size(0), -1)

        output = self.linear(exit_avg_pool_flat)
        
        return output

In [4]:
clf = Xception(1)
print(clf)

Xception(
  (entry_flow_1): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
  )
  (entry_flow_2): Sequential(
    (0): depthwise_separable_conv(
      (depthwise): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=64, bias=False)
      (pointwise): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
    )
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): depthwise_separable_conv(
      (depthwise): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=128, bias=False)
      (pointwise): Conv2d(128, 128, 

In [5]:
# class convblk(nn.Module):
#     def __init__(self, in_channels, out_channels, kernel_size, padding, stride, groups, bias = False):
#         super(convblk, self).__init__()
#         self.blk = nn.Sequential(nn.Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size, padding=padding, groups=groups, bias=bias), 
#                                  nn.Conv2d(in_channels=in_channels, out_channels=out_channels,kernel_size= 1, stride = 1, padding = 0, bias=bias))
#     def forward(self, x):
#         x = self.blk(x)
#         return x


# class depthwise(nn.Module):
#     def __init__(self, in_channels, out_channels, kernel_size, padding, stride, middle=False):
#         super(depthwise, self).__init__()
#         self.bn = nn.BatchNorm2d(out_channels)
#         if middle==False:
#             self.blk = nn.Sequential(nn.ReLU(), convblk(in_channels, out_channels, kernel_size, padding, stride, in_channels), 
#                                         self.bn, nn.ReLU(), convblk(out_channels, out_channels, kernel_size, padding, stride, out_channels), 
#                                         self.bn, nn.MaxPool2d(kernel_size=3, stride=2, padding=1))        
#         if middle==True:
#             self.blk = nn.Sequential(nn.ReLU(), convblk(in_channels, out_channels, kernel_size, padding, stride, in_channels), 
#                                             self.bn, nn.ReLU(), convblk(out_channels, out_channels, kernel_size, padding, stride, out_channels), 
#                                             self.bn, nn.ReLU(), convblk(out_channels, out_channels, kernel_size, padding, stride, out_channels))
#         if middle == 'Exit':
#             self.blk = nn.Sequential(convblk(in_channels, out_channels, kernel_size, padding, stride, in_channels), 
#                                         self.bn, nn.ReLU(), convblk(out_channels, 2048, kernel_size, padding, stride, out_channels), 
#                                         nn.BatchNorm2d(2048), nn.ReLU())



#     def forward(self, x):
#         x = self.blk(x)
#         return x

# class xception(nn.Module):
#     def __init__(self, num_classes = 7):
#         super(xception, self).__init__()

#         self.nc = num_classes
#         self.conv1 = nn.Conv2d(1, 32, kernel_size=3,stride=2, padding=1)
#         self.bn1 = nn.BatchNorm2d(32)
#         self.relu = nn.ReLU(inplace=True)

#         self.conv2 = nn.Conv2d(32,64,3,stride=1, padding=1)
#         self.bn2 = nn.BatchNorm2d(64)

#         #channels = [64, 128, 256, 728, 1024, 1536, 2048]

#         self.blk1 = depthwise(64,128, 3, 1, 1)
#         self.d1 = nn.Conv2d(64, 128, kernel_size=1, stride=2)

#         self.blk2 = depthwise(128, 256, 3, 1, 1)
#         self.d2 = nn.Conv2d(128, 256, kernel_size=1, stride=2)

#         self.blk3 = depthwise(256, 728, 3, 1, 1)
#         self.d3 = nn.Conv2d(256, 728, kernel_size=1, stride=2)

#         self.mid = depthwise(728, 728, 3, 1, 1, middle=True)

#         self.blk4 = depthwise(728, 1024, 3, 1, 1)
#         self.d4 = nn.Conv2d(728, 1024, kernel_size=1, stride=2)

#         self.exit = depthwise(1024, 1536, 3, 1, 1, middle='Exit')

#         self.gap = nn.AdaptiveAvgPool2d((1,1))

#         self.fc = nn.Linear(2048, self.nc)
#         self.soft = nn.Softmax(dim = 1)

#     def forward(self, x):
#         x = self.relu(self.bn1(self.conv1(x)))
#         x1 = self.bn2(self.conv2(x))
#         x = self.blk1(x1)
#         x1 = self.d1(x1)
#         x1 = x1 + x
#         x = self.blk2(x1)
#         x2 = self.d2(x1)
#         x2 = x2 + x

#         x = self.blk3(x2)
#         x3 = self.d3(x2)
#         x3 = x3 + x

#         for i in range(9):
#             x  = self.mid(x3)
#             x3 = x + x3

#         x = self.blk4(x3)
#         x4 = self.d4(x3)
#         x4 = x + x4

#         x = self.exit(x4)

#         x = self.gap(x)

#         x = torch.reshape(x, (x.shape[0], -1))

#         x = self.fc(x)

#         # x = self.soft(x)
#         return x


In [None]:
import torch.utils.data as data
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)
def main(epochs, batch_size, learning_rate, save_freq, data_dir):
    # train dataset and train loader
    datasource = DataSource(data_dir, train=True)
    train_loader = torch.utils.data.DataLoader(dataset=datasource, batch_size=batch_size, shuffle=True)
    # load model
    load = True
    model = Xception(1).to(device)
    # posenet = model().to(device)
    if load:
        print('loading model')
        model.load_state_dict(torch.load('checkpoints\epoch47.onnx'))
    #loss function
    criterion = nn.CrossEntropyLoss()

    # train the network
    optimizer = torch.optim.SGD(nn.ParameterList(model.parameters()),
                     lr=learning_rate, momentum=0.9)

    batches_per_epoch = len(train_loader.batch_sampler)

    for epoch in range(epochs):
        model.train()
        for step, batches in enumerate(train_loader):
            #print(batches[1].shape)
            images, labels = batches
            images = images.to(device)
            labels = labels.to(device)
            #images, labels = Variable(batches, requires_grad=True).to(device)
            predict = model(images)
            loss = criterion(predict, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print("{}/{}: loss = {}".format(step+1, batches_per_epoch, loss))
            if (epoch + 1) % 13 == 0:

                save_filename = "epoch60.onnx"
                save_path = os.path.join('checkpoints', save_filename)
                torch.save(model.state_dict(), save_path)
if __name__ == '__main__':

    main(epochs=13, batch_size=16, learning_rate=5e-3, save_freq=20, data_dir="archive")


In [21]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
testset = DataSource("archive", train = False)
testloader = DataLoader(dataset=testset, batch_size = 64, shuffle=False)
output_labels = []
true_labels = []
load = True
model = Xception(1).to(device)
if load:
        print('loading model')
        model.load_state_dict(torch.load('checkpoints\epoch47.onnx'))
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for i, data in enumerate(testloader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        true_labels.append(labels)
        _, predicted = torch.max(outputs.data, 1)
        output_labels.append(predicted)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
      
    print('[%d epoch] Accuracy of the network on the validation images: %d %%' % 
        (1, 100 * correct / total)
        )

loading model
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224])
torch.Size([1, 224, 224]

KeyboardInterrupt: 

In [18]:
print(output_labels[:1])

[tensor([0, 0, 6, 3, 3, 6, 5, 3, 0, 0, 1, 5, 6, 6, 6, 5, 1, 5, 6, 0, 3, 6, 0, 6,
        6, 3, 0, 6, 5, 5, 6, 4, 6, 6, 6, 6, 6, 3, 0, 1, 6, 6, 6, 6, 1, 1, 6, 0,
        5, 6, 5, 2, 0, 5, 0, 5, 0, 3, 3, 0, 6, 0, 6, 3], device='cuda:0')]


In [19]:
print(true_labels[:1])

[tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], device='cuda:0')]


In [23]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)
emote = {0 : "Angry", 1 : "Disgusted", 2 : "Afraid", 3 : "Happy", 4 : "Sad", 5 : "Surprised", 6 : "Neutral"}
load = True
model = Xception(1).to(device)
if load:
    print('loading model')
    model.load_state_dict(torch.load('checkpoints\epoch47.onnx'))
cap = cv2.VideoCapture(0)
while True:
    ret, frame = cap.read()
    if not ret:
        break
    detector = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    face = detector.detectMultiScale(gray, scaleFactor=1.05, minNeighbors=5)
    for (fx, fy, fw, fh) in face:
        faces = gray[fx : fy + fh, fx : fx + fw]
        cv2.rectangle(frame, (fx, fy), (fx + fw, fy + fh), (0, 255, 255), 2)
        crop = T.Compose([T.ToTensor(), T.Resize((224, 224))])
        cropped_image = crop(faces).to(device)
        cropped_image = torch.unsqueeze(cropped_image, dim=0)
        pred = model(cropped_image)
        class_label = int(torch.argmax(pred))
        cv2.putText(frame, emote[class_label],  (fx+20, fy-60), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 0, 0), 2, cv2.LINE_4)
    
    cv2.imshow('Frames', cv2.resize(frame,(1920,1080),interpolation = cv2.INTER_CUBIC))
    key = cv2.waitKey(1)
    if key == ord('w'):
        break
cap.release()
cv2.destroyAllWindows()

cuda:0
loading model
