In [None]:
#Importing the python libraries 
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt 
import tensorflow as tf 
import sklearn as sk 
import os 
import cv2
import albumentations as A
from PIL import Image 
from tensorflow import keras 
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix 

#Setting the training and testing paths to extract the files 
training_path = "/kaggle/input/sp-society-camera-model-identification/train/train/"
testing_path = "/kaggle/input/sp-society-camera-model-identification/test/test/"
_,training_classes,_=next(os.walk(training_path))
#print(training_classes)


In [None]:
#Setting the hyperparameters for the deep learning model
batch_size = 8
epochs = 40
learning_rate = 1e-4

In [None]:
#testing_files 
_,_,testing_files = next(os.walk(testing_path))


In [None]:
#Creating the training dataset by iterating over the directory 
#Appending the training paths to the list and extracting the classes from the directory
training_paths = [] 
labels = [] 
for i in training_classes:
    _,_,filenames = next(os.walk(training_path+i))
    for j in filenames:
        training_paths.append(training_path+i+'/'+j)
        labels.append(i)

In [None]:
#Ensure that the training size and the class labels are the same
assert len(training_paths)==len(labels)

In [None]:
#Setting the random seed to 33 for reproducible results
seed = 33

In [None]:
np.unique(labels)

In [None]:
#Moving the training paths to an pandas DataFrame for easy indexing and one-hot encoding the class labels
training_data = pd.DataFrame(training_paths,columns=['Training Image Path'])
classes = pd.DataFrame(labels)
classes = pd.get_dummies(classes)
classes.columns=np.unique(labels)# One Hot Encode the class variables

In [None]:
#print the training data
training_data

In [None]:
#printing the one-hot encoded class variable
classes.head()

In [None]:
#Creating the class dictionary for converting the one-hot encoded variables
classes_dict = {'0':'HTC-1-M7',
                '1':'LG-Nexus-5x',
                '2':'Motorola-Droid-Maxx',
                '3':'Motorola-Nexus-6',
                '4':'Motorola-X',
                '5':'Samsung-Galaxy-Note3',
                '6':'Samsung-Galaxy-S4',
                '7':'Sony-NEX-7',
                '8':'iPhone-4s',
                '9':'iPhone-6'}

In [None]:
#Creating the testing DataFrame for easy indexing
testing_data = pd.DataFrame(testing_files,columns=['Testing Image Path'])
testing_data

In [None]:
#Reading images form the disk to memory and converting from BGR to RGB
def read_img(path):
    temp = cv2.imread(path)
    temp = cv2.cvtColor(temp, cv2.COLOR_BGR2RGB)
    return np.array(temp)

In [None]:
#Defining the training and testing augmentations
length = 512 #Defining the length used for each image 
training_transforms = A.Compose([A.RandomCrop(height=length,width=length),
                                A.RandomGamma(gamma_limit=(80,120),p=0.9),
                                A.JpegCompression(quality_lower=70,quality_upper=90,p=0.9),
                                A.GridDistortion(interpolation=cv2.INTER_CUBIC)])
    

                                
testing_augmentation = A.Compose([
                                  A.CenterCrop(height=length,width=length)])

In [None]:
#Creating a custom dataset to read the images in a mini-batch format

class Dataset(keras.utils.Sequence):
    def __init__(self,x,y,batch_size,augmentations,test):
        self.x = x
        self.y = y
        self.batch_size = batch_size
        self.augmentations = augmentations 
        self.test = test 
    def __len__(self):
        return int(len(self.x)/self.batch_size)
    def __getitem__(self,index):
        batched_x = self.x.iloc[index*self.batch_size:(index+1)*self.batch_size].to_numpy()
        if(self.test):
            images = [(self.augmentations(image=read_img(i))['image']) for i in batched_x]
            
        else:
            batched_y = self.y.iloc[index*self.batch_size : (index+1)*self.batch_size,:]
            images = [(self.augmentations(image=read_img(i[0]))['image']) for i in batched_x]
        return np.array(images)if self.test else (np.array(images),np.array(batched_y.values))

In [None]:
#Creating a dummy datast to test the functionality
dummy_dataset = Dataset(training_data,classes,batch_size=batch_size,augmentations=training_transforms,test=False)

In [None]:
#Calling the dummy dataset to check the functionality 
(a,b)=dummy_dataset.__getitem__(5)
print(a[0])
print(a.shape)
print(b)
print(b.shape)
print(type(b))
plt.imshow(a[0])



