In [14]:
import numpy as np
import matplotlib.pyplot as plt

from keras.models import Sequential, Model #inear stack of neural network layers
from keras.layers import Dense, Dropout, Activation, Flatten, Input, UpSampling2D #core layers
from keras.layers import Convolution2D, MaxPooling2D, Conv2D, Reshape, Conv2DTranspose #convolutional layers to help efficiently train on image data
from keras.preprocessing.image import ImageDataGenerator
from numpy import clip

from PIL import Image

from sklearn.model_selection import train_test_split

from astropy.io import fits
from skimage.transform import resize

mywidth=myheight=256 

In [15]:
#setting up multi-gpu support, here using only gpu 0 and 1
from tensorflow import distribute
#strategy = distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

## Reading image and changing it to be between 0-255 for noise application

In [16]:
#FITS input

image_file = ' .fits'
fits.info(image_file)
image_data = fits.getdata(image_file, ext=0, view = 'ndarray')
image_data = resize(image_data, (256, 256))

image_zero = np.nan_to_num(image_data, nan=0.0)
image_zero_max = np.max(image_zero)
image_zero_min = np.min(image_zero)

image_nan_edited = np.nan_to_num(image_data, nan=image_zero_max)
image_nan_edited_max = np.max(image_nan_edited)
image_nan_edited_min = np.min(image_nan_edited)

image_minus_minimum = image_nan_edited - image_nan_edited_min
image_minus_max = np.max(image_minus_minimum)
image_minus_min = np.min(image_minus_minimum)

image_divide_maximum = image_minus_minimum/image_minus_max
image_divide_max = np.max(image_divide_maximum)
image_divide_min = np.min(image_divide_maximum)

scaled_ref = image_divide_maximum*255

plt.figure()
plt.imshow(scaled_ref)

print(np.max(scaled_ref))
print(np.min(scaled_ref))

FileNotFoundError: ignored

# Add Read Noise

In [None]:
#https://mwcraig.github.io/ccd-as-book/01-03-Construction-of-an-artificial-but-realistic-image.html
def read_noise(image, amount, gain=2):                 
    """
    Generate simulated read noise.
    
    Parameters
    ----------
    
    image: numpy array
        Image whose shape the noise array should match.
    amount : float
        Amount of read noise, in electrons.
    gain : float, optional
        Gain of the camera, in units of electrons/ADU.
    """
    shape = 256,256
    
    noise = np.random.normal(scale=amount/gain, size=shape)
    return noise



# ADD RANDOM GAUSSIAN NOISE


In [None]:
#Image transformation FUNCTIONS (images pixel values [0..255])

#add noise to picture
def noise(image,level):
    # adding a gaussian random nois with mu=0 and sigma=128
    temp=image+level*np.random.normal(0, 128, size=(image.shape[0],image.shape[1]))
    # #removing negative pixels (set to zero) and normalising back to 255
    regular= temp*(temp>0)                                                     
    mynorm=255./np.max(temp)                                                   
    # #converting the image back to integer numbers      
    temp = np.around(mynorm*regular).astype('int')                             
    return(temp)



In [None]:
#create a noisy ref of gaussian noise
Guussian_noisy=noise(scaled_ref,0.1)
Guussian_noisy=Guussian_noisy.astype('uint8')    

# ADD POISSON NOISE

In [None]:
#https://theailearner.com/2019/05/07/add-different-noise-to-an-image/
import cv2
import numpy as np
from skimage.util import random_noise

#Load the image
image_array = Guussian_noisy

  # Add noise to the image.
Poisson_noisy = random_noise(image_array, mode='poisson')

  # The above function returns a floating-point image
  # on the range [0, 1], thus we changed it to 'uint8'
  # and from [0,255]
Poisson_noisy = np.array(255*Poisson_noisy, dtype = 'uint8')                   


In [None]:
#create a noisy ref from the gaussian noise image with poisson and read (possion done above using gaussian noise image)                  
noisy_ref = Poisson_noisy + read_noise(Poisson_noisy, 5)

#normalise the pixel values between 0 and 1, since read noise application distorts the image
noisy_ref = (noisy_ref - np.min(noisy_ref))/(np.max(noisy_ref)-(np.min(noisy_ref)))

