# Method - 1 Supervised Model Development

In [24]:
# Libraries
from utils import read_data, rRMSE_per_case #get_batch, fit_biExponential_model_signal
import numpy as np
import matplotlib.pyplot as plt
import os
from torch import nn

In [22]:
# Addresses
file_dir='../public_training_data/'
fname_gt ='_IVIMParam.npy'
fname_gtDWI ='_gtDWIs.npy'
fname_tissue ='_TissueType.npy'
fname_noisyDWIk = '_NoisyDWIk.npy'
file_Resultdir='../Result/'

b_values = [0, 5, 50, 100, 200, 500, 800, 1000]

In [23]:
# Save NLLS performance
tumor, non_tumor = 0, 0
ctr = 0
for image_number in range(800, 1000):
    nlls = np.load(os.path.join(file_Resultdir, f'{(image_number + 1):04d}.npy'))
    params = read_data(file_dir, fname_gt, image_number + 1)
    tissue = read_data(file_dir, fname_tissue, image_number + 1)
    t, nt = rRMSE_per_case(nlls[:,:,0], nlls[:,:,1], nlls[:,:,2], params[:,:,0], params[:,:,1], params[:,:,2], tissue)
    tumor += t
    non_tumor += nt
    ctr += 1
print(tumor/ctr, non_tumor/ctr)

0.7233413566918498 0.5567931568118685


In [None]:
class PIA(nn.Module):
    
    def __init__(self, b_values = [0, 5, 50, 100, 200, 500, 800, 1000],
                hidden_dims= [32, 64, 128, 256, 512],
                predictor_depth=2):
         super(PIA, self).__init__()

         self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
         self.number_of_signals = len(b_values)
         self.sigmoid = nn.Sigmoid()
         modules = []
         in_channels = self.number_of_signals
         for h_dim in hidden_dims:
             modules.append(nn.Sequential(
                 nn.Linear(in_features=in_channels, out_features=h_dim),
                 nn.LeakyReLU()))
             in_channels = h_dim
         self.encoder = nn.Sequential(*modules).to(self.device)
         D_predictor = []
         for _ in range(predictor_depth):
             D_predictor.append(nn.Sequential(
                 nn.Linear(in_features=hidden_dims[-1], out_features=hidden_dims[-1]),
                 nn.LeakyReLU())
                 )
         D_predictor.append(nn.Linear(hidden_dims[-1], 1))
         self.D_predictor = nn.Sequential(*D_predictor).to(self.device)
    def encode(self, x):
        """
        Encodes the input by passing through the encoder network
        and returns the latent codes.
        :param input: (Tensor) Input tensor to encoder
        :return: (Tensor) List of latent codes
        """
        result = self.encoder(x)   
        D = self.D_predictor(result)
        # D_star = 3 + self.relu(self.D_star_predictor(result))
        # f = self.sigmoid(self.f_predictor(result))      
        return D#[f, D, D_star]

# D_star_predictor = []
# for _ in range(predictor_depth):
    
#     D_star_predictor.append(
#         nn.Sequential(
#             nn.Linear(in_features=hidden_dims[-1], out_features=hidden_dims[-1]),
#             nn.LeakyReLU())
#     )
# D_star_predictor.append(nn.Linear(hidden_dims[-1], 1))
# D_star_predictor = nn.Sequential(*D_star_predictor).to(device)
# f_predictor = []
# for _ in range(predictor_depth):
    
#     f_predictor.append(
#         nn.Sequential(
#             nn.Linear(in_features=hidden_dims[-1], out_features=hidden_dims[-1]),
#             nn.LeakyReLU())
#     )
# f_predictor.append(nn.Linear(hidden_dims[-1], 1))
# f_predictor = nn.Sequential(*f_predictor).to(device)




In [None]:
# Check if what you build is correct
for image_number in range(1):
    params = read_data(file_dir, fname_gt, image_number + 1)
    clean = read_data(file_dir, fname_gtDWI, image_number + 1)
    tissue = read_data(file_dir, fname_tissue, image_number + 1)
    coordBody = np.argwhere(tissue != 1)
    k = read_data(file_dir, fname_noisyDWIk, image_number + 1)
    noisy = np.abs(np.fft.ifft2(k, axes=(0,1) ,norm='ortho'))
    clean = np.abs(clean)

    for pix in range(250, coordBody.shape[0]):
        ix, jx = coordBody[pix, 0], coordBody[pix, 1]
        f, D, D_star = params[ix, jx, :]
        D *= 1000
        D_star *=1000
        print(clean[ix, jx]/clean[ix, jx, 0])
        print(noisy[ix, jx]/clean[ix, jx, 0])
        signal = np.zeros((len(b_values)), dtype=float)
        for ctr, b in enumerate(b_values):
            signal[ctr] = (1 - f)*np.exp(-(b/1000)*D) + f*np.exp(-(b/1000)*D_star)
        print(signal)
        break





