### Pre-processing Stage

#### Import Libraries

In [None]:
%pip install keras_tuner

In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
from tensorflow import keras, feature_column
import matplotlib.pyplot as plt
import keras_tuner as kt

#### Access Dataset

In [2]:
spectral_data = pd.read_csv('data/Skyserver_spectral.csv')

spectral_data.head()

Unnamed: 0,subClass,spectroFlux_u,spectroFlux_g,spectroFlux_r,spectroFlux_i,spectroFlux_z,spectroSynFlux_u,spectroSynFlux_g,spectroSynFlux_r,spectroSynFlux_i,...,spectroSynFluxIvar_u,spectroSynFluxIvar_g,spectroSynFluxIvar_r,spectroSynFluxIvar_i,spectroSynFluxIvar_z,spectroSkyFlux_u,spectroSkyFlux_g,spectroSkyFlux_r,spectroSkyFlux_i,spectroSkyFlux_z
0,F,283.1147,487.4325,623.9703,661.565,653.7544,266.6792,489.4288,620.6246,663.5571,...,6.239177,7.270068,5.600547,4.923303,2.058532,11.209,14.54359,25.90486,36.41381,94.26073
1,B,1.62727,4.844555,5.206846,5.112135,5.294659,1.91309,4.815378,5.095849,5.075979,...,6.239177,7.270068,5.600547,4.923303,2.058532,10.84869,13.64701,24.44349,34.59489,90.46743
2,F,3.750942,5.264215,6.468467,6.767785,7.150636,3.008075,5.254549,6.316006,6.686767,...,6.239177,7.270068,5.600547,4.923303,2.058532,11.44013,13.69386,24.14538,34.3828,91.69515
3,F,23.94584,44.70097,61.68239,67.05418,69.31022,23.19226,44.96296,61.06528,67.08057,...,6.239177,7.270068,5.600547,4.923303,2.058532,11.22245,13.70153,24.25106,34.65611,89.66241
4,O,67.44971,42.15791,26.75512,20.34038,15.1516,59.19731,42.50629,26.38128,20.62716,...,6.239177,7.270068,5.600547,4.923303,2.058532,9.613045,11.76241,21.65069,32.35199,90.13759


In [None]:
spectral_data.dropna(inplace=True)

In [None]:
label_encoder = LabelEncoder()
spectral_data['subClass'] = label_encoder.fit_transform(spectral_data['subClass'])

#### Split the dataframe into train, validation, and test

In [None]:
X = spectral_data.drop('subClass', axis=1)
y = spectral_data['subClass']

In [None]:
correlation_matrix = X.corrwith(y)

print(correlation_matrix)

In [None]:
negative_correlation_columns = correlation_matrix[correlation_matrix < 0].index

X = X.drop(columns=negative_correlation_columns)

X.corrwith(y)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
X_train.shape

#### Resample

In [None]:
spectral_data['subClass'].value_counts()

In [None]:
# Undersample the training data
under_sampler = RandomUnderSampler(random_state=42)
X_train_resampled, y_train_resampled = under_sampler.fit_resample(X_train, y_train)

# Oversample the training data
over_sampler = RandomOverSampler(random_state=42)
X_train_resampled, y_train_resampled = over_sampler.fit_resample(X_train, y_train)

In [None]:
print(pd.Series(y_train_resampled).value_counts())

#### Normalize

In [None]:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train_resampled)
X_val = scaler.transform(X_val)

In [None]:
num_classes = len(np.unique(y_train))
y_train = tf.one_hot(y_train_resampled, num_classes)
y_val = tf.one_hot(y_val, num_classes)

In [None]:
print(X_train.shape)
print(y_train.shape)

In [None]:
print(X_val.shape)
print(y_val.shape)

#### Common Functions

In [None]:
# create an optimizer based on the optimizer name and learning rate
def get_optimizer(optimizer_name, learning_rate):
  #'sgd','rmsprop','adam','adagrad'
  optimizer = None

  if optimizer_name == 'adagrad':
    optimizer = tf.keras.optimizers.Adagrad(learning_rate=learning_rate)

  elif 'rmsprop':
    optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)

  elif 'adam':
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

  elif 'sgd':
    optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)

  return optimizer

