## ST456 Deep Learning Project

In [19]:
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
import os
import gc
import random
import matplotlib.pyplot as plt
import seaborn as sns
import zipfile
import keras
from sklearn.model_selection import train_test_split
from keras import layers

#from keras import ops
from keras.models import Sequential
from keras.layers import Activation, Dropout, Flatten, Dense, BatchNormalization, GlobalMaxPooling2D
from keras.preprocessing import image
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from keras.optimizers import Adam
from keras.metrics import MeanSquaredError, RootMeanSquaredError
from keras.utils import to_categorical
from keras import backend as K
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from tqdm import tqdm

print(tf.config.list_physical_devices('GPU'))

[]


In [2]:
data_path = '/kaggle/input/processed-galaxy-data/compressed_img_target.npz'

In [3]:
# load cropped, compressed images
loaded_arrays = np.load(data_path)

data = loaded_arrays['images']
target = loaded_arrays['target']

batch_size = 32 * 2  # 2 GPUs

print(data.shape)

(61578, 64, 64, 3)


In [15]:
prob_means= np.mean(target, axis=0)
prob_means.shape

(37,)

In [5]:
# Define RMSE loss function
def rmse_custom(y_true, y_pred):
    return K.sqrt(K.mean(K.square(y_pred - y_true)))

In [None]:
# train/validation/test split due to data augmentation
# 72% train, 8% validation, 20% test
X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.2, random_state = 42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# Normalize pixel values to range [0, 1]
X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255
X_val = X_val.astype('float32') / 255

y_train = y_train.astype('float32')
y_test = y_test.astype('float32')
y_val = y_val.astype('float32')

In [None]:
data_augmentation = ImageDataGenerator(
    rotation_range=360,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

train_datagen = data_augmentation.flow(X_train, y_train, batch_size=batch_size)
validation_datagen = ImageDataGenerator().flow(X_val, y_val, batch_size=batch_size)


#### Recording the effects of constraint coefficient on performance of simple CNN

As we increase the force of the decision tree constraints on the customised loss function, how does this impact model performance?

In [1]:
# these regularization parameters correspond to the galaxy-zoo decision tree structure
def custom_loss(y_true, y_pred, l=0.001):
    c1 = tf.abs(tf.reduce_sum(y_pred[:, 0:3], axis=1) - 1) # Class 1 constraint
    c2 = tf.abs(tf.reduce_sum(y_pred[:, 13:15], axis=1) - 1) # Class 6 constraint
    c3 = tf.abs((y_pred[:, 3] + y_pred[:, 4]) - y_pred[:, 1]) # Class 2 constraint
    c4 = tf.abs((y_pred[:, 15] + y_pred[:, 16] + y_pred[:, 17]) - y_pred[:, 10]) # Class 7 constraint
    c5 = tf.abs(tf.reduce_sum(y_pred[:, 18:25], axis=1) - y_pred[:, 13]) # Class 8 constraint
    c6 = tf.abs(tf.reduce_sum(y_pred[:, 25:28], axis=1) - y_pred[:, 3]) # Class 9 constraint
    c7 = tf.abs(tf.reduce_sum(y_pred[:, 5:7], axis=1) - y_pred[:, 4]) # Class 3 constraint
    c8 = tf.abs(tf.reduce_sum(y_pred[:, 7:9], axis=1) - y_pred[:, 4]) # Class 4 constraint
    c9 = tf.abs(tf.reduce_sum(y_pred[:, 28:31], axis=1) - y_pred[:, 7]) # Class 10 constraint
    c10 = tf.abs(tf.reduce_sum(y_pred[:, 9:13], axis=1) - y_pred[:, 7] - y_pred[:, 8]) # Class 5 constraint
    c11 = tf.abs(tf.reduce_sum(y_pred[:, 31:37], axis=1) - y_pred[:, 7]) # Class 11 constraint
    
    # l (lambda) coefficient controls the force of penalties
    c_loss = l*(c1+c2+c3+c4+c5+c6+c7+c8+c9+c10+c11)

    # base loss (mean squared error)
    mse_loss = tf.reduce_mean(tf.square(y_true - y_pred))

    # combine base loss with constraints
    return mse_loss + c_loss

def custom_loss_wrapper(l=0.001):
    def loss(y_true, y_pred):
        return custom_loss(y_true, y_pred, l=l)
    return loss

In [None]:
# Build simple CNN
# to use two GPUs in parallel
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

In [None]:
l = [0.001, 0.01, 0.1, 0.2, 0.5, 1]
num_epochs = 30
results = {'lambda':[], 'rmse':[]}

# so we can keep resetting the weights

fig, axes = plt.subplots(3, 2, figsize=(12, 8), sharex='col', sharey='row')
axes = axes.flatten()

# Loop through lambda values
for i, lamda in enumerate(l):
    print(f'Training Lambda = {lamda}')
    print('-'*200)
    
    with strategy.scope():
        model = Sequential()
        model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(64, 64, 3)))
        model.add(MaxPooling2D((2, 2)))
        model.add(Conv2D(64, (3, 3), activation='relu'))
        model.add(MaxPooling2D((2, 2)))
        model.add(Conv2D(128, (3, 3), activation='relu'))
        model.add(MaxPooling2D((2, 2)))
        model.add(Conv2D(128, (3, 3), activation='relu'))
        model.add(MaxPooling2D((2, 2)))
        model.add(Flatten())
        model.add(Dense(512, activation='relu'))
        model.add(Dense(37, activation='sigmoid'))

        model.compile(optimizer='adam', loss=custom_loss_wrapper(l=lamda), metrics=[RootMeanSquaredError()])

        # Train the model
        history = model.fit(
            train_datagen,
            steps_per_epoch=len(X_train) // 32,
            epochs=num_epochs,
            validation_data=validation_datagen,
            validation_steps=len(X_val) // 32
        )
    
    # Visualize loss and metric by training epoch
    train_metric = history.history['root_mean_squared_error']
    val_metric = history.history['val_root_mean_squared_error']
    epochs = range(1, len(train_metric) + 1)
    
    # Plot training and validation metric
    axes[i].plot(epochs, train_metric, marker='o', color='b', label='Training RMSE')
    axes[i].plot(epochs, val_metric, marker='o', color='r', label='Validation RMSE')
    axes[i].set_title(f'$\lambda={lamda}$') 
    axes[i].set_xlabel('Epochs') 
    axes[i].set_ylabel('RMSE')
    plt.tight_layout()
    
    # Save plot
    plt.savefig(f'plot_l_{lamda}.png')
    
    # Save model
    model_path = f'base_model_lambda_{lamda}.keras'
    model.save(model_path)
    
    # Evaluate model on test data
    test_loss, test_rmse = model.evaluate(X_test, y_test)
    
    # Store results
    results['rmse'].append(test_rmse)
    results['lambda'].append(lamda)


In [None]:
results_df = pd.DataFrame(results)
results_df.to_csv('reg_effect.csv', index=False)
results_df.head()