In [1]:
import os
import numpy as np
import torch
import glob
import torch.nn as nn
import torchvision
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.autograd import Variable
from torchvision import datasets

import pathlib
import matplotlib.pyplot as plt
import torch.nn.functional as F

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
def activation_layer(activation:str):

    if activation == 'leaky':
        return nn.LeakyReLU(negative_slope=0.1)


In [4]:
def normalization_layer(normalization: str,
                      num_channels: int, dim:int):
    if dim == 2:
        if normalization == 'BN':
            return nn.BatchNorm2d(num_channels)
    elif dim == 3:
        if normalization == 'BN':
            return nn.BatchNorm3d(num_channels)


In [5]:
def pooling_layer(pooling:str, dim:int):
    if dim == 2:
        if pooling == "max":
            return nn.MaxPool2d(kernel_size=2,stride=2,padding=0)
        #if pooling == 'stride':
           # return nn.Conv2d()
    if dim == 3:
        if pooling == "max":
            return nn.MaxPool3d(kernel_size=2,stride=2,padding=0)


In [6]:
def conv_layer(in_chs, out_chs, kernel_size, stride, padding, dim):
    if dim == 2:
        return nn.Conv2d(in_chs, out_chs, kernel_size, stride, padding)
    elif dim == 3:
        return nn.Conv3d(in_chs, out_chs, kernel_size, stride, padding)

In [7]:
def up_sample_layer(up_sample,in_chs = None, out_chs = None, kernel_size = 2, stride = 2, dim = 3):
    if up_sample == 'transposed':
        if dim == 2:
            return nn.ConvTranspose2d(in_chs, out_chs, kernel_size,stride)
        elif dim == 3:
            return nn.ConvTranspose3d(in_chs, out_chs, kernel_size,stride)
    else:
        return nn.Upsample(scale_factor=2, mode=up_sample)

In [8]:
def Cat(tensor1, tensor2):
    
    x = torch.cat((tensor1, tensor2), 1)

    return x

In [9]:
def Add (tensor1, tensor2):
    
    x = torch.add(tensor1, tensor2)
    
    return x

In [10]:
class DownBlock1(nn.Module):
    """
    left part of the U shape.
    the repeated application of two 3 × 3 × 3 3D convolution layers, 
    each followed by a batch normalization (BN) and a leaky rectified
    """

    def __init__(self,
                 chs=[1,16,32,64,128],               
                 pooling: str = "max",
                 kernel_size: int = 3,
                 stride:int = 1,
                 padding: int = 1,
                 activation: str = 'leaky',
                 normalization: str = 'BN',
                 dim: int = 2):
        super().__init__()

        self.chs = chs
        #self.out_channels = chs[::-1]
        self.pooling = pooling
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.activation = activation
        self.normalization = normalization
        self.dim = dim
        
        self.activation_layer = activation_layer(self.activation)
        #self.normalization_layer = normalization_layer(normalization=self.normalization, num_channels=out_chs,
                                           #dim=self.dim)       
        self.pool = pooling_layer(pooling = self.pooling, dim=self.dim)
        
        #self.tensor_to_cat = nn.ModuleList()
        
        self.num_level = len(self.chs)-1



    def forward(self, x):
        #print(x.shape)
        
        for i in range(self.num_level-1):
            
            in_chs = self.chs[i]
            out_chs = self.chs[i+1]
            norm_layer = normalization_layer(normalization=self.normalization, num_channels=out_chs,
                                           dim=self.dim)
            conv_layer1 = conv_layer(in_chs, out_chs, kernel_size = self.kernel_size, stride = self.stride, padding = self.padding, 
                                          dim = self.dim)
            conv_layer2 = conv_layer(out_chs, out_chs, kernel_size = self.kernel_size, stride = self.stride, padding = self.padding, 
                                          dim = self.dim)
        
            x = conv_layer1(x)
            x = norm_layer(x)
            x = self.activation_layer(x)

            x = conv_layer2(x)
            x = norm_layer(x)
            x = self.activation_layer(x)

            
            #x = self.pool(y)
            #self.tensor_to_cat.append(x)
            stride_layer = conv_layer(self.chs[i+1], self.chs[i+1], kernel_size = self.kernel_size, stride = 2, padding = self.padding, 
                                          dim = self.dim)
            x = stride_layer(x)

            x = norm_layer(x)
            x = self.activation_layer(x)
            #print(x.shape)


        #last level
        conv_layer_end1 = conv_layer(self.chs[-2], self.chs[-1], kernel_size = self.kernel_size, stride = self.stride, padding = self.padding, 
                                          dim = self.dim)
        conv_layer_end2 = conv_layer(self.chs[-1], self.chs[-1], kernel_size = self.kernel_size, stride = self.stride, padding = self.padding, 
                                          dim = self.dim)
        norm_layer_end = normalization_layer(normalization=self.normalization, num_channels=self.chs[-1],
                                           dim=self.dim)
        x = conv_layer_end1(x)
        x = norm_layer_end(x)
        x = self.activation_layer(x)
 
        x = conv_layer_end2(x)
        x = norm_layer_end(x)
        x = self.activation_layer(x)
            
        

        return x