# Viewing noise application results

In [None]:
#testing image transformations
plt.imshow(scaled_ref)
plt.show()
print(np.min(scaled_ref))
print(np.max(scaled_ref))
print(scaled_ref.dtype)

plt.imshow(Guussian_noisy)
plt.show()
print(np.min(Guussian_noisy))
print(np.max(Guussian_noisy))
print(Guussian_noisy.dtype)

plt.imshow(Poisson_noisy)
plt.show()
print(np.min(Poisson_noisy))
print(np.max(Poisson_noisy))
print(Poisson_noisy.dtype)

plt.imshow(noisy_ref)
plt.show()
print(np.min(noisy_ref))
print(np.max(noisy_ref))
print(noisy_ref.dtype)

## Use data augmentation to create a dataset from a single image

In [None]:
#First we need to add a channel index at the end of the np.array
channel1 = np.expand_dims(scaled_ref, 2)                                   
#Then we can use image_data_generator from Keras to create a dataset
#define our "samples" as being just one image and add a dimension for this (at the start)
samples= np.expand_dims(channel1, 0)
# create image data augmentation generator
datagen = ImageDataGenerator(rotation_range=360,height_shift_range=0.25,width_shift_range=0.25,fill_mode="constant",)
datagen.fit(samples)
#create the new dataset (both clean, noisy and very noisy at the same time)
maxsize=3000
#storage arrrays
augmented=[]
noisy=[]
#instanciating the generator
it = datagen.flow(samples, batch_size=1)
#filling the arrays
for i in range(maxsize):
  new_image=it.next()[0]
# normalising the image
  augmented.append(new_image.astype('float32')/255.)
# noise addition to image and normalising
  noisy.append(noise(new_image[:,:,0],0.1).astype('float32')/255.)

#Re-shaping the sets so that they are in the sample, x,y, channel order
created_set=np.reshape(augmented, [-1, mywidth,myheight, 1])
noisycreated_set=np.reshape(noisy, [-1, mywidth,myheight, 1])

In [None]:
noisycreated_set=np.reshape(noisycreated_set, [-1, mywidth,myheight])
noisycreated_set = random_noise(noisycreated_set, mode='poisson')
noisycreated_set = np.array(255*noisycreated_set, dtype = 'uint8')
noisycreated_set = noisycreated_set + read_noise(noisycreated_set, 5)

#normalise the pixel values between 0 and 1, since read noise application distorts the image

noisycreated_set = noisycreated_set - np.min(noisycreated_set)
noisycreated_set = noisycreated_set / np.max(noisycreated_set)
noisycreated_set=np.reshape(noisycreated_set, [-1, mywidth,myheight, 1])

In [None]:
#Quick check of the outputs
plt.imshow(created_set[99,:,:,0])
plt.show()
print(np.min(created_set[0]))
print(np.max(created_set[0]))
print(created_set[0].dtype)
plt.imshow(noisycreated_set[99,:,:,0])
plt.show()
print(np.min(noisycreated_set[0]))
print(np.max(noisycreated_set[0]))
print(noisycreated_set[0].dtype)

In [None]:
#splitting the dataset into training and test (70/30)
X_train, X_test = train_test_split(created_set, test_size=0.3,random_state=2345173645)
X_train_noisy, X_test_noisy = train_test_split(noisycreated_set, test_size=0.3,random_state=2345173645)

## Auto-encoder definition and training & cross-check

In [None]:
#adapted from https://keras.io/examples/vision/autoencoder/
#bigger model (GET ALL gpuS)
strategy = distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"])
with strategy.scope():
    input = Input(shape=channel1.shape)

# Encoder
    x = Conv2D(32, (3, 3), activation="relu", padding="same")(input)
    x = MaxPooling2D((2, 2), padding="same")(x)
    x = Conv2D(16, (3, 3), activation="relu", padding="same")(x)
    x = MaxPooling2D((2, 2), padding="same")(x)

# Decoder
    x = Conv2DTranspose(16, (3, 3), strides=2, activation="relu", padding="same")(x)
    x = Conv2DTranspose(32, (3, 3), strides=2, activation="relu", padding="same")(x)
    x = Conv2D(1, (3, 3), activation="relu", padding="same")(x)