In [None]:
#Creating the Keras model for training with the DenseNet201 Model being instantiated with imagenet weights
def keras_model(length,input_size,output_classes):
        base_model = tf.keras.applications.DenseNet201(weights='imagenet', include_top=False, input_shape=[length, length, 3])
        base_model.trainable = True
        inputs =tf.keras.layers.Input(shape=(length,length,3))
        x = tf.keras.applications.densenet.preprocess_input(inputs)
        x = base_model(x, training=True)
        x = tf.keras.layers.GlobalAveragePooling2D()(x)
        x = tf.keras.layers.Dense(64, activation='relu')(x)
        x = tf.keras.layers.Dense(32,activation='relu')(x)
        outputs = tf.keras.layers.Dense(10, activation='softmax')(x)
        model =tf.keras.Model(inputs=inputs, outputs=outputs)
        model.compile(optimizer=tf.keras.optimizers.Adam(lr=learning_rate), loss='categorical_crossentropy', metrics=['accuracy'])
        return model

In [None]:
#Instantiating the keras model
training_model = keras_model(length,length*length*3,10)

In [None]:
#Printing the summary of training model 
training_model.summary()

In [None]:
# Splitting the training dataset into a training and validation dataset with the validation split  = 0.1 and setting the Shuffle=True
xtrain,xval,ytrain,yval = train_test_split(training_data,classes,test_size=0.1,shuffle=True,random_state=seed)

In [None]:
#Printing the size of each array to check that the sizes match
print(len(xtrain),len(xval))
print(len(ytrain),len(yval))

In [None]:
#Pushing the arrays into a custom keras dataset to feed to the neural networ
training_dataset = Dataset(xtrain,ytrain,batch_size=batch_size,augmentations=training_transforms,test=False)
validation_dataset = Dataset(xval,yval,batch_size=1,augmentations=training_transforms,test=False)

print(len(training_dataset))
print(len(validation_dataset))

In [None]:
#Creating a list of callbacks to add to the training model 

model_file = "output/base_model_weights.hd5" # defining the output path to save the model file

checkpoint = tf.keras.callbacks.ModelCheckpoint(model_file, monitor="val_accuracy", save_best_only=True, mode='max') # Save the model which has the best validation acc

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.9, patience=2, min_lr=1e-6, mode="max", verbose=True) # reduce Lr if the validation acc does not increase

early_stopping = tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=5, mode="max", verbose=True) # Stop the training if the val acc does not improve

callbacks_list = [checkpoint, reduce_lr, early_stopping] # Call all of the instantiated callbacks into one variable


In [None]:
#Training the model for 10 epochs multiple times, each time the best model is read back for training 
inner_epochs = 10
training_model.fit(training_dataset,validation_data=validation_dataset,epochs=inner_epochs,batch_size=batch_size,callbacks=callbacks_list,verbose=1)
loops = (epochs-inner_epochs)//inner_epochs
for i in range(0,3):
    training_model= keras.models.load_model(model_file)
    training_model.fit(training_dataset,validation_data=validation_dataset,epochs=inner_epochs,batch_size=batch_size,callbacks=callbacks_list,verbose=1)



In [None]:
#Reading the submission file from the directory 
sample_submission = pd.read_csv('/kaggle/input/sp-society-camera-model-identification/sample_submission.csv')
sample_submission.head() #Printing the top few rows from the submission file

In [None]:
#Creating a testing dataset with the filepaths from the submission file


x_test = testing_path+sample_submission['fname']
#x_test = x_test.to_numpy()
print(x_test[0])

#training_model.load_model(model_file)
x_test = Dataset(x_test,x_test,test=True,batch_size=1,augmentations=testing_augmentation)

In [None]:
#generating predictions for the validation dataset
val_predicted = training_model.predict(validation_dataset)

In [None]:
#reading the best file from memory and generating predictions for the testing dataset
training_model = tf.keras.models.load_model(model_file)
predicted = training_model.predict(x_test)

print(predicted)

In [None]:
#Creating a function to convert the one-hot encoded item into class labels
def convert_predictions_to_labels(labels):
    temp = []
    for item in labels.argmax(axis=1):
        temp.append(classes_dict[str(item)])
    return temp

In [None]:
#converting the test and validation predictions to their class labels
test_labels = convert_predictions_to_labels(predicted)

validation_labels = convert_predictions_to_labels(val_predicted)

In [None]:
#filling up the submission file with the test predictions
sample_submission['camera'] = test_labels
sample_submission.head()

In [None]:
#Submitting the test predictions
sample_submission.to_csv("submission.csv", index=False)

In [None]:
#Converting the validation one-hot encoded labels
yval=yval.idxmax(axis=1)

In [None]:
#printing the confusion matrix
con_matrix = confusion_matrix(yval,validation_labels)
print(con_matrix)