In [None]:

from tqdm import tqdm



from model import PIA
from torch import optim
import torch

In [None]:








test_tensor, f_test, D_test, D_star_test, clean = get_batch(10, noise_sdt=0.01)
#test = test_tensor.detach().cpu().numpy()
#f, D, D_star = fit_biExponential_model_signal(test, b_values)

In [None]:
model = PIA()
params = model.parameters()
lr = 3e-4
optimizer = optim.Adam(params, lr=lr)

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
test_tensor = test_tensor.to(device)

In [None]:
clean = clean.to(device)
f_test = f_test.to(device)
D_test = D_test.to(device)
loss_fcn = nn.MSELoss()
model.train()
for ep in range(500):
    for i in range(10):
        D = model.encode(test_tensor[i])
        #D_star = 5 + D_star_predictor(result)
        #f = sigmoid(f_predictor(result))
        #signal = torch.zeros((D.shape[0], number_of_signals)).to(device)
        # D_T, D_star_T, f_T = D.T, D_star.T, f.T
        # for inx, b in enumerate(b_values):
        #     signal[:, inx] = (1-f_T)*torch.exp(-(b/1000)*D_T) + f_T*torch.exp(-(b/1000)*D_star_T)
    
        loss = torch.mean((D - D_test[i]) ** 2)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    if not ep%10:
        print(f'{loss.item():.4f}')





In [None]:
dev = "cuda:0"
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
device = torch.device(dev)
#models =  []
torch.cuda.empty_cache()
model = PIA(predictor_depth=2).float().to(device).train()
params = model.parameters()
lr = 3e-4
optimizer = optim.Adam(params, lr=lr)
test_tensor = test_tensor.cuda()

clean = clean.cuda()

In [None]:
for ep in tqdm(range(50)):
    optimizer.zero_grad()
    f, D, D_star = model.encode(test_tensor)  
    recon = model.decode(f, D, D_star).cuda()
    loss = model.loss_function(recon, clean)
    loss.backward()
    optimizer.step()
    print(f, f_test)


In [None]:


fig, ax = plt.subplots(3,1, figsize=(15,45))
title = ['f ', 'D ', 'D_star']
ylims = [(0.0, 1.0), (0.0, 0.003), (0.003, 0.3)]
for c in range(3):
    y = nlls[:, :, c].flatten() 
    x = params[:, :, c].flatten()
    #nbins=30
    #k = gaussian_kde([x,y])
    #xi, yi = np.mgrid[x.min():x.max():nbins*1j, y.min():y.max():nbins*1j]
    #zi = k(np.vstack([xi.flatten(), yi.flatten()]))
    #ax[c].pcolormesh(xi, yi, zi.reshape(xi.shape), cmap="hot", shading='auto')
    ax[c].scatter(x,y, color='b', s=4, alpha=0.5)

    err = np.mean(np.abs(x - y))
    corr = np.corrcoef(x,y)[0,1]
    ax[c].xaxis.set_tick_params(labelsize=20)
    ax[c].yaxis.set_tick_params(labelsize=20)
    #ax[r,c].set_title(fr'{title[c]}, MAE = {err:.3f}, $\rho$ = {corr:.3f}')
    ax[c].set_xlabel('true', fontsize=24)
    ax[c].set_ylabel('predicted', fontsize=24)



#     
#     


    #pure_noise_re[b_value].append(noisy_real[coordBody[:,0], coordBody[:,1]] - clean_real[coordBody[:,0], coordBody[:,1]])
    #f_measurement = params[:, :, 0]
    #D_measurement = params[:, :, 1]
    #D_star_measurement = params[:, :, 2]
    #
    #f.append(f_measurement[coordBody[:,0], coordBody[:,1]])
    #D.append(D_measurement[coordBody[:,0], coordBody[:,1]])
    #D_star.append(D_star_measurement[coordBody[:,0], coordBody[:,1]])


# f = np.concatenate(f).ravel()
# a = plt.hist(f, bins=100)
# plt.title('f')

# plt.figure()
# D = np.concatenate(D).ravel()
# a = plt.hist(D, bins=100)
# plt.title('D')

# plt.figure()
# D_star = np.concatenate(D_star).ravel()
# a = plt.hist(D_star, bins=100)
# plt.title('D_star')