# Głębokie Sieci Neuronowe

### 1. Wymagane biblioteki

In [24]:
import os # Określanie lokacji plików
import numpy as np # Operacje na macierzach
import cv2 # Wczytywanie obrazów
import torch # PyTorch
import torch.nn as nn # Sieci neuronowe
import torch.nn.functional as F # Do specyfikacji funkcji
import random # Losowanie liczb
import time # Uzyskiwanie obecnego czasu
from albumentations import HorizontalFlip, VerticalFlip, Rotate # Morfologia obrazów
from torch.utils.data import Dataset # Zbiór danych do uczenia i trenowania
from torch.utils.data import DataLoader # Uzyskiwanie obrazu w danej epoce
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score # Wskaźniki jakości modelu
from operator import add # Łączenie zdjęć

# Obrazy treningowe
train_img_path = os.path.abspath('train/images')
train_mask_path = os.path.abspath('train/ground_truth')

# Obrazy walidacyjne
validation_img_path = os.path.abspath('test/images')
validation_mask_path = os.path.abspath('test/ground_truth')

# Obrazy testowe
# Są równoznaczne z walidacyjnymi, ale można to zmienić.
# Należy potem wykonać augumentacje na obrazach testowych - tylko do zmiany ich rozmiaru
test_img_path = os.path.abspath('test/images')
test_mask_path = os.path.abspath('test/ground_truth')

# Lokacje zdjęć po augumentacji (zmianie rozmiaru, rotacji, itd)
train_augumented_path = os.path.abspath('train/augumented_images')
validation_augumented_path = os.path.abspath('test/augumented_images')
test_augumented_path = os.path.abspath('test/augumented_images')

# Tutaj zostaną zapisane wyniki detekcji modelem siecii
results_path = os.path.abspath('results')

# Tutaj zostanie zapisany model w trakcie uczenia
model_output = os.path.abspath('model')

### 2. Sieć UNET

In [2]:
class Block(nn.Module):
    
    def __init__(self, input_size, output_size):
        super().__init__()
        self.conv_layer1 = nn.Conv2d(input_size, output_size, kernel_size=3, padding=1)
        self.norm_layer1 = nn.BatchNorm2d(output_size)
        
        self.conv_layer2 = nn.Conv2d(output_size, output_size, kernel_size=3, padding=1)
        self.norm_layer2 = nn.BatchNorm2d(output_size)
        
        self.relu = nn.ReLU()
    
    def forward(self, inputs):
        x = self.conv_layer1(inputs)
        x = self.norm_layer1(x)
        x = self.relu(x)
        
        x = self.conv_layer2(x)
        x = self.norm_layer2(x)
        x = self.relu(x)
        
        return x

In [3]:
class PullingBlock(nn.Module):
    
    def __init__(self, input_size, output_size):
        super().__init__()
        self.conv_block = Block(input_size, output_size)
        self.pool = nn.MaxPool2d((2, 2))
    
    def forward(self, inputs):
        x = self.conv_block(inputs)
        p = self.pool(x)
        
        return x, p

