# Assignment Two
# Sign-Based Adaptive Learning Rate for Neural Network Galaxy Classification
## Name: Wadalisa O. Molokwe
### Student Number: u19258349

In [1]:
#limit CPU
import os
os.environ["OMP_NUM_THREADS"] = "2"
os.environ["OPENBLAS_NUM_THREADS"] = "2"
os.environ["MKL_NUM_THREADS"] = "2"
os.environ["VECLIB_MAXIMUM_THREADS"] = "2"
os.environ["NUMEXPR_NUM_THREADS"] = "2"

#imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.metrics import BinaryAccuracy

from sklearn import model_selection
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
from sklearn.ensemble import IsolationForest
from sklearn.inspection import DecisionBoundaryDisplay
from sklearn.feature_selection import SelectKBest, chi2
from imblearn.over_sampling import SMOTE

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, callbacks
from scikeras.wrappers import KerasClassifier
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

## Data Preparation

### Data Statistics

In [2]:
# Getting Data Stats

print("----------------------Reading in Data----------------------------------------------------------------------")

#load in data
data = pd.read_csv("sdss_100k_galaxy_form_burst.csv")

print("\n------------------------------------- First Two Instances---------------------------------------------------")
print(data.head(2))

# Get a summary of the DataFrame, including data types and non-null values
print("\n----------------------------------Dataset Information ---------------------------------------------------------------")
data.info()

# Get descriptive statistics for numerical columns
print("\n----------------------------------------------- Descriptive Statistics ----------------------------------------------")
print(data.describe())

# the dataset is enough to just drop values and duplicates
data = data.dropna()
data = data.drop_duplicates()

----------------------Reading in Data----------------------------------------------------------------------

------------------------------------- First Two Instances---------------------------------------------------
                 objid            specobjid         ra       dec         u  \
0  1237646587710669400  8175185722644649984  82.038679  0.847177  21.73818   
1  1237646588247540577  8175186822156277760  82.138894  1.063072  20.66761   

          g         r         i         z  modelFlux_u  ...  psfMag_z  \
0  20.26633  19.32409  18.64037  18.23833     2.007378  ...  19.43575   
1  19.32016  18.67888  18.24693  18.04122     5.403369  ...  18.85012   

    expAB_u   expAB_g   expAB_r   expAB_i   expAB_z   class     subclass  \
0  0.099951  0.311864  0.289370  0.270588  0.187182  GALAXY  STARFORMING   
1  0.366549  0.516876  0.517447  0.552297  0.636966  GALAXY  STARFORMING   

   redshift  redshift_err  
0  0.067749      0.000015  
1  0.105118      0.000010  

