#Implements encoder/decoder for weak lensing outputs

The major idea is to see if I can compress the data in the snapshot files.
The result is that the compression of many different algorithms based on CNNs (of different depths) is not so much different than averaging neighboring cells.  This in retrospect is not so surprising as there are differences on the cell scale in the maps that make compression challenging.

Set configurations for COLAB if running there

In [None]:
import os
use_COLAB = 1

if use_COLAB == 1:
  #mount drive
  from google.colab import drive
  drive.mount('/content/gdrive')

  WORK_AREA = '/content/gdrive/My Drive/weaklensing_ML/' #columbialensing/
  os.chdir(WORK_AREA)

  #get GPU info
  gpu_info = !nvidia-smi
  gpu_info = '\n'.join(gpu_info)
  if gpu_info.find('failed') >= 0:
    print('Not connected to a GPU')
  else:
    print(gpu_info)


## extract tarfiles if necessary and set specs for run



In [None]:

import tarfile
import os
import shutil
from astropy.io import fits
import numpy as np
from scipy.ndimage import zoom
import re
import tensorflow as tf
import random

#whether we are training
TRAIN = False


# Specify the directory containing the .tar files
directory_path = './columbialensing/'


max_cosmologies = -1 #for testing.  Set to -1 to use everything.

number_batches = 10
#validation_split = 0.4  # use this fraction of the data for validation
normalize_by_RMS = False #set to one if you want to renormalize by RMS



# image_size
image_size = 1024 #really makes sense to use the 1024s
sub_image_size = 64 #needs to divide image size



number_subimages_across =image_size//sub_image_size


number_fits_files = 512
suffix = f"_{image_size}"
extract_tarfiles = False  #if I need to extract tarfiles

run_suffix = rf"im{image_size}"

#extracts only if indicated (could make this more elegant by checking to see if they exist)
if extract_tarfiles:
    # Use a regular expression to match .tar files with the desired suffix
    pattern = re.compile(rf"{suffix}.tar$")

    # List all matching .tar files in the directory
    all_tar_files = [f for f in os.listdir(directory_path) if pattern.search(f)]

    # Extract the tar archive
    for tar_file in all_tar_files:
        #print(tar_file)
        tar_file_path = os.path.join(directory_path, tar_file)
        with tarfile.open(tar_file_path, 'r') as archive:
            archive.extractall(path=directory_path)


# Read into memory the data

In [None]:
def get_labels_for_file(dir_name):
    """
    Extracts labels from the tar file name.
    For the file "Om0.183_si0.958_256.tar", the labels will be [0.183, 0.958].

    Args:
    - tar_file_name (str): Name of the tar file.

    Returns:
    - list: List containing the two labels extracted from the filename.
    """
    # Split the filename on underscores
    parts = dir_name.split('_')

    # Extract the numeric values for 'Om' and 'si'
    om_label = float(parts[0][2:])
    si_label = float(parts[1][2:])

    return [om_label, si_label]


#now loop through all files in the
pattern = re.compile(rf"{suffix}$")
#all_directories = [f for f in os.listdir(directory_path) if pattern.search(f)]
all_directories = ["Om0.268_si0.801"] # "Om0.283_si0.805_256"
num_cosmologies = len(all_directories)

random.shuffle(all_directories) #this makes it so that there is no particular order for the directories
#print(all_directories)

#tensor of labels; there are two labels for each
numsubimages = number_subimages_across**2
number_images = number_fits_files*numsubimages
#cosmology_labels = np.empty((len(all_directories), number_images, 2), dtype=np.float16)

