In [7]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from torchvision.models import resnet50
import torch as t
from torch.utils import data
from torchvision import transforms as tsf
import scipy.misc
import torch.nn as nn
from pathlib import Path
from keras.layers import GaussianNoise
from PIL import Image
import skimage
from skimage import io
from torchvision import models
import torch
from sklearn.model_selection import train_test_split
from torchvision.models import resnet50
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils, datasets
from torch.nn import CrossEntropyLoss, Sequential, Linear, Sigmoid, Tanh, BCELoss, Softmax, BatchNorm1d
from torch.utils.data.sampler import Sampler, SubsetRandomSampler, WeightedRandomSampler
from PIL import Image # Replace by accimage when ready
from PIL.Image import FLIP_LEFT_RIGHT, FLIP_TOP_BOTTOM, ROTATE_90, ROTATE_180, ROTATE_270
from PIL.ImageEnhance import Color, Contrast, Brightness, Sharpness
from sklearn.preprocessing import MultiLabelBinarizer
from tqdm import tqdm
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt
import scipy
import imageio
import os 
from torch import nn
import torch.nn.functional as F
import tensorflow as tf 
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
from skimage import img_as_ubyte
img_size=1024
TRAIN_PATH = './train.pth'
TEST_PATH = './test.tph'
%matplotlib inline

In [2]:
def process(dataset_path, mask_path=None):
    data = []
    if mask_path:
        mask_path = Path(mask_path)

    for image in sorted(Path(dataset_path).iterdir()):
        item = {}
        img = io.imread(image)
        
        if img.shape[2]>3:
            assert(img[:,:,3]!=255).sum()==0
        
        img = img[:,:,:3]
        item['name'] = image.name.split("_")[0]
        item['img'] = t.from_numpy(img)
        if mask_path:
            mask = io.imread(mask_path/(item['name'] + "_manual1.gif"))
            item['mask'] = t.from_numpy(mask)
        data.append(item)
    
    return data
test = process('test/images/')
t.save(test, TEST_PATH)
train_data = process('train/images', "train/1st_manual/")
control_data = process('train/control/images', "train/control/1st_manual/")



In [3]:
import PIL
class Dataset():
    def __init__(self,data,source_transform,target_transform):
        self.datas = data
        self.s_transform = source_transform
        self.t_transform = target_transform
    def __getitem__(self, index):
        data = self.datas[index]
        img = data['img'].numpy()
        mask = data['mask'][:,:,None].byte().numpy()
        img = self.s_transform(img)
        mask = self.t_transform(mask)
        return img, mask
    def __len__(self):
        return len(self.datas)
s_trans = tsf.Compose([
    tsf.ToPILImage(),
                tsf.Resize((img_size,img_size)),
                 tsf.ToTensor(),
                 tsf.Normalize(mean = [0.485, 0.456, 0.406], 
                             std = [0.229, 0.224, 0.225])
])
t_trans = tsf.Compose([
    tsf.ToPILImage(),
    tsf.Resize((img_size,img_size),interpolation=PIL.Image.NEAREST),
    tsf.ToTensor(),]
)
dataset = Dataset(train_data,s_trans,t_trans)
train_dataloader = t.utils.data.DataLoader(dataset,batch_size=4, num_workers=48)
control_dataloader = t.utils.data.DataLoader(Dataset(control_data,s_trans,t_trans), batch_size=3,shuffle=True)

In [None]:
# sub-parts of the U-Net mode


