In [2]:
from pathlib import Path
import cv2, json, numpy as np
from PIL import Image
from tqdm import tqdm
import torch
torch.cuda.is_available()

True

### 路径说明
```
FS2K
├─data # FS2K数据集位置
│  └─FS2K
│      ├─photo
│      │  ├─photo1
│      │  ├─photo2
│      │  └─photo3
│      └─sketch
│          ├─sketch1
│          ├─sketch2
│          └─sketch3
├─save # 模型保存位置
└─FS2K.ipynb # 代码
```

In [10]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# 定义Dataset
class DS(Dataset):
    def __init__(s, dataD, mode='train'):
        super().__init__()
        s.dataD = dataD
        s.mode = mode
        s.xtf = transforms.Compose([
            transforms.Resize((250,250)),
            transforms.ToTensor(),
        ])
        s.ytf = transforms.Compose([
            torch.tensor,
        ])
        s.data = s.read()
    
    def read(s):
        D = s.dataD
        jp = D / f'anno_{s.mode}.json'
        with jp.open('r', encoding='utf-8')as f:
            annos = json.load(f)
        return annos

    def __getitem__(s, i):
        a = s.data[i]
        imgP = s.dataD/ f"photo/{a['image_name']}.jpg"
        img = s.xtf(Image.open(imgP.as_posix()))
        colors = a['lip_color']+a['eye_color']
        attrs = list(map(int,[a['hair'],a['hair_color'],a['gender'],a['earring'],a['smile'],a['frontal_face']]))
        return img, s.ytf(colors), torch.tensor(attrs, dtype=int)

    def __len__(s):
        return len(s.data)

rootdir = '/content/drive/MyDrive/DeepLearningHW/'
# 实例化Dataset
dataD = Path(rootdir + 'data/FS2K')
train_ds = DS(dataD)
val_ds = DS(dataD, 'test')

# 创建Dataloader
train_dl = DataLoader(train_ds, batch_size=16, shuffle=True, num_workers=0)
val_dl = DataLoader(val_ds, batch_size=16, shuffle=True, num_workers=0)

### Dataset说明
每一个样本包含三个变量img, colors, attrs  
img为tensor图片  
colors为一个6元素的float类型一维数组, 前三个表示嘴唇颜色lip_color, 后三个表示眼睛颜色eye_color  
attrs为6元素的整型一位数组, 分别为hair, hair_color, gender, earring, smile, frontal_face

In [None]:
x, colors, attrs = train_ds[0]
colors, attrs

(tensor([156.9775,  82.5112,  79.0000, 118.6518,  72.2589,  69.5982]),
 tensor([0, 2, 0, 1, 1, 1]))

In [None]:
{
    "image_name": "photo1/image0110",

    "skin_patch": [163, 139],
    # a point of face region.

    "lip_color": [156.97750511247443, 82.51124744376278, 79.0],
    # the mean RGB value of lip area.

    "eye_color": [118.65178571428571, 72.25892857142857, 69.59821428571429],
    # the mean RGB value of eye area.

    "hair": 0,
    # 0: with hair, 1: without hair.

    "hair_color": 2,
    # 0: brown, 1: black, 2: red, 3: no-hair, 4: golden.

    "gender": 0,
    # 0: male, 1: female.

    "earring": 1,
    # 0: with earring, 1: without earring.

    "smile": 1,
    # 0: with smile, 1: without smile.

    "frontal_face": 1,
    # 0: head rotates within 30 degrees, 1: > 30 degrees

    "style": 0
    # Style = one of {0, 1, 2}, please refer to the sketch samples.
}

{'image_name': 'photo1/image0110',
 'skin_patch': [163, 139],
 'lip_color': [156.97750511247443, 82.51124744376278, 79.0],
 'eye_color': [118.65178571428571, 72.25892857142857, 69.59821428571429],
 'hair': 0,
 'hair_color': 2,
 'gender': 0,
 'earring': 1,
 'smile': 1,
 'frontal_face': 1,
 'style': 0}

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from collections import OrderedDict
from torch.nn import init