In [12]:
paras = list(model.parameters())
for num,para in enumerate(paras):
    print('number:',num)
    print(para)
    print('_____________________________')

In [14]:
paras = list(model.parameters())

In [15]:
paras

[]

In [13]:
x = torch.randn(1,1,128,128,32)
model = DownBlock1(chs=[1,16,32,64,128],
                 #concatenate = True,
                 pooling = "max",
                 kernel_size = 3,
                 stride = 1,
                 padding = 1,
                 activation = 'leaky',
                 normalization = 'BN',
                 dim= 3)

model(x)


tensor([[[[[-1.3660e-03,  1.3096e-01,  1.2187e+00, -2.1254e-02],
           [-2.7243e-02,  2.5002e-01,  6.8810e-01,  6.5612e-01],
           [-6.4995e-03,  9.8996e-01,  3.9638e-01,  3.0948e-01],
           ...,
           [ 3.5684e-01,  6.9186e-01,  6.5880e-01,  8.1883e-01],
           [ 4.8672e-01,  6.7564e-01,  3.2251e-02,  3.6761e-01],
           [ 5.3180e-01,  6.2143e-01,  7.1048e-01,  2.3860e-01]],

          [[-6.4301e-02,  8.1877e-01,  3.3855e-01,  3.6440e-01],
           [ 9.1067e-01, -3.9725e-02,  1.0892e-01, -2.9553e-03],
           [-9.6898e-02,  9.0274e-01, -3.4818e-02, -1.4273e-02],
           ...,
           [-4.4403e-02,  2.1027e+00, -3.3676e-03,  1.5247e-01],
           [-5.0209e-02,  2.3212e+00, -8.0926e-02, -2.1190e-02],
           [ 3.3808e-01, -4.7242e-02, -1.9040e-03, -4.4178e-03]],

          [[-4.4923e-02, -1.7795e-02, -1.3895e-01, -7.5760e-02],
           [-6.0071e-02,  7.1465e-01,  1.4050e+00,  8.9919e-01],
           [ 1.4053e-02, -1.3159e-01,  9.9878e-01, -5.

In [None]:
optim = Adam(model.parameters(), lr=1e-3)

another try

In [None]:
class Net(nn.Module):  # 继承torch的module
    def __init__(self, n_feature, n_hidden, n_output):
        super(Net, self).__init__()  # 继承__init__功能
 # 输出层线性输出
        self.n_feature = n_feature
        self.n_hidden = n_hidden
        self.n_output = n_output
    def forward(self, x):
        # 激励函数（隐藏层的线性值）
        for i in range(3):
                    # 定义每一层用什么样的样式
            self.hidden1 = torch.nn.Linear(self.n_feature, self.n_hidden)  # 隐藏层线性输出
            self.hidden2 = torch.nn.Linear(self.n_hidden, self.n_hidden)  # 隐藏层线性输出
            self.predict = torch.nn.Linear(self.n_hidden, self.n_output) 
            x = torch.relu(self.hidden1(x))
            x = torch.relu(self.hidden2(x))
        x = self.predict(x)  # 输出值
        return x


In [18]:
net = Net(2, 5, 3)
paras = list(net.parameters())
for num,para in enumerate(paras):
    print('number:',num)
    print(para)
    print('_____________________________')

In [19]:
net(x)

RuntimeError: mat1 and mat2 shapes cannot be multiplied (16384x32 and 2x5)

In [16]:
optim = Adam(model.parameters(), lr=1e-3)

ValueError: optimizer got an empty parameter list

In [None]:
model.parameters()

In [None]:
y.shape

In [None]:
def crop(down_level, up_level):
    """
    the input down_level is each level's last tensor on the left (down) side. e.g. [1,1,256,256,64]; up_level is the first
    tensor on the right (up) side.
    Center-crops the encoder_layer to the size of the decoder_layer,
    so can catanate encoder layer to decoder layer
    This is only necessary for input sizes != 2**n for 'same' padding and always required for 'valid' padding.
    """
    if down_level.shape[2:] != up_level.shape[2:]:
        down_shape = down_level.shape[2:]
        up_shape = up_level.shape[2:]
#down_shape should bigger than up_shape
        if down_level.dim() == 4:  # 2D
            down_level = encoder_layer[
                            :,
                            :,
                            ((down_shape[0] - up_shape[0]) // 2):((down_shape[0] + up_shape[0]) // 2),
                            ((down_shape[1] - up_shape[1]) // 2):((down_shape[1] + up_shape[1]) // 2)
                            ]
        elif down_level.dim() == 5:  # 3D
            down_level = down_level[
                            :,
                            :,
                            ((down_shape[0] - up_shape[0]) // 2):((down_shape[0] + up_shape[0]) // 2),
                            ((down_shape[1] - up_shape[1]) // 2):((down_shape[1] + up_shape[1]) // 2),
                            ((down_shape[2] - up_shape[2]) // 2):((down_shape[2] + up_shape[2]) // 2),
                            ]
    return down_level, up_level


catanate -> add

In [None]:
class UpBlock(nn.Module):
    """

    """

    def __init__(self,
                 chs = [1,16,32,64,128],
                 concatenate:bool = False,
                 add : bool = False,
                 Crop:bool=True,
                 kernel_size = 3,
                 stride = 1,
                 padding = 1,
                 activation: str = 'leaky',
                 normalization: str = "BN",
                 dim: int = 3,
                 up_sample: str = 'nearest'
                 ):
        super().__init__()

        self.chs = chs[::-1]
        self.concatenate = concatenate
        self.add = add
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.activation = activation
        self.normalization = normalization
        self.dim = dim
        self.up_sample = up_sample
        self.Crop = Crop
        

    
        self.activation_layer = activation_layer(self.activation)
        #self.normalization_layer = normalization_layer(normalization=self.normalization, num_channels=out_chs,
                                           #dim=self.dim)       
        self.up_sample_layer = up_sample_layer(up_sample = self.up_sample)
        
        
        self.num_level = len(self.chs)-1


            
    def forward(self, tensor_to_cat, x_from_down):
        """ Forward pass
        Arguments:
            encoder_layer: Tensor from the encoder pathway
            decoder_layer: Tensor from the decoder pathway (to be up'd)
        """
        x = x_from_down
        
        if self.up_sample != 'transposed':
            
            for i in range(self.num_level-1):
                in_chs = self.chs[i]
                out_chs = self.chs[i+1]    
                
                conv_layer0 = conv_layer(in_chs, in_chs//2, kernel_size = 3, stride = 1, padding = 1, #to half the channels when up sampling
                                          dim = self.dim)
                
                x = self.up_sample_layer(x)  # double the image size
                x = conv_layer0(x) #half the channels
                
                if self.Crop:
                    cropped_tensor, up_tensor = crop(tensor_to_cat[i], x)  # cropping
                    if self.cat:
                        x = Cat(cropped_tensor,up_tensor)
                    elif self.add:
                        x = Add(cropped_tensor,up_tensor)
                    
                else:
                    if self.cat:
                        x = Cat(tensor_to_cat[-i-1],x)
                    elif self.add:
                        x = Add(tensor_to_cat[-i-1],x)
                    
            #conv-BN-ACTIVATION
                conv_layer1 = conv_layer(in_chs, out_chs, kernel_size = self.kernel_size, stride = self.stride, padding = self.padding, 
                                          dim = self.dim)
                conv_layer2 = conv_layer(out_chs, out_chs, kernel_size = self.kernel_size, stride = self.stride, padding = self.padding, 
                                          dim = self.dim)
                norm_layer = normalization_layer(normalization=self.normalization, num_channels=out_chs,
                                           dim=self.dim)
                x = conv_layer1(x)
                x = norm_layer(x)
                x = self.activation_layer(x)
                x = conv_layer2(x)
                x = norm_layer(x)
                x = self.activation_layer(x)
                
            conv_layer_end = conv_layer(self.chs[-2], self.chs[-1], kernel_size = self.kernel_size, stride = self.stride, padding = self.padding, 
                                          dim = self.dim)
            x = conv_layer_end(x)

        return x


In [None]:
model1 = UpBlock(chs = [1,16,32,64,128],
                 concatenate= True,
                 Crop=False,
                 kernel_size = 3,
                 stride = 1,
                 padding = 1,
                 activation= 'leaky',
                 normalization = "BN",
                 dim = 3,
                 up_sample= 'nearest')

In [None]:
z=model1(ts,y)

In [None]:
z.shape

In [None]:
class UNet(nn.Module):
    def __init__(self,
                 chs = [1,16,32,64,128],
                 concatenate:bool = False,
                 add:bool = False,
                 Crop:bool=True,
                 pooling = "max",
                 kernel_size = 3,
                 stride = 1,
                 padding = 1,
                 activation: str = 'leaky',
                 normalization: str = "BN",
                 dim: int = 3,
                 up_sample: str = 'nearest'
                 ):
        super().__init__()

        self.chs = chs

        self.pooling = pooling
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.activation = activation
        self.normalization = normalization
        self.dim = dim
        #self.chs = chs[::-1]
        self.concatenate = concatenate
        self.up_sample = up_sample
        self.Crop = Crop
        
        
        self.UpBlock = UpBlock(chs = self.chs,
                 concatenate= self.concatenate,
                 Crop=self.Crop,
                 kernel_size = self.kernel_size,
                 stride = self.stride,
                 padding = self.padding,
                 activation= self.activation,
                 normalization = self.normalization,
                 dim = self.dim,
                 up_sample= self.up_sample)
        
        self.DownBlock = DownBlock(chs=self.chs,
                 pooling = self.pooling,
                 kernel_size = self.kernel_size,
                 stride = self.stride,
                 padding = self.padding,
                 activation = self.activation,
                 normalization = self.normalization,
                 dim= self.dim)    
        
    def forward(self,x):
        x_from_down, tensor_to_cat = self.DownBlock(x)
        x = self.UpBlock(tensor_to_cat,x_from_down)
            
        return x
                
            

In [None]:
x1 = torch.randn(1,1,128,128,32)


In [None]:
unet = UNet(chs = [1,16,32,64,128],
                 concatenate= True,
                 Crop=False,
                 pooling = "max",
                 kernel_size = 3,
                 stride = 1,
                 padding = 1,
                 activation= 'leaky',
                 normalization = "BN",
                 dim= 3,
                 up_sample = 'nearest').to(device)

In [None]:
unet

In [None]:
result = unet(x1)

In [None]:
result.shape

start to try real image data

In [None]:
from scipy.io import loadmat

In [None]:
P = loadmat('xcat.mat')

In [None]:
p = P['data']

In [None]:
p1 = p[:,:,:,3]

In [None]:
p1 = p[:,:,256:256+32,3]

In [None]:
p1.shape

In [None]:
type(p1)

In [None]:
P1= torch.from_numpy(p1)

In [None]:
P1.shape

In [None]:
P1 = P1.unsqueeze(0)
P1 = P1.unsqueeze(0)

In [None]:
P1.shape

In [None]:
z = unet(P1)

In [None]:
z.shape

In [None]:
output= z[0,0,:,:,0]
output = output.detach().numpy()
plt.imshow(output)

# add noise

The function torch.randn produces a tensor with elements drawn from a Gaussian distribution of zero mean and unit variance. Multiply by sqrt(0.1) to have the desired variance.

x = torch.zeros(5, 10, 20, dtype=torch.float64)
x = x + (0.1**0.5)*torch.randn(5, 10, 20)

In [None]:
P1 = P1[0,0,:,:,0]
poisson_noise = torch.poisson(P1)*0.1

In [None]:
input1 = P1+poisson_noise

In [None]:
input1

In [None]:
parameters1 = list(unet.parameters())

In [None]:

optimizer = Adam(model.parameters(), lr=1e-3)
# mean-squared error loss
criterion = nn.MSELoss()
#criterion = nn.CrossEntropyLoss()
#criterion = nn.BCELoss()
if torch.cuda.is_available():
    unet = unet.cuda()
    criterion = criterion.cuda()

In [None]:
def train(epoch):
    unet.train()
    train_accuracy = 0.0
    train_loss = 0.0
    

    if torch.cuda.is_available():
            #images = Variable(images.cuda())            
        input1 = input1.to(device)
            #labels = Variable(labels.cuda())
            
    optimizer.zero_grad() 
    
    output = unet(input1) #give us prediction
        #print(outputs.shape)
        #outputs = torch.argmax(outputs, dim=1)
    label = P1
    loss = criterion(label,output)
    loss.backward()
    optimizer.step()
        
    train_loss += loss.cpu().data*input1.size(0)
        
    print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch, train_loss))
    
        #_,predicted = torch.max(outputs.data,1)
        
        #train_accuracy += int(torch.sum(prediction == labels.data))
    
   # train_accuracy = train_accuracy / len(train)
   # train_loss = train_loss / len(train)
    
 

In [None]:
num_epochs = 1
train_loss = 0
for epoch in range(num_epochs):
    train(epoch)

training error and test error plot