# CAMEL Multifield Dataset

Implementing model suggested in 2021 paper titled "The CAMEL Multifield Dataset: Learning the Universe's Fundamental Parameters with Artificial Intelligence".

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib.colors import LogNorm  # plotting on log scale

# import tensorflow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import RandomFlip, RandomRotation
from keras import layers, models

# import sklean
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

In [None]:
# mount google drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
class Map:
    '''Takes label and image file names, opens and processes the data ready to
    implement into an model. Provides frequency plotting methods.'''

    path = "/content/drive/MyDrive/Colab_Notebooks/Camel_Project/Camels/"

    def __init__(self, labelfile: str, imagefile: str) -> None:
        self.labelfile = labelfile
        self.imagefile = imagefile

        # load files
        try:
            self.params = np.loadtxt(Map.path+self.labelfile)
            self.img_map = np.load(Map.path+self.imagefile)
        except Exception as e:
            print(f'Error loading files: {e}')
            self.params = None
            self.img_map = None

        # Useful Variables
        # images per parameter line
        if self.img_map is not None:
            self.img_per_param_line = self.img_map.shape[0] // self.params.shape[0]
        # range of values varied in the dataset
        if self.params is not None:
            self.p_range = self.params.shape[0] // self.params.shape[1]

    def info(self) -> None:
        '''Provides some basic information about the 2DMap provided'''
        print("params.shape",self.params.shape)
        print("img_map.shape",self.img_map.shape)
        print("Maps/Images per paramater line",self.img_map.shape[0]//self.params.shape[0])
        print("Maximum Value",np.max(self.img_map[0]))
        print("Minimum Value",np.min(self.img_map[0]))
        fig, ax = plt.subplots()
        ax.imshow(np.log10(self.img_map[0]),cmap="binary")
        ax.set_title("First image")

    def log_scale(self) -> None:
        '''Log scales and then noramlizes values ~{0,1} on per image basis to deal with
        large intensity distribution.
        NOTE: After experimenting this produced highest contrast and best utilization of range'''
        # loop over every image
        for i, image in enumerate(self.img_map):
            self.img_map[i] = np.log10(self.img_map[i]) # log scale
            self.img_map[i] = self.img_map[i] / np.min(self.img_map[i]) # normalize

        print("Total max",np.max(self.img_map))
        print("Total mean",np.mean(self.img_map))
        print("Total std",np.std(self.img_map))
        print("Total min",np.min(self.img_map),"\n")

    def resize(self, verbose=True) -> np.ndarray:
        '''Resized the label array to fit images per parameter line'''
        # resize labels to match images
        resized_labels = np.zeros((self.img_map.shape[0], self.params.shape[1]))
        count = 0
        for row in self.params:
            for i in range(self.img_per_param_line):
                resized_labels[count] = row
                count += 1
        # check
        for index, row in enumerate(self.params):
            if np.array_equal(resized_labels[index*self.img_per_param_line], row):
                continue
            else:
                print("ERROR : rows not equal")
                print(resized_labels[index*self.img_per_param_line], row)

        if verbose:
            print(f"resized_labels.shape : {resized_labels.shape} \n")

        return resized_labels

    def extract_param(self, p: int, labels: np.ndarray, verbose=True) -> np.ndarray:
        '''Returns a modified array that selects a specific parameter from the label file.
        Also appropriately modifies the images array to match.
        NOTE: Parameters must be indexed from zero.'''

        # extract labels
        new_labels = labels[(p*self.p_range*self.img_per_param_line) : ((p+1)*self.p_range*self.img_per_param_line) : 1 , p:p+1]

        # modified images array
        images = self.img_map[(p*self.p_range*self.img_per_param_line) : ((p+1)*self.p_range*self.img_per_param_line) : 1]

        if verbose:
            print(f"labels.shape : {new_labels.shape}")
            print(f"images.shape : {images.shape}")
            print(f"p_range : {self.p_range} \n")

        return new_labels, images

    def split_data(self, labels: np.ndarray, images: np.ndarray) -> np.ndarray:
        '''Uses sklearn shuffle to mix up the labels and images in the same way - then performs an 80-10-10 split'''

        labels_shuffled, images_shuffled = shuffle(labels, images) # shuffle before we split

        # split into test and train
        X_train, X_test, y_train, y_test = train_test_split(images_shuffled, labels_shuffled, test_size=0.10, random_state=42)
        # split train further into validation and train
        X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.11, random_state=42)

        # print shapes of model input data
        print(f"X_train.shape :  {X_train.shape}")
        print(f"X_test.shape :  {X_test.shape}")
        print(f"X_val.shape: {X_val.shape}")
        print(f"y_train.shape : {y_train.shape}")
        print(f"y_test.shape : {y_test.shape}")
        print(f"y_val.shape : {y_val.shape} \n")

        return X_train, X_test, X_val, y_train, y_test, y_val

    def plot_hist(self, labels: np.ndarray) -> None:
        '''Plots a histogram to see if the distribution of parameter values is uniform
        (important to prevent overfitting).'''

        parameter = labels[:, 0]
        # plot histogram
        fig, ax = plt.subplots()
        ax.hist(parameter[0:self.p_range*self.img_per_param_line:1], bins=self.p_range)
        # formatting
        ax.set_title('Frequency plot of select parameter')
        ax.set_xlabel('Parameter')
        ax.set_ylabel('Frequency')

    @staticmethod
    def augment_images(images: np.ndarray, labels: np.ndarray, factor: int = 2, example: bool = False) -> np.ndarray:
        '''Implements image augmentation increasing the dataset by input variable "factor". Also resizes the labels
        array to match.'''

        # Implement image augmentation using albumentations
        augmentation_pipeline = A.Compose([
            A.HorizontalFlip(p=0.5),  # horizontal flip
            A.VerticalFlip(p=0.5),  # vertical flip
            A.Rotate(limit=20, p=0.5),  # rotate by up to 20 degrees
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),  # random brightness and contrast
            A.RandomGamma(gamma_limit=(80, 120), p=0.5),  # gamma correction
            A.RandomSizedCrop(min_max_height=(50, 100), height=images.shape[1], width=images.shape[2], p=0.5),  # crop with random size
            A.GaussianBlur(p=0.3),  # add Gaussian blur
            A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=10, p=0.5)  # shift, scale, and rotate
        ], p=1)

        # error checking
        try:
            assert factor > 1
        except Exception as e:
            print(f"Error: {e}")
            print("WARNING: factor cannot be < 2")

        # preallocate memory for new arrays
        new_images = np.zeros(shape=(images.shape[0] * factor, images.shape[1], images.shape[2]))
        new_labels = np.zeros(shape=(labels.shape[0] * factor, labels.shape[1]))

        # augment images
        for f in range(factor):
            offset = f * images.shape[0]
            for i in range(images.shape[0]):
                image = images[i]  # retrieve image
                label = labels[i]  # retrieve label

                # apply augmentations
                augmented = augmentation_pipeline(image=image)
                augmented_image = augmented["image"]

                # add to new arrays
                new_images[i+offset] = augmented_image
                new_labels[i+offset] = label

        # output shapes
        print(f"new_images.shape : {new_images.shape}")
        print(f"new_labels.shape : {new_labels.shape}")

        # 3x3 grid of augmentation examples
        if example:
            test_image = images[0]  # test image to augment
            # Plot augmented images
            plt.figure(figsize=(10, 10))
            plt.title("Examples of image augmentation")
            plt.axis("off")
            for i in range(9):
                augmented_image = augmentation_pipeline(image=test_image)["image"]
                ax = plt.subplot(3, 3, i + 1)
                ax.imshow(np.log10(augmented_image), cmap="binary")
                ax.axis("off")

        return new_images, new_labels