def conv3x3(in_channels, out_channels, stride=1, 
            padding=1, bias=True, groups=1):    
    """3x3 convolution with padding
    """
    return nn.Conv2d(
        in_channels, 
        out_channels, 
        kernel_size=3, 
        stride=stride,
        padding=padding,
        bias=bias,
        groups=groups)


def conv1x1(in_channels, out_channels, groups=1):
    """1x1 convolution with padding
    - Normal pointwise convolution When groups == 1
    - Grouped pointwise convolution when groups > 1
    """
    return nn.Conv2d(
        in_channels, 
        out_channels, 
        kernel_size=1, 
        groups=groups,
        stride=1)


def channel_shuffle(x, groups):
    batchsize, num_channels, height, width = x.data.size()

    channels_per_group = num_channels // groups
    
    # reshape
    x = x.view(batchsize, groups, 
        channels_per_group, height, width)

    # transpose
    # - contiguous() required if transpose() is used before view().
    #   See https://github.com/pytorch/pytorch/issues/764
    x = torch.transpose(x, 1, 2).contiguous()

    # flatten
    x = x.view(batchsize, -1, height, width)

    return x


class ShuffleUnit(nn.Module):
    def __init__(self, in_channels, out_channels, groups=3,
                 grouped_conv=True, combine='add'):
        
        super(ShuffleUnit, self).__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.grouped_conv = grouped_conv
        self.combine = combine
        self.groups = groups
        self.bottleneck_channels = self.out_channels // 4

        # define the type of ShuffleUnit
        if self.combine == 'add':
            # ShuffleUnit Figure 2b
            self.depthwise_stride = 1
            self._combine_func = self._add
        elif self.combine == 'concat':
            # ShuffleUnit Figure 2c
            self.depthwise_stride = 2
            self._combine_func = self._concat
            
            # ensure output of concat has the same channels as 
            # original output channels.
            self.out_channels -= self.in_channels
        else:
            raise ValueError("Cannot combine tensors with \"{}\"" \
                             "Only \"add\" and \"concat\" are" \
                             "supported".format(self.combine))

        # Use a 1x1 grouped or non-grouped convolution to reduce input channels
        # to bottleneck channels, as in a ResNet bottleneck module.
        # NOTE: Do not use group convolution for the first conv1x1 in Stage 2.
        self.first_1x1_groups = self.groups if grouped_conv else 1

        self.g_conv_1x1_compress = self._make_grouped_conv1x1(
            self.in_channels,
            self.bottleneck_channels,
            self.first_1x1_groups,
            batch_norm=True,
            relu=True
            )

        # 3x3 depthwise convolution followed by batch normalization
        self.depthwise_conv3x3 = conv3x3(
            self.bottleneck_channels, self.bottleneck_channels,
            stride=self.depthwise_stride, groups=self.bottleneck_channels)
        self.bn_after_depthwise = nn.BatchNorm2d(self.bottleneck_channels)

        # Use 1x1 grouped convolution to expand from 
        # bottleneck_channels to out_channels
        self.g_conv_1x1_expand = self._make_grouped_conv1x1(
            self.bottleneck_channels,
            self.out_channels,
            self.groups,
            batch_norm=True,
            relu=False
            )


    @staticmethod
    def _add(x, out):
        # residual connection
        return x + out


    @staticmethod
    def _concat(x, out):
        # concatenate along channel axis
        return torch.cat((x, out), 1)


    def _make_grouped_conv1x1(self, in_channels, out_channels, groups,
        batch_norm=True, relu=False):

        modules = OrderedDict()

        conv = conv1x1(in_channels, out_channels, groups=groups)
        modules['conv1x1'] = conv

        if batch_norm:
            modules['batch_norm'] = nn.BatchNorm2d(out_channels)
        if relu:
            modules['relu'] = nn.ReLU()
        if len(modules) > 1:
            return nn.Sequential(modules)
        else:
            return conv


    def forward(self, x):
        # save for combining later with output
        residual = x

        if self.combine == 'concat':
            residual = F.avg_pool2d(residual, kernel_size=3, 
                stride=2, padding=1)

        out = self.g_conv_1x1_compress(x)
        out = channel_shuffle(out, self.groups)
        out = self.depthwise_conv3x3(out)
        out = self.bn_after_depthwise(out)
        out = self.g_conv_1x1_expand(out)
        
        out = self._combine_func(residual, out)
        return F.relu(out)


