In [None]:
import tensorflow as tf
import pandas as pd
import os
import matplotlib.pyplot as plt
import cv2
import numpy as np
from PIL import Image as PILImage
import random
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import KFold
from sklearn.utils import shuffle
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.models import load_model
from sklearn.metrics import classification_report


from tensorflow.python.client import device_lib
print(tf.__version__)
print(device_lib.list_local_devices())

DATA_FRACTION = 1
MIN_AGREEMENT = 0

resolution='69'

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
print(physical_devices[0])

In [None]:
df = pd.read_csv('3class_map_a(p).csv')
df = df.drop(df[df['agreement'] < MIN_AGREEMENT].index)
display(df)

In [None]:
mask_E = df['gz2class'].str.startswith('E')
num_E = mask_E.sum()
print("Type E galaxies: " + str(num_E))
mask_S = df['gz2class'].str.match('^S[^B]')
num_S = mask_S.sum()
print("Type S galaxies: " + str(num_S))
mask_SB = df['gz2class'].str.startswith('SB')
num_SB = mask_SB.sum()
print("Type SB galaxies: " + str(num_SB))

In [None]:
min_class = min(num_E, num_S, num_SB)    
print(min_class)
min_class = min_class * 1

if num_E > min_class:
    df_E = df[df["gz2class"].str.startswith('E')].sample(n=min_class)
else:
    df_E = df[df["gz2class"].str.startswith('E')]

if num_S > min_class:
    df_S = df[df["gz2class"].str.match('^S[^B]')].sample(n=min_class)
else:
    df_S = df[df["gz2class"].str.match('^S[^B]')]
    
if num_SB > min_class:
    df_SB = df[df["gz2class"].str.startswith('SB')].sample(n=min_class)
else:
    df_SB = df[df["gz2class"].str.startswith('SB')]
    
df = pd.concat([df_E, df_S, df_SB])
display(df)

In [None]:
mask_E = df['gz2class'].str.startswith('E')
num_E = mask_E.sum()
print("Type E galaxies: " + str(num_E))
mask_S = df['gz2class'].str.match('^S[^B]')
num_S = mask_S.sum()
print("Type S galaxies: " + str(num_S))
mask_SB = df['gz2class'].str.startswith('SB')
num_SB = mask_SB.sum()
print("Type SB galaxies: " + str(num_SB))

In [None]:
print("Train set:")
print("-------------------------")

folder_path_E = "images_E_S_SB_"+resolution+"x"+resolution+"_a_03/images_E_S_SB_"+resolution+"x"+resolution+"_a_03_train/E"
files_names_E = os.listdir(folder_path_E)

folder_path_S = "images_E_S_SB_"+resolution+"x"+resolution+"_a_03/images_E_S_SB_"+resolution+"x"+resolution+"_a_03_train/S"
files_names_S = os.listdir(folder_path_S)

folder_path_SB = "images_E_S_SB_"+resolution+"x"+resolution+"_a_03/images_E_S_SB_"+resolution+"x"+resolution+"_a_03_train/SB"
files_names_SB = os.listdir(folder_path_SB)

photos_train_E = []
for asset_id in df['asset_id']:
    picture_path = os.path.join(folder_path_E, str(asset_id) + '.jpg')
    if os.path.exists(picture_path):
        picture = PILImage.open(picture_path)
        picture = picture.convert("L")
        picture_array = np.array(picture)
        photos_train_E.append((str(asset_id), picture_array))

photos_train_S = []
for asset_id in df['asset_id']:
    picture_path = os.path.join(folder_path_S, str(asset_id) + '.jpg')
    if os.path.exists(picture_path):
        picture = PILImage.open(picture_path)
        picture = picture.convert("L")
        picture_array = np.array(picture)
        photos_train_S.append((str(asset_id), picture_array))