In [None]:
# plot a graph based on the results
def plot_graph(accuracy_measures, title):
  plt.figure(figsize=(15, 8))

  for experiment in accuracy_measures.keys():
    plt.plot(accuracy_measures[experiment], label=experiment, linewidth=3)

  plt.title(title)
  plt.xlabel("Epochs")
  plt.ylabel("Accuracy")
  plt.legend()
  plt.show()

### Benchmark

#### CNN

In [None]:
model = tf.keras.Sequential([
  tf.keras.layers.Conv1D(256, kernel_size=3, activation='relu', padding='same', input_shape=(X_train.shape[1],1)),
  tf.keras.layers.MaxPooling1D(pool_size=2),
  tf.keras.layers.Conv1D(256, kernel_size=3, activation='relu', padding='same'),
  tf.keras.layers.MaxPooling1D(pool_size=2),
  tf.keras.layers.Conv1D(32, kernel_size=3, activation='relu', padding='same'),
  tf.keras.layers.MaxPooling1D(pool_size=2),
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dropout(0.5),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dense(7, activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
accuracy_measures = {}

reduceLR = tf.keras.callbacks.ReduceLROnPlateau(monitor="val_accuracy", patience= 6, verbose= 1, mode='max', factor=  0.2, min_lr = 1e-6)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience = 10 , verbose=1, mode='max', restore_best_weights= True)
checkpoint = tf.keras.callbacks.ModelCheckpoint('CustomModel.weights.hdf5', monitor='val_accuracy', verbose=1,save_best_only=True, mode= 'max')
callbacks = [reduceLR, early_stopping,checkpoint]

model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=10, callbacks=callbacks)

accuracy_measures["training"] = model.history.history["accuracy"]
accuracy_measures["validation"] = model.history.history["val_accuracy"]

In [None]:
plot_graph(accuracy_measures, "Custom CNN Model")

In [None]:
model.evaluate(test_dataset, batch_size=model_config["BATCH_SIZE"])

### Hyper Parameter Tuning Automate

In [None]:
def build_cnn_model(hp):
    # Build the CNN model
    model = tf.keras.Sequential()

    # Tune the number of filters and kernel size for the first Conv1D layer
    model.add(tf.keras.layers.Conv1D(hp.Int('conv1_filters', min_value=64, max_value=256, step=32),
                             hp.Int('conv1_kernel', min_value=3, max_value=5, step=2),
                             activation='relu', input_shape=(X_train.shape[1],1)))
    model.add(tf.keras.layers.MaxPooling1D())

    # Add more Conv2D and MaxPooling1D layers with tunable hyperparameters
    model.add(tf.keras.layers.Conv1D(hp.Int('conv2_filters', min_value=32, max_value=256, step=32),
                             hp.Int('conv2_kernel', min_value=3, max_value=5, step=2),
                             activation='relu'))
    model.add(tf.keras.layers.MaxPooling1D())

    model.add(tf.keras.layers.Flatten())

    # Add tunable dropout layers
    model.add(tf.keras.layers.Dropout(hp.Float(f'dropout_rate', min_value=0.001, max_value=0.5, step=0.1)))

    # Tune the number of units in the dense layer
    model.add(tf.keras.layers.Dense(hp.Int('dense_units', min_value=64, max_value=256, step=32), activation='relu'))



    model.add(tf.keras.layers.Dense(num_classes, activation='softmax'))

    # Tune the optimizer choice and learning rate
    optimizer_value = hp.Choice('optimizer', ['adam', 'rmsprop', 'sgd'])

    # Define the learning rate
    lr = hp.Float(f'learning_rate', min_value=0.00001, max_value=0.5, step=0.1)

    optimizer = get_optimizer(optimizer_value, learning_rate=lr)

    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    return model

In [None]:
reduceLR = tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", patience= 3, verbose= 1, mode='min', factor=  0.2, min_lr = 1e-6)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 5 , verbose=1, mode='min', restore_best_weights= True)
checkpoint = tf.keras.callbacks.ModelCheckpoint('CustomModel.weights.hdf5', monitor='val_loss', verbose=1,save_best_only=True, mode= 'min')
callbacks = [reduceLR, early_stopping,checkpoint]

In [None]:
cnn_tuner = kt.Hyperband(
    build_cnn_model,
    objective='val_accuracy',
    max_epochs=100,
    factor=3,
    directory='cnn_tuning',
    project_name='cnn'
)

cnn_tuner.search(X_train, y_train, validation_data=(X_val, y_val), epochs=100, verbose=2)


In [None]:
def build_dnn_model(hp):
    model = tf.keras.Sequential()

    model.add(tf.keras.layers.Dense(hp.Int('dense_units', min_value=64, max_value=256, step=32), activation='relu', input_shape=(X_train.shape[1],)))

    model.add(tf.keras.layers.BatchNormalization())

    model.add(tf.keras.layers.Dropout(hp.Float(f'dropout_rate', min_value=0.0001, max_value=0.5, step=0.1)))

    # Add more Dense layers with tunable hyperparameters
    for i in range(hp.Int('num_dense_layers', 1, 4)):
        model.add(tf.keras.layers.Dense(hp.Int(f'dense_units_{i}', min_value=32, max_value=256, step=32), activation='relu'))
        model.add(tf.keras.layers.BatchNormalization())

        model.add(tf.keras.layers.Dropout(hp.Float(f'dropout_rate_{i}', min_value=0.2, max_value=0.5, step=0.1)))

    # Output layer for classification
    model.add(tf.keras.layers.Dense(7, activation='softmax'))

    # Tune the optimizer choice
    optimizer_value = hp.Choice('optimizer', ['adam', 'rmsprop', 'sgd', 'adagrad'])

    # Define the learning rate
    lr = hp.Float(f'learning_rate', min_value=0.00001, max_value=0.5, step=0.1)

    optimizer = get_optimizer(optimizer_value, learning_rate=lr)

    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    return model


In [None]:
reduceLR = tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", patience= 3, verbose= 1, mode='min', factor=  0.2, min_lr = 1e-6)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 5 , verbose=1, mode='min', restore_best_weights= True)
checkpoint = tf.keras.callbacks.ModelCheckpoint('CustomModel.weights.hdf5', monitor='val_loss', verbose=1,save_best_only=True, mode= 'min')
callbacks = [reduceLR, early_stopping,checkpoint]

In [None]:
tuner = kt.Hyperband(
    build_dnn_model,
    objective='val_accuracy',
    max_epochs=100,
    factor=3,
    directory='tuning',
    project_name='dnn'
)

tuner.search(X_train, y_train, validation_data=(X_val, y_val), epochs=100, verbose=2)


In [None]:
# Get the optimal hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"Optimal number of filters in the first Conv2D layer: {best_hps.get('conv1_filters')}")
print(f"Optimal kernel size in the first Conv2D layer: {best_hps.get('conv1_kernel')}")

print(f"Optimal number of units in the first Dense layer: {best_hps.get('dense_units')}")

# Print the optimal dropout rates for the dropout layers
for i in range(best_hps.get('num_dropout_layers')):
    print(f"Optimal dropout rate for Dropout layer {i+1}: {best_hps.get(f'dropout_rate_{i}')}")

print(f"Optimal number of units in the first Dense layer: {best_hps.get('dense_units')}")
print(f"Optimal optimizer: {best_hps.get('optimizer')}")

### Final Model

In [None]:
X_train.shape[0]

In [None]:
def build_final_model():
    model = tf.keras.Sequential()

    model.add(tf.keras.layers.Dense(224, activation='relu', input_shape=(X_train.shape[1],)))

    model.add(tf.keras.layers.BatchNormalization())

    model.add(tf.keras.layers.Dropout(0.4))

    model.add(tf.keras.layers.Dense(192, activation='relu'))

    model.add(tf.keras.layers.BatchNormalization())

    model.add(tf.keras.layers.Dropout(0.2))

    model.add(tf.keras.layers.Dense(224, activation='relu'))

    model.add(tf.keras.layers.BatchNormalization())

    model.add(tf.keras.layers.Dropout(0.3))

    model.add(tf.keras.layers.Dense(64, activation='relu'))

    model.add(tf.keras.layers.BatchNormalization())

    model.add(tf.keras.layers.Dropout(0.3))

    model.add(tf.keras.layers.Dense(160, activation='relu'))

    model.add(tf.keras.layers.BatchNormalization())

    model.add(tf.keras.layers.Dropout(0.2))

    model.add(tf.keras.layers.Dense(64, activation='relu'))

    model.add(tf.keras.layers.BatchNormalization())

    model.add(tf.keras.layers.Dropout(0.3))

    # Output layer for classification
    model.add(tf.keras.layers.Dense(7, activation='softmax'))

    # Tune the optimizer choice
    optimizer_value = 'adam'

    optimizer = get_optimizer(optimizer_value, learning_rate=0.101)

    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    return model


In [None]:
accuracy_measures = {}
model_config = base_model_config()
model = build_final_model()

reduceLR = tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", patience= 3, verbose= 1, mode='min', factor=  0.2, min_lr = 1e-6)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 5 , verbose=1, mode='min', restore_best_weights= True)
checkpoint = tf.keras.callbacks.ModelCheckpoint('CustomModel.weights.hdf5', monitor='val_loss', verbose=1, save_best_only=True, mode= 'min')
callbacks = [reduceLR, early_stopping,checkpoint]

model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=5, verbose=1)