RMS =0 #first time set to zero
data_array = np.empty((num_cosmologies, number_images, sub_image_size, sub_image_size), dtype=np.float16)
for idy, dir_name in enumerate(all_directories):
    if max_cosmologies>0 and idy >= max_cosmologies:
        break

    #if idy%10 ==0:
    print("reading in", dir_name)
    dir_path = os.path.join(directory_path, dir_name)

    all_files = os.listdir(dir_path)
    fits_files = [f for f in all_files if f.endswith('.fits')]



    for idx, file in enumerate(fits_files):
        with fits.open(os.path.join(dir_path, file)) as hdul:

            original_data = hdul[0].data

            if RMS == 0: #get RMS to divide by for first file to normalize everything
                RMS = np.sqrt(np.var(hdul[0].data))
                print(f"RMS={RMS}")

            ##get rid of NANs, which affects a few files
            #if np.isnan(original_data).any():
            #    continue
            #I've cleaned this out already
            for i in range(number_subimages_across):
                for j in range(number_subimages_across):
                    data_array[idy][numsubimages*idx+ number_subimages_across*i+j] = original_data[sub_image_size*i:sub_image_size*(i+1),\
                                                                  sub_image_size*j:sub_image_size*(j+1)]

    #since all fits files in one directory have the same label
    cosmology = get_labels_for_file(dir_name)
    #cosmology_labels[idy] = np.array([cosmology for i in range(number_fits_files)])


    #flatten data_array[idy][numsubimages*idx+ number_subimages_across*i+j]
WL_tensor = tf.convert_to_tensor(data_array)

WL_tensor = tf.reshape(WL_tensor, (-1, WL_tensor.shape[2], WL_tensor.shape[3]));

WL_tensor = WL_tensor[..., np.newaxis]  # Add channel dimension

# create decoder-encoder CNN with minimal number of layers, but final dense layer.  

  Here n sets the compression with the size compressed by the factor 4^n.  The result is n+1 layers, and most experiments I've run to test are n=2, or a compression by a factor of 16.

  The n CNN layers have number_channels channels, where I've experimented with 64 and 256

If load_saved= 1, it loads a trained version of this decoder-encoder




In [None]:
from tensorflow.keras import layers, models, regularizers
from keras.layers import LeakyReLU, BatchNormalization, Dropout

#Parameters for network
n=2 #number of layers (needs to be >2)
number_channels = 64
dropout_rate = 0

L1weight = 0 #1e-8
act_string = LeakyReLU(alpha=0.1) #okay, not a string

#string with parameters for saving
sci_notation = "{:.0e}".format(L1weight)
exponent = sci_notation.split('e')[-1]
save_string = f'n{n}_nc{number_channels}_d{dropout_rate }_logL1w{exponent}'

# Conditionally add L1 regularizer if L1weight is greater than 0
if L1weight > 0:
    regularizer = regularizers.l1(L1weight)
else:
    regularizer = None

def create_simple_encoder(input_shape, n,  number_channels=number_channels, dropout_rate=dropout_rate):
    if n<2:
        print("n is too small.  n >=2")

    model = models.Sequential()
    model.add(layers.InputLayer(input_shape))

    model.add(layers.Conv2D(number_channels//2, (3, 3), activation=act_string, padding='same',\
              kernel_regularizer=regularizer))
    model.add(layers.MaxPooling2D((2, 2)))
    if dropout_rate >0:
      model.add(Dropout(dropout_rate))

    for nlayer in range(1,n):
        model.add(layers.Conv2D(number_channels, (3, 3), activation=act_string, padding='same',\
                  kernel_regularizer=regularizer))
        model.add(BatchNormalization())
        model.add(layers.MaxPooling2D((2, 2)))
        if dropout_rate >0:
          model.add(Dropout(dropout_rate))

    # Adding a Dense layer for encoding
    model.add(layers.Flatten())
    model.add(layers.Dense(units=(input_shape[0] * input_shape[1]) // (4 ** n), activation=act_string, \
              kernel_regularizer=regularizer))
    if dropout_rate >0:
      model.add(Dropout(dropout_rate))

    return model


