# 搭一个简单的CNN
仅包含：训练、验证和预测

## 导入库并配置变量

In [3]:
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm_notebook

def rle_encode(im):
    '''
    im: numpy array, 1 - mask, 0 - background
    Returns run length as string formated
    '''
    pixels = im.flatten(order = 'F')
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

def rle_decode(mask_rle, shape=(512, 512)):
    '''
    mask_rle: run-length as string formated (start length)
    shape: (height,width) of array to return 
    Returns numpy array, 1 - mask, 0 - background

    '''
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape, order='F')

In [29]:
import torch
import torch.nn as nn
import albumentations as A

In [50]:
PATH='./data/'

EPOCHES = 20
BATCH_SIZE = 32
IMAGE_SIZE = 256
DEVICE = 'cpu'

trfm = A.Compose([
                    A.Resize(IMAGE_SIZE, IMAGE_SIZE),
                    A.HorizontalFlip(p=0.5),
                    A.VerticalFlip(p=0.5),
                    A.RandomRotate90(),
                ])

定义Dataset类

In [32]:
import torch.utils.data as D
import cv2
from torchvision import transforms as T

class MyDataset(D.Dataset):
    def __init__(self,paths,masks,transform):
        self.paths = paths
        self.masks = masks
        self.transform = transform
        
        self.len = len(paths)
        self.as_tensor = T.Compose([
                                    T.ToPILImage(),
                                    T.Resize(IMAGE_SIZE),
                                    T.ToTensor(),
                                    T.Normalize([0.625, 0.448, 0.688],
                                                [0.131, 0.177, 0.101]),
                                    ])
        
    def __getitem__(self,index):
        img = cv2.imread(self.paths[index])
        mask = rle_decode(self.masks[index])
        augments = self.transform(image=img, mask=mask)
        return self.as_tensor(augments['image']), augments['mask'][None]
        
    def __len__(self):
        return self.len

读取数据

In [25]:
train_mask = pd.read_csv(PATH + 'train_mask.csv', sep='\t', names=['name', 'mask'])
train_mask['pathname'] = train_mask['name'].apply(lambda x: PATH + 'train/' + x)

In [33]:
dataset = MyDataset(
                    train_mask['pathname'].values,
                    train_mask['mask'].fillna('').values,
                    trfm
                    )

In [41]:
# 抽取数据，缩短训练时间
valid_idx, train_idx = [], []
for i in range(len(dataset)):
    if i % 30 == 0:
        valid_idx.append(i)
    elif i % 30 == 1:
        train_idx.append(i)

In [42]:
train_ds = D.Subset(dataset, train_idx)
valid_ds = D.Subset(dataset, valid_idx)

In [44]:
loader = D.DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
vloader = D.DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

构建模型

In [48]:
import torchvision
def get_model():
    model = torchvision.models.segmentation.fcn_resnet50(True)
    model.classifier[4] = nn.Conv2d(512, 1, kernel_size=(1, 1), stride=(1, 1))
    return model

@torch.no_grad()
def validation(model, loader, loss_fn):
    losses = []
    model.eval()
    for image, target in loader:
        image, target = image.to(DEVICE), target.float().to(DEVICE)
        output = model(image)['out']
        loss = loss_fn(output, target)
        losses.append(loss.item())
        
    return np.array(losses).mean()

In [51]:
model = get_model()
model.to(DEVICE);

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-3)

In [52]:
class SoftDiceLoss(nn.Module):
    def __init__(self, smooth=1., dims=(-2,-1)):

        super(SoftDiceLoss, self).__init__()
        self.smooth = smooth
        self.dims = dims
    
    def forward(self, x, y):
        tp = (x * y).sum(self.dims)
        fp = (x * (1 - y)).sum(self.dims)
        fn = ((1 - x) * y).sum(self.dims)
        
        dc = (2 * tp + self.smooth) / (2 * tp + fp + fn + self.smooth)
        dc = dc.mean()
        return 1 - dc

In [53]:
bce_fn = nn.BCEWithLogitsLoss()
dice_fn = SoftDiceLoss()

def loss_fn(y_pred, y_true):
    bce = bce_fn(y_pred, y_true)
    dice = dice_fn(y_pred.sigmoid(), y_true)
    return 0.8*bce+ 0.2*dice

In [55]:
import time
header = r'''
        Train | Valid
Epoch |  Loss |  Loss | Time, m
'''
#          Epoch         metrics            time
raw_line = '{:6d}' + '\u2502{:7.3f}'*2 + '\u2502{:6.2f}'
print(header)

EPOCHES = 5
best_loss = 10
for epoch in range(1, EPOCHES+1):
    losses = []
    start_time = time.time()
    model.train()
    for image, target in tqdm_notebook(loader):
        
        image, target = image.to(DEVICE), target.float().to(DEVICE)
        optimizer.zero_grad()
        output = model(image)['out']
        loss = loss_fn(output, target)
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
        # print(loss.item())
        
    vloss = validation(model, vloader, loss_fn)
    print(raw_line.format(epoch, np.array(losses).mean(), vloss, (time.time()-start_time)/60**1))
    losses = []
    
    if vloss < best_loss:
        best_loss = vloss
        torch.save(model.state_dict(), 'small_model_best.pth')


        Train | Valid
Epoch |  Loss |  Loss | Time, m



  0%|          | 0/32 [00:00<?, ?it/s]

     1│  0.484│  0.383│ 27.39


  0%|          | 0/32 [00:00<?, ?it/s]

     2│  0.357│  0.313│ 26.91


  0%|          | 0/32 [00:00<?, ?it/s]

     3│  0.319│  0.299│ 26.53


  0%|          | 0/32 [00:00<?, ?it/s]

     4│  0.292│  0.278│ 24.78


  0%|          | 0/32 [00:00<?, ?it/s]

     5│  0.265│  0.263│ 24.66