photos_train_SB = []
for asset_id in df['asset_id']:
    picture_path = os.path.join(folder_path_SB, str(asset_id) + '.jpg')
    if os.path.exists(picture_path):
        picture = PILImage.open(picture_path)
        picture = picture.convert("L")
        picture_array = np.array(picture)
        photos_train_SB.append((str(asset_id), picture_array))
        
random.shuffle(photos_train_E)
random.shuffle(photos_train_S)
random.shuffle(photos_train_SB)

photos_train_E = photos_train_E[:int(len(photos_train_E) / DATA_FRACTION)]
photos_train_S = photos_train_S[:int(len(photos_train_S) / DATA_FRACTION)]
photos_train_SB = photos_train_SB[:int(len(photos_train_SB) / DATA_FRACTION)]

print("Elliptical galaxies: " + str(len(photos_train_E)))
print("Spiral galaxies: " + str(len(photos_train_S)))
print("Barred Spiral galaxies: " + str(len(photos_train_SB)))

photos_train = photos_train_E + photos_train_S + photos_train_SB

print("\n")
print("Total galaxies: " + str(len(photos_train)))

In [None]:
print("Test set:")
print("-------------------------")

folder_path_E = "images_E_S_SB_"+resolution+"x"+resolution+"_a_03/images_E_S_SB_"+resolution+"x"+resolution+"_a_03_test/E"
files_names_E = os.listdir(folder_path_E)

folder_path_S = "images_E_S_SB_"+resolution+"x"+resolution+"_a_03/images_E_S_SB_"+resolution+"x"+resolution+"_a_03_test/S"
files_names_S = os.listdir(folder_path_S)

folder_path_SB = "images_E_S_SB_"+resolution+"x"+resolution+"_a_03/images_E_S_SB_"+resolution+"x"+resolution+"_a_03_test/SB"
files_names_SB = os.listdir(folder_path_SB)

photos_test_E = []
for asset_id in df['asset_id']:
    picture_path = os.path.join(folder_path_E, str(asset_id) + '.jpg')
    if os.path.exists(picture_path):
        picture = PILImage.open(picture_path)
        picture = picture.convert("L")
        picture_array = np.array(picture)
        photos_test_E.append((str(asset_id), picture_array))

photos_test_S = []
for asset_id in df['asset_id']:
    picture_path = os.path.join(folder_path_S, str(asset_id) + '.jpg')
    if os.path.exists(picture_path):
        picture = PILImage.open(picture_path)
        picture = picture.convert("L")
        picture_array = np.array(picture)
        photos_test_S.append((str(asset_id), picture_array))

photos_test_SB = []
for asset_id in df['asset_id']:
    picture_path = os.path.join(folder_path_SB, str(asset_id) + '.jpg')
    if os.path.exists(picture_path):
        picture = PILImage.open(picture_path)
        picture = picture.convert("L")
        picture_array = np.array(picture)
        photos_test_SB.append((str(asset_id), picture_array))

random.shuffle(photos_test_E)
random.shuffle(photos_test_S)
random.shuffle(photos_test_SB)
photos_test_E = photos_test_E[:int(len(photos_test_E) / DATA_FRACTION)]
photos_test_S = photos_test_S[:int(len(photos_test_S) / DATA_FRACTION)]
photos_test_SB = photos_test_SB[:int(len(photos_test_SB) / DATA_FRACTION)]

print("Elliptical galaxies: " + str(len(photos_test_E)))
print("Spiral galaxies: " + str(len(photos_test_S)))
print("Barred Spiral galaxies: " + str(len(photos_test_SB)))

photos_test = photos_test_E + photos_test_S + photos_test_SB

print("\n")
print("Total galaxies: " + str(len(photos_test)))

In [None]:
random.shuffle(photos_train)
for i,tupla in enumerate(photos_train[:4]):
    plt.subplot(2,2,i+1)
    plt.imshow(tupla[1], cmap='gray')