# Autoencoder
    autoencoder = Model(input, x)
    autoencoder.compile(optimizer="adam", loss="binary_crossentropy")
    autoencoder.summary()


In [None]:
batch_size = 4

#history=autoencoder.fit(X_train_noisymore, X_train_noisy,
history=autoencoder.fit(X_train_noisy, X_train,
                validation_data=(X_test_noisy, X_test),
                epochs=20,
                shuffle=True,
                batch_size=batch_size)

In [None]:
f = plt.figure()
# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])

plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper right')
plt.show()

In [None]:
# Predict the Autoencoder output from noisy test image
noisy_decoded = autoencoder.predict(samples)

# noisy_decoded = noisy_decoded*(np.max(image_data))

plt.imshow(noisy_decoded[0].reshape(256,256))
plt.show()

## Comparison of results using MSE and SSIM metrics

In [None]:
#Comparison metrics
from skimage.metrics import structural_similarity as ssim

def mse(imageA, imageB):
	# the 'Mean Squared Error' between the two images is the
	# sum of the squared difference between the two images;
	# NOTE: the two images must have the same dimension
	err = np.sum((imageA.astype("float") - imageB.astype("float")) ** 2)
	err /= float(imageA.shape[0] * imageA.shape[1])
	
	# return the MSE, the lower the error, the more "similar"
	# the two images are
	return err

def compare_images(imageA, imageB, title):
	# compute the mean squared error and structural similarity
	# index for the images
  m = mse(imageA, imageB)
  s = ssim(imageA, imageB)
	# setup the figure
  fig = plt.figure(title)
  print("MSE: %.6f, SSIM: %.2f" % (m, s))
  plt.suptitle("MSE: %.6f, SSIM: %.2f" % (m, s))
	# show first image
  ax = fig.add_subplot(1, 2, 1)
  plt.imshow(imageA)#, cmap = plt.cm.gray)
  plt.axis("off")
	# show the second image
  ax = fig.add_subplot(1, 2, 2)
  plt.imshow(imageB)#, cmap = plt.cm.gray)
  plt.axis("off")
	# show the images
  plt.show()

In [None]:
#FITS input

image_file2 = ' .fits'
fits.info(image_file2)
image_data_stack = fits.getdata(image_file2, ext=0, view = 'ndarray')
image_data_stack = resize(image_data_stack, (256, 256))

image_zero_stack = np.nan_to_num(image_data_stack, nan=0.0)
image_zero_max_stack = np.max(image_zero_stack)
image_zero_min_stack = np.min(image_zero_stack)

image_nan_edited_stack = np.nan_to_num(image_data_stack, nan=image_zero_max_stack)
image_nan_edited_max_stack = np.max(image_nan_edited_stack)
image_nan_edited_min_stack = np.min(image_nan_edited_stack)

image_minus_minimum_stack = image_nan_edited_stack - image_nan_edited_min_stack
image_minus_max_stack = np.max(image_minus_minimum_stack)
image_minus_min_stack = np.min(image_minus_minimum_stack)

image_divide_maximum_stack = image_minus_minimum_stack/image_minus_max_stack
image_divide_max_stack = np.max(image_divide_maximum_stack)
image_divide_min_stack = np.min(image_divide_maximum_stack)

Alternate_ref = image_divide_maximum_stack*255
print(np.max(Alternate_ref))
print(np.min(Alternate_ref))
print(Alternate_ref.dtype)


plt.figure()
plt.imshow(Alternate_ref)

In [None]:
# #Normalising images for comparison
Alternate_reference=Alternate_ref.astype('float32')/255. #Stacked image

Normalised_reference=scaled_ref.astype('float32')/255. #Original image

Normalised_decoded=noisy_decoded[0].reshape(256,256)/np.max(noisy_decoded[0]) #Decoded

Normalised_noisy_original=noisy_ref.astype('float32') #Noisy image