class ShuffleNet(nn.Module):
    """ShuffleNet implementation.
    """

    def __init__(self, groups=3, in_channels=3, num_classes=1000):
        """ShuffleNet constructor.

        Arguments:
            groups (int, optional): number of groups to be used in grouped 
                1x1 convolutions in each ShuffleUnit. Default is 3 for best
                performance according to original paper.
            in_channels (int, optional): number of channels in the input tensor.
                Default is 3 for RGB image inputs.
            num_classes (int, optional): number of classes to predict. Default
                is 1000 for ImageNet.

        """
        super(ShuffleNet, self).__init__()

        self.groups = groups
        self.stage_repeats = [3, 7, 3]
        self.in_channels =  in_channels
        self.num_classes = num_classes

        # index 0 is invalid and should never be called.
        # only used for indexing convenience.
        if groups == 1:
            self.stage_out_channels = [-1, 24, 144, 288, 567]
        elif groups == 2:
            self.stage_out_channels = [-1, 24, 200, 400, 800]
        elif groups == 3:
            self.stage_out_channels = [-1, 24, 240, 480, 960]
        elif groups == 4:
            self.stage_out_channels = [-1, 24, 272, 544, 1088]
        elif groups == 8:
            self.stage_out_channels = [-1, 24, 384, 768, 1536]
        else:
            raise ValueError(
                """{} groups is not supported for
                   1x1 Grouped Convolutions""".format(num_groups))
        
        # Stage 1 always has 24 output channels
        self.conv1 = conv3x3(self.in_channels,
                             self.stage_out_channels[1], # stage 1
                             stride=2)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Stage 2
        self.stage2 = self._make_stage(2)
        # Stage 3
        self.stage3 = self._make_stage(3)
        # Stage 4
        self.stage4 = self._make_stage(4)

        # Global pooling:
        # Undefined as PyTorch's functional API can be used for on-the-fly
        # shape inference if input size is not ImageNet's 224x224

        # Fully-connected classification layer
        num_inputs = self.stage_out_channels[-1]

        self.lip_color = nn.Linear(num_inputs, 3)
        self.eye_color = nn.Linear(num_inputs, 3)
        self.hair = nn.Linear(num_inputs, 2)
        self.hair_color = nn.Linear(num_inputs, 5)
        self.gender = nn.Linear(num_inputs, 2)
        self.earring = nn.Linear(num_inputs, 2)
        self.smile = nn.Linear(num_inputs, 2)
        self.frontal_face = nn.Linear(num_inputs, 2)

        # self.fc = nn.Linear(num_inputs, self.num_classes)
        self.init_params()


    def init_params(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                init.constant_(m.weight, 1)
                init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                init.normal_(m.weight, std=0.001)
                if m.bias is not None:
                    init.constant_(m.bias, 0)


    def _make_stage(self, stage):
        modules = OrderedDict()
        stage_name = "ShuffleUnit_Stage{}".format(stage)
        
        # First ShuffleUnit in the stage
        # 1. non-grouped 1x1 convolution (i.e. pointwise convolution)
        #   is used in Stage 2. Group convolutions used everywhere else.
        grouped_conv = stage > 2
        
        # 2. concatenation unit is always used.
        first_module = ShuffleUnit(
            self.stage_out_channels[stage-1],
            self.stage_out_channels[stage],
            groups=self.groups,
            grouped_conv=grouped_conv,
            combine='concat'
            )
        modules[stage_name+"_0"] = first_module

        # add more ShuffleUnits depending on pre-defined number of repeats
        for i in range(self.stage_repeats[stage-2]):
            name = stage_name + "_{}".format(i+1)
            module = ShuffleUnit(
                self.stage_out_channels[stage],
                self.stage_out_channels[stage],
                groups=self.groups,
                grouped_conv=True,
                combine='add'
                )
            modules[name] = module

        return nn.Sequential(modules)


    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)

        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)

        # global average pooling layer
        x = F.avg_pool2d(x, x.data.size()[-2:])
        
        # flatten for input to fully-connected layer
        x = x.view(x.size(0), -1)
        # x = self.fc(x)

        lip_color = self.lip_color(x)
        eye_color = self.eye_color(x)
        hair = self.hair(x)
        hair_color = self.hair_color(x)
        gender = self.gender(x)
        earring = self.earring(x)
        smile = self.smile(x)
        frontal_face = self.frontal_face(x)

        hair = F.softmax(hair,dim = 1)
        hair_color = F.softmax(hair_color,dim = 1)
        gender = F.softmax(gender,dim = 1)
        earring = F.softmax(earring,dim = 1)
        smile = F.softmax(smile,dim = 1)
        frontal_face = F.softmax(frontal_face,dim = 1)


        return [lip_color, eye_color, hair, hair_color, gender, earring, smile, frontal_face]


