In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pyrsgis



In [3]:
######################### Import des libraries #######################

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset, DataLoader, sampler
from pyrsgis import raster
from pyrsgis.convert import changeDimension
from copy import deepcopy
import torch
import torch.nn as nn
import pandas as pd
import scipy.signal
import scipy.ndimage
import os
import numpy as np



In [0]:
############################## Parameters ############################

stop = 1000
epochs = 7

In [0]:
######################### Import des données #########################

path = "/content/drive/My Drive/D4G/2_Seredou_2017/"

Test = "2_Image_Seredou_32bits_20170205/S2A_20170205_seredou_32bits.tif"
MSI = "2_MSI_Seredou_20170205/S2A_20170205_seredou_ZE_MSI_89.tif"
Truth = "GroundTruth_Seredou_20170205/GroundTruth_20170205_seredou_ZE_89.tif"

In [0]:
######################### Fonctions auxiliaires #########################

def divide_test_in_squares(size, img):
    return [img[:,int(i % img.shape[0])*size:((int(i % img.shape[0])+1)*size), int(i // img.shape[0])*size:(int(i // img.shape[0])+1)*size] for i in range(int(img.shape[0] * img.shape[1] // size**2))]

def divide_image_in_squares(size, img):
    return [img[int(i % img.shape[0])*size:((int(i % img.shape[0])+1)*size), int(i // img.shape[0])*size:(int(i // img.shape[0])+1)*size] for i in range(int(img.shape[0] * img.shape[1] // size**2))]

def get_window(img, center, size):
    
    window = torch.tensor([[0. for i in range(size)] for j in range(size)])
    if center[0] + size // 2 < img.shape[0]:
        i_begin = max(0,int(center[0]-size//2))
        i_end = i_begin + size
    else:
        i_end = img.shape[0]
        i_begin = i_end - size
    if center[1] + size // 2 < img.shape[1]:
        j_begin = max(0,int(center[1]-size//2))
        j_end = j_begin + size
    else:
        j_end = img.shape[1]
        j_begin = j_end - size

    for i in range(i_begin, i_end):
        for j in range(j_begin, j_end):
            window[i - i_begin,j - j_begin] = float(img[i,j])
    return window

In [0]:
######################### Base de données #########################

class OurDataset(Dataset):
    """This dataset includes .... """
    
    def __init__(self, path, Test, MSI, Truth, size, percentage, transform=None):
        """
        Args:
            path: (str) path to the images directory.
        """
        #get data
        ds1, data_MSI = raster.read(path + MSI, bands='all')
        ds2, data_test = raster.read(path + Test, bands='all')
        ds3, data_truth = raster.read(path + Truth, bands='all')

        data_MSI[data_MSI < 0] = 0
        data_test[data_test < 0] = 0
        data_truth[data_truth < 0] = 0

        self.MSI = divide_image_in_squares(size, data_MSI)
        self.Test = divide_test_in_squares(size, data_test)
        self.Truth = divide_image_in_squares(size, data_truth)
  
        self.size = size
        self.mode = "train"
        self.id_train, self.id_test = train_test_split([i for i in range(len(self.Test))], test_size = percentage, random_state=42, shuffle=True)
        
    def __len__(self):
        if self.mode == "train":
            return len(self.id_train)
        else:
            return len(self.id_test)

    def __getitem__(self, idx):
        """
        Args:
            idx: (int) the index of the subject/session whom data is loaded.
        Returns:
            sample: (dict) corresponding data described by the following keys:
                scan: 11 channels value
                mask: true value
        """

        feature_data = torch.tensor([[[0. for i in range(11*5)] for j in range(self.Test[idx].shape[0])] for k in range(self.Test[idx].shape[1])])
        feature_truth = np.array([[0 for j in range(self.Test[idx].shape[0])] for k in range(self.Test[idx].shape[1])])
        
        #On importe les données
        for i in range(self.Test[idx].shape[0]):
            for j in range(self.Test[idx].shape[1]):
                
                feature_truth[i,j] = self.Truth[idx][i,j]

                msi = get_window(self.MSI[idx], (i,j), self.size // 2)
                msi = msi/msi.max()
                feature_data[i,j,50] = float(self.MSI[idx][i,j])
                feature_data[i,j,51] = msi.max()
                feature_data[i,j,52] = msi.min()
                feature_data[i,j,53] = msi.mean()
                feature_data[i,j,54] = msi.std()
                
                for k in range (10):
                    test = get_window(self.Test[idx][k,:,:], (i,j), self.size // 2)
                    test = test/test.max()
                    
                    feature_data[i,j,k*5] = float(self.Test[idx][k,i,j])
                    feature_data[i,j,k*5 + 1] = test.max()
                    feature_data[i,j,k*5 + 2] = test.min()
                    feature_data[i,j,k*5 + 3] = test.mean()
                    feature_data[i,j,k*5 + 4] = test.std()
        
        sample = {'data': feature_data, 'mask': feature_truth}

        return sample

    def train(self):
        """Put all the transforms of the dataset in training mode"""
        self.transform.train()

    def set_mode(self, mode):
        """Change mode of the database"""
        self.mode = mode

    def eval(self):
        """Put all the transforms of the dataset in evaluation mode"""
        self.transform.eval()

In [0]:
def train(model, train_loader, criterion, optimizer, n_epochs):
    """
    Method used to train a nn
    
    Args:
        model: (nn.Module) the neural network
        train_loader: (DataLoader) a DataLoader wrapping the dataset
        criterion: (nn.Module) a method to compute the loss of a mini-batch of images
        optimizer: (torch.optim) an optimization algorithm
        n_epochs: (int) number of epochs performed during training

    Returns:
        best_model: (nn.Module) the trained neural network
    """
    best_model = deepcopy(model)
    train_best_loss = np.inf

    batch_size = train_loader.batch_size
    n = 10

    n_batches = n//batch_size

    for epoch in range(n_epochs):
        model.train()
        total_loss = 0
        for i, data in enumerate(train_loader):
            images, mask = data['data'], data['mask']
            scans = torch.flatten(mask)
            outputs = torch.reshape(model(images), (scans.shape[0],4))
            loss = criterion(outputs, scans)
            loss.backward()
            total_loss += loss.item()
            optimizer.step()
            optimizer.zero_grad()

        mean_loss = total_loss / len(train_loader.dataset)
        print('Epoch %i: loss = %f & accuracy = %f' % (epoch, mean_loss,torch.sum(torch.argmax(outputs,1) == scans)/scans.shape[0]))

        if mean_loss < train_best_loss:
            best_model = deepcopy(model)
            train_best_loss = mean_loss
    
    return best_model

def test(model, data_loader, criterion):
    """
    Method used to test a CNN
    
    Args:
        model: (nn.Module) the neural network
        data_loader: (DataLoader) a DataLoader wrapping a MRIDataset
        criterion: (nn.Module) a method to compute the loss of a mini-batch of images
    
    Returns:
        results_df: (DataFrame) the label predicted for on the slice level.
        results_metrics: (dict) a set of metrics
    """
    model.eval()
    columns = ["participant_id", "slice_id", "proba0", "proba1",
               "true_label", "predicted_label"]
    results_df = pd.DataFrame(columns=columns)
    total_loss = 0

    batch_size = data_loader.batch_size
    n = 10000

    n_batches = n//batch_size
    
    with torch.no_grad():
        for i, data in enumerate(data_loader, 0):
             images, mask = data['data'], data['mask']
             scans = torch.flatten(mask)
             outputs = torch.reshape(model(images), (scans.shape[0],4))
             loss = criterion(outputs, scans)
             total_loss += loss.item()
             print("Batch {}/{} loss : {}".format(i+1, n_batches, str(total_loss/((i+1)*batch_size))))
       #     probs = nn.Softmax(dim=1)(outputs)
        #    _, predicted = torch.max(outputs.data, 1)
             print("Accuracy : {}".format(str(int(torch.sum(torch.argmax(outputs,1) == scans))/scans.shape[0])))

    print("Final loss : {}".format(str(total_loss/ len(data_loader.dataset))))

In [0]:
########################## Modèle ############################

class Model(nn.Module):

    def __init__(self):
        super(Model, self).__init__()
        self.lin1 = nn.Linear(55, 200)
        self.lin2 = nn.Linear(200, 200)
        self.lin3 = nn.Linear(200, 200)
        self.lin4 = nn.Linear(200, 4)
        self.tanh = nn.Tanh()
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.lin1(x)
        x = self.tanh(x)
        x = self.lin2(x)
        x = self.tanh(x)
        x = self.lin3(x)
        x = self.tanh(x)
        x = self.lin4(x)
        return self.relu(x)

In [0]:
########################## Prétraitement ############################

batch_size = 100

database = OurDataset(path, Test, MSI, Truth, 10, 0.2)
dataloader_train = DataLoader(database, batch_size=batch_size, drop_last=True)

model = Model()
criterion = nn.MultiMarginLoss(p=2)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
n_epochs = 5

In [0]:
#################### Entrainement du modèle ##########################

train(model, dataloader_train, criterion, optimizer, n_epochs)
torch.save(model.state_dict(), "model")

In [0]:
############################### Statistiques ################################

#Predict for test data 

database.set_mode("test")
dataloader_val = DataLoader(database, batch_size=batch_size, drop_last=True)

results_metrics = test(model, dataloader_val, criterion)