In [4]:
class Decoder(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.d = nn.ConvTranspose2d(input_size, output_size, kernel_size=2, stride=2, padding=0)
        self.conv_block = Block(output_size + output_size, output_size)
    
    def forward(self, inputs, skip):
        x = self.d(inputs)
        x = torch.cat([x, skip], axis=1)
        x = self.conv_block(x)
        return x

In [5]:
class UNET(nn.Module):
    def __init__(self):
        super().__init__()
        self.e1 = PullingBlock(3, 64)
        self.e2 = PullingBlock(64, 128)
        self.e3 = PullingBlock(128, 256)
        self.e4 = PullingBlock(256, 512)
        
        self.b = Block(512, 1024)
        
        self.d1 = Decoder(1024, 512)
        self.d2 = Decoder(512, 256)
        self.d3 = Decoder(256, 128)
        self.d4 = Decoder(128, 64)
        
        self.output = nn.Conv2d(64, 1, kernel_size=1, padding=0)
    
    def forward(self, inputs):
        f1, p1 = self.e1(inputs)
        f2, p2 = self.e2(p1)
        f3, p3 = self.e3(p2)
        f4, p4 = self.e4(p3)
        
        b = self.b(p4)
        
        d1 = self.d1(b, f4)
        d2 = self.d2(d1, f3)
        d3 = self.d3(d2, f2)
        d4 = self.d4(d3, f1)
        
        output = self.output(d4)
        
        return output

### 3. Augumentacja obrazu

In [6]:
def augument_data(img_path, mask_path, output_path, img_size = (512, 512), augument = True):
    """
    Augumentacja obrazu.
    :param img_path: - ścieżka do zdjęć na których należy wykonać augumentację
    :param mask_path: - ścieżka do mask eksperckich zdjęć z zaznaczonymi żyłami
    :param img_size: - docelowy rozmiar obrazów po augumentacji
    :param augument: - czy dokonać morfologii obrazów, czy ograniczyć się tylko do zmiany rozmiaru
    """
    # Do wyświetlania postępu augumentacji
    num_images = len(os.listdir(img_path))
    for idx, image in enumerate(os.listdir(img_path)):
        print(f"Augumenting: {idx} / {num_images}...")
        img = cv2.imread(img_path + '/' + image, cv2.IMREAD_COLOR)
        mask = cv2.imread(mask_path + '/' + image.split('.')[0] + '.tif', cv2.IMREAD_GRAYSCALE)
        
        if augument:
            h_flip = HorizontalFlip(p=1.0)
            v_flip = VerticalFlip(p=1.0)
            rotate = Rotate(limit=45, p=1.0)

            result1 = h_flip(image=img, mask=mask)
            result2 = v_flip(image=img, mask=mask)
            result3 = rotate(image=img, mask=mask)

            aug_img = cv2.resize(img, img_size)
            hflip_img = cv2.resize(result1["image"], img_size)
            vflip_img = cv2.resize(result2["image"], img_size)
            rotated_img = cv2.resize(result3["image"], img_size)
            cv2.imwrite(output_path + '/images/' + image, aug_img)
            cv2.imwrite(output_path + '/images/' + image.split('.')[0] + '_hflip.jpg', hflip_img)
            cv2.imwrite(output_path + '/images/' + image.split('.')[0] + '_vflip.jpg', vflip_img)
            cv2.imwrite(output_path + '/images/' + image.split('.')[0] + '_rotated.jpg', rotated_img)

            aug_mask = cv2.resize(mask, img_size)
            hflip_mask = cv2.resize(result1["mask"], img_size)
            vflip_mask = cv2.resize(result2["mask"], img_size)
            rotated_mask = cv2.resize(result3["mask"], img_size)
            cv2.imwrite(output_path + '/ground_truth/' + image.split('.')[0] + '.tif', aug_mask)
            cv2.imwrite(output_path + '/ground_truth/' + image.split('.')[0] + '_hflip.tif', hflip_mask)
            cv2.imwrite(output_path + '/ground_truth/' + image.split('.')[0] + '_vflip.tif', vflip_mask)
            cv2.imwrite(output_path + '/ground_truth/' + image.split('.')[0] + '_rotated.tif', rotated_mask)
            

        else:
            augumented_image = cv2.resize(img, img_size)
            augumented_mask = cv2.resize(mask, img_size)
            cv2.imwrite(output_path + '/images/' + image, augumented_image)
            cv2.imwrite(output_path + '/ground_truth/' + image.split('.')[0] + '.tif', augumented_mask)

    print(f"Result images saved to {output_path + '/images'} and masks to {output_path + '/ground_truth'}!")
    return mask

### 4. Model DICE

In [7]:
class DiceLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        inputs = torch.sigmoid(inputs)

        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)

        return 1 - dice

class DiceBCELoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        inputs = torch.sigmoid(inputs)

        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_loss = 1 - (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)
        BCE = F.binary_cross_entropy(inputs, targets, reduction='mean')
        Dice_BCE = BCE + dice_loss

        return Dice_BCE

In [8]:
def srand(seed):
    '''
    Ustawianie losowych liczb.
    :seed: - zmienna na podstawie której ustala się podstawę liczb losowych
    '''
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    # Przekaż seed do PyTorch
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    
def get_epoch(start_time, end_time):
    '''
    Uzyskiwanie czasu, który upłynął między dwoma punktami w czasie.
    Do ustalania czasu trwania epoki.
    '''
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

### 5. Trenowanie sieci

##### 1. Wczytywanie danych

In [47]:
augument_data(train_img_path, train_mask_path, train_augumented_path)
augument_data(validation_img_path, validation_mask_path, validation_augumented_path, augument=False)
srand(42)

train_images = [f"{train_augumented_path}/images/{image}" for image in os.listdir(train_augumented_path + '/images')]
train_masks = [f"{train_augumented_path}/ground_truth/{mask}" for mask in os.listdir(train_augumented_path + '/ground_truth')]
validation_images = [f"{validation_augumented_path}/images/{image}" for image in os.listdir(validation_augumented_path + '/images')]
validation_masks = [f"{validation_augumented_path}/ground_truth/{mask}" for mask in os.listdir(validation_augumented_path + '/ground_truth')]
test_images = [f"{test_augumented_path}/images/{image}" for image in os.listdir(test_augumented_path + '/images')]
test_masks = [f"{test_augumented_path}/ground_truth/{mask}" for mask in os.listdir(test_augumented_path + '/ground_truth')]

