<a href="https://colab.research.google.com/github/crazylazylife/galaxy_zoo_classification_project/blob/master/galaxy_zoo_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import files
!pip install -q kaggle

In [0]:
%cd ../root

/root


In [0]:
!mkdir .kaggle
%cd .kaggle
uploading = files.upload()

In [0]:
%cd ../../content

In [0]:
!kaggle competitions download -c galaxy-zoo-the-galaxy-challenge

In [0]:
!unzip training_solutions_rev1.zip
!unzip images_training_rev1.zip

-----------------------------------------------------------------------------
Beginning preprocessing, developing and training the ResNet model for the task of classification

In [0]:
#Importing the libraries
import numpy as np
import pandas as pd
import cv2
import tensorflow as tf
from keras import layers
from keras.layers import Input, Add, ZeroPadding2D, Flatten, AveragePooling2D, MaxPooling2D, Conv2D, Activation, BatchNormalization, GlobalAveragePooling2D, GlobalMaxPooling2D, Dense, Dropout
from keras.activations import relu, softmax
from keras.models import Model, load_model
from keras import regularizers
from keras.preprocessing import image
from keras.utils import layer_utils
from keras.utils.data_utils import get_file
from keras.initializers import glorot_uniform
from keras.callbacks import ModelCheckpoint, TensorBoard

import keras.backend as K
import os
import glob
from sklearn.model_selection import train_test_split
import gc

from matplotlib.pyplot import imshow
%matplotlib inline

Using TensorFlow backend.


In [0]:
#Inputing the instance of training running
check = int(input("Enter training instance: "))

Enter training instance: 4


In [0]:
#Defining the hyperparameters
num_epochs = 20   #200 is too much. 20 is fine.
batch_size = 32
set_size = 5000
final_width = 224
final_height = 224
split_ratio = 0.8

In [0]:
'''
This part processes the images by first cropping them to 312x312 size and then resizing them
to final_width*final_height images. 
First we read in some of the values in training_solutions_rev1.csv as defined by the set_size.
'''
Y_full = pd.read_csv('training_solutions_rev1.csv')
Y_full = Y_full[set_size*(check-1):set_size*(check)]
print(Y_full.head())
Y_train, Y_test = train_test_split(Y_full, test_size=(1 - split_ratio))
train_l = len(Y_train)
test_l = len(Y_test)
del Y_full
gc.collect()

X_train = np.zeros((train_l, final_width, final_height, 3))
i = 0
for id in Y_train["GalaxyID"]:
  name = 'images_training_rev1/'+str(id)+'.jpg'
  img = cv2.imread(name)
  img = img[56:368, 56:368]
  img = cv2.resize(img, (final_width, final_height))  
  X_train[i] = img
  i+=1
  
del train_l
gc.collect()

X_test = np.zeros((test_l, final_width, final_height, 3))
i = 0  
for id in Y_test["GalaxyID"]:
  name = 'images_training_rev1/'+str(id)+'.jpg'
  img = cv2.imread(name)
  img = img[56:368, 56:368]
  img = cv2.resize(img, (final_width, final_height))  
  X_test[i] = img
  i+=1
  

del test_l
gc.collect()

Y_train.drop(columns=["GalaxyID"])
Y_test.drop(columns=["GalaxyID"])

'''
At the end of this cell we have four important variables, namely: 
X_train and X_test --> containing the training and testing images as lists of images.
Y_train and Y_test --> containing the training and testing outputs for the images as pandas dataFrames.
'''

In [0]:
#Building the ResNet Architecture