[2 rows x 43 

KeyboardInterrupt: 

### Getting Unique Classes and Splitting Datasets

In [None]:
print("\n----------------------------------------Unique Classes---------------------------------------------------------------")
num_classes_nunique = data['subclass'].nunique()
print(f"Number of classes : {num_classes_nunique}")

# Absolute counts
print("\nValue counts :")
print(data['subclass'].value_counts())

print("\nValue counts :")
print((data['subclass'].value_counts(normalize=True) * 100).round(2))


print("\n----------------------------------------Splitting Data-------------------------------------------------------------")
y = data['subclass']
X = data.drop(['subclass','class'], axis=1) #they are all galaxies hence we drop class

print("\n----------------------------------------X Dataset-------------------------------------------------------------------")
print(X.head(2))

print("\n----------------------------------------Y Dataset-------------------------------------------------------------------")
print(y.head(2))

#shuffling dataset for the split
data = data.sample(frac=1, random_state=42)

X_train , X_test , y_train , y_test  = train_test_split(X,y, test_size=0.2 , random_state=42)

### Outliers

In [None]:
print("-----------------------------------------Outliers----------------------------------------------------------------")
Q1 = X_train.quantile(0.25) 
Q3 = X_train.quantile(0.75)
IQR = Q3 - Q1

outliers_iqr = ((X_train < (Q1 - 1.5 * IQR)) | (X_train > (Q3 + 1.5 * IQR)))
print("Number of outliers per column:")
print(outliers_iqr.sum())

#using IsolationForest to handle outliers and replace the outliers with the mean of the nearest neighbour
#IsolationForest is used to detect outliers in the training dataset
iso = IsolationForest(contamination=0.01, random_state=42)
outliers = iso.fit_predict(X_train)
outlier_preds = iso.predict(X_train)

#Replacing outliers with the mean of the nearest inlier
inlier_mask = outlier_preds ==1
outlier_mask = outlier_preds == -1

X_inliers = X_train[inlier_mask]
X_outliers = X_train[outlier_mask]

#Fit the nearest neighbor on inliers
num_neighbors = NearestNeighbors(n_neighbors=5)
num_neighbors.fit(X_inliers)

# Find nearest neighbors for each outlier
distances, indices = num_neighbors.kneighbors(X_outliers)

#Locate the nearest neighbor for every outlier
mean_neighbors = []
for neighbors_idx in indices:
    mean_point = X_inliers.iloc[neighbors_idx].mean(axis=0)
    mean_neighbors.append(mean_point)

mean_neighbors = pd.DataFrame(mean_neighbors, columns=X_train.columns, index=X_outliers.index)
print("--------------------Outliers before Replacement-------------------------------------------")
print(mean_neighbors)
print("------------------------Shape-----------------------------")
print(mean_neighbors.shape)
#replacement of the outlier rows
X_train.loc[outlier_mask] = mean_neighbors

#Verify the replaced rows
print("-------------------------Replaced Outlier Rows With Mean---------------------------------------")
print(X_train.loc[outlier_mask].head())


### Normalizing Data

In [None]:
print("-----------------------------------------Normalizing Data--------------------------------------")
#normalizing the data with MinMaxScaler
scaler = MinMaxScaler(feature_range=(-1,1))
X_train = scaler.fit_transform(X_train)
print(X_train)

### Handling Data Imbalance

In [None]:
print("--------------------------------------Data Imbalance Handling-----------------------------------------")
from collections import Counter

print("Before SMOTE:", Counter(y_train))
smote = SMOTE(random_state=42)
X_train , y_train = smote.fit_resample(X_train, y_train) 
print("After SMOTE :", Counter(y_train))

### Encoding

In [None]:
#encoding subclasses
print("-----------------------------------Encoding----------------------------------------------------------------------")
#encoding subclasses to integers
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)

# Verify encoded classes
print("Classes:", label_encoder.classes_)
print("Encoded values:", np.unique(y_train_encoded))

## Testing Data

In [None]:
outlier_preds_test = iso.predict(X_test) #not refitting the dataset to avoid data leakage
print(outlier_preds_test)

#test Outliers replaced with pre-fitted nearest neighbors
test_outliermask = outlier_preds_test == -1
X_test_outliers = X_test[test_outliermask]
distances, indices = num_neighbors.kneighbors(X_test_outliers)

#replace with nearest neighbor of the training dataset
#Locate the nearest neighbor for every outlier
mean_neighbors_test = []
for neighbors_idx in indices:
    mean_point = X_inliers.iloc[neighbors_idx].mean(axis=0)
    mean_neighbors_test.append(mean_point)

neighbors_test = pd.DataFrame(mean_neighbors_test, columns=X_test.columns, index=X_test_outliers.index)
#replacement of the outlier rows
X_test.loc[test_outliermask] = neighbors_test
#Normalize Data
X_test = scaler.transform(X_test)
#Feature selection used on training data

## Sign-Based Adaptive Learning Rate

In [None]:
class SignBasedAdaptiveLR(tf.keras.optimizers.Adam): 
    def __init__(self, 
                 learning_rate=0.001,
                 increase_factor=1.05,  # Factor to increase LR when sign is consistent
                 decrease_factor=0.95,  # Factor to decrease LR when sign changes
                 min_lr=1e-6,
                 max_lr=0.1,
                 **kwargs):
        super().__init__(learning_rate=learning_rate, **kwargs)
        self.increase_factor = increase_factor
        self.decrease_factor = decrease_factor
        self.min_lr = min_lr
        self.max_lr = max_lr
        self.previous_gradients = {}
        self.weight_learning_rates = {}
    
    def _resource_apply_dense(self, grad, var, apply_state=None):
        
        var_key = var.ref()
        
        # Initialize previous gradient and learning rate for this variable
        if var_key not in self.previous_gradients:
            self.previous_gradients[var_key] = tf.zeros_like(grad)
            self.weight_learning_rates[var_key] = self.learning_rate
        
        # Get previous gradient and current learning rate for this weight
        prev_grad = self.previous_gradients[var_key]
        current_lr = self.weight_learning_rates[var_key]
        
        # Check if gradient sign has changed
        # Sign is consistent if prev_grad * grad > 0 (same sign)
        sign_consistent = tf.reduce_mean(tf.cast(prev_grad * grad > 0, tf.float32))
        
        # Adapt learning rate based on sign consistency
        # If more than 50% of gradients have consistent signs, increase LR
        # Otherwise, decrease LR
        if sign_consistent > 0.5:
            new_lr = tf.minimum(current_lr * self.increase_factor, self.max_lr)
        else:
            new_lr = tf.maximum(current_lr * self.decrease_factor, self.min_lr)
        
        # Update stored values
        self.previous_gradients[var_key] = grad
        self.weight_learning_rates[var_key] = new_lr
        
        # Apply the gradient update with adapted learning rate
        return var.assign_sub(new_lr * grad)
    
    def _resource_apply_sparse(self, grad, var, indices, apply_state=None):
        """Sparse gradient updates (for embeddings, etc.)"""
        return self._resource_apply_dense(tf.convert_to_tensor(grad), var, apply_state)