print(f"Dataset contains:\n{len(train_images)} training images\n{len(test_images)} validation images")

Dataset contains:
120 training images
15 validation images


##### 2. Dataset

In [10]:
class NetworkDataset(Dataset):
    def __init__(self, train_img_paths, train_mask_paths):
        super().__init__()
        self.train_img_paths = train_img_paths
        self.train_mask_paths = train_mask_paths
        self.num_samples = len(self.train_img_paths)
    
    def __getitem__(self, index):
        image = cv2.imread(self.train_img_paths[index], cv2.IMREAD_COLOR)
        image = image / 255.0
        image = np.transpose(image, (2, 0, 1))
        image = image.astype(np.float32)
        image = torch.from_numpy(image)
        
        mask = cv2.imread(self.train_mask_paths[index], cv2.IMREAD_GRAYSCALE)
        mask = mask / 255.0
        mask = np.expand_dims(mask, axis=0)
        mask = mask.astype(np.float32)
        mask = torch.from_numpy(mask)
        
        return image, mask
    
    def __len__(self):
        return self.num_samples

##### 3. Tworzenie modelu

In [11]:
# Docelowy rozmiar obrazu podawany do sieci
img_size = (512, 512)
# Ile obrazów na raz przetwarzać w sieci
batch_size = 2
# Ile epok ma trwać uczenie
num_epochs = 50
# Prędkość uczenia sieci
learning_rate = 1e-4
# Gdzie zapisywać model przy poprawie jakości modelu
checkpoint_path = model_output + '/model.pth'

# Zbiory danych
train_dataset = NetworkDataset(train_images, train_masks)
validation_dataset = NetworkDataset(validation_images, validation_masks)

train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=0
)

validation_loader = DataLoader(
    dataset=validation_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=0
)

# Tworzenie modelu
device = torch.device('cuda')
model = UNET()
model = model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, verbose=True)
loss_function = DiceBCELoss()

##### 4. Uczenie modelu

In [12]:
def train(model, loader, optimizer, loss_function, device):
    '''
    Algorytm uczenia.
    :param model: - model sieci
    :param loader: - obiekt wczytujący dane uczące
    :param optimizer: - optymalizator
    :param loss_function: - funkcja obliczająca błąd, na podstawie którego wyznaczane są wagi
    :param device: - urządzenie uczące (gpu lub cpu)
    '''
    epoch_loss = 0.0
    model.train()
    for img_tensor, mask_tensor in loader:
        img_tensor = img_tensor.to(device, dtype=torch.float32)
        mask_tensor = mask_tensor.to(device, dtype=torch.float32)
        
        optimizer.zero_grad()
        prediction = model(img_tensor)
        loss = loss_function(prediction, mask_tensor)
        # Wsteczna propagacja
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
    
    # Obliczanie średniego błędu
    epoch_loss = epoch_loss / len(loader)
    return epoch_loss

In [13]:
def evaluate(model, loader, loss_function, device):
    '''
    Ewaluacja jakości modelu na podstawie danych walidacyjnych.
    :param model: - model do ewaluacji
    :param loader: - obiekt wczytujący dane walidacyjne
    :param loss_function: - funkcja błędu pomiaru
    :param device: - urządzenie wykonujące walidację (gpu lub cpu)
    '''
    epoch_loss = 0.0

    model.eval()
    with torch.no_grad():
        for img_tensor, mask_tensor in loader:
            img_tensor = img_tensor.to(device, dtype=torch.float32)
            mask_tensor = mask_tensor.to(device, dtype=torch.float32)

            prediction = model(img_tensor)
            loss = loss_function(prediction, mask_tensor)

            epoch_loss += loss.item()

        epoch_loss = epoch_loss/len(loader)
    return epoch_loss

In [14]:
min_loss = float("inf")
# Zwalnianie danych w urządzeniu uczącym, zapobiega błędowi CUDA Out of Memory
torch.cuda.empty_cache()

for epoch in range(num_epochs):
    start_time = time.time()

    training_loss = train(model, train_loader, optimizer, loss_function, device)
    validation_loss = evaluate(model, validation_loader, loss_function, device)
    
    # Zapisuj model jeżeli jego jakość się poprawi
    if validation_loss < min_loss:
        print(f"Validation loss reduced from {min_loss} to {validation_loss}, saving model...")
        min_loss = validation_loss
        torch.save(model.state_dict(), checkpoint_path)
    
    end_time = time.time()
    
    epoch_mins, epoch_secs = get_epoch(start_time, end_time)
    
    # Wypisz dane diagnostyczne
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {training_loss:.3f}')
    print(f'\t Val. Loss: {validation_loss:.3f}\n')