In [None]:
df_train = pd.DataFrame(photos_train, columns=['name','photo'])
df_train['test'] = 0
display(df_train)

In [None]:
df_test = pd.DataFrame(photos_test, columns=['name','photo'])
df_test['test'] = 1
display(df_test)

In [None]:
df_concat = pd.concat([df_train, df_test], ignore_index=True)
df_concat['name'] = df_concat['name'].astype(int)
df = df.merge(df_concat, left_on='asset_id', right_on='name',how='inner')
display(df)

In [None]:
X_train = []
y_train = []

X_test = []
y_test = []

for index, row in df.iterrows():
    if row['test'] == 0:
        X_train.append(row['photo'])
        y_train.append(row['gz2class'])
    else:
        X_test.append(row['photo'])
        y_test.append(row['gz2class'])

In [None]:
print(len(X_train))
print(X_train)

In [None]:
print(len(y_train))
#print(y_train)

In [None]:
print(len(X_test))
#print(X_test)

In [None]:
print(len(y_test))
print(y_test)

In [None]:
X_train = np.array(X_train).astype(float) / 255
X_test = np.array(X_test).astype(float) / 255
print(X_train)

In [None]:
def convert_element(element):
    if element.startswith('SB'):
        return 2
    elif element.startswith('S'):
        return 1
    else:
        return 0

In [None]:
y_train = [convert_element(element) for element in y_train]
y_test = [convert_element(element) for element in y_test]
y_train = np.array(y_train)
y_test = np.array(y_test)
print(y_train)
print(y_test)

In [None]:
X_train, y_train = shuffle(X_train, y_train)
X_test, y_test = shuffle(X_test, y_test)

In [None]:
print(len(X_train))
print(len(y_train))
print(len(X_test))
print(len(y_test))

In [None]:
print(y_train)
print(y_test)

In [None]:
datagen = ImageDataGenerator(
    rotation_range=0,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0,
    zoom_range=[0.8,1.2],
    horizontal_flip=True,
    vertical_flip=True
)

X_train = np.expand_dims(X_train, axis=-1)

iterator = datagen.flow(X_train, y_train, batch_size=25)

X_batch, y_batch = iterator.next()

plt.figure(figsize=(10,10))
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.imshow(X_batch[i].reshape(int(resolution),int(resolution)), cmap='gray')
    plt.title('Label: {}'.format(y_batch[i]))
plt.show()

"""
    Label 0 --> E
    Label 1 --> S
    Label 2 --> SB
"""

In [None]:
EPOCHS = 200
BATCH_SIZE = 32
INITIAL_LEARNING_RATE = 0.001
KFOLD_NSPLITS = 5

BEST_MODEL_PATH = 'best_model_min_agreement_'+str(int(MIN_AGREEMENT*100))+'.h5'

kf = KFold(n_splits=KFOLD_NSPLITS, shuffle=True)

results = []
results_E = []
results_S = []
results_SB = []