class ResNet:
  
  def __init__(self):
    pass
  
  def identity_block(self, X, f_shape, n_filters, stage, block_name):
    '''
    X -> Input tensor 
    f_shape -> Integer, denoting the shape of the middle filter
    n_filters -> List, denoting the number of filters in each layer
    stage -> Integer, denoting the name of the layer
    block_name -> Denotes the name of the layer
    '''
    
    
    f1, f2 = n_filters
    X_skip = X
    
    #1st component
    X = Conv2D(filters=f1, kernel_size=(f_shape, f_shape), strides=(1, 1), padding="same", name= "resnet"+str(stage)+block_name+"_branch_2a", kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name = "batchnorm"+str(stage)+block_name+"_branch_2a")(X)
    X = Activation("relu")(X)
    
    #2nd component
    X = Conv2D(filters=f2, kernel_size=(f_shape, f_shape), strides=(1, 1), padding="same", name= "resnet"+str(stage)+block_name+"_branch_2b", kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name = "batchnorm"+str(stage)+block_name+"_branch_2b")(X)
    X = Activation("relu")(X)
    
    #Inserting skip connection
    X = Add()([X, X_skip])
    X = Activation("relu")(X)
    
    return X
  
  def convolutional_block(self, X, f_shape, n_filters, stage, block_name, s=2):
    '''
    X -> Input tensor 
    f_shape -> Integer, denoting the shape of the middle filter
    n_filters -> List, denoting the number of filters in each layer
    stage -> Integer, denoting the name of the layer
    block_name -> Denotes the name of the layer
    s -> denoting the stride to be used
    '''
    
    f1, f2 = n_filters
    X_skip = X
    
    #1st component
    #1st component
    X = Conv2D(filters=f1, kernel_size=(f_shape, f_shape), strides=(s, s), padding="same", name= "resnet"+str(stage)+block_name+"_branch_2a", kernel_initializer=glorot_uniform(seed=0))(X) 
    X = BatchNormalization(axis=3, name = "batchnorm"+str(stage)+block_name+"_branch_2a")(X)
    X = Activation("relu")(X)
    
    #2nd component
    X = Conv2D(filters=f2, kernel_size=(1, 1), strides=(1, 1), padding="valid", name= "resnet"+str(stage)+block_name+"_branch_2b", kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name = "batchnorm"+str(stage)+block_name+"_branch_2b")(X)
    X = Activation("relu")(X)
    
    #Convolution for the shortcut
    X_skip = Conv2D(filters=f2, kernel_size=(1, 1), strides=(s, s), padding="valid", name= "resnet"+str(stage)+block_name+"_branch_1", kernel_initializer=glorot_uniform(seed=0))(X_skip)
    X_skip = BatchNormalization(axis=3, name= "batchnorm"+str(stage)+block_name+"_branch_1")(X_skip)
    
    #Inserting skip connection
    X = Add()([X, X_skip])
    X = Activation("relu")(X)
    
    return X
  
  def ResNet18(self, input_shape=(224, 224, 3), classes=38):
    '''
    Defining the ResNet18 Architecture used
    
    Conv1 -> 7 X 7, 64, stride = 2, Output = (112 X 112)
    
    Conv2 -> 3 X 3 MaxPool, stride=2
             [3 X 3, 64] * 2
             
    Conv3 -> [3 X 3, 128] * 2
    
    Conv4 -> [3 X 3, 256] * 2
    
    Conv5 -> [3 X 3, 512] * 2
    '''
    
    X_input = Input(input_shape)
    X = ZeroPadding2D((3, 3))(X_input)
    
    #Conv1
    X = Conv2D(64, kernel_size=(7, 7), strides=(2, 2), name="conv1", kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name="batchnorm_conv1")(X)
    X = Activation("relu")(X)
    
    #Conv2
    X = MaxPooling2D((3, 3), strides = (2, 2))(X)
    X = self.convolutional_block(X, f_shape=3, n_filters=[64, 128], stage=2, block_name="a", s=2)
    X = self.identity_block(X, f_shape=3, n_filters=[64, 128], stage=2, block_name="b")
    
    #Conv3
    X = self.convolutional_block(X, f_shape=3, n_filters=[128, 256], stage=3, block_name="a", s=2)
    X = self.identity_block(X, f_shape=3, n_filters=[128, 256], stage=3, block_name="b")
    
    #Conv4
    X = self.convolutional_block(X, f_shape=3, n_filters=[256, 512], stage=4, block_name="a", s=2)
    X = self.identity_block(X, f_shape=3, n_filters=[256, 512], stage=4, block_name="b")
    
    #Conv5
    X = self.convolutional_block(X, f_shape=3, n_filters=[512, 1024], stage=5, block_name="a", s=2)
    X = self.identity_block(X, f_shape=3, n_filters=[512, 1024], stage=5, block_name="b")
    
    #Final Dense Layers
    X = AveragePooling2D((2, 2))(X)
    
    X = Flatten()(X)
    X = Dense(classes, activation="softmax", name="fc"+str(classes), kernel_initializer=glorot_uniform(seed=0))(X)
    
    #Create the model
    model = Model(inputs=X_input, outputs=X, name="ResNet18")
    
    return model

In [0]:
#Initializing and Compiling the model
Resnet18 = ResNet()
model = Resnet18.ResNet18(input_shape=(224, 224, 3), classes=38)
if (check == 1):
  model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
else:
  print("Loading pretrained model...")
  model = load_model("trained_resnet18.h5")
  
train_tensorboard = TensorBoard(log_dir="./logs/train_"+str(check), histogram_freq = 0, write_graph  =True, write_images = True)
checkpointer = ModelCheckpoint(filepath='tmp/weights_resnet18.hdf5', verbose=1, save_best_only = True)


In [0]:
#Traing the model on the training data
model.fit(X_train, Y_train, epochs = num_epochs, batch_size = batch_size, verbose = 1, callbacks = [checkpointer, train_tensorboard], validation_data = (X_test, Y_test))

In [0]:
#Saving the trained model for further training on the next batch
model.save("trained_resnet18.h5")

In [0]:
#Evaluating the data on the test set
test_tensorboard = TensorBoard(log_dir="./logs/test_"+str(check), histogram_freq = 0, write_graph  =True, write_images = False)
#test_tensorboard not working with model.evaluate. Gonna change it later.
prediction = model.evaluate(X_test, Y_test, batch_size = batch_size, verbose = 1)
print("Test Loss: "+str(prediction[0]))
print("Test Accuracy: "+str(prediction[1]))

In [0]:
uploading = files.upload()

In [0]:
Resnet18 = ResNet()
model = Resnet18.ResNet18(input_shape=(224, 224, 3), classes=38)
model = load_model("trained_resnet18.h5")

In [0]:
print("upload the image to check predictions")
uploading = files.upload()

In [0]:
name = input("enter the name of the image ")

In [0]:
img = cv2.imread(name+'.jpg')
img = img[56:368, 56:368]
img = cv2.resize(img, (224, 224))  

In [0]:
pred = model.predict([[np.array(img)]])
np.savetxt(name+'.txt', pred, fmt='%.18f', delimiter=' ')