class Loss(torch.nn.Module):
    def __init__(s):
        super().__init__()
        s.MSE = torch.nn.MSELoss()
        s.CE = torch.nn.CrossEntropyLoss()

    def forward(s, preds, colors_b, attrs_b):
#         y = a['lip_color']+a['eye_color']+[a['hair'],a['hair_color'],a['gender'],a['earring'],a['smile'],a['frontal_face']]
        lip_color, eye_color, hair, hair_color, gender, earring, smile, frontal_face = preds
        lpc = s.MSE(lip_color, colors_b[:,:3])
        lc = s.MSE(eye_color, colors_b[:,3:])
        h = s.CE(hair, attrs_b[:, 0])
        hc = s.CE(hair_color, attrs_b[:, 1])
        g = s.CE(gender, attrs_b[:, 2])
        e = s.CE(earring, attrs_b[:, 3])
        sm = s.CE(smile, attrs_b[:, 4])
        f = s.CE(frontal_face, attrs_b[:, 5])
        loss = lpc+lc+h+hc+g+e+sm+f
        
        return loss

### 模型和损失函数说明
模型使用Mobilenetv3的Small版本， 在将最后的输出层更改为8个并行的1x1卷积层，对2个颜色属性进行回归，对6个整型属性进行分类  
回归损失使用MSE，分类损失使用交叉熵损失

In [None]:
m = mobilenet_v3_small_1_0(nclass=6)
x = torch.rand((2, 3, 250, 250))
y = m(x)
for _ in y:
    print(_.shape)
lip_color, eye_color, hair, hair_color, gender, earring, smile, frontal_face = y
lip_color, eye_color, hair, hair_color, gender, earring, smile, frontal_face

torch.Size([2, 3])
torch.Size([2, 3])
torch.Size([2, 2])
torch.Size([2, 5])
torch.Size([2, 2])
torch.Size([2, 2])
torch.Size([2, 2])
torch.Size([2, 2])


