In [3]:
import torch
import torch.nn as nn
from collections import OrderedDict
from pathlib import Path
import numpy as np
from pydicom import dcmread
import matplotlib.pyplot as plt
from pathlib import Path
from pydicom import dcmread
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from scipy.ndimage import gaussian_filter
from scipy.interpolate import splrep, splev
from scipy.interpolate import CubicSpline
from scipy.interpolate import interp1d, RegularGridInterpolator
import scipy.optimize as optimize

In [4]:
def conv(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1, bias=True, mode='CBR', negative_slope=0.2):
    L = []
    for t in mode:
        if t == 'C':
            L.append(nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, bias=bias))
        elif t == 'T':
            L.append(nn.ConvTranspose2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, bias=bias))
        elif t == 'B':
            L.append(nn.BatchNorm2d(out_channels, momentum=0.9, eps=1e-04, affine=True))
        elif t == 'I':
            L.append(nn.InstanceNorm2d(out_channels, affine=True))
        elif t == 'R':
            L.append(nn.ReLU(inplace=True))
        elif t == 'r':
            L.append(nn.ReLU(inplace=False))
        elif t == 'L':
            L.append(nn.LeakyReLU(negative_slope=negative_slope, inplace=True))
        elif t == 'l':
            L.append(nn.LeakyReLU(negative_slope=negative_slope, inplace=False))
        elif t == '2':
            L.append(nn.PixelShuffle(upscale_factor=2))
        elif t == '3':
            L.append(nn.PixelShuffle(upscale_factor=3))
        elif t == '4':
            L.append(nn.PixelShuffle(upscale_factor=4))
        elif t == 'U':
            L.append(nn.Upsample(scale_factor=2, mode='nearest'))
        elif t == 'u':
            L.append(nn.Upsample(scale_factor=3, mode='nearest'))
        elif t == 'v':
            L.append(nn.Upsample(scale_factor=4, mode='nearest'))
        elif t == 'M':
            L.append(nn.MaxPool2d(kernel_size=kernel_size, stride=stride, padding=0))
        elif t == 'A':
            L.append(nn.AvgPool2d(kernel_size=kernel_size, stride=stride, padding=0))
        else:
            raise NotImplementedError('Undefined type: '.format(t))
    return sequential(*L)



In [5]:
def sequential(*args):
    """Advanced nn.Sequential.

    Args:
        nn.Sequential, nn.Module

    Returns:
        nn.Sequential
    """
    if len(args) == 1:
        if isinstance(args[0], OrderedDict):
            raise NotImplementedError('sequential does not support OrderedDict input.')
        return args[0]  # No sequential is needed.
    modules = []
    for module in args:
        if isinstance(module, nn.Sequential):
            for submodule in module.children():
                modules.append(submodule)
        elif isinstance(module, nn.Module):
            modules.append(module)
    return nn.Sequential(*modules)


In [6]:
def describe_model(model):
    if isinstance(model, torch.nn.DataParallel):
        model = model.module
    msg = '\n'
    msg += 'models name: {}'.format(model.__class__.__name__) + '\n'
    msg += 'Params number: {}'.format(sum(map(lambda x: x.numel(), model.parameters()))) + '\n'
    msg += 'Net structure:\n{}'.format(str(model)) + '\n'
    return msg


In [7]:
class DnCNN(nn.Module):
    def __init__(self, in_nc=1, out_nc=1, nc=64, nb=17, act_mode='R'):
        """
        # ------------------------------------
        in_nc: channel number of input
        out_nc: channel number of output
        nc: channel number
        nb: total number of conv layers
        act_mode: batch norm + activation function; 'BR' means BN+ReLU.
        # ------------------------------------
        Batch normalization and residual learning are
        beneficial to Gaussian denoising (especially
        for a single noise level).
        The residual of a noisy image corrupted by additive white
        Gaussian noise (AWGN) follows a constant
        Gaussian distribution which stablizes batch
        normalization during training.
        # ------------------------------------
        """
        super(DnCNN, self).__init__()
        assert 'R' in act_mode or 'L' in act_mode, 'Examples of activation function: R, L, BR, BL, IR, IL'
        bias = True
        m_head = conv(in_nc, nc, mode='C'+act_mode[-1], bias=bias)
        m_body = [conv(nc, nc, mode='C'+act_mode, bias=bias) for _ in range(nb-2)]
        m_tail = conv(nc, out_nc, mode='C', bias=bias)

        self.model = sequential(m_head, *m_body, m_tail)

    def forward(self, x):
        n = self.model(x)
        return x-n