def create_simple_decoder(encoded_length, original_shape, n, number_channels=number_channels, dropout_rate=dropout_rate):
    model = models.Sequential()

    # The input is a flat array
    model.add(layers.InputLayer((encoded_length,)))



    # Expanding the flat array to a 3D tensor
    model.add(layers.Dense(units=np.prod(encoded_length*number_channels), activation=act_string,\
              kernel_regularizer=regularizer))

    # Calculate the dimensions for the first reshape
    # It should match the output size of the last MaxPooling layer in the encoder
    reshape_dims = (original_shape[0] // (2 ** n), original_shape[1] // (2 ** n), number_channels)

    model.add(layers.Reshape(reshape_dims))

    # Upsampling to original size, looping over number of layers
    for nlayer in range(1, n):
        model.add(layers.Conv2DTranspose(number_channels, (3, 3), activation=act_string, padding='same',\
                  kernel_regularizer=regularizer))
        model.add(BatchNormalization())
        model.add(layers.UpSampling2D((2, 2)))
        if dropout_rate >0:
          model.add(Dropout(dropout_rate))

    model.add(layers.Conv2DTranspose(number_channels/2, (3, 3), activation=act_string, padding='same',\
                                     kernel_regularizer=regularizer))
    model.add(layers.UpSampling2D((2, 2)))
    if dropout_rate >0:
        model.add(Dropout(dropout_rate))

    # Final layer to reconstruct the image
    model.add(layers.Conv2D(original_shape[2], (1, 1), activation='linear', padding='same',\
      kernel_regularizer=regularizer))

    return model



original_shape = [sub_image_size, sub_image_size, 1]


encoded_length = sub_image_size*sub_image_size//int(4**n)

load_saved = 1

model_name = f'simple_encoder_{save_string}.keras'
if load_saved == 1 and os.path.exists(model_name):
    from tensorflow.keras.models import load_model
    simple_encoder = load_model(model_name)
    simple_decoder = load_model(f'simple_decoder_{save_string}.keras')
else:
  if load_saved == 1 and not os.path.exists(model_name):
      print(f"Path does not exist to {model_name}.  Creating model")
  simple_encoder = create_simple_encoder(original_shape , n)
  simple_decoder = create_simple_decoder(encoded_length,original_shape, n)


# Combine the encoder and decoder to create the autoencoder
simple_autoencoder = models.Sequential([simple_encoder, simple_decoder])



In [None]:


simple_encoder.summary() #summary of encoder
simple_decoder.summary() #summary of decoder

##Sets a learning rate scheduler, compiles the model, and trains

I've experimented with larger learning rates than 0.001, finding that five times this is too high as loss is very non-monatonic

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback, ReduceLROnPlateau
from keras import backend as K


#I was curious about how much of the loss owed to regularization, so this allows me to check at end of each batch (but was generating a warning in first batch)
class RegularizationLossMonitor(Callback):
    def on_epoch_end(self, epoch, logs=None):
        regularization_loss = sum(self.model.losses)
        total_loss = logs['loss']
        data_loss = total_loss - regularization_loss
        print(f'\n Regularization loss: {regularization_loss:.4f}',)
        print(f'Data loss: {data_loss:.4f}',)
        print(f'Total loss: {total_loss:.4f}')


reduce_lr = ReduceLROnPlateau(monitor='loss', factor=0.5,
                              patience=5, min_lr=0.0001)


if train:
  # Set the learning rate (I find that .005 is too large)
  learning_rate = 0.001

  simple_autoencoder.compile(optimizer=Adam(learning_rate=learning_rate), loss="mae") #loss=fractional_difference_loss) #, run_eagerly=True)

  simple_autoencoder.fit(WL_tensor, WL_tensor, epochs=20, batch_size=256,
                shuffle=True, callbacks=[reduce_lr]) #, RegularizationLossMonitor()  If inlclude RegularizationLossMonitor() as a callback, separately prints regularization loss at the end of each batch
  simple_encoder.save(f'simple_encoder_{save_string}.keras')
  simple_decoder.save(f'simple_decoder_{save_string}.keras')

# Encoder-decoder 2:  

This takes out the dense layers and replaces it with 1x1 convolution layer.  This makes a pure CNN encoder-decoder and the output now is in terms of pixels and channels rather than one vector.  This felt more physically motivated.  Now n=4 and 16 channels is the same compression as n=2 previously.

In [None]:
from tensorflow.keras import layers, models, regularizers
from keras.layers import LeakyReLU, BatchNormalization, Dropout

#Parameters for network
n=4 #number of layers (needs to be >2)
number_channels = 256 #these are intermediat channels
number_final_channels = 16
dropout_rate = 0

L1weight = 1e-8
act_string = LeakyReLU(alpha=0.1) #okay, not a string

#string with parameters for saving
sci_notation = "{:.0e}".format(L1weight)
exponent = sci_notation.split('e')[-1]
save_string = f'n{n}_nc{number_channels}_nfc{number_final_channels}_d{dropout_rate }_logL1w{exponent}'

# Conditionally add L1 regularizer if L1weight is greater than 0
if L1weight > 0:
    regularizer = regularizers.l1(L1weight)
else:
    regularizer = None

def create_CNN_encoder(input_shape, n,  number_channels=number_channels, dropout_rate=dropout_rate):
    if n<2:
        print("n is too small.  n >=2")

    model = models.Sequential()
    model.add(layers.InputLayer(input_shape))

    model.add(layers.Conv2D(number_channels//2, (3, 3), activation=None, padding='same',\
              kernel_regularizer=regularizer))
    model.add(BatchNormalization())
    model.add(layers.Activation(act_string))
    model.add(layers.MaxPooling2D((2, 2)))
    if dropout_rate >0:
      model.add(Dropout(dropout_rate))

    for nlayer in range(1,n):
        model.add(layers.Conv2D(number_channels, (3, 3), activation=None, padding='same',\
                  kernel_regularizer=regularizer))
        model.add(BatchNormalization())
        model.add(layers.Activation(act_string))
        model.add(layers.MaxPooling2D((2, 2)))
        if dropout_rate >0:
          model.add(Dropout(dropout_rate))

    # Adding a Dense layer for encoding

    model.add(layers.Conv2D(number_final_channels, (1, 1), activation=None, padding='same',\
          kernel_regularizer=regularizer))
    model.add(BatchNormalization())
    model.add(layers.Activation(act_string))
    if dropout_rate >0:
        model.add(Dropout(dropout_rate))


    #model.add(layers.Flatten())
    #model.add(layers.Dense(units=(input_shape[0] * input_shape[1]) // (4 ** n), activation=act_string, \
    #          kernel_regularizer=regularizer))

    return model


def create_CNN_decoder(original_shape, n, number_channels=number_channels, dropout_rate=dropout_rate):
    model = models.Sequential()

    # The input is a flat array
    model.add(layers.InputLayer( (original_shape[0] // (2 ** n), original_shape[1] // (2 ** n), number_final_channels)))

    model.add(layers.Conv2DTranspose(number_channels, (1, 1), activation=None, padding='same',\
                  kernel_regularizer=regularizer))
    model.add(BatchNormalization())
    model.add(layers.Activation(act_string))

    # Upsampling to original size, looping over number of layers
    for nlayer in range(1, n):
        model.add(layers.Conv2DTranspose(number_channels, (3, 3), activation=None, padding='same',\
                  kernel_regularizer=regularizer))
        model.add(BatchNormalization())
        model.add(layers.Activation(act_string))
        model.add(layers.UpSampling2D((2, 2)))
        if dropout_rate >0:
          model.add(Dropout(dropout_rate))

    model.add(layers.Conv2DTranspose(number_channels/2, (3, 3), activation=None, padding='same',\
                                     kernel_regularizer=regularizer))
    model.add(BatchNormalization())
    model.add(layers.Activation(act_string))

    model.add(layers.UpSampling2D((2, 2)))
    if dropout_rate >0:
        model.add(Dropout(dropout_rate))

    # Final layer to reconstruct the image
    model.add(layers.Conv2D(original_shape[2], (1, 1), activation='linear', padding='same',\
      kernel_regularizer=regularizer))

    return model



original_shape = [sub_image_size, sub_image_size, 1]


load_saved = 1

model_name = f'CNN_encoder_{save_string}.keras'
if load_saved == 1 and os.path.exists(model_name):
    from tensorflow.keras.models import load_model
    CNN_encoder = load_model(model_name)
    CNN_decoder = load_model(f'CNN_decoder_{save_string}.keras')
else:
  if load_saved == 1 and not os.path.exists(model_name):
      print(f"Path does not exist to {model_name}.  Creating model")
  CNN_encoder = create_CNN_encoder(original_shape, n)
  CNN_decoder = create_CNN_decoder(original_shape, n)


# Combine the encoder and decoder to create the autoencoder
CNN_autoencoder = models.Sequential([CNN_encoder, CNN_decoder])



In [None]:
CNN_encoder.summary()
CNN_decoder.summary()


In [None]:
if train:

  learning_rate = .001

  CNN_autoencoder.compile(optimizer=Adam(learning_rate=learning_rate), loss="mae")
  CNN_autoencoder.fit(WL_tensor, WL_tensor, epochs=20, batch_size=256,
                shuffle=True, callbacks=[reduce_lr]) #, RegularizationLossMonitor()  If inlclude RegularizationLossMonitor() as a callback, separately prints regularization loss at the end of each batch
  CNN_encoder.save(f'CNN_encoder_{save_string}.keras')
  CNN_decoder.save(f'CNN_decoder_{save_string}.keras')

#Encoder Decoder 3 (residual connections)

This now is the same CNN as our first but with residual connections

In [None]:
from tensorflow.keras import layers, models, regularizers
from keras.layers import LeakyReLU, BatchNormalization, Dropout

number_channels = 64
dropout_rate = 0

L1weight = 1e-8
act_string = LeakyReLU(alpha=0.1) #okay, not a string



# Conditionally add L1 regularizer if L1weight is greater than 0
if L1weight > 0:
    regularizer = regularizers.l1(L1weight)
else:
    regularizer = None


#string with parameters for saving
sci_notation = "{:.0e}".format(L1weight)
exponent = sci_notation.split('e')[-1]
save_string = f'n{n}_nc{number_channels}_d{dropout_rate }_logL1w{exponent}'


def create_encoder_residconnect(input_shape, n, number_channels=number_channels, act_string=act_string, dropout_rate=0, regularizer=regularizer):
    inputs = layers.Input(shape=input_shape)
    x = inputs

    # Initial Convolutional Layer
    x = layers.Conv2D(number_channels, (3, 3), activation=None, padding='same', kernel_regularizer=regularizer)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(act_string)(x)
    x = layers.MaxPooling2D((2, 2))(x)
    if dropout_rate > 0:
        x = layers.Dropout(dropout_rate)(x)

    # Additional Layers with Residual Connections
    for _ in range(1, n):
        identity = x
        x = layers.Conv2D(number_channels, (3, 3), activation=None, padding='same', kernel_regularizer=regularizer)(x)
        x = layers.BatchNormalization()(x)
        x = layers.Activation(act_string)(x)
        x = layers.Conv2D(number_channels, (3, 3), activation=None, padding='same', kernel_regularizer=regularizer)(x)
        x = layers.BatchNormalization()(x)

        x = layers.Add()([x, identity])  # Residual Connection
        x = layers.Activation(act_string)(x)
        x = layers.MaxPooling2D((2, 2))(x)
        if dropout_rate > 0:
            x = layers.Dropout(dropout_rate)(x)

    # Flatten and Dense Layer
    x = layers.Flatten()(x)
    x = layers.Dense(units=(input_shape[0] * input_shape[1]) // (4 ** n), activation=act_string, kernel_regularizer=regularizer)(x)
    if dropout_rate > 0:
        x = layers.Dropout(dropout_rate)(x)

    model = models.Model(inputs, x)
    return model

def create_decoder_residconnect(encoded_length, original_shape, n, number_channels=number_channels, act_string=act_string, dropout_rate=0, regularizer=regularizer):
    inputs = layers.Input(shape=(encoded_length,))
    x = inputs

    # Dense layer
    x = layers.Dense(units=np.prod(encoded_length*number_channels), activation=act_string, kernel_regularizer=regularizer)(x)
    x = layers.Reshape((original_shape[0] // (2 ** n), original_shape[1] // (2 ** n), number_channels))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(act_string)(x)

    # Upsampling Layers with Residual Connections
    for _ in range(1, n):
        identity = x
        x = layers.Conv2DTranspose(number_channels, (3, 3), activation=None, padding='same', kernel_regularizer=regularizer)(x)
        x = layers.BatchNormalization()(x)
        x = layers.Activation(act_string)(x)
        x = layers.Conv2DTranspose(number_channels, (3, 3), activation=None, padding='same', kernel_regularizer=regularizer)(x)
        x = layers.BatchNormalization()(x)

        x = layers.Add()([x, identity])  # Residual Connection
        x = layers.Activation(act_string)(x)
        x = layers.UpSampling2D((2, 2))(x)
        if dropout_rate > 0:
            x = layers.Dropout(dropout_rate)(x)

    # Final Conv2DTranspose to get back to original shape
    x = layers.Conv2DTranspose(number_channels, (3, 3), activation=act_string, padding='same', kernel_regularizer=regularizer)(x)
    x = layers.UpSampling2D((2, 2))(x)
    if dropout_rate > 0:
        x = layers.Dropout(dropout_rate)(x)

    x = layers.Conv2D(original_shape[2], (1, 1), activation='linear', padding='same', kernel_regularizer=regularizer)(x)

    model = models.Model(inputs, x)
    return model




n=2 #number of layers (needs to be >2)
original_shape = [sub_image_size, sub_image_size, 1]
encoded_length = sub_image_size*sub_image_size//int(4**n)

load_saved = 1
model_name = f'encoder_residconnect_{save_string}.keras'
if load_saved == 1 and os.path.exists(model_name):
    from tensorflow.keras.models import load_model
    encoder_residconnect= load_model(f'encoder_residconnect_{save_string}.keras')
    decoder_residconnect = load_model(f'decoder_residconnect_{save_string}.keras')

else:
  if load_saved == 1 and not os.path.exists(model_name):
        print(f"Path does not exist to {model_name}.  Creating model")
  encoder_residconnect = create_encoder_residconnect(original_shape , n)
  decoder_residconnect = create_decoder_residconnect(encoded_length,original_shape, n)

# Combine the encoder and decoder to create the autoencoder
autoencoder_residconnect = models.Sequential([encoder_residconnect, decoder_residconnect])

encoder_residconnect.summary()
decoder_residconnect.summary()

In [None]:



# Combine the encoder and decoder to create the autoencoder
autoencoder_residconnect = models.Sequential([encoder_residconnect, decoder_residconnect])
#autoencoder_residconnect.summary()



In [None]:
if train:
  # Set the learning rate
  learning_rate = 0.001

  autoencoder_residconnect.compile(optimizer=Adam(learning_rate=learning_rate), loss="mae")

  autoencoder_residconnect.fit(WL_tensor, WL_tensor, epochs=40,batch_size=256,
                shuffle=True, callbacks=[reduce_lr]) #, RegularizationLossMonitor()
  encoder_residconnect.save(f'encoder_residconnect_{save_string}.keras')
  decoder_residconnect.save(f'decoder_residconnect_{save_string}.keras')

## Encoder-decoder 4: most complex model

I've added 2*(n-1) additional layers, half 1x1 convolutions and half 3x3.

In [None]:
from tensorflow.keras import layers, models


number_channels = 64

act_func = LeakyReLU(alpha=0.1)

L1weight = 0 #1e-8

# Conditionally add L1 regularizer if L1weight is greater than 0
if L1weight > 0:
    regularizer = regularizers.l1(L1weight)
else:
    regularizer = None


#string with parameters for saving
sci_notation = "{:.0e}".format(L1weight)
exponent = sci_notation.split('e')[-1]
save_string = f'n{n}_nc{number_channels}_d{dropout_rate }_logL1w{exponent}'


def create_encoder(input_shape, n, number_channels=64, act_string=act_string, regularizer=regularizer):
    if n<2:
        print("n is too small.  n >=2")

    model = models.Sequential()
    model.add(layers.InputLayer(input_shape))

    model.add(layers.Conv2D(number_channels//2, (3, 3), activation=act_func, padding='same', kernel_regularizer=regularizer))
    model.add(layers.Conv2D(number_channels, (1, 1), activation=act_func, padding='same', kernel_regularizer=regularizer))
    model.add(layers.MaxPooling2D((2, 2)))

    for nlayer in range(1,n):
        model.add(layers.Conv2D(number_channels, (3, 3), activation=act_func, padding='same', kernel_regularizer=regularizer))
        model.add(BatchNormalization())
        model.add(layers.Conv2D(number_channels, (1, 1), activation=act_func, padding='same', kernel_regularizer=regularizer))
        model.add(layers.Conv2D(number_channels, (3, 3), activation=act_func, padding='same', kernel_regularizer=regularizer))
        model.add(BatchNormalization())
        model.add(layers.MaxPooling2D((2, 2)))

    # Adding a Dense layer for encoding
    model.add(layers.Flatten())
    model.add(layers.Dense(units=(input_shape[0] * input_shape[1]) // (4 ** n), activation=act_func, kernel_regularizer=regularizer))

    return model


def create_decoder(encoded_length, original_shape, n, number_channels=64, act_string=act_string, regularizer=regularizer):
    model = models.Sequential()

    # The input is a flat array
    model.add(layers.InputLayer((encoded_length,)))



    # Expanding the flat array to a 3D tensor
    model.add(layers.Dense(units=np.prod(encoded_length*64), activation=act_func, kernel_regularizer=regularizer))


    # Calculate the dimensions for the first reshape
    # It should match the output size of the last MaxPooling layer in the encoder
    reshape_dims = (original_shape[0] // (2 ** n), original_shape[1] // (2 ** n), 64)

    model.add(layers.Reshape(reshape_dims))

    # Upsampling to original size, looping over number of layers
    for nlayer in range(1, n):
        model.add(layers.Conv2DTranspose(number_channels, (3, 3), activation=act_func, padding='same', kernel_regularizer=regularizer))
        model.add(BatchNormalization())
        model.add(layers.Conv2DTranspose(number_channels, (1, 1), activation=act_func, padding='same', kernel_regularizer=regularizer))
        model.add(layers.Conv2DTranspose(number_channels, (3, 3), activation=act_func, padding='same'))
        model.add(BatchNormalization())
        model.add(layers.UpSampling2D((2, 2)))

    model.add(layers.Conv2DTranspose(number_channels, (1, 1), activation=act_func, padding='same', kernel_regularizer=regularizer))
    model.add(layers.Conv2DTranspose(number_channels/2, (3, 3), activation=act_func, padding='same', kernel_regularizer=regularizer))
    model.add(layers.UpSampling2D((2, 2)))

    # Final layer to reconstruct the image
    model.add(layers.Conv2D(original_shape[2], (1, 1), activation='linear', padding='same', kernel_regularizer=regularizer))

    return model





n=2 #number of layers (needs to be >2)
original_shape = [sub_image_size, sub_image_size, 1]


encoded_length = sub_image_size*sub_image_size//int(4**n)

load_saved = 1
model_name = f'encoder_{save_string}.keras'
if load_saved == 1 and os.path.exists(model_name):
    encoder= load_model(f'encoder_{save_string}.keras')
    decoder = load_model(f'decoder_{save_string}.keras')
else:
    if load_saved == 1 and not os.path.exists(model_name):
        print(f"Path does not exist to {model_name}.  Creating model...")
    # Combine the encoder and decoder to create the autoencoder
    encoder = create_encoder(original_shape , n)
    decoder = create_decoder(encoded_length,original_shape, n)

autoencoder = models.Sequential([encoder, decoder])



In [None]:
from tensorflow.keras.optimizers import Adam
from keras import backend as K


# Set the learning rate
learning_rate = 0.001

# Create an Adam optimizer with the desired learning rate
adam_optimizer = Adam(learning_rate=learning_rate)

def fractional_difference_loss(y_true, y_pred):
    # Avoid division by zero
    epsilon = .01 # A small, non-zero number to prevent division by zero
    # Calculate the fractional difference
    loss = K.mean(K.abs((y_pred - y_true) / (K.abs(y_true) + epsilon)), axis=-1)
    return loss

def fractional_square_loss(y_true, y_pred):
    # Avoid division by zero
    epsilon = 1e-2 # A small, non-zero number to prevent division by zero
    # Calculate the fractional difference
    loss = K.square((y_pred - y_true) / (K.abs(y_true) + epsilon))
    return loss

encoder.summary()
decoder.summary()



# compile and train complex CNN

In [None]:
if train:
  # Set the learning rate
  learning_rate = 0.001
  autoencoder.compile(optimizer==Adam(learning_rate=learning_rate), loss="mae") #loss=fractional_difference_loss) #, run_eagerly=True)
  autoencoder.fit(WL_tensor, WL_tensor,
                epochs=40,
                batch_size=256,
                shuffle=True, callbacks=[reduce_lr])
  encoder.save(f'encoder_{save_string}.keras')
  decoder.save(f'decoder_{save_string}.keras')

#This shows images from the various encoders/decoders

change number_dcoders to compare more decoders, if they are loaded


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Assuming WL_tensor is a 4D tensor with shape (num_images, height, width, channels)
# and sub_image_size is the size of the second and third dimensions (height and width).
num_images = 5
sub_image_size = WL_tensor.shape[1]  # Assuming height and width are the same

number_decoders = 1

# Get the outputs from the decoder
decoded_images1 = simple_decoder(simple_encoder(WL_tensor[:num_images,...])).numpy()

if number_decoders >= 2:
  decoded_images2 = CNN_decoder(CNN_encoder(WL_tensor[:num_images,...])).numpy()
if number_decoders >=3:
    decoded_images3 = decoder_residconnect(encoder_residconnect(WL_tensor[:num_images,...])).numpy()
if number_decoders >=4:
    decoded_images4 = decoder(encoder(WL_tensor[:num_images,...])).numpy()

# Function to display images
def display_side_by_side(index):
    if index < 0 or index >= num_images:
        raise ValueError("Index out of bounds")

    # Select the specific input and output images
    input_image = WL_tensor[index, :, :, 0]  # Assuming grayscale, channel dimension is 0
    output_image1 = decoded_images1[index, :, :, 0]  # Also assuming grayscale

    # Create a figure with two subplots
    fig, axes = plt.subplots(1, number_decoders+1, figsize=(4*number_decoders, 10))

    # Display the input image in the first subplot
    axes[0].imshow(input_image, cmap='viridis', vmin=-0.05, vmax=.1)
    axes[0].set_title(f'Input Image {index}')
    axes[0].axis('off')  # Hide the axis

    # Display the output image in the second subplot
    axes[1].imshow(output_image1, cmap='viridis', vmin=-0.05, vmax=.1)
    axes[1].set_title(f'Decoded Image {index}')
    axes[1].axis('off')  # Hide the axis

    if number_decoders >=2:
      # Display the output image in the second subplot
      output_image2 = decoded_images2[index, :, :, 0]  # Also assuming grayscale
      axes[2].imshow(output_image2, cmap='viridis', vmin=-0.05, vmax=.1)
      axes[2].set_title(f'Decoded 2 Image {index}')
      axes[2].axis('off')  # Hide the axis

    if number_decoders >=3:
      # Display the output image in the second subplot
      output_image3 = decoded_images3[index, :, :, 0]  # Also assuming grayscale
      axes[3].imshow(output_image3, cmap='viridis', vmin=-0.05, vmax=.1)
      axes[3].set_title(f'Decoded 3 Image {index}')
      axes[3].axis('off')  # Hide the axis


    if number_decoders >=4:
      # Display the output image in the second subplot
      output_image4 = decoded_images4[index, :, :, 0]  # Also assuming grayscale
      axes[3].imshow(output_image4, cmap='viridis', vmin=-0.05, vmax=.1)
      axes[3].set_title(f'Decoded 4 Image {index}')
      axes[3].axis('off')  # Hide the axis

    # Display the images
    plt.show()


# Example usage for the first image
display_side_by_side(0)  # You can loop or call this function for other indices as well
display_side_by_side(2)