accuracy_measures["training"] = model.history.history["accuracy"]
accuracy_measures["validation"] = model.history.history["val_accuracy"]

In [None]:
plot_graph(accuracy_measures, "Final CNN Model")

In [None]:
predictions = model.evaluate(test_dataset, batch_size=model_config["BATCH_SIZE"])

In [None]:
print(f"Accuracy: {predictions[1]}, Loss: {predictions[0]}")

In [None]:
# Get the ground truth labels and predicted labels for the validation dataset
y_true = []
y_pred = []

for images, labels in test_dataset:
    y_true.extend(tf.argmax(labels, axis=1).numpy())
    predictions = model.predict(images)
    y_pred.extend(tf.argmax(predictions, axis=1).numpy())


In [None]:
# Calculate classification metrics
classification_metrics = classification_report(y_true, y_pred, target_names=class_names)
print(classification_metrics)

In [None]:
# Calculate confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Print the confusion matrix
print("Confusion Matrix:")
print(conf_matrix)

In [None]:
model.save('New_CNN_model.h5')

### Benchmark

In [None]:
from sklearn.ensemble import RandomForestClassifier
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials

# Define the search space for hyperparameters
space = {
    'n_estimators': hp.quniform('n_estimators', 40, 100, 1),
    # 'max_depth': hp.quniform('max_depth', 2, 20, 1),
    # 'min_samples_split': hp.quniform('min_samples_split', 2, 20, 1),
    # 'min_samples_leaf': hp.quniform('min_samples_leaf', 1, 10, 1),
}

