# Galaxy and Non-Galaxy Classifiers 

## 1. Base on Zernike moments:

In [None]:
#Import packages

from sklearn.svm import SVC
from sklearn import metrics
import pickle
import pandas as pd
import numpy as np
import pickle
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")

#Tensorflow
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense
from tensorflow.keras.layers import (Dense, Dropout,BatchNormalization, Input, Conv1D, Flatten,
                             MaxPooling1D)
from keras.utils import to_categorical
from keras.callbacks import EarlyStopping


#Scikit_learn
from sklearn.model_selection import train_test_split
from sklearn.metrics import (accuracy_score, classification_report,
                             ConfusionMatrixDisplay, confusion_matrix)


In [None]:
# If using Google Colab.

from google.colab import drive
drive.mount('/content/drive')

### Compute the ZMs:

##### The ZEMO python package [https://pypi.org/project/ZEMO/] [https://github.com/hmddev1/ZEMO] can be used to compute Zernike moments (Zms) for your images. This library is described in a research paper you can find online [[arxiv:2308.13562](https://arxiv.org/abs/2308.13562v2)].

In [None]:
from ZEMO import zemo
import os
import cv2

image_size = 200
zernike_order = 45

ZBFSTR = zemo.zernike_bf(image_size, zernike_order, 1)    

- To use our image data use these directories from the repo.:
        
        - galaxy: 
        - non-galaxy:

In [None]:
path = r'/path/to/your/directory/galaxy' 

image_files = [os.path.join(path, filename) for filename in os.listdir(path) if filename.endswith('.jpg')]
len(image_files)

In [None]:
ZZ = []

for img_path in image_files:
    image = cv2.imread(img_path)
    resized_image = cv2.resize(image, (image_size, image_size))
    im = resized_image[:,:,0]
    Z = np.abs(zemo.zernike_mom(np.array(im), ZBFSTR))
    ZZ.append(Z)


# Save the ZMs as data frame
df = pd.DataFrame(ZZ)
df.to_csv('/path/to/your/directory/galaxy_zms.csv')

#### Read the Zernike moments data:

- To use our Zernike moments data, use these directories from the repo.:

        - galaxy: 
        - non-galaxy: 

In [None]:
galaxy_zm = pd.read_csv('/path/to/your/direcotry/galaxy_zms.csv')
nongalaxy_zm = pd.read_csv('/path/to/your/direcotry/non_galaxy_zms.csv')

galaxy_zm.drop("Unnamed: 0", axis = 1, inplace = True)
nongalaxy_zm.drop("Unnamed: 0", axis = 1, inplace = True)

zmg = np.array(galaxy_zm)
zmng = np.array(nongalaxy_zm)

all_zm_data = np.concatenate([zmg,zmng])
len(zmg), len(zmng), len(all_zm_data)

#### Defining the labels due to the length of zernike moment features 


In [None]:
galaxies_labels = np.zeros(len(zmg))
nongalaxy_labels = np.ones(len(zmng))
all_labels = np.concatenate([galaxies_labels, nongalaxy_labels])
len(all_labels)

#### Fit the SVM model for 10 iterations and calculate the standard deviation.
##### (You can save the performance materials and the models outputs.) 

In [None]:
y_test_list=[]
models=[]
Y_pred=[]
accs=[]
cons=[]
test_indx = []

for i in range (10):
    X_train, X_test, y_train, y_test, train_indices, test_indices = train_test_split(all_zm_data, all_labels, np.arange(len(all_labels)), test_size=0.25, shuffle=True, random_state=None)
    y_test_list.append(y_test)
    test_indx.append(test_indices)

    class_weights = {0: len(all_zm_data) / (2*len(zmg)), 1: len(all_zm_data) / (2*len(zmng))}

    model = SVC(kernel='rbf', probability=True, C=1.5, gamma='scale',class_weight=class_weights)
    gz2_training_model = model.fit(X_train, y_train)
    models.append(gz2_training_model)

    y_pred = model.predict(X_test)
    Y_pred.append(y_pred)

    con0 = metrics.confusion_matrix(y_test, y_pred)
    cons.append(con0)

    acc = metrics.accuracy_score(y_test, y_pred)
    accs.append(acc)

In [None]:
print(np.mean(accs), np.std(accs))
plt.plot(accs)

#### Save any output you need.


In [None]:
output_el_path = '/path/to/your/direcotry'
import os
pickle_el_filename = 'performances_of_galaxy_nonegalaxy.pickle'
pickle_el_filepath = os.path.join(output_el_path, pickle_el_filename)

with open(pickle_el_filepath, 'wb') as pickle_file:
    pickle.dump(accs, pickle_file)   

#### Fit the 1D-CNNmodel for 10 iterations and calculate the standard deviation.
#### (You can save the performance materials and the models outputs.)

