In [1]:
import torch
import numpy as np
import random
torch.manual_seed(89)
np.random.seed(37)
random.seed(1254)

In [2]:
import os
import numpy as np
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import cv2
from PIL import Image
from random import shuffle

import pandas as pd

import pickle

from sklearn.svm import SVC,LinearSVC
from sklearn.ensemble import RandomForestClassifier, BaggingClassifier,ExtraTreesClassifier
from sklearn import svm,metrics,preprocessing
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn import preprocessing

#from scipy.misc import imread, imresize

%matplotlib inline

In [3]:
INPUT_SIZE = (224, 224)

# Training

In [4]:
import glob
from itertools import chain
import os
import random
import zipfile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from tqdm.notebook import tqdm

print(f"Torch: {torch.__version__}")

Torch: 1.9.0+cu102


In [5]:
# Training settings
batch_size = 2# 32 #16 #8 #
epochs = 40
lr = 3e-5
gamma = 0.7
device = 'cuda'
use_cuda = torch.cuda.is_available()
print(use_cuda)

True


In [6]:
IMG_SIZE=224 # 300 # 80 #

train_transforms = transforms.Compose(
    [
        #transforms.Resize((IMG_SIZE_ORIG,IMG_SIZE_ORIG)),
        #transforms.RandomResizedCrop(IMG_SIZE),
        transforms.Resize((IMG_SIZE,IMG_SIZE)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    ]
)

test_transforms = transforms.Compose(
    [
        transforms.Resize((IMG_SIZE,IMG_SIZE)),
        #transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    ]
)

In [7]:
train_dir = r'C:\Users\HP\FER_SL\sl_data\train_files_cropped'
test_dir = r'C:\Users\HP\FER_SL\sl_data\test_files_cropped'

kwargs = {'num_workers': 4, 'pin_memory': True} if use_cuda else {}

train_dataset = datasets.ImageFolder(root=train_dir, transform=train_transforms)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, **kwargs)
test_dataset = datasets.ImageFolder(root=test_dir, transform=test_transforms)
test_loader  = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, **kwargs) 

print(len(train_dataset), len(test_dataset))

2024 507


In [8]:
num_classes=len(train_dataset.classes)
print(num_classes)

7


In [9]:
# loss function
#weights = torch.FloatTensor(list(class_weights.values())).cuda()
#criterion = nn.CrossEntropyLoss(weight=weights)
criterion = nn.CrossEntropyLoss()

In [10]:
from sam import SAM
import copy
def train(model,n_epochs=epochs, learningrate=lr, use_sam=False):
    # optimizer
    if use_sam:
        optimizer = SAM(filter(lambda p: p.requires_grad, model.parameters()), optim.Adam, lr=learningrate)
    else:
        optimizer=optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=learningrate)
    # scheduler
    #scheduler = StepLR(optimizer, step_size=1, gamma=gamma)
    best_acc=0
    best_model=None
    for epoch in range(n_epochs):
        epoch_loss = 0
        epoch_accuracy = 0
        model.train()
        for data, label in tqdm(train_loader):
            data = data.to(device)
            label = label.to(device)

            output = model(data)
            loss = criterion(output, label)

            if use_sam:
                #optimizer.zero_grad()
                loss.backward()
                optimizer.first_step(zero_grad=True)
  
                # second forward-backward pass
                output = model(data)
                loss = criterion(output, label)
                loss.backward()
                optimizer.second_step(zero_grad=True)
            else:
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            acc = (output.argmax(dim=1) == label).float().mean()
            epoch_accuracy += acc / len(train_loader)
            epoch_loss += loss / len(train_loader)

        model.eval()
        with torch.no_grad():
            epoch_val_accuracy = 0
            epoch_val_loss = 0
            for data, label in test_loader:
                data = data.to(device)
                label = label.to(device)

                val_output = model(data)
                val_loss = criterion(val_output, label)

                acc = (val_output.argmax(dim=1) == label).float().mean()
                epoch_val_accuracy += acc / len(test_loader)
                epoch_val_loss += val_loss / len(test_loader)

        print(
            f"Epoch : {epoch+1} - loss : {epoch_loss:.4f} - acc: {epoch_accuracy:.4f} - val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_accuracy:.4f}\n"
        )
        if best_acc<epoch_val_accuracy:
            best_acc=epoch_val_accuracy
            best_model=copy.deepcopy(model.state_dict())
        #scheduler.step()
    
    if best_model is not None:
        model.load_state_dict(best_model)
        print(f"Best acc:{best_acc}")
        model.eval()
        with torch.no_grad():
            epoch_val_accuracy = 0
            epoch_val_loss = 0
            for data, label in test_loader:
                data = data.to(device)
                label = label.to(device)

                val_output = model(data)
                val_loss = criterion(val_output, label)

                acc = (val_output.argmax(dim=1) == label).float().mean()
                epoch_val_accuracy += acc / len(test_loader)
                epoch_val_loss += val_loss / len(test_loader)

        print(
            f"val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_accuracy:.4f}\n"
        )
    else:
        print(f"No best model Best acc:{best_acc}")

