# Benford for 3D MRI - Training data generation

This scripts generates 3D magnetic resonance images (MRIs) corrupted with Rician noise from a set of noiseless MRIs. Then the noisy images are transformed according to certain fast 3D transforms. Finally, the divergence of the observed probability distribution of first and second digits of the transformed data with respect to the theoretical Benford probability distributions are measured according to several divergence metrics for discrete probability distributions.




In [1]:
%reset

In [1]:
# Import the relevant libraries
import nibabel as nib
import numpy as np
import pickle
import os
from glob import glob 

# Import the scipy package
import scipy
from scipy import stats
from scipy import fft

In [2]:
# Add Rician noise to a 3D MRI
# Inputs:
#     img3D=The noiseless 3D MRI as a numpy 3D array
#     StandardDeviation=The standard deviation of the Rician noise
#     NoiseScale=Scaling parameter for the noise
#     fixedSeed=Pseudorandom seed to be employed in the noise generation
# Output:
#     module=The noisy module 3D MRI as a numpy 3D array 
def ricianDistribution(img3D, StandardDeviation, NoiseScale, fixedSeed):
    if StandardDeviation==0:
        return img3D
    np.random.seed(seed=fixedSeed)
    distributionRandom1 = np.random.normal(size = img3D.shape)
    distributionRandom2 = np.random.normal(size = img3D.shape)
    # Generate the noise
    x = StandardDeviation * NoiseScale * distributionRandom1 + img3D
    y = StandardDeviation * NoiseScale * distributionRandom2
    module = np.sqrt(pow(x, 2) + pow(y, 2))
    # Return the image with noise 
    return  module

# Compute the Bhattacharyya distance between two discrete distributions
def bhattacharyya_distance(distribution1, distribution2):
    return -np.log(bhattacharyya_coefficient(distribution1, distribution2))

# Compute the Bhattacharyya coefficient between two discrete distributions
def bhattacharyya_coefficient(distribution_1, distribution_2):
    return np.sum(np.sqrt(distribution_1 * distribution_2))

# Compute the Kullback-Leibler divergence between two discrete distributions
def kullbackLeibler_divergence(distribution_1, distribution_2):
    return np.sum(np.where(distribution_1 != 0, distribution_1 * np.log(distribution_1/ distribution_2),0))

# Compute the total variation distance between two discrete distributions
def totalvariation_distance(distribution_1, distribution_2):
    return np.amax(np.absolute(distribution_1 - distribution_2))

# Compute the Hellinger distance between two discrete distributions
def hellinger_distance(distribution_1, distribution_2):
    return (1/np.sqrt(2))*np.sqrt(np.sum(np.power(np.sqrt(distribution_1) - np.sqrt(distribution_2),2)))

# Compute the Jensen-Shannon divergence between two discrete distributions
def jensenshannon_divergence(distribution_1, distribution_2):
    mean_distribution=0.5*(distribution_1+distribution_2)
    return 0.5*kullbackLeibler_divergence(distribution_1, mean_distribution)+0.5*kullbackLeibler_divergence(distribution_2, mean_distribution)

# Compute the first and second digits of the voxel values of a 3D image
# Inputs:
#     ImageData=The 3D image as a 3D numpy array
# Outputs:
#     FirstDigits=The first digits as a 3D numpy array
#     SecondDigits=The first digits as a 3D numpy array
#     FirstSecondDigits=The first and second digits as a 3D numpy array
def ComputeFirstSecondDigits(ImageData):
    FirstDigits=np.floor(ImageData/(np.power(10,np.floor(np.log10(ImageData)))))
    FirstSecondDigits=np.floor(ImageData/(np.power(10,np.floor(np.log10(ImageData))-1)))
    SecondDigits=FirstSecondDigits%10
    return (FirstDigits,SecondDigits,FirstSecondDigits)

# Compute the observed distribution of first and second digits
# Inputs: 
#     FirstDigits=The first digits as a 3D numpy array
#     SecondDigits=The second digits as a 3D numpy array
# Outputs:
#     ObservedDistributionFirst=The observed distribution of first digits as a (9,) numpy array
#     ObservedDistributionSecond=The observed distribution of second digits as a (10,) numpy array
def ObservedDistribution(FirstDigits,SecondDigits):
    ObservedDistributionFirst=np.zeros((9,))
    TotalCount=0
    for NdxDigit in range(1,10):
      ThisCount=np.count_nonzero(FirstDigits==NdxDigit)
      ObservedDistributionFirst[NdxDigit-1]=ThisCount
      TotalCount=TotalCount+ThisCount
    ObservedDistributionFirst=ObservedDistributionFirst/TotalCount
    ObservedDistributionSecond=np.zeros((10,))
    TotalCount=0
    for NdxDigit in range(0,10):
      ThisCount=np.count_nonzero(SecondDigits==NdxDigit)
      ObservedDistributionSecond[NdxDigit]=ThisCount
      TotalCount=TotalCount+ThisCount
    ObservedDistributionSecond=ObservedDistributionSecond/TotalCount
    return (ObservedDistributionFirst,ObservedDistributionSecond)

In [3]:
# Define the dataset to use, choosing one of the directories: 20OASIS_SynapseWeb_brain, OASIS-TRT-20_volumes, 12HLN, NKI-TRT-20_volumes, NKI-RS-22_volumes, MMRR-21_volumes
repository = '12HLN' #20OASIS_SynapseWeb_brain #OASIS-TRT-20_volumes #12HLN #NKI-TRT-20_volumes #NKI-RS-22_volumes #MMRR-21_volumes
inputDir = ('../input') 
outputDir = ('../output')
outputInputDir = outputDir + '/input/'