In [None]:


y_test_list=[]
models=[]
Y_pred=[]
accs=[]
cons=[]
test_indx = []

batch_size = 64
NUM_EPOCH = 30

for i in range (10):
    X_train, X_test, y_train, y_test, train_indices, test_indices = train_test_split(all_zm_data, all_labels, np.arange(len(all_labels)), test_size=0.25, shuffle=True, random_state=None)
    y_test_list.append(y_test)
    test_indx.append(test_indices)

    class_weights = {0: len(all_zm_data) / (2*len(zmg1)), 1: len(all_zm_data) / (2*len(zms1))}
    y_train_encoded = to_categorical(y_train, num_classes=2)

    # input value due to the zernike order of 45 (1081 features)
    x = Input(shape=(1081,1))

    #hidden layers
    c0 = Conv1D(256, kernel_size=3, strides=2, padding="same")(x)
    b0 = BatchNormalization()(c0)
    m0 = MaxPooling1D(pool_size=2)(b0)
    d0 = Dropout(0.1)(m0)

    c1 = Conv1D(128, kernel_size=3, strides=2, padding="same")(d0)
    b1 = BatchNormalization()(c1)
    m1 = MaxPooling1D(pool_size=2)(b1)
    d1 = Dropout(0.1)(m1)

    c2 = Conv1D(64, kernel_size=3, strides=2, padding="same")(d1)
    b2 = BatchNormalization()(c2)
    m2 = MaxPooling1D(pool_size=2)(b2)
    d2 = Dropout(0.1)(m2)

    f = Flatten()(d2)

    # output
    de0 = Dense(64, activation='relu')(f)
    de1 = Dense(32, activation='relu')(de0)
    de2 = Dense(2, activation='softmax')(de1)

    model = Model(inputs=x, outputs=de2, name="cnn_zm_45_galaxy_nonegalaxy")
    loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
    optimizer = tf.keras.optimizers.Adam()
    model.compile(loss=loss, optimizer=optimizer, metrics=['accuracy'])

    # Callback Functions
    es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

    history = model.fit(
    X_train, y_train_encoded,
    batch_size=batch_size,
    epochs=NUM_EPOCH,
    class_weight=class_weights,
    verbose = 1,
    callbacks=es,
    validation_split=0.1
    )
    models.append(history)

    y_pred = model.predict(X_test)
    y_pred_labels = np.argmax(y_pred, axis=1)
    Y_pred.append(y_pred_labels)

    con0 = metrics.confusion_matrix(y_test, y_pred_labels)
    cons.append(con0)

    acc = metrics.accuracy_score(y_test, y_pred_labels)
    accs.append(acc)

In [None]:
print(np.mean(accs), np.std(accs))
plt.plot(accs)

#### Save any output you need.

In [None]:
output_el_path = '/path/to/your/direcotry'
import os
pickle_el_filename = 'performances_of_galaxy_nonegalaxy.pickle'
pickle_el_filepath = os.path.join(output_el_path, pickle_el_filename)

with open(pickle_el_filepath, 'wb') as pickle_file:
    pickle.dump(accs, pickle_file)   

## 2. Base on original images 
- (Vision Transformers used as data augmentation tools on the Galaxy and Non-Galaxy images.)

In [None]:
# import packages 

import cv2
import os
import numpy as np
import random
from PIL import Image
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")

from sklearn.model_selection import train_test_split

#Tensorflow
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras import Sequential
from tensorflow.keras.layers import (Dense, Dropout, Input,Conv2D, Flatten,
                             MaxPooling2D,BatchNormalization)
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.applications import ResNet50, VGG16
from keras.callbacks import EarlyStopping

#Torch
import torch
import torch.nn as nn
from torchvision import transforms

### The load_galaxy_images functions uses for read and pre-process the input images

In [None]:
def load_galaxy_images(data_dir, target_size):
        
        """
        Loads, resizes, and processes all JPG images from the specified directory.

        Parameters:
        data_dir (str): The directory containing the JPG images to be processed.
        target_size (tuple): The target size for resizing the images, specified as (width, height).

        Returns:
        list: A list of PIL Image objects, each representing a resized and processed image.

        The function performs the following steps:
        1. Lists all JPG image files in the specified directory.
        2. Reads each image using OpenCV.
        3. Resizes each image to the specified target size.
        4. Scales the pixel values and converts the image to a format compatible with PIL.
        5. Converts each resized image to a PIL Image object.
        6. Appends each PIL Image object to a list.
        7. Returns the list of PIL Image objects.
        """

        all_images = []

        file_path = [os.path.join(data_dir, filename) for filename in os.listdir(data_dir) if filename.endswith('.jpg')]

        for img in file_path:
            image = cv2.imread(img)
            resized_images=cv2.resize(image, target_size)
            resized_images = (resized_images * 255).astype(np.uint8)
            pil_images = Image.fromarray(resized_images)
            all_images.append(pil_images)

        return all_images