In [11]:
#adapted from https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html
def set_parameter_requires_grad(model, requires_grad):
    for param in model.parameters():
        param.requires_grad = requires_grad

In [12]:
from torchvision.models import resnet101,mobilenet_v2
import sys
import timm
sys.path.insert(0, 'timm==0.4.5')
#import timm #gets version 2
print(f"Timm: {timm.__version__}")

Timm: 0.4.5


In [13]:
model=timm.create_model('tf_efficientnet_b0_ns', pretrained=True)
model.classifier=torch.nn.Identity()
model = torch.load(r'C:\Users\HP\FER_SL\models\affectnet_emotions/enet_b0_7.pt') #_new

In [14]:
model.classifier=nn.Sequential(nn.Linear(in_features=1280, out_features=num_classes)) #1792 #1280 #1536
#model.head.fc=nn.Linear(in_features=3072, out_features=num_classes)
#model.head=nn.Sequential(nn.Linear(in_features=768, out_features=num_classes))
model=model.to(device)
print(model)

EfficientNet(
  (conv_stem): Conv2dSame(3, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
  (bn1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  (act1): SiLU(inplace=True)
  (blocks): Sequential(
    (0): Sequential(
      (0): DepthwiseSeparableConv(
        (conv_dw): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
        (bn1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
        (act1): SiLU(inplace=True)
        (se): SqueezeExcite(
          (conv_reduce): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
          (act1): SiLU(inplace=True)
          (conv_expand): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
        )
        (conv_pw): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn2): BatchNorm2d(16, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
        (act2): Identity()
      )
    )
    (1): Sequential(
      (0

In [15]:
if False:
    set_parameter_requires_grad(model.features, requires_grad=False)
else:
    set_parameter_requires_grad(model, requires_grad=False)
    set_parameter_requires_grad(model.classifier, requires_grad=True)
    #set_parameter_requires_grad(model.head, requires_grad=True)
train(model,3,0.001,use_sam=True)
#Best acc:0.48875007033348083
#7: Best acc:0.558712363243103

  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 1 - loss : 1.8878 - acc: 0.3127 - val_loss : 1.6519 - val_acc: 0.5846



  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 2 - loss : 1.6783 - acc: 0.3898 - val_loss : 1.4494 - val_acc: 0.5945



  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 3 - loss : 1.5980 - acc: 0.4254 - val_loss : 3.5971 - val_acc: 0.5413

Best acc:0.5944879651069641
val_loss : 1.4494 - val_acc: 0.5945



In [16]:
if False:
    set_parameter_requires_grad(model.features, requires_grad=True)
else:
    set_parameter_requires_grad(model, requires_grad=True)
train(model,6,1e-4,use_sam=True)

  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 1 - loss : 1.4121 - acc: 0.4738 - val_loss : 1.5319 - val_acc: 0.6043



  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 2 - loss : 1.1267 - acc: 0.5874 - val_loss : 0.9715 - val_acc: 0.6634



  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 3 - loss : 0.9005 - acc: 0.6793 - val_loss : 1.1151 - val_acc: 0.6240



  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 4 - loss : 0.6975 - acc: 0.7574 - val_loss : 1.7519 - val_acc: 0.6142



  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 5 - loss : 0.5737 - acc: 0.8068 - val_loss : 1.1553 - val_acc: 0.6614



  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 6 - loss : 0.4534 - acc: 0.8498 - val_loss : 1.0325 - val_acc: 0.6496

Best acc:0.6633855700492859
val_loss : 0.9715 - val_acc: 0.6634



# Trying SGD with ENet 

In [22]:
from sam import SAM
import copy
def train_with_sgd(model,n_epochs=epochs, learningrate=lr, use_sam=False):
    # optimizer
    if use_sam:
        optimizer = SAM(filter(lambda p: p.requires_grad, model.parameters()), optim.SGD, lr=learningrate)
    # scheduler
    #scheduler = StepLR(optimizer, step_size=1, gamma=gamma)
    best_acc=0
    best_model=None
    for epoch in range(n_epochs):
        epoch_loss = 0
        epoch_accuracy = 0
        model.train()
        for data, label in tqdm(train_loader):
            data = data.to(device)
            label = label.to(device)

            output = model(data)
            loss = criterion(output, label)

            if use_sam:
                #optimizer.zero_grad()
                loss.backward()
                optimizer.first_step(zero_grad=True)
  
                # second forward-backward pass
                output = model(data)
                loss = criterion(output, label)
                loss.backward()
                optimizer.second_step(zero_grad=True)
            else:
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            acc = (output.argmax(dim=1) == label).float().mean()
            epoch_accuracy += acc / len(train_loader)
            epoch_loss += loss / len(train_loader)

        model.eval()
        with torch.no_grad():
            epoch_val_accuracy = 0
            epoch_val_loss = 0
            for data, label in test_loader:
                data = data.to(device)
                label = label.to(device)

                val_output = model(data)
                val_loss = criterion(val_output, label)

                acc = (val_output.argmax(dim=1) == label).float().mean()
                epoch_val_accuracy += acc / len(test_loader)
                epoch_val_loss += val_loss / len(test_loader)

        print(
            f"Epoch : {epoch+1} - loss : {epoch_loss:.4f} - acc: {epoch_accuracy:.4f} - val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_accuracy:.4f}\n"
        )
        if best_acc<epoch_val_accuracy:
            best_acc=epoch_val_accuracy
            best_model=copy.deepcopy(model.state_dict())
        #scheduler.step()
    
    if best_model is not None:
        model.load_state_dict(best_model)
        print(f"Best acc:{best_acc}")
        model.eval()
        with torch.no_grad():
            epoch_val_accuracy = 0
            epoch_val_loss = 0
            for data, label in test_loader:
                data = data.to(device)
                label = label.to(device)

                val_output = model(data)
                val_loss = criterion(val_output, label)

                acc = (val_output.argmax(dim=1) == label).float().mean()
                epoch_val_accuracy += acc / len(test_loader)
                epoch_val_loss += val_loss / len(test_loader)

        print(
            f"val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_accuracy:.4f}\n"
        )
    else:
        print(f"No best model Best acc:{best_acc}")

In [23]:
if False:
    set_parameter_requires_grad(model.features, requires_grad=True)
else:
    set_parameter_requires_grad(model, requires_grad=True)
train_with_sgd(model,3,1e-4,use_sam=True)

  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 1 - loss : 0.9162 - acc: 0.6739 - val_loss : 1.2054 - val_acc: 0.6260



  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 2 - loss : 0.9191 - acc: 0.6789 - val_loss : 1.0012 - val_acc: 0.6378



  0%|          | 0/1012 [00:00<?, ?it/s]

Epoch : 3 - loss : 0.8923 - acc: 0.6759 - val_loss : 1.5091 - val_acc: 0.5906

Best acc:0.6377950310707092
val_loss : 1.0012 - val_acc: 0.6378



# Saving the model

In [24]:
PATH='/enet_ftsgd_ip_hf_pt'
model_name='enet_ftsgd_ip_hf_pt'

In [25]:
# Save
torch.save(model, PATH)

In [26]:
# Load
print(PATH)
model = torch.load(PATH)
model.eval()

C:/Users/HP/SLR/2src/enet_ftsgd_ip_hf_pt


EfficientNet(
  (conv_stem): Conv2dSame(3, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
  (bn1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  (act1): SiLU(inplace=True)
  (blocks): Sequential(
    (0): Sequential(
      (0): DepthwiseSeparableConv(
        (conv_dw): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
        (bn1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
        (act1): SiLU(inplace=True)
        (se): SqueezeExcite(
          (conv_reduce): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
          (act1): SiLU(inplace=True)
          (conv_expand): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
        )
        (conv_pw): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn2): BatchNorm2d(16, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
        (act2): Identity()
      )
    )
    (1): Sequential(
      (0

In [27]:
class_to_idx=train_dataset.class_to_idx
print(class_to_idx)
idx_to_class={idx:cls for cls,idx in class_to_idx.items()}
print(idx_to_class)

{'anger': 0, 'disgust': 1, 'fear': 2, 'happy': 3, 'neutral': 4, 'sad': 5, 'surprise': 6}
{0: 'anger', 1: 'disgust', 2: 'fear', 3: 'happy', 4: 'neutral', 5: 'sad', 6: 'surprise'}


# Evaluation

In [28]:
y_val,y_scores_val=[],[]
model.eval()
for class_name in tqdm(os.listdir(test_dir)):
    if class_name in class_to_idx:
        class_dir=os.path.join(test_dir,class_name)
        y=class_to_idx[class_name]
        for img_name in os.listdir(class_dir):
            filepath=os.path.join(class_dir,img_name)
            img = Image.open(filepath)
            img_tensor = test_transforms(img)
            img_tensor.unsqueeze_(0)
            scores = model(img_tensor.to(device))
            scores=scores[0].data.cpu().numpy()
            #print(scores.shape)
            y_scores_val.append(scores)
            y_val.append(y)

y_scores_val=np.array(y_scores_val)
y_val=np.array(y_val)
print(y_scores_val.shape,y_val.shape)

  0%|          | 0/7 [00:00<?, ?it/s]

(507, 7) (507,)


In [29]:
y_pred=np.argmax(y_scores_val,axis=1)
acc=100.0*(y_val==y_pred).sum()/len(y_val)
print(acc)

y_train=np.array(train_dataset.targets)

for i in range(y_scores_val.shape[1]):
    _val_acc=(y_pred[y_val==i]==i).sum()/(y_val==i).sum()
    print('%s %d/%d acc: %f' %(idx_to_class[i],(y_train==i).sum(),(y_val==i).sum(),100*_val_acc))

63.905325443786985
anger 371/132 acc: 65.151515
disgust 156/31 acc: 70.967742
fear 251/57 acc: 33.333333
happy 160/19 acc: 89.473684
neutral 153/42 acc: 59.523810
sad 290/59 acc: 59.322034
surprise 643/167 acc: 71.856287


In [30]:
from sklearn.metrics import classification_report

target_names = ['Anger', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sadness', 'Surprise']
print(classification_report(y_val, y_pred, target_names=target_names))

              precision    recall  f1-score   support

       Anger       0.80      0.65      0.72       132
     Disgust       0.56      0.71      0.63        31
        Fear       0.35      0.33      0.34        57
       Happy       0.63      0.89      0.74        19
     Neutral       0.62      0.60      0.61        42
     Sadness       0.38      0.59      0.46        59
    Surprise       0.82      0.72      0.77       167

    accuracy                           0.64       507
   macro avg       0.59      0.64      0.61       507
weighted avg       0.67      0.64      0.65       507