(tensor([[-0.8925,  0.1118, -0.1723],
         [-0.1632,  0.2536, -1.3730]], grad_fn=<ViewBackward>),
 tensor([[-0.0142, -0.2719,  0.8314],
         [-2.5911, -0.9223, -0.0742]], grad_fn=<ViewBackward>),
 tensor([[-0.8087, -1.0785],
         [ 0.4037,  1.3462]], grad_fn=<ViewBackward>),
 tensor([[-0.9686, -1.5095,  0.3642, -0.6491, -0.6247],
         [-2.1093, -0.1983, -0.2731, -0.8785, -1.9321]], grad_fn=<ViewBackward>),
 tensor([[-2.1952,  0.4042],
         [-2.5527, -0.1194]], grad_fn=<ViewBackward>),
 tensor([[ 0.1786, -2.2074],
         [ 0.2929, -1.5629]], grad_fn=<ViewBackward>),
 tensor([[-0.4470,  0.1229],
         [-0.4061,  0.9815]], grad_fn=<ViewBackward>),
 tensor([[ 0.8123, -1.0342],
         [ 1.5224, -1.1291]], grad_fn=<ViewBackward>))

In [5]:
def save(savePath, m, epoch, acc):
    d = {
        'param': m.state_dict(),
        'epoch': epoch,
        'acc': acc,
    }
    if isinstance(savePath, Path):
        savePath = savePath.as_posix()
    torch.save(d, savePath)
    print('checkpoint saved as', savePath)

def load(loadPath):
    if isinstance(loadPath, Path):
        loadPath = loadPath.as_posix()
    d = torch.load(loadPath)
    m = Model()
    m.load_state_dict(d['param'])
    print('checkpoint loaded from', loadPath)
    e, acc = d['epoch'], d['acc']
    print('epoch:', e, 'acc:', acc)
    return m, d['epoch'], d['acc']

def toCpu(path):
    path = Path(path)
    m, e, acc = load(path)
    m.to(torch.device('cpu'))
    save(path.parents[0]/f'{path.stem}_cpu.ckpt')

In [6]:
def val(em, param, val_dl, d):
    l=Loss()
    vn = len(val_dl.dataset)
    em.load_state_dict(param)
    hair_cnt = 0
    hair_color_cnt = 0
    gender_cnt = 0
    earring_cnt = 0
    smile_cnt = 0
    frontal_face_cnt = 0
    L = 0
    with torch.no_grad():
        for i, (xs, colors_b, attrs_b) in enumerate(val_dl):
            xs, colors_b, attrs_b = xs.to(d), colors_b.to(d), attrs_b.to(d)
            outs = em(xs)
            lip_color, eye_color, hair, hair_color, gender, earring, smile, frontal_face = outs
            L += l(outs, colors_b, attrs_b.long()).item()
            hair_ = torch.max(hair, 1)[1]
            hair_cnt += torch.sum(hair_ == attrs_b[:,0])
            
            hair_color_ = torch.max(hair_color, 1)[1]
            hair_color_cnt += torch.sum(hair_color_ == attrs_b[:,1])
            
            gender_ = torch.max(gender, 1)[1]
            gender_cnt += torch.sum(gender_ == attrs_b[:,2])
            
            earring_ = torch.max(earring, 1)[1]
            earring_cnt += torch.sum(earring_ == attrs_b[:,3])
            
            smile_ = torch.max(smile, 1)[1]
            smile_cnt += torch.sum(smile_ == attrs_b[:,4])
            
            frontal_face_ = torch.max(frontal_face, 1)[1]
            frontal_face_cnt += torch.sum(frontal_face_ == attrs_b[:,5])
    acc = (hair_cnt+hair_color_cnt+gender_cnt+earring_cnt+smile_cnt+frontal_face_cnt)/6/vn
    print(f'validated on {vn} samples| mean acc:{acc*100:.4f}%')
    print(f'hair_cnt:{hair_cnt/vn}|hair_color_cnt:{hair_color_cnt/vn}|gender_cnt:{gender_cnt/vn}')
    print(f'earring_cnt:{earring_cnt/vn}|smile_cnt:{smile_cnt/vn}|frontal_face_cnt:{frontal_face_cnt/vn}')
    print(f'loss:{L/vn:.4f}')
    
    return acc