Validation loss reduced from inf to 1.0774696543812752, saving model...
Epoch: 01 | Epoch Time: 2m 0s
	Train Loss: 1.156
	 Val. Loss: 1.077



KeyboardInterrupt: 

Ponieważ proces uczenia na komputerze lokalnym jest wolny, model został wyuczony na notatniku **Google Colab** i zapisany w folderze `model`.

### 6. Testowanie sieci

##### 1. Wczytywanie modelu

In [14]:
srand(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = UNET()
model = model.to(device)
model.load_state_dict(torch.load(checkpoint_path, map_location=device))
model.eval()

UNET(
  (e1): PullingBlock(
    (conv_block): Block(
      (conv_layer1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (norm_layer1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv_layer2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (norm_layer2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (pool): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (e2): PullingBlock(
    (conv_block): Block(
      (conv_layer1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (norm_layer1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv_layer2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (norm_layer2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): Re

##### 2. Ewaluacja efektywności

In [15]:
def get_score(ground_truth, prediction):
    ground_truth = ground_truth.cpu().numpy()
    ground_truth = ground_truth > 0.5
    ground_truth = ground_truth.astype(np.uint8)
    ground_truth = ground_truth.reshape(-1)
    
    prediction = prediction.cpu().numpy()
    prediction = prediction > 0.5
    prediction = prediction.astype(np.uint8)
    prediction = prediction.reshape(-1)
    
    jaccard = jaccard_score(ground_truth, prediction)
    f1 = f1_score(ground_truth, prediction)
    recall = recall_score(ground_truth, prediction)
    precision = precision_score(ground_truth, prediction)
    acc = accuracy_score(ground_truth, prediction)

    return [jaccard, f1, recall, precision, acc]

def mask_parse(mask):
    mask = np.expand_dims(mask, axis=-1)
    mask = np.concatenate([mask, mask, mask], axis=-1)
    return mask

In [36]:
metrics_score = [0.0, 0.0, 0.0, 0.0, 0.0]
prediction_times = []

for idx, (img, mask) in enumerate(zip([f"{test_img_path}/{img}" for img in test_images], [f"{test_mask_path}/{mask}" for mask in test_masks])):
    result_name = img.split("/")[-1].split(".")[0]
    
    image = cv2.imread(img, cv2.IMREAD_COLOR)
    image = cv2.resize(image, img_size)
    img = np.transpose(image, (2, 0, 1))
    img = img / 255.0
    img = np.expand_dims(img, axis=0)
    img = img.astype(np.float32)
    img = torch.from_numpy(img)
    img = img.to(device)
    
    expert_mask = cv2.imread(mask, cv2.IMREAD_GRAYSCALE)
    expert_mask = cv2.resize(expert_mask, img_size)
    mask = np.expand_dims(expert_mask, axis=0)
    mask = mask / 255.0
    mask = np.expand_dims(expert_mask, axis=0)
    mask = mask.astype(np.float32)
    mask = torch.from_numpy(mask)
    mask = mask.to(device)
    
    with torch.no_grad():
        start_time = time.time()
        prediction = model(img)
        prediction = torch.sigmoid(prediction)
        elapsed = time.time() - start_time
        prediction_times.append(elapsed)
        
        score = get_score(mask, prediction)
        metrics_score = list(map(add, metrics_score, score))
        prediction = prediction[0].cpu().numpy()
        prediction = np.squeeze(prediction, axis=0)
        prediction = prediction > 0.5
        prediction = np.array(prediction, dtype=np.uint8)
        
    expert_mask = mask_parse(expert_mask)
    prediction = mask_parse(prediction)
    line = np.ones((img_size[1], 10, 3)) * 128
    
    result = np.concatenate(
        [image, line, expert_mask, line, prediction * 255], axis=1
    )
    print(f"Saving {result_name}.png to {results_path}...")
    cv2.imwrite(f"{results_path}/{result_name}.png", result)

mean_jaccard = metrics_score[0]/len(test_images)
mean_f1 = metrics_score[1]/len(test_images)
mean_recall = metrics_score[2]/len(test_images)
mean_precision = metrics_score[3]/len(test_images)
mean_acc = metrics_score[4]/len(test_images)
print(f"Jaccard: {mean_jaccard:1.4f} - F1: {mean_f1:1.4f} - Recall: {mean_recall:1.4f} - Precision: {mean_precision:1.4f} - Acc: {mean_acc:1.4f}")

fps = 1/np.mean(prediction_times)
print("FPS: ", fps)

Saving 11_dr.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 11_g.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 11_h.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 12_dr.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 12_g.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 12_h.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 13_dr.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 13_g.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 13_h.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 14_dr.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 14_g.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 14_h.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\Deep NN\results...
Saving 15_dr.png to C:\Users\dawnw\Desktop\PP\IwM\Projekt 2\