In [1]:
# -*- coding: utf-8 -*-
import os, sys, glob, argparse
import pandas as pd
import numpy as np
from tqdm import tqdm

import time, datetime
import pdb, traceback

import cv2
from PIL import Image
from sklearn.model_selection import train_test_split, StratifiedKFold, KFold

import torch
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data.dataset import Dataset
import timm



In [2]:
from joblib import dump, load

train_jpg = glob.glob('./Datawhale_人脸情绪识别_数据集/train/*/*')
np.random.shuffle(train_jpg)
train_jpg = np.array(train_jpg)

if os.path.exists('train_jpg.pkl'):
    train_jpg_dict = load('train_jpg.pkl')
else:
    train_jpg_dict = {}
    for path in tqdm(train_jpg):
        # 加入resize
        train_jpg_dict[path] = Image.open(path).convert('RGB')
        
    dump(train_jpg_dict, 'train_jpg.pkl')

In [3]:
class QRDataset(Dataset):
    def __init__(self, img_path, transform=None):
        self.img_path = img_path
        if transform is not None:
            self.transform = transform
        else:
            self.transform = None
    
    def __getitem__(self, index):
        start_time = time.time()
        # img = Image.open(self.img_path[index]).convert('RGB')
        
        if self.img_path[index] in train_jpg_dict:
            img = train_jpg_dict[self.img_path[index]]
        else:
            img = Image.open(self.img_path[index]).convert('RGB')
        
        lbl_dict = {'angry': 0,
             'disgusted': 1,
             'fearful': 2,
             'happy': 3,
             'neutral': 4,
             'sad': 5,
             'surprised': 6}
        if self.transform is not None:
            img = self.transform(img)
        
        if 'test' in self.img_path[index]:
            return img, torch.from_numpy(np.array(0))
        else:
            lbl_int = lbl_dict[self.img_path[index].split('/')[-2]]
            return img, torch.from_numpy(np.array(lbl_int))
    
    def __len__(self):
        return len(self.img_path)

In [4]:
class XunFeiNet(nn.Module):
    def __init__(self):
        super(XunFeiNet, self).__init__()
                
#         model = models.resnet18(True)
#         model.avgpool = nn.AdaptiveAvgPool2d(1)
#         model.fc = nn.Linear(512, 7)
        
        model = timm.create_model('efficientnet_b1', pretrained='imagenet')
        model.classifier = nn.Linear(1280, 7)
        self.model = model
         
    def forward(self, img):        
        out = self.model(img)
        return out

In [5]:
def validate(val_loader, model, criterion):
    model.eval()
    acc1 = []
    with torch.no_grad():
        end = time.time()
        for i, (input, target) in enumerate(val_loader):
            input = input.cuda()
            target = target.cuda()
            
            output = model(input)
            loss = criterion(output, target)
            acc1.append((output.argmax(1) == target).float().mean().item())

        print(' * Val Acc@1 {0}'.format(np.mean(acc1)))
        return np.mean(acc1)

def predict(test_loader, model, tta=10):
    model.eval()
    
    test_pred_tta = None
    for _ in range(tta):
        test_pred = []
        with torch.no_grad():
            end = time.time()
            for i, (input, target) in enumerate(test_loader):
                input = input.cuda()
                target = target.cuda()

                output = model(input)
                output = output.data.cpu().numpy()

                test_pred.append(output)
        test_pred = np.vstack(test_pred)
    
        if test_pred_tta is None:
            test_pred_tta = test_pred
        else:
            test_pred_tta += test_pred
    
    return test_pred_tta

def train(train_loader, model, criterion, optimizer, epoch):
    model.train()

    end = time.time()
    acc1 = []
    for i, (input, target) in enumerate(train_loader):
        input = input.cuda(non_blocking=True)
        target = target.cuda(non_blocking=True)
        output = model(input)
        loss = criterion(output, target)

        acc1.append((output.argmax(1) == target).float().mean().item())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if i % 100 == 0:
            print('Train: {0}'.format(np.mean(acc1)))

In [None]:
skf = KFold(n_splits=10, random_state=233, shuffle=True)
for flod_idx, (train_idx, val_idx) in enumerate(skf.split(train_jpg, train_jpg)):
    
    train_loader = torch.utils.data.DataLoader(
        QRDataset(train_jpg[train_idx][:],
                transforms.Compose([
                            transforms.RandomAffine(10),
                            transforms.ColorJitter(hue=.05, saturation=.05),
                            transforms.RandomHorizontalFlip(),
                            transforms.RandomVerticalFlip(),
                            transforms.Resize((196, 196)),
                            transforms.ToTensor(),
                            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])
        ), batch_size=20, shuffle=True, num_workers=5, pin_memory=True
    )
    
    val_loader = torch.utils.data.DataLoader(
        QRDataset(train_jpg[val_idx][:],
                transforms.Compose([
                            transforms.Resize((196, 196)),
#                             transforms.Resize((256, 256)),
                            # transforms.Resize((124, 124)),
                            # transforms.RandomCrop((88, 88)),
                            transforms.ToTensor(),
                            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])
        ), batch_size=10, shuffle=False, num_workers=5, pin_memory=True
    )
        
    model = XunFeiNet().cuda()
    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = torch.optim.SGD(model.parameters(), 0.01)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.75)
    best_acc = 0.0
    for epoch in range(40):
        print('\nEpoch: ', epoch)

        train(train_loader, model, criterion, optimizer, epoch)
        val_acc = validate(val_loader, model, criterion)
        scheduler.step()
        
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), './resnet18_fold{0}.pt'.format(flod_idx))
            
    break


Epoch:  0
Train: 0.20000000298023224
Train: 0.26435644050488377
Train: 0.29129353697546084
Train: 0.3127907025333853
Train: 0.3283042449439107
Train: 0.3439121814054286
Train: 0.35582363327187233
Train: 0.3669044285661909
Train: 0.3784020042997993
Train: 0.38662597811165844
Train: 0.394255751611797
Train: 0.40113533923333067
Train: 0.4089925141600298
 * Val Acc@1 0.5065972295124084

Epoch:  1
Train: 0.45000001788139343


In [20]:
test_jpg = glob.glob('./人脸情绪识别_数据集/test/*')
test_jpg = np.array(test_jpg)
test_jpg.sort()

test_loader = torch.utils.data.DataLoader(
        QRDataset(test_jpg,
                transforms.Compose([
                            transforms.RandomHorizontalFlip(),
                            transforms.RandomVerticalFlip(),
                            transforms.Resize((196, 196)),
                            transforms.ToTensor(),
                            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])
        ), batch_size=10, shuffle=False, num_workers=5, pin_memory=True
)
        
model = XunFeiNet().cuda()
model.load_state_dict(torch.load('resnet18_fold0.pt'))
test_pred = predict(test_loader, model, 5)


# test_csv = pd.DataFrame()
# test_csv['ID'] = list(range(0, 3082))
# test_csv['Label'] = np.argmax(test_pred, 1)
# test_csv['Label'] = test_csv['Label'].map({1:'pos', 0:'neg'})
# test_csv.to_csv('tmp.csv', index=None)

In [21]:
cls_name = np.array(['angry', 'disgusted', 'fearful', 'happy','neutral', 'sad', 'surprised'])
submit_df = pd.DataFrame({'name': test_jpg, 'label': cls_name[test_pred.argmax(1)]})
submit_df['name'] = submit_df['name'].apply(lambda x: x.split('/')[-1])

In [22]:
submit_df = submit_df.sort_values(by='name')
submit_df.to_csv('pytorch_submit2.csv', index=None)