In [8]:
model = DnCNN()
print(describe_model(model))



models name: DnCNN
Params number: 555137
Net structure:
DnCNN(
  (model): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): ReLU(inplace=True)
    (6): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (17)

In [9]:
torch.load("dncnn_25.pth")

FileNotFoundError: [Errno 2] No such file or directory: 'dncnn_25.pth'

In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.load_state_dict(torch.load("dncnn_25.pth"), strict=True)
model.eval()
for k, v in model.named_parameters():
    v.requires_grad = False
model = model.to(device)
print('Model path: {:s}'.format("dncnn_25.pth"))
number_parameters = sum(map(lambda x: x.numel(), model.parameters()))
print('Params number: {}'.format(number_parameters))


FileNotFoundError: [Errno 2] No such file or directory: 'dncnn_25.pth'

In [11]:
 model

DnCNN(
  (model): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): ReLU(inplace=True)
    (6): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (17): ReLU(inplace=True)
    (18): Conv2d(64, 64, kernel_size

In [12]:
cardio_path = Path().resolve().parents[2] / "dane" / "KARDIO ZAMKNIETE"
data_path = cardio_path / "A001" / "DICOM" / "P1" / "E1" / "S1" / "l199"

In [15]:
def convert_sequence_to_array(data_path: Path):
    return np.stack([np.flip(dcmread(file).pixel_array) for file in data_path.iterdir()], axis=0)

In [17]:
stacked = convert_sequence_to_array(Path()/"l199")

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'l199'

In [None]:
image = stacked[200]

In [None]:
def calculate_NPS_2D(ROI_array, pixel_size_x, pixel_size_y):
    NPS_array = []
    for roi in ROI_array:
        rows, cols = roi.shape
        x = np.arange(cols)
        y = np.arange(rows)
        x, y = np.meshgrid(x, y)
        z = np.polyfit(x.ravel(), y.ravel(), 2)
        poly_fit = np.polyval(z, x)
        # Subtract the polynomial fit from the image
        roi = roi - poly_fit
        # calculation od DFT 2D
        dft = np.fft.fft2(roi - np.mean(roi))
        # shift of FT
        shifted_dft = np.fft.fftshift(dft)
        # calcation of absolute value
        NPS_array.append(np.abs(shifted_dft)**2)
    N = len(NPS_array)
    Ly = NPS_array[0].shape[0]
    Lx = NPS_array[0].shape[1]
    return (1/N)*(1/(Lx*Ly))*(np.sum(NPS_array,axis=0)*pixel_size_x*pixel_size_y)

In [None]:
pixel_size = 0.402

In [None]:
def select_ROI_array_ractangle(image, y0, x0, size, num, plot=False):
    r = np.sqrt((y0-256)**2+(x0-256)**2)
    ROI_array = []
    if plot:
        plt.pcolormesh(image)
        plt.colorbar();
        plt.title("Selected ROIs")
    for i in range(num):
        y = y0 + size*(i%3)
        if(i%3==0):
            x = x0 + size*(i//3)
        if plot:
            plt.gca().add_patch(Rectangle((x-size//2,y-size//2),size,size,
                        edgecolor='red',
                        facecolor='none',
                        lw=1))
        ROI_array.append(image[y-size//2:y+size//2,x-size//2:x+size//2])
    return ROI_array

In [None]:
def calculate_sigma(image, pixel_size):
    ROI_array = select_ROI_array_ractangle(image=image, y0=238, x0=432, size=16, num=9)
    NPS = calculate_NPS_2D(ROI_array, pixel_size, pixel_size)
    dx = 1
    dy = 1
    return np.sqrt(np.trapz(np.trapz(NPS, dx=dx, axis=1), dx=dy, axis=0))

In [None]:
plt.pcolormesh(image)
plt.title(f"Input image \n Standard noise {np.std(image):.2f} \n Noise calculated with NPS: {calculate_sigma(image, pixel_size):.2f}")
plt.colorbar();

In [None]:
def np_to_torch(np_array):
    """
    Convert numpy array to torch tensor
    """
    return torch.from_numpy(np_array).float()

In [None]:
 np_to_torch(img_noise).reshape(1, 1, 100, 100)

In [None]:
def single2tensor4(img):
    return torch.from_numpy(img).float().unsqueeze(0)

In [None]:
def uint2single(img):
    return np.float32(img/255.)

In [None]:
img_denoised = model( single2tensor4(image))

In [None]:
def torch_to_np(torch_array):
    """
     Convert torch tensor to numpy array
     """
    return np.squeeze(torch_array.detach().cpu().numpy())

In [None]:
 torch_to_np(img_denoised)

In [None]:
plt.pcolormesh( torch_to_np(img_denoised))
plt.title(f"Input image \n Standard noise {np.std(torch_to_np(img_denoised)):.2f} \n Noise calculated with NPS: {calculate_sigma(torch_to_np(img_denoised), pixel_size):.2f}")
plt.colorbar();

In [None]:
def calculate_NPS_1D_test(NPS_2D, size_of_pixel_in_spatial_domain):
    cen_x = NPS_2D.shape[1]//2
    cen_y = NPS_2D.shape[1]//2

    # Find radial distances 
    [X, Y] = np.meshgrid(np.arange(NPS_2D.shape[1])-cen_x, np.arange(NPS_2D.shape[1])-cen_y)
    R = np.sqrt(np.square(X)+np.square(Y))

    rad = np.arange(0, np.max(R), 1)
    intensity = np.zeros(len(rad))
    index = 0
    bin_size = 1

    for i in rad:
        mask = (np.greater(R, i - bin_size) & np.less(R, i + bin_size))
        rad_values = NPS_2D[mask]
        intensity[index] = np.mean(rad_values)
        index += 1
        # Plot data
    x = rad* 1.0 / (size_of_pixel_in_spatial_domain* NPS_2D.shape[1])
    y = intensity
    return x, y

In [None]:
def make_plot(x_points, y_points, title, legend, min_x=0, max_x =1.0, num=64):
    x_interpolate = np.linspace(min_x , max_x, num)
    cubic_spline = CubicSpline(x_points, y_points)
    plt.plot(x_points, y_points,'.')
    plt.plot(x_interpolate, cubic_spline(x_interpolate),color='blue',label=legend)
    plt.title(title)
    plt.legend()
    plt.xlabel("$f_{r} [mm^{-1}]$")
    plt.xlim(min_x, max_x);
    plt.grid(which="minor", alpha=0.3)
    plt.grid(which="major", alpha=0.7)

In [None]:
ROI_array_rectangle =select_ROI_array_ractangle(image=np.flipud(image[30:130, 250:350]), y0=60, x0=60, size=8, num=9, plot=True)
plt.show()


NPS_2D_rectangle = calculate_NPS_2D(ROI_array_rectangle, pixel_size, pixel_size)
rad_1, intensity_1 = calculate_NPS_1D_test(NPS_2D_rectangle,pixel_size)
make_plot(rad_1, intensity_1, title="CT Scan - rectangle ROI of orignal image", legend = " NPS 1D", min_x=0, max_x =1.0, num=64)

In [None]:
ROI_array_rectangle =select_ROI_array_ractangle(image=np.flipud(torch_to_np(img_denoised)[30:130, 250:350]), y0=60, x0=60, size=8, num=9, plot=True)
plt.show()


NPS_2D_rectangle = calculate_NPS_2D(ROI_array_rectangle, pixel_size, pixel_size)
rad_1, intensity_1 = calculate_NPS_1D_test(NPS_2D_rectangle,pixel_size)
make_plot(rad_1, intensity_1, title="CT Scan - rectangle ROI of orignal image", legend = " NPS 1D", min_x=0, max_x =1.0, num=64)

In [None]:
plt.pcolormesh(image-torch_to_np(img_denoised))

In [18]:
path = (
    Path().resolve().parents[2]
    / "dane"
    / "KARDIO ZAMKNIETE"
    / "A001"
    / "DICOM"
    / "P1"
    / "E1"
    / "S1"
)
images = get_data(path)
model = PDnCNN()
model.load_state_dict(torch.load(r"C:\Users\marci\Desktop\ICM\MGR\programy\Noise-CT-Scans\train\dncnn_25.pth"), strict=True)
model.eval()
for k, v in model.named_parameters():
    v.requires_grad = False

y = model(torch.from_numpy(images[200]).float().unsqueeze(0))

make_dot(y.mean(), params=dict(model.named_parameters()),  show_attrs=True, show_saved=True)
plt.show()

NameError: name 'get_data' is not defined