def objective(params):
    # Convert floating-point hyperparameters to integer
    # params['n_estimators'] = int(params['n_estimators'])
    # params['max_depth'] = int(params['max_depth'])
    # params['min_samples_split'] = int(params['min_samples_split'])
    # params['min_samples_leaf'] = int(params['min_samples_leaf'])
    n_estimators = int(params['n_estimators'])

    # Create and train the RandomForestClassifier
    # clf = RandomForestClassifier(**params, random_state=42)
    clf = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
    clf.fit(X_train, y_train)

    # Evaluate the classifier on the validation set
    y_pred = clf.predict(X_val)
    accuracy = accuracy_score(y_val, y_pred)

    return {'loss': -accuracy, 'status': STATUS_OK}

# Hyperband parameters
max_evals = 60  # Maximum number of evaluations
eta = 3  # Reduction factor for the number of configurations

# Set the random seed
np.random.seed(42)

# Hyperparameter optimization using Hyperband
trials = Trials()
best = fmin(fn=objective,
            space=space,
            algo=tpe.suggest,
            max_evals=max_evals,
            trials=trials,
            verbose=1)

print("Best hyperparameters:")
print(best)

# Retrain the model with the best hyperparameters on the full training set
# best_params = space_eval(space, best)
# best_clf = RandomForestClassifier(**best_params, random_state=42)
best_clf = RandomForestClassifier(n_estimators=int(best['n_estimators']), random_state=42)
best_clf.fit(X_train, y_train)
best_clf.fit(X_train, y_train)