class double_conv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(double_conv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.conv(x)
        return x


class inconv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(inconv, self).__init__()
        self.conv = double_conv(in_ch, out_ch)

    def forward(self, x):
        x = self.conv(x)
        return x


class down(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(down, self).__init__()
        self.mpconv = nn.Sequential(
            nn.MaxPool2d(2),
            double_conv(in_ch, out_ch)
        )

    def forward(self, x):
        x = self.mpconv(x)
        return x


class up(nn.Module):
    def __init__(self, in_ch, out_ch, bilinear=False):
        super(up, self).__init__()

        #  would be a nice idea if the upsampling could be learned too,
        #  but my machine do not have enough memory to handle all those weights
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        else:
            self.up = nn.ConvTranspose2d(in_ch, in_ch//2, 2, stride=2)

        self.conv = double_conv(in_ch, out_ch)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, (diffX // 2, diffX - diffX//2,
                        diffY // 2, diffY - diffY//2))

        x = t.cat([x2, x1], dim=1)
        x = self.conv(x)
        return x


class outconv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(outconv, self).__init__()
        self.conv = nn.Conv2d(in_ch, out_ch, 1)

    def forward(self, x):
        x = self.conv(x)
        return x


class UNet(nn.Module):
    def __init__(self, n_channels, n_classes):
        super(UNet, self).__init__()
        self.inc = inconv(n_channels, 64)
        self.down1 = down(64, 128)
        self.down2 = down(128, 256)
        self.down3 = down(256, 512)
        self.down4 = down(512, 1024)
        self.up1 = up(1024, 512)
        self.up2 = up(512, 256)
        self.up3 = up(256, 128)
        self.up4 = up(128, 64)
        self.outc = outconv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        x = self.outc(x)
        x = t.nn.functional.sigmoid(x)
        return x

In [None]:
# Попробуйте использовать различные функции потерь.
def soft_dice_loss(inputs, targets):
        num = targets.size(0)
        m1  = inputs.view(num,-1)
        m2  = targets.view(num,-1)
        intersection = (m1 * m2)
        score = 2. * (intersection.sum(1)+1) / (m1.sum(1) + m2.sum(1)+1)
        score = 1 - score.sum()/num
        return score

In [None]:
class TestDataset():
    def __init__(self,path,source_transform):
        self.datas = t.load(path)
        self.s_transform = source_transform
    def __getitem__(self, index):
        data = self.datas[index]
        img = data['img'].numpy()
        img = self.s_transform(img)
        return img
    def __len__(self):
        return len(self.datas)
trans = tsf.Compose([
    tsf.ToPILImage(),
                tsf.Resize((img_size,img_size)),
                 tsf.ToTensor(), 
                 tsf.Normalize(mean = [0.485, 0.456, 0.406], 
                             std = [0.229, 0.224, 0.225])])
testset = TestDataset(TEST_PATH, trans)
testdataloader = t.utils.data.DataLoader(testset,batch_size=1)

In [None]:
model = UNet(n_channels=3,n_classes=1)
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
criterion = soft_dice_loss
model.train()

In [None]:
device = torch.device("cpu")
dtype = torch.FloatTensor
model.train()

num_epochs = 400

for epoch in range(num_epochs):
    running_loss = []
    control_loss =[]
    for i, (X, y) in enumerate(train_dataloader):
        X = X.to(device)
        y = y.to(device)
        X = Variable(X)
        y = Variable(y)
        
        optimizer.zero_grad()
        output = model(X)
        loss = criterion(output, y)
        
        loss.backward()
        optimizer.step()
        running_loss.append(loss.item())

    print("loss for epoch " + str(epoch) + ":  " + str(np.mean(running_loss)))


In [None]:
os.makedirs("result")

In [None]:
model = model.eval()
with torch.no_grad():
    for ex_id, data in enumerate(testdataloader):
        data = t.autograd.Variable(data, volatile=True).cuda()
        o = model(data).cpu()
    
        source_image = io.imread("test/images/%s_test.tif" % str(ex_id + 1).zfill(2))
        tm = o[0][0].data.cpu().numpy()
        tm = skimage.transform.resize(tm, source_image.shape[:-1])
        tm = (tm > 0.5).astype(int)
        imageio.imwrite("result/%s.png" % (ex_id + 1), tm)

In [None]:
!zip -r result.zip result