In [None]:
# flag = 0

# if flag == 0:
#   !pip install pandas numpy scikit-learn seaborn matplotlib scipy nltk tensorflow keras transformers
#   flag = 1

import tensorflow as tf
import pandas as pd
import numpy as np
import math
import seaborn as sns
import matplotlib.pyplot as plt
from tensorflow import keras
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Embedding
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.layers import concatenate, Concatenate
from keras.layers import Input, Embedding, Conv1D, Conv2D, GlobalMaxPooling1D, GlobalAveragePooling1D, Flatten, MaxPooling1D, MaxPooling2D, Dense, Dropout, Reshape
from keras.models import Model
from sklearn.metrics import accuracy_score
import transformers
import requests
import zipfile
import io
import os
import glob
import re
from keras.models import load_model
from sklearn.preprocessing import LabelEncoder, StandardScaler
from tensorflow.keras.callbacks import EarlyStopping
from keras.optimizers.legacy import Adam
from sklearn.metrics import mean_squared_error
import time
from datetime import datetime

#### Read the dataset

In [None]:
df = pd.read_csv('SDSS_DR18.csv')
df.head()

#### Check column types

In [None]:
df.info()

#### Check duplicates

In [None]:
num_duplicate_rows = df.duplicated().sum()

print("Number of duplicate rows:", num_duplicate_rows)

#### Check for missing values

In [None]:
column_names = df.columns.values.tolist()

print("Column name \t Count of missing values \t Percentage of missing value to total rows")
for col in column_names:
    count_nan = df[col].isnull().sum()
    pct_nan = count_nan / len(df) * 100
    if col in ['ra', 'dec', 'u', 'g', 'r', 'i', 'z']:
        print(col + " - \t\t\t" + str(count_nan) + " \t\t\t\t" + str(round(pct_nan, 2)) + "%")
    else:
        print(col + " - \t\t" + str(count_nan) + " \t\t\t\t" + str(round(pct_nan, 2)) + "%")

#### Data split for model training

In [None]:
X = df.drop(columns=['class', 'objid', 'specobjid'])
y = df['class']

In [None]:
print(X.shape)
print(y.shape)

#### Scaling all the numerical data to  standard scale

In [None]:
scaler = StandardScaler()
X = scaler.fit_transform(X)

#### Dataset split into train, test and validate

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.25, random_state=42)

In [None]:
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)

#### Label encoding target feature

In [None]:
num_classes = 3

In [None]:
label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(y_train)
y_val = label_encoder.transform(y_val)
y_test = label_encoder.transform(y_test)

In [None]:
class_mapping = dict(zip(label_encoder.classes_, range(len(label_encoder.classes_))))
galaxy_index = class_mapping['GALAXY']
qso_index = class_mapping['QSO']
star_index = class_mapping['STAR']
print("Galaxy class index:", galaxy_index)
print("QSO class index:", qso_index)
print("Star class index:", star_index)

#### One hot encoding target variable

In [None]:
y_train_one_hot = to_categorical(y_train, num_classes)
y_val_one_hot = to_categorical(y_val, num_classes)
y_test_one_hot = to_categorical(y_test, num_classes)

#### Checking number of instances for train, test and validate

In [None]:
X_train_rows = X_train.shape[0]
X_test_rows = X_test.shape[0]
X_val_rows = X_val.shape[0]
y_train_rows = y_train.shape[0]
y_test_rows = y_test.shape[0]
y_val_rows = y_val.shape[0]

print("Input for train:", X_train_rows)
print("Input for test:", X_test_rows)
print("Input for validation:", X_val_rows)
print("Target for train:", y_train_rows)
print("Target for test:", y_test_rows)
print("Target for validation:", y_val_rows)

#### Checking shape of input features

In [None]:
print("X_train shape:", X_train.shape)
print("X_val shape:", X_val.shape)
print("X_test shape:", X_test.shape)

In [None]:
from keras import backend as K