- To use our image data use these directories from the repo.:
        
        - galaxy: 
        - non-galaxy:

In [None]:
g_im = '/path/to/your/direcotry/galaxy'
ng_im = '/path/to/your/direcotry/non_galaxy'

categories = ['g', 'ng']

image_size = 200

g_img = load_galaxy_images(g_im, target_size=(image_size,image_size))
ng_img = load_galaxy_images(ng_im, target_size=(image_size,image_size))

all_data = g_img + ng_img
np.shape(all_data)

### vision transformer

In [None]:
# transforms for training data
train_transform = transforms.Compose([transforms.CenterCrop(image_size),
                                      transforms.RandomRotation(90),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.RandomVerticalFlip(),
                                      transforms.RandomResizedCrop(image_size, scale=(0.8, 1.0), ratio=(0.99, 1.01)),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                      ])


# transforms for test data
test_transform = transforms.Compose([transforms.CenterCrop(image_size),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                      ])

In [None]:
galaxies_labels = np.zeros(len(g_img))
nongalaxy_labels = np.ones(len(ng_img))

all_labels = np.concatenate([galaxies_labels, nongalaxy_labels])
len(all_labels)

#### Fit the classis CNN layers for 10 iterations and calculate the standard deviation.
#### (You can save the performance materials and the models outputs.)

In [None]:
y_test_list=[]
models=[]
Y_pred=[]
accs=[]
cons=[]
test_indx=[]

b_size = 64
e_num = 30

for i in range (10):
    X_train, X_test, y_train, y_test, train_indices, test_indices = train_test_split(all_data, all_labels, np.arange(len(all_labels)), test_size=0.25, shuffle=True, random_state=None)
    y_test_list.append(y_test)
    test_indx.append(test_indices)

    y_train_encoded = to_categorical(y_train, num_classes=2)
    class_weights = {0: len(all_data) / (2*len(g_img)), 1: len(all_data) / (2*len(ng_img))}

    # Transformer for training data
    transformed_X_train=[]
    for i in range(len(X_train)):
      transformed_train_images = train_transform(X_train[i])
      new_image = np.transpose(transformed_train_images, (1, 2, 0))
      transformed_X_train.append(new_image)

    # Transformer for testing data
    transformed_X_test=[]
    for j in range(len(X_test)):
      transformed_test_images = test_transform(X_test[j])
      new_images = np.transpose(transformed_test_images, (1, 2, 0))
      transformed_X_test.append(new_images)

    # input
    x = Input(shape=(image_size,image_size,3))

    #hidden layers
    c0 = Conv2D(256, kernel_size=(3,3), strides=(1,1), padding="same")(x)
    b0 = BatchNormalization()(c0)
    m0 = MaxPooling2D(pool_size=(2, 2))(b0)
    d0 = Dropout(0.1)(m0)

    c1 = Conv2D(128, kernel_size=(3,3), strides=(1,1), padding="same")(m0)
    b1 = BatchNormalization()(c1)
    m1 = MaxPooling2D(pool_size=(2, 2))(b1)
    d1 = Dropout(0.1)(m1)

    c2 = Conv2D(64, kernel_size=(3,3), strides=(1,1), padding="same")(m1)
    b2 = BatchNormalization()(c2)
    m2 = MaxPooling2D(pool_size=(2, 2))(b2)
    d2 = Dropout(0.1)(m2)

    f = Flatten()(m2)

    # output layers
    de0 = Dense(64, activation='relu')(f)
    de1 = Dense(32, activation='relu')(de0)
    de2 = Dense(2, activation='softmax')(de1)

    model = Model(inputs=x, outputs=de2, name="cnn_transformer_galaxy_nonegalaxy")
    loss = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
    optimizer = tf.keras.optimizers.Adam()
    model.compile(loss=loss, optimizer=optimizer, metrics=['accuracy'])

    # Callback Functions
    es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

    history = model.fit(
    np.array(transformed_X_train), y_train_encoded,
    batch_size=b_size,
    epochs=e_num,
    verbose = 1,
    class_weight=class_weights,
    callbacks=es,
    validation_split=0.1
    )
    models.append(history)

    y_pred = model.predict(np.array(transformed_X_test))
    y_pred_labels = np.argmax(y_pred, axis=1)
    Y_pred.append(y_pred_labels)

    con0 = metrics.confusion_matrix(y_test, y_pred_labels)
    cons.append(con0)

    acc = metrics.accuracy_score(y_test, y_pred_labels)
    accs.append(acc)

In [None]:
print(np.mean(accs), np.std(accs))
plt.plot(accs)

#### Save any output you need.


In [None]:

output_el_path = '/path/to/your/direcotry'
import os
pickle_el_filename = 'performances_of_galaxy_nonegalaxy.pickle'
pickle_el_filepath = os.path.join(output_el_path, pickle_el_filename)

with open(pickle_el_filepath, 'wb') as pickle_file:
    pickle.dump(accs, pickle_file)

#### Fit the ResNet50 model for 10 iterations and calculate the standard deviation.
#### (You can save the performance materials and the models outputs.)

In [None]:
y_test_list=[]
models=[]
Y_pred=[]
accs=[]
cons=[]
test_indx=[]

b_size = 64
e_num = 30

for i in range (10):
    X_train, X_test, y_train, y_test, train_indices, test_indices = train_test_split(all_data, all_labels, np.arange(len(all_labels)), test_size=0.25, shuffle=True, random_state=None)
    y_test_list.append(y_test)
    test_indx.append(test_indices)

    y_train_encoded = to_categorical(y_train, num_classes=2)
    class_weights = {0: len(all_data) / (2*len(g_img)), 1: len(all_data) / (2*len(ng_img))}

    # Training data
    transformed_X_train=[]
    for i in range(len(X_train)):
      transformed_train_images = train_transform(X_train[i])
      new_image = np.transpose(transformed_train_images, (1, 2, 0))
      transformed_X_train.append(new_image)

    # Testing data
    transformed_X_test=[]
    for j in range(len(X_test)):
      transformed_test_images = test_transform(X_test[j])
      new_images = np.transpose(transformed_test_images, (1, 2, 0))
      transformed_X_test.append(new_images)

    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(image_size, image_size, 3))

    x = Flatten()(base_model.output)
    x = Dense(64, activation='relu')(x)  # Add your custom layers here
    output = Dense(2, activation='softmax')(x)  # 3 classes, so 3 output units with softmax activation

    model = Model(inputs=base_model.input, outputs=output)

    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

    history = model.fit(
    np.array(transformed_X_train), y_train_encoded,
    batch_size=b_size,
    epochs=e_num,
    verbose = 1,
    callbacks=es,
    class_weight=class_weights,
    validation_split=0.1
    )
    models.append(history)

    y_pred = model.predict(np.array(transformed_X_test))
    y_pred_labels = np.argmax(y_pred, axis=1)
    Y_pred.append(y_pred_labels)

    con0 = metrics.confusion_matrix(y_test, y_pred_labels)
    cons.append(con0)

    acc = metrics.accuracy_score(y_test, y_pred_labels)
    accs.append(acc)

#### Fit the VGG16 model for 10 iterations and calculate the standard deviation.
#### (You can save the performance materials and the models outputs.)

In [None]:
y_test_list=[]
models=[]
Y_pred=[]
accs=[]
cons=[]
test_indx=[]

b_size = 64
e_num = 30

for i in range (10):
    X_train, X_test, y_train, y_test, train_indices, test_indices = train_test_split(all_data, all_labels, np.arange(len(all_labels)), test_size=0.25, shuffle=True, random_state=None)
    y_test_list.append(y_test)
    test_indx.append(test_indices)

    y_train_encoded = to_categorical(y_train, num_classes=2)
    class_weights = {0: len(all_data) / (2*len(g_img)), 1: len(all_data) / (2*len(ng_img))}

    # Training data
    transformed_X_train=[]
    for i in range(len(X_train)):
      transformed_train_images = train_transform(X_train[i])
      new_image = np.transpose(transformed_train_images, (1, 2, 0))
      transformed_X_train.append(new_image)

    # Testing data
    transformed_X_test=[]
    for j in range(len(X_test)):
      transformed_test_images = test_transform(X_test[j])
      new_images = np.transpose(transformed_test_images, (1, 2, 0))
      transformed_X_test.append(new_images)

    base_model = VGG16(weights='imagenet', include_top=False, input_shape=(image_size, image_size, 3))

    x = Flatten()(base_model.output)
    x = Dense(64, activation='relu')(x)  # Add your custom layers here
    output = Dense(2, activation='softmax')(x)  # 3 classes, so 3 output units with softmax activation

    model = Model(inputs=base_model.input, outputs=output)

    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

    history = model.fit(
    np.array(transformed_X_train), y_train_encoded,
    batch_size=b_size,
    epochs=e_num,
    verbose = 1,
    callbacks=es,
    class_weight=class_weights,
    validation_split=0.1
    )
    models.append(history)

    y_pred = model.predict(np.array(transformed_X_test))
    y_pred_labels = np.argmax(y_pred, axis=1)
    Y_pred.append(y_pred_labels)

    con0 = metrics.confusion_matrix(y_test, y_pred_labels)
    cons.append(con0)

    acc = metrics.accuracy_score(y_test, y_pred_labels)
    accs.append(acc)