<a href="https://colab.research.google.com/github/harishk30/RingGalaxiesCNNAnalysis/blob/main/Transfer%20Learn/GAN_Convw_LastLayer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pathlib
import PIL
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.python.keras.layers import Dense, Flatten
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import ResNet50

In [None]:
!unzip /content/drive/MyDrive/TRUE_TEST.zip -d /content/final_test

In [None]:
!unzip /content/drive/MyDrive/RING_GAL_DATA_SORTED_5.zip -d /content/final_sort

In [None]:
model = tf.keras.models.load_model('/content/drive/MyDrive/RingNetSimInception')

In [None]:
model.pop()
model.pop()

In [None]:
model.summary()

In [None]:
model.add(Dense(512, activation = 'relu'))
model.add(Dense(1, activation = 'sigmoid'))

In [None]:
model.layers[0].trainable = False

In [None]:
model.summary()

In [None]:
pip install split-folders tqdm

In [None]:
import splitfolders
splitfolders.ratio('/content/final_sort/RING_GAL_DATA_SORTED_4', output="output", seed=1337, ratio=(0.9, 0.1)) 

In [None]:
basetrain_dir = '/content/output/train/'
training_dir = pathlib.Path(basetrain_dir)
baseval_dir = '/content/output/val/'
validation_dir = pathlib.Path(baseval_dir)
basetest_dir = '/content/drive/MyDrive/test/'
test_dir = pathlib.Path(basetest_dir)

In [None]:
batch_size = 32
img_height = 256
img_width = 256

In [None]:
import os
from PIL import Image
folder_path = r'/content/output/train'
i = 0
extensions = []
for fldr in os.listdir(folder_path):
    sub_folder_path = os.path.join(folder_path, fldr)
    for filee in os.listdir(sub_folder_path):
        try:
          file_path = os.path.join(sub_folder_path, filee)
          print('** Path: {}  **'.format(file_path), end="\r", flush=True)
          im = Image.open(file_path)
        except:
          os.remove(file_path)
          i += 1
          print(i)

In [None]:
from keras.preprocessing.image import ImageDataGenerator
data_aug = ImageDataGenerator(rescale=1./255, horizontal_flip=True, width_shift_range = 0.1, height_shift_range = 0.1, 	shear_range=1.2, zoom_range=0.25, rotation_range=360)

In [None]:
test_gen = ImageDataGenerator(rescale=1./255)

In [None]:
train_ds = data_aug.flow_from_directory(
	training_dir,  
	target_size=(img_height, img_width),  
	batch_size = batch_size,
	class_mode='binary')

In [None]:
validation_ds = test_gen.flow_from_directory(
	validation_dir,
	target_size=(img_height, img_width),
	batch_size = batch_size,
	class_mode='binary')

In [None]:
confirmation_generator = test_gen.flow_from_directory(
	test_dir,
	target_size = (img_height, img_width),
	batch_size = batch_size,
	class_mode=None,
	shuffle=False)

In [None]:
#differential learning rate
optimizers_and_layers = [(Adam(learning_rate = 0.00001), model.layers[0]), (Adam(learning_rate = 0.001), model.layers[1:])]

In [None]:
pip install tensorflow_addons

In [None]:
import tensorflow_addons as tfa
model.compile(optimizer = tfa.optimizers.MultiOptimizer(optimizers_and_layers), loss = 'binary_crossentropy', metrics = ['accuracy'])

In [None]:
epochs = 1000
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
history = model.fit(
    train_ds,
    validation_data = validation_ds,
    epochs = epochs,
    callbacks = [callback]
)

In [None]:
model.compile(optimizer = Adam(learning_rate = 0.0000001), loss = 'binary_crossentropy', metrics = ['accuracy'])

In [None]:
model.trainable = True

In [None]:
fine_tune_epochs = 10
total_epochs =  epochs + fine_tune_epochs
history_fine = model.fit(train_ds,
                         epochs=total_epochs,
                         initial_epoch=history.epoch[-1],
                         validation_data=validation_ds)

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

acc += history_fine.history['accuracy']
val_acc += history_fine.history['val_accuracy']

loss += history_fine.history['loss']
val_loss += history_fine.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.ylim([0.8, 1])
plt.plot([342-1,342-1],
          plt.ylim(), label='Start Fine Tuning')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.ylim([0, 1.0])
plt.plot([342-1,342-1],
         plt.ylim(), label='Start Fine Tuning')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

In [None]:
def get_predictions(generator,model):
	test_steps_per_epoch = np.math.ceil(generator.samples / generator.batch_size)
	predictions = model.predict_generator(generator, steps=test_steps_per_epoch)
	predictions = [i[0] for i in predictions]
	predictions = np.array(predictions)
	return predictions


def prediction_report(predictions,generator,cutoff,chatter=False):
	predicted_classes = np.copy(predictions)
	predicted_classes[predicted_classes>cutoff] = 1.0
	predicted_classes[predicted_classes<cutoff] = 0.0 
	true_classes = generator.classes
	N_ring = len(np.where(true_classes>0.5)[0])
	N_normal = len(np.where(true_classes<0.5)[0])
	tn, fp, fn, tp = confusion_matrix(true_classes,predicted_classes).ravel()
	matrix = confusion_matrix(true_classes,predicted_classes)
	if chatter:
		print('True Normal predictions: {} | ({} percent)'.format(tn,(tn * 100)/N_normal))
		print('True Ring predictions: {} | ({} percent)'.format(tp,(tp * 100)/N_ring))
		print('False Normal predictions (actually Ring): {} | ({} percent)'.format(fn,(fn * 100)/N_ring))
		print('False Ring predictions (actually Normal): {} | ({} percent)'.format(fp,(fp * 100)/N_normal))
	return(tn, fp, fn, tp)

def MCC(theta):
	tn, fp, fn, tp = theta
	numerator = (tp*tn) - (fp*fn)
	denomenator = np.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn))
	MCC = numerator / denomenator
	return MCC

def AUC(predictions,generator):
	thresholds = np.linspace(0.0,1,20)
	tprs = []
	fprs = []
	for i in thresholds:
		tn, fp, fn, tp = prediction_report(predictions,generator,cutoff=i,chatter=False)
		TPR = tp / (fn+tp)
		FPR = fp / (fp+tn)
		tprs.append(TPR)
		fprs.append(FPR)
	tprs = np.array(tprs)[::-1]
	fprs = np.array(fprs)[::-1]
	plt.plot(fprs,tprs)
	plt.plot([0,1],[0,1],'k')
	area = np.trapz(tprs,x=fprs)
	print("Area under the curve: {}".format(area))
	return area


In [None]:
from sklearn.metrics import confusion_matrix
predictions = get_predictions(confirmation_generator,model)
theta = prediction_report(predictions,confirmation_generator,cutoff=0.5,chatter=True)
mcc = MCC(theta)
print('MCC: ',mcc)
area_under_curve = AUC(predictions,confirmation_generator)
#print(area_under_curve)

In [None]:
import tensorflow as tf
import tensorflow_hub as hub

m = tf.keras.Sequential([hub.KerasLayer("/content/drive/MyDrive/ConvBlockLastLayer", trainable=True)])
m.build([None, 256, 256, 3])
m.summary()

In [None]:
model.save("ConvBlockLastLayer")

In [None]:
model.evaluate(confirmation_generator)

In [None]:
from keras.utils.vis_utils import plot_model
plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True, expand_nested = True)