## Model Creation using Custom Optimizer

In [None]:
def create_model_opti(input_dim, hidden_units,learning_rate):
    print("---------------------Training with Sign-Based Adaptive Learning Rate---------------------------------")
    model_adaptive = models.Sequential([
        layers.Input(shape=(input_dim,)),
        layers.Dense(hidden_units, activation='relu'),
        layers.Dropout(0.2),
        layers.Dense(32, activation='relu'),
        layers.Dropout(0.2),
        layers.Dense(1, activation='sigmoid')
    ])
        
    optimizer_adaptive = SignBasedAdaptiveLR(
            learning_rate=learning_rate,
            increase_factor=1.05,
            decrease_factor=0.05
        )
        
    model_adaptive.compile(
            optimizer=optimizer_adaptive,
            loss=BinaryCrossentropy(),
            metrics=[BinaryAccuracy()]
        )
        
    return model_adaptive


def train_and_evaluate_model_adaptive(X_train, y_train, X_test, y_test, 
                          epochs=10, batch_size=32, learning_rate=0.001, hidden_units=64):

    print("\n-----------------------Creating Model---------------------------------------------------\n")
    
    # Create model
    model = create_model_opti(learning_rate=learning_rate, hidden_units=hidden_units,input_dim=X_train.shape[1])

    print("\n--------------------------------Training Model-------------------------------------------\n")
    # Train model
    history_adaptive = model.fit(
        X_train, y_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_split=0.2,
        verbose=1
    )
    print("\n-----------------------------Model Architecture-----------------------------------------------\n")
    model.summary()
    
    print("-------------------------Evaluate Model--------------------------------------------------")
    
    test_loss, test_acc = model.evaluate(
        X_test, y_test,
        batch_size=batch_size, 
        verbose=1
    )
    print(f'\nTest accuracy: {test_acc}')
    print(f'Test loss: {test_loss}')
    return model , history_adaptive

## Model Creation - Standard

In [None]:
def create_model(input_dim, hidden_units,learning_rate):
    
    model = models.Sequential([
        layers.Input(input_dim), 
        layers.Dense(hidden_units, activation='relu'),
        layers.Dropout(0.2),
        layers.Dense(32, activation='relu'),
        layers.Dropout(0.2),
        layers.Dense(1, activation='sigmoid')  # Binary classification
    ])
    
    # Use Adam optimizer with specified learning rate
    optimizer = Adam(learning_rate=learning_rate)
    
    # Use BinaryCrossentropy for binary classification
    model.compile(
        optimizer=optimizer,
        loss=BinaryCrossentropy(),
        metrics=[BinaryAccuracy()])
    
    return model


def train_and_evaluate_model(X_train, y_train, X_test, y_test, 
                          epochs=10, batch_size=32, learning_rate=0.001, hidden_units=64):

    print("\n-----------------------Creating Model---------------------------------------------------\n")
    
    # Create model
    model = create_model(learning_rate=learning_rate, hidden_units=hidden_units,input_dim=X_train.shape[1])

    print("\n--------------------------------Training Model-------------------------------------------\n")
    # Train model
    training = model.fit(
        X_train, y_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_split=0.2,
        verbose=1
    )
    print("\n-----------------------------Model Architecture-----------------------------------------------\n")
    model.summary()

    print("\n------------------------------------Evaluate Model-----------------------------------------\n")
    
    test_loss, test_acc = model.evaluate(
        X_test, y_test,
        batch_size=batch_size, 
        verbose=1
    )
    print(f'\nTest accuracy: {test_acc}')
    print(f'Test loss: {test_loss}')
    
    return model, training