In [None]:
y_pred_test = best_clf.predict(X_test)
accuracy_test = accuracy_score(y_test, y_pred_test)
print("Test accuracy with best hyperparameters:", accuracy_test)

In [None]:
from sklearn.ensemble import RandomForestClassifier
# Create and train the RandomForestClassifier
clf = RandomForestClassifier(n_estimators=43, random_state=42)
clf.fit(X_train, y_train)

# Evaluate the classifier on the validation set
y_pred = clf.predict(X_val)
accuracy = accuracy_score(y_val, y_pred)
print(accuracy)

In [None]:
import joblib

model_filename = 'models/model.joblib'
joblib.dump(clf, model_filename, protocol=2)

In [None]:
# Generate classification report (precision, recall, f1-score, support for each class)
report = classification_report(y_val, y_pred)
print("Classification Report:\n", report)

In [None]:
# Calculate confusion matrix
conf_matrix = confusion_matrix(y_val, y_pred)

# Print the confusion matrix
print("Confusion Matrix:")
print(conf_matrix)

In [None]:
# Step 4: Evaluate the Model
accuracy = accuracy_score(y_val, y_pred)
print("Accuracy:", accuracy)

# Generate classification report (precision, recall, f1-score, support for each class)
report = classification_report(y_val, y_pred)
print("Classification Report:\n", report)

In [None]:
from sklearn.svm import SVC

# y_val_not_encode = y_val.argmax(axis=1)

# Build and Train the SVM Model
svm_model = SVC(kernel='rbf', C=1.0, gamma='scale', random_state=42)
# If you have a large dataset, 'kernel='linear'' might not be the best choice due to its time complexity. You can use 'kernel='rbf'' (Radial Basis Function) instead.

svm_model.fit(X_train, y_train_resampled)

# Step 3: Make Predictions
y_pred = svm_model.predict(X_val)


In [None]:
# Step 4: Evaluate the Model
accuracy = accuracy_score(y_val.values.argmax(axis=1), y_pred)
print("Accuracy:", accuracy)

# Generate classification report (precision, recall, f1-score, support for each class)
report = classification_report(y_val, y_pred)
print("Classification Report:\n", report)

from sklearn.metrics import f1_score

# Calculate F1-score
f1 = f1_score(y_val, y_pred, average='weighted')  # Set 'average' to 'weighted' for multi-class classification
print("F1-Score:", f1)

In [None]:
from sklearn.neighbors import KNeighborsClassifier

# Build and Train the KNN Model
k = 7  # Number of neighbors to consider
knn_model = KNeighborsClassifier(n_neighbors=k)

knn_model.fit(X_train, y_train)

# Step 3: Make Predictions
y_pred = knn_model.predict(X_val)

In [None]:
# Step 4: Evaluate the Model
accuracy = accuracy_score(y_val, y_pred)
print("Accuracy:", accuracy)

# Generate classification report (precision, recall, f1-score, support for each class)
report = classification_report(y_val, y_pred)
print("Classification Report:\n", report)

from sklearn.metrics import f1_score

# Calculate F1-score
f1 = f1_score(y_val, y_pred, average='weighted')  # Set 'average' to 'weighted' for multi-class classification
print("F1-Score:", f1)