for train_index, test_index in kf.split(X_train):    
    X_train_kf, X_test_kf = X_train[train_index], X_train[test_index]
    y_train_kf, y_test_kf = y_train[train_index], y_train[test_index]
    
    model = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(8, (3,3), activation='relu', input_shape=(int(resolution), int(resolution), 1)),
        tf.keras.layers.Conv2D(16, (3,3), activation='relu'),
        tf.keras.layers.MaxPooling2D(2,2),
        tf.keras.layers.Conv2D(32, (3,3), activation='relu'),
        tf.keras.layers.Conv2D(64, (3,3), activation='relu'),
        tf.keras.layers.MaxPooling2D(2,2),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(3, activation='softmax')
    ])

    optimizer = tf.keras.optimizers.Adam(learning_rate=INITIAL_LEARNING_RATE)

    model.compile(optimizer=optimizer,
                        loss='sparse_categorical_crossentropy',
                        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
    
    #reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=15, min_lr=0.00001, verbose=1)
    checkpoint = ModelCheckpoint(BEST_MODEL_PATH, monitor='val_sparse_categorical_accuracy', 
                                 verbose=1, save_best_only=True, mode='max')
    
    model.fit(datagen.flow(X_train_kf, y_train_kf, batch_size=BATCH_SIZE),
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        validation_data=(X_test_kf,y_test_kf),
        callbacks=[checkpoint]
    )
    
    scores = model.evaluate(X_test_kf, y_test_kf)
    print("Accuracy: %.2f%%" % (scores[1] * 100))
    
    indexs = (y_test_kf == 0)
    y_test_kf_E = y_test_kf[indexs]
    X_test_kf_E = X_test_kf[indexs]
    scores_E = model.evaluate(X_test_kf_E, y_test_kf_E)
    print("Accuracy E: %.2f%%" % (scores_E[1] * 100))
    
    indexs = (y_test_kf == 1)
    y_test_kf_S = y_test_kf[indexs]
    X_test_kf_S = X_test_kf[indexs]
    scores_S = model.evaluate(X_test_kf_S, y_test_kf_S)
    print("Accuracy S: %.2f%%" % (scores_S[1] * 100))
    
    indexs = (y_test_kf == 2)
    y_test_kf_SB = y_test_kf[indexs]
    X_test_kf_SB = X_test_kf[indexs]
    scores_SB = model.evaluate(X_test_kf_SB, y_test_kf_SB)
    print("Accuracy SB: %.2f%%" % (scores_SB[1] * 100))

    results.append(scores[1] * 100)
    results_E.append(scores_E[1] * 100)
    results_S.append(scores_S[1] * 100)
    results_SB.append(scores_SB[1] * 100)

mean = np.mean(results)
std = np.std(results)
mean_E = np.mean(results_E)
std_E = np.std(results_E)
mean_S = np.mean(results_S)
std_S = np.std(results_S)
mean_SB = np.mean(results_SB)
std_SB = np.std(results_SB)

print('\n')
print("-----------------------------")
print(f"Mean precision: {mean:.2f}")
print(f"Standard deviation: {std:.2f}")
print("-----------------------------")
print(f"Mean precision for E class: {mean_E:.2f}")
print(f"Standard deviation for E class: {std_E:.2f}")
print("-----------------------------")
print(f"Mean precision for S class: {mean_S:.2f}")
print(f"Standard deviation for S class: {std_S:.2f}")
print("-----------------------------")
print(f"Mean precision for SB class: {mean_SB:.2f}")
print(f"Standard deviation for SB class: {std_SB:.2f}")

In [None]:
best_model = load_model(BEST_MODEL_PATH)

y_pred_probs = best_model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)
print(y_pred)
print(y_test)
report = classification_report(y_test, y_pred, zero_division=1)
print(report)

In [None]:
scores = best_model.evaluate(X_test,y_test)
print("General \n%s: %.2f%%" % (best_model.metrics_names[1], scores[1] * 100))

In [None]:
indexs = (y_test == 0)
y_test_E = y_test[indexs]
X_test_E = X_test[indexs]
scores_E = best_model.evaluate(X_test_E,y_test_E)
print("Type E galaxy \n%s: %.2f%%" % (best_model.metrics_names[1], scores_E[1] * 100))

In [None]:
indexs = (y_test == 1)
y_test_S = y_test[indexs]
X_test_S = X_test[indexs]
scores_S = best_model.evaluate(X_test_S,y_test_S)
print("Type S galaxy \n%s: %.2f%%" % (best_model.metrics_names[1], scores_S[1] * 100))

In [None]:
indexs = (y_test == 2)
y_test_SB = y_test[indexs]
X_test_SB = X_test[indexs]
scores_SB = best_model.evaluate(X_test_SB,y_test_SB)
print("Type SB galaxy \n%s: %.2f%%" % (best_model.metrics_names[1], scores_SB[1] * 100))