## Adaptive Model and using Grid Search for hyperparameter optimization

In [None]:

print("\n---------------------------------Training Model with Hyperparameter Optimization----------------------------------\n")

# Create KerasClassifier wrapper for GridSearchCV
keras_classifier = KerasClassifier(
    model=create_model_opti,
    model__input_dim=X_train.shape[1],
    epochs=10,
    batch_size=32,
    verbose=0
)

#the parameter grid for which the search will occur
param_grid = {
    'model__hidden_units': [32, 64, 128],
    'model__learning_rate': [0.0001, 0.001, 0.01],
    'batch_size': [16, 32, 64]
}

print("Starting Grid Search...")
grid_search = GridSearchCV(
    estimator=keras_classifier, 
    param_grid=param_grid, 
    cv=3,  # Reduced CV folds for faster training
    scoring='accuracy',
    verbose=2,
    n_jobs=1
)

grid_search.fit(X_train, y_train_encoded)

print("\n-----------------------------------Grid Search Results---------------------------------------------------\n")
print("Best parameters found:", grid_search.best_params_)
print("Best cross-validation score:", grid_search.best_score_)

print("\n---------------------------------------Heatmap for Grid Search-------------------------------------------\n")


# Extract results from grid search
results_df = pd.DataFrame(grid_search.cv_results_)

# Get unique values for the two hyperparameters we're focusing on
learning_rates = param_grid['model__learning_rate']
hidden_units = param_grid['model__hidden_units']

# Create pivot table for heatmap (averaging across batch sizes)
heatmap_data = results_df.groupby(['param_model__learning_rate', 'param_model__hidden_units'])['mean_test_score'].mean().reset_index()
heatmap_pivot = heatmap_data.pivot(index='param_model__hidden_units', columns='param_model__learning_rate', values='mean_test_score')

# Create heatmap
plt.figure(figsize=(10, 6))
sns.heatmap(heatmap_pivot, annot=True, fmt='.4f', cmap='YlOrRd', cbar_kws={'label': 'Mean CV Accuracy'})
plt.title('Grid Search Results: Learning Rate vs Hidden Units')
plt.xlabel('Learning Rate')
plt.ylabel('Number of Hidden Units')
plt.tight_layout()
plt.show()

# Print detailed results 
print("\nDetailed Grid Search Results (Mean ± Std Dev):")
print("="*80)
for i in range(len(results_df)):
    params = results_df.loc[i, 'params']
    mean_score = results_df.loc[i, 'mean_test_score']
    std_score = results_df.loc[i, 'std_test_score']
    print(f"Params: {params}")
    print(f"  Score: {mean_score:.4f} ± {std_score:.4f}")
    print()

# Train final model with best parameters
print("\n------------------------Training Final Model with Best Parameters------------------------------------------")

best_params = grid_search.best_params_
        
final_model, training_history = train_and_evaluate_model_adaptive(
    X_train, y_train_encoded, 
    X_test, y_test_encoded,
    epochs=15,
    batch_size=best_params['batch_size'],
    learning_rate=best_params['model__learning_rate'],
    hidden_units=best_params['model__hidden_units']
    )



## Evaluation

In [None]:
print("\n---------------------------Final Model Evaluation--------------------------------------------------")

# Make predictions
y_pred_proba = final_model.predict(X_test)
y_pred = (y_pred_proba > 0.5).astype(int).flatten()

# Calculate metrics
accuracy = accuracy_score(y_test_encoded, y_pred)
precision = precision_score(y_test_encoded, y_pred)
recall = recall_score(y_test_encoded, y_pred)
f1 = f1_score(y_test_encoded, y_pred)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Test F1-Score: {f1:.4f}")

print("\nConfusion Matrix:")
print(confusion_matrix(y_test_encoded, y_pred))

print("\nClassification Report:")
print(classification_report(y_test_encoded, y_pred, target_names=label_encoder.classes_))

## Visualization Adaptive Model

In [None]:

print("--------------------------------Generating Visualizations Adaptive Model--------------------------------------------------------------")