# Path with input images
dataMRI = os.path.join(inputDir, repository)
typeImage = 't1weighted.nii.gz' #'t1weighted_brain.nii.gz' #'t1weighted.nii.gz'
pathMRI = sorted(glob(dataMRI + '/**/' + typeImage, recursive=True))
SIZE = len(pathMRI)

extension = '.pkl'
outputFile = repository + extension
dataset_path = os.path.join(outputInputDir, outputFile)

In [None]:
# Create/check the directory to output
directory1 = outputDir

if os.path.isdir(directory1):
    print("Directory '% s' was already created" % directory1)
else:
    try:
        os.mkdir(directory1)
    except OSError:
        print("Directory '% s' failed" % directory1)
    else:
        print("Directory '% s' created" % directory1)

# Create/check a directory for the output in this file but this will be the input in another file
directory2 = outputInputDir
if os.path.isdir(directory2):
    print("Directory '% s' was already created" % directory2)
else:

    try:
        os.mkdir(directory2)
    except OSError:
        print("Directory '% s' failed" % directory2)
    else:
        print("Directory '% s' created" % directory2)


In [None]:
# Compute the distances from the observed probability distributions of first and second digits 
# to their theoretical ones, for several Rician noise levels

# Prepare the experiment parameters
NumNoiseLevels=20
NumNoiselessImages=SIZE
MaxNoiseLevel=0.4
TestedTransforms=["FFT","DCT","DST"]
TestedDivergences=["BD","KL","TV","H","JS"]
TestedDigits=["First","Second"]
TestedDivergenceFunctions={"BD":bhattacharyya_distance, "KL":kullbackLeibler_divergence, "TV":totalvariation_distance, "H":hellinger_distance, "JS":jensenshannon_divergence}

# Prepare intermediate variable
TransformedNoisyImage={}

# Prepare the divergences output variable
Divergences={}
for MyTransform in TestedTransforms:
  for MyDivergence in TestedDivergences:
    for MyDigit in TestedDigits:
      Divergences[MyTransform,MyDivergence,MyDigit]=np.zeros((NumNoiselessImages,NumNoiseLevels))

# Theoretical distribution of the first digit
BenfordDistributionFirst=np.log10(1+1/np.arange(1,10))

# Theoretical distribution of the second digit
BenfordDistributionSecond=np.zeros((10,))
for NdxDigit in range(0,10):
  BenfordDistributionSecond[NdxDigit]=sum(np.log10(1+1/(10*np.arange(1,10)+NdxDigit)))

# Generate the randomly chosen noise levels
np.random.seed(seed=1)
NoiseLevels = np.random.uniform(0,MaxNoiseLevel,(NumNoiselessImages,NumNoiseLevels))

# Loop for all noiseless images
for NdxImage in range(0,NumNoiselessImages):

  # Load the noiseless 3D MRI
  NoiselessImage = nib.load(pathMRI[NdxImage]).get_fdata()
  
  # The interquartile range is employed as the noise scaling parameter. This way the 
  # added noise level is robust against outliers in the values of the noiseless images.
  MyIQR=scipy.stats.iqr(NoiselessImage[NoiselessImage!=0])

  # Loop for all randomly chosen noise levels
  for NdxNoise in range(0,NumNoiseLevels):

    # Apply Rician noise
    NoiseLevel=NoiseLevels[NdxImage,NdxNoise]
    NoisyImage = ricianDistribution(img3D=NoiselessImage,StandardDeviation=NoiseLevel,NoiseScale=MyIQR,fixedSeed=NdxImage*NumNoiseLevels+NdxNoise)

    # Fast Fourier Transform WORKS
    TransformedNoisyImage["FFT"] = np.fft.fftn(NoisyImage)

    # Discrete Cosine Transform WORKS AT LEAST FOR TYPE 2
    TransformedNoisyImage["DCT"] = scipy.fft.dctn(NoisyImage,type=2)

    # Discrete Sine Transform WORKS AT LEAST FOR TYPE 2
    TransformedNoisyImage["DST"] = scipy.fft.dstn(NoisyImage,type=3)

    # Discrete Wavelet transform NOT WORKING
    # WaveletCoeffs = pywt.dwtn(NoisyImage, wavelet='db1', mode='symmetric')
    # NoisyImageFFT = WaveletCoeffs['ada']

    # Loop through all tested transforms
    for MyTransform in TestedTransforms:

      # Compute first and second digits
      FirstDigits,SecondDigits,FirstSecondDigits = ComputeFirstSecondDigits(np.real(TransformedNoisyImage[MyTransform]))

      # Compute observed distributions
      ObservedDistributionFirst,ObservedDistributionSecond = ObservedDistribution(FirstDigits,SecondDigits)
      
      # Compute the divergences from the observed distributions to the theoretical ones  
      for MyDivergence in TestedDivergences:
        Divergences[MyTransform,MyDivergence,"First"][NdxImage,NdxNoise]=TestedDivergenceFunctions[MyDivergence](ObservedDistributionFirst, BenfordDistributionFirst)
        Divergences[MyTransform,MyDivergence,"Second"][NdxImage,NdxNoise]=TestedDivergenceFunctions[MyDivergence](ObservedDistributionSecond, BenfordDistributionSecond)


In [None]:
with open(dataset_path,'wb') as MyFile:
  pickle.dump([NoiseLevels,Divergences],MyFile)