def train(m,
          d,
          train_dl,
          val_dl,
          saveDir=Path('save'),
          resumePath=None,
          lr=0.001,
          e=50,
          s=10
         ):
    saveDir.mkdir(exist_ok=1)
    startEp = -1
    b = 0
    try:
        m, startEp, b = load(saveDir/'best.ckpt')
    except Exception as err:
        print(err)

    if resumePath is not None:
        m, startEp, b = load(resumePath)

    m.to(d).train()
    em = Model().to(d).eval()
    
    l=Loss()
    o=torch.optim.SGD(m.parameters(), lr=lr, momentum=0.9)
    
    saveDir.mkdir(exist_ok=1)
    tn = len(train_dl.dataset)
    t = tqdm(range(startEp+1, e))
#     t = range(startEp+1, e)
    for ep in t:
        L = 0
        for i, (xs, colors_b, attrs_b) in enumerate(train_dl):
            xs, colors_b, attrs_b = xs.to(d), colors_b.to(d), attrs_b.to(d)
            o.zero_grad()
            outs = m(xs)
            loss = l(outs, colors_b, attrs_b.long())
            loss.backward()
            o.step()

            L += loss.item()
        t.set_description(f'ep:{ep}| L:{L/tn:.6f}')
        if (ep+1)%s != 0: continue

        acc = val(em, m.state_dict(), val_dl, d)
        save(saveDir/f'{ep:05d}_{acc:.4f}.ckpt', m, ep, acc)
        if b < acc:
            b = acc
            save(saveDir/'best.ckpt', m, ep, acc)
        print(f'E:{ep}| L:{L/tn:.6f}')
    t.close()

In [11]:
d = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(d)
# d = torch.device('cpu')
Model = ShuffleNet
m = Model().to(d)
train(m,
      d,
      train_dl,
      val_dl,
      saveDir=Path(rootdir + 'save2'),
#       resumePath=Path('save2/03999_0.8062.ckpt'),
      lr=0.0001,
      e=400,
      s=20)

cuda:0
checkpoint loaded from /content/drive/MyDrive/DeepLearningHW/save2/best.ckpt
epoch: 379 acc: tensor(0.7702, device='cuda:0')


ep:399| L:1.123851: 100%|██████████| 20/20 [13:13<00:00, 39.68s/it]

validated on 1046 samples| mean acc:77.1192%
hair_cnt:0.9502868056297302|hair_color_cnt:0.5124282836914062|gender_cnt:0.8470363020896912
earring_cnt:0.8212236762046814|smile_cnt:0.6625239253044128|frontal_face_cnt:0.8336520195007324
loss:27.1113
checkpoint saved as /content/drive/MyDrive/DeepLearningHW/save2/00399_0.7712.ckpt
checkpoint saved as /content/drive/MyDrive/DeepLearningHW/save2/best.ckpt
E:399| L:1.123851





In [13]:
em, epo, acc = load(rootdir + 'save2/best.ckpt')
val(Model().to(d).eval(), em.state_dict(), val_dl, d)

checkpoint loaded from /content/drive/MyDrive/DeepLearningHW/save2/best.ckpt
epoch: 399 acc: tensor(0.7712, device='cuda:0')
validated on 1046 samples| mean acc:77.1192%
hair_cnt:0.9502868056297302|hair_color_cnt:0.5124282836914062|gender_cnt:0.8470363020896912
earring_cnt:0.8212236762046814|smile_cnt:0.6625239253044128|frontal_face_cnt:0.8336520195007324
loss:27.0248


tensor(0.7712, device='cuda:0')

## result

epoch: 399 acc: tensor(0.7712, device='cuda:0')  
validated on 1046 samples  
mean acc:77.1192%    
hair_cnt:0.9502868056297302  
hair_color_cnt:0.5124282836914062  
gender_cnt:0.8470363020896912  
earring_cnt:0.8212236762046814  
smile_cnt:0.6625239253044128  
frontal_face_cnt:0.8336520195007324  
loss:27.0248  