def precision(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def recall(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    actual_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    return true_positives / (actual_positives + K.epsilon())

def f1_score(y_true, y_pred):
    precision_value = precision(y_true, y_pred)
    recall_value = recall(y_true, y_pred)
    return 2 * ((precision_value * recall_value) / (precision_value + recall_value + K.epsilon()))

def fnr(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    false_negatives = K.sum(K.round(K.clip(y_true * (1 - y_pred), 0, 1)))
    return false_negatives / (true_positives + false_negatives + K.epsilon())


## Base CNN Xb training

In [None]:
def train_and_evaluate_model(X_train, y_train, X_val, y_val, num_classes):
    model = Sequential()
    model.add(Conv1D(64, 3, activation='relu', input_shape=(33, 1)))
    model.add(MaxPooling1D(2))
    model.add(Conv1D(128, 3, activation='relu'))
    model.add(MaxPooling1D(2))
    model.add(GlobalAveragePooling1D())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))  # Add dropout to prevent overfitting
    model.add(Dense(num_classes, activation='softmax'))

    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy', precision, recall, f1_score, fnr])

    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

    history = model.fit(X_train, y_train, batch_size=32, epochs=10, validation_data=(X_val, y_val), callbacks=[early_stopping])

    return model, history

In [None]:
model, history = train_and_evaluate_model(X_train, y_train_one_hot, X_val, y_val_one_hot, num_classes)

In [None]:
def evaluate_model(model, X_test, y_test):
    # Evaluate the model on the test set
    test_loss, test_accuracy, test_precision, test_recall, test_f1_score, test_fnr = model.evaluate(X_test, y_test)

    print(f"Test Loss: {test_loss}")
    print(f"Test Accuracy: {test_accuracy}")
    print(f"Test Precision: {test_precision}")
    print(f"Test Recall: {test_recall}")
    print(f"Test F1 Score: {test_f1_score}")
    print(f"Test FNR: {test_fnr}")

    # Get predictions
    predictions = model.predict(X_test)
    predicted_classes = np.argmax(predictions, axis=1)
    true_classes = np.argmax(y_test, axis=1)

    # Confusion Matrix
    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(true_classes, predicted_classes)
    print("Confusion Matrix:\n", cm)

    return test_loss, test_accuracy, test_precision, test_recall, test_f1_score, test_fnr, cm

evaluate_model(model, X_test, y_test_one_hot)


In [None]:
def plot_test_metrics(test_accuracy, test_precision, test_recall, test_f1_score, test_fnr, confusion_matrix):
    # Plotting metrics
    metrics = ['Accuracy', 'Precision', 'Recall', 'F1 Score', 'FNR']
    values = [test_accuracy, test_precision, test_recall, test_f1_score, test_fnr]
    
    plt.figure(figsize=(12, 6))

    # Bar plot for metrics
    plt.subplot(1, 2, 1)
    bars = sns.barplot(x=metrics, y=values)
    plt.title('Test Metrics')
    plt.ylabel('Value')
    
    # Annotate values on top of bars
    for bar in bars.patches:
        bars.annotate(format(bar.get_height(), '.2f'), 
                      (bar.get_x() + bar.get_width() / 2, 
                       bar.get_height()), ha='center', va='center',
                       size=10, xytext=(0, 8),
                       textcoords='offset points')

    plt.xticks(rotation=45)

    # Confusion Matrix
    if confusion_matrix is not None:
        plt.subplot(1, 2, 2)
        sns.heatmap(confusion_matrix, annot=True, fmt='d', cmap='Blues')
        plt.title('Confusion Matrix')
        plt.xlabel('Predicted Label')
        plt.ylabel('True Label')

    plt.tight_layout()
    plt.show()

test_loss, test_accuracy, test_precision, test_recall, test_f1_score, test_fnr, cm = evaluate_model(model, X_test, y_test_one_hot)

plot_test_metrics(test_accuracy, test_precision, test_recall, test_f1_score, test_fnr, cm)