# Plot training loss over epochs
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(training_history.history['loss'], label='Training Loss')
plt.plot(training_history.history['val_loss'], label='Validation Loss')
plt.title('Model Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(training_history.history['binary_accuracy'], label='Training Accuracy')
plt.plot(training_history.history['val_binary_accuracy'], label='Validation Accuracy')
plt.title('Model Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()


## Standard Model and using Grid Search for hyperparameter optimization

In [None]:

print("\n---------------------------------Training Model with Hyperparameter Optimization----------------------------------\n")

# Create KerasClassifier wrapper for GridSearchCV
keras_classifier = KerasClassifier(
    model=create_model,
    model__input_dim=X_train.shape[1],
    epochs=10,
    batch_size=32,
    verbose=0
)

#the parameter grid for which the search will occur
# Note: model parameters need 'model__' prefix for KerasClassifier
param_grid = {
    'model__hidden_units': [32, 64, 128],
    'model__learning_rate': [0.0001, 0.001, 0.01],
    'batch_size': [16, 32, 64]
}

print("Starting Grid Search...")
grid_search = GridSearchCV(
    estimator=keras_classifier, 
    param_grid=param_grid, 
    cv=3,  # Reduced CV folds for faster training
    scoring='accuracy',
    verbose=2,
    n_jobs=1
)

grid_search.fit(X_train, y_train_encoded)

print("\n-----------------------------------Grid Search Results---------------------------------------------------\n")
print("Best parameters found:", grid_search.best_params_)
print("Best cross-validation score:", grid_search.best_score_)

print("\n---------------------------------------Heatmap for Grid Search-------------------------------------------\n")


# Extract results from grid search
results_df = pd.DataFrame(grid_search.cv_results_)

# Get unique values for the two hyperparameters we're focusing on
learning_rates = param_grid['model__learning_rate']
hidden_units = param_grid['model__hidden_units']

# Create pivot table for heatmap (averaging across batch sizes)
heatmap_data = results_df.groupby(['param_model__learning_rate', 'param_model__hidden_units'])['mean_test_score'].mean().reset_index()
heatmap_pivot = heatmap_data.pivot(index='param_model__hidden_units', columns='param_model__learning_rate', values='mean_test_score')

# Create heatmap
plt.figure(figsize=(10, 6))
sns.heatmap(heatmap_pivot, annot=True, fmt='.4f', cmap='YlOrRd', cbar_kws={'label': 'Mean CV Accuracy'})
plt.title('Grid Search Results: Learning Rate vs Hidden Units')
plt.xlabel('Learning Rate')
plt.ylabel('Number of Hidden Units')
plt.tight_layout()
plt.show()

# Print detailed results with standard deviations
print("\nDetailed Grid Search Results (Mean ± Std Dev):")
print("="*80)
for i in range(len(results_df)):
    params = results_df.loc[i, 'params']
    mean_score = results_df.loc[i, 'mean_test_score']
    std_score = results_df.loc[i, 'std_test_score']
    print(f"Params: {params}")
    print(f"  Score: {mean_score:.4f} ± {std_score:.4f}")
    print()

# Train final model with best parameters
print("\n------------------------Training Final Model with Best Parameters------------------------------------------")

best_params = grid_search.best_params_

final_model, training_history = train_and_evaluate_model(
    X_train, y_train_encoded, 
    X_test, y_test_encoded,
    epochs=15,
    batch_size=best_params['batch_size'],
    learning_rate=best_params['model__learning_rate'],
    hidden_units=best_params['model__hidden_units']
)


## Visualization Standard Model

In [None]:

print("--------------------------------Generating Visualizations Standard Model--------------------------------------------------------------")

# Plot training loss over epochs
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(training_history.history['loss'], label='Training Loss')
plt.plot(training_history.history['val_loss'], label='Validation Loss')
plt.title('Model Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(training_history.history['binary_accuracy'], label='Training Accuracy')
plt.plot(training_history.history['val_binary_accuracy'], label='Validation Accuracy')
plt.title('Model Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()


## Evaluation

In [None]:
print("\n---------------------------Final Model Evaluation--------------------------------------------------")

# Make predictions
y_pred_proba = final_model.predict(X_test)
y_pred = (y_pred_proba > 0.5).astype(int).flatten()

# Calculate metrics
accuracy = accuracy_score(y_test_encoded, y_pred)
precision = precision_score(y_test_encoded, y_pred)
recall = recall_score(y_test_encoded, y_pred)
f1 = f1_score(y_test_encoded, y_pred)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Test F1-Score: {f1:.4f}")

print("\nConfusion Matrix:")
print(confusion_matrix(y_test_encoded, y_pred))

print("\nClassification Report:")
print(classification_report(y_test_encoded, y_pred, target_names=label_encoder.classes_))