In [None]:
def conv_block(model, filter_size: int, H: int):
    '''Convolutional block used in paper_model. Contains repeated
    convolutional and batch normalization layers.'''

    # first layer
    model.add(layers.Conv2D(filters=(filter_size*H), kernel_size=(3,3), strides=(1,1), padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    # second layer
    model.add(layers.Conv2D(filters=(filter_size*H), kernel_size=(3,3), strides=(1,1), padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    # third layer
    model.add(layers.Conv2D(filters=(filter_size*H), kernel_size=(2,2), strides=(2,2), padding='valid'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

def paper_model(input_shape, H, C=1, dropout_rate=0.5):
    '''CNN model based on reference paper

    Inputs:
    input_shape: tuple containing input shape
    H: Hyperparameter determining number of channels in convolutional layers
    C: int representing number of channels (>1 in case of multifield)
    dropout_rate: droupout rate'''

    model = models.Sequential()

    # first block - input
    model.add(layers.Conv2D(filters=2*H, kernel_size=(3, 3), strides=(1,1), padding='same', input_shape=input_shape))
    model.add(layers.LeakyReLU())
    model.add(layers.Conv2D(filters=2*H, kernel_size=(3, 3), strides=(1,1), padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Conv2D(filters=2*H, kernel_size=(2, 2), strides=(2,2), padding='valid'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # second block
    conv_block(model, filter_size=4, H=H)
    # third block
    conv_block(model, filter_size=8, H=H)
    # fourth block
    conv_block(model, filter_size=16, H=H)
    # fifth block
    conv_block(model, filter_size=32, H=H)
    # sixth block
    conv_block(model, filter_size=64, H=H)

    # seventh block
    model.add(layers.Conv2D(filters=128*H, kernel_size=(4,4), strides=(1,1), padding='valid'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # eighth block - flatten and fully connected layers
    model.add(layers.Flatten())
    model.add(layers.Dropout(dropout_rate))
    model.add(layers.Dense(64*H))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(dropout_rate))
    model.add(layers.Dense(12*H))

    # ninth block - output layer
    model.add(layers.Dense(1))

    return model