In [None]:
print(np.min(Alternate_reference))
print(np.max(Alternate_reference))
print(Alternate_reference.dtype)
print()
print(np.min(Normalised_reference))
print(np.max(Normalised_reference))
print(Normalised_reference.dtype)
print()
print(np.min(Normalised_decoded))
print(np.max(Normalised_decoded))
print(Normalised_decoded.dtype)
print()
print(np.min(Normalised_noisy_original))
print(np.max(Normalised_noisy_original))
print(Normalised_noisy_original.dtype)

In [None]:
#Compare images original reference with de-noised
compare_images(Normalised_reference,Normalised_decoded,"boo")

In [None]:
#Compare images original reference with noisy original
compare_images(Normalised_reference,Normalised_noisy_original,"boo")

In [None]:
#Compare alternate with denoised
compare_images(Alternate_reference,Normalised_decoded,"boo")

In [None]:
#Compare alternate with noisy original
compare_images(Alternate_reference,Normalised_noisy_original,"boo")

In [None]:
#Compare alternate with original
compare_images(Alternate_reference,Normalised_reference,"boo")

# Gini

In [None]:
def gini(array):
    """Calculate the Gini coefficient of a numpy array."""
    # based on bottom eq: http://www.statsdirect.com/help/content/image/stat0206_wmf.gif
    # from: http://www.statsdirect.com/help/default.htm#nonparametric_methods/gini.htm
    array = array.flatten() #all values are treated equally, arrays must be 1d
    if np.amin(array) < 0:
        array -= np.amin(array) #values cannot be negative
    #array += 0.0000001 #values cannot be 0
    array = np.sort(array) #values must be sorted
    index = np.arange(1,array.shape[0]+1) #index per array element
    n = array.shape[0]#number of array elements
    return ((np.sum((2 * index - n  - 1) * array)) / (n * np.sum(array))) #Gini coefficient



print("Gini coefficient for original image is", gini(Normalised_reference))
print("Gini coefficient for decoded image is", gini(Normalised_decoded))
print("Gini coefficient for noisy image is", gini(Normalised_noisy_original))
print("Gini coefficient for alternate image is", gini(Alternate_reference))

# Export FITS

In [None]:
f = plt.figure()
plt.imshow(Normalised_reference)
plt.axis('off')
plt.show()

print(np.max(Normalised_reference))
print(np.min(Normalised_reference))
print()

reverse = Normalised_reference * image_minus_max
print(np.max(reverse))
print(np.min(reverse))
print()

reverse = reverse + image_nan_edited_min
print(np.max(reverse))
print(np.min(reverse))
print()

hdu = fits.PrimaryHDU(reverse)
hdul = fits.HDUList([hdu])
hdul.writeto(' .fits')

In [None]:
f = plt.figure()
plt.imshow(Normalised_noisy_original)
plt.axis('off')
plt.show()

print(np.max(Normalised_noisy_original))
print(np.min(Normalised_noisy_original))
print()

reverse = Normalised_noisy_original * image_minus_max
print(np.max(reverse))
print(np.min(reverse))
print()

reverse = reverse + image_nan_edited_min
print(np.max(reverse))
print(np.min(reverse))
print()

hdu = fits.PrimaryHDU(reverse)
hdul = fits.HDUList([hdu])
hdul.writeto(' .fits')

In [None]:
f = plt.figure()
plt.imshow(Normalised_decoded)
plt.axis('off')
plt.show()

print(np.max(Normalised_decoded))
print(np.min(Normalised_decoded))
print()

reverse = Normalised_decoded * image_minus_max
print(np.max(reverse))
print(np.min(reverse))
print()

reverse = reverse + image_nan_edited_min
print(np.max(reverse))
print(np.min(reverse))
print()

hdu = fits.PrimaryHDU(reverse)
hdul = fits.HDUList([hdu])
hdul.writeto(' .fits')

In [None]:
f = plt.figure()
plt.imshow(Alternate_reference)
plt.axis('off')
plt.show()

print(np.max(Alternate_reference))
print(np.min(Alternate_reference))
print()

reverse = Alternate_reference * image_minus_max_stack
print(np.max(reverse))
print(np.min(reverse))
print()

reverse = reverse + image_nan_edited_min_stack
print(np.max(reverse))
print(np.min(reverse))
print()

hdu = fits.PrimaryHDU(reverse)
hdul = fits.HDUList([hdu])
hdul.writeto(' .fits')