<a href="https://colab.research.google.com/github/KrishPatel5611/Surface_Defect_Detection/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
riffatsiddiqui_ceramic_tiles_defects_crackspotspinhole_path = kagglehub.dataset_download('riffatsiddiqui/ceramic-tiles-defects-crackspotspinhole')

print('Data source import complete.')


In [None]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Dense, Activation,Dropout, MaxPooling2D,BatchNormalization
from tensorflow.keras.optimizers import Adam, Adamax
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras import regularizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model, load_model
import numpy as np
import time
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import logging
logging.getLogger("tensorflow").setLevel(logging.ERROR)
print ('modules loaded')

In [None]:
sdir=r'../input/ceramic-tiles-defects-crackspotspinhole/dataset'
flist=os.listdir(sdir)
labels=[]
filepaths=[]
for f in flist:
    fpath=os.path.join(sdir,f)
    filepaths.append(fpath)
    klass =f.split('.')[0]
    labels.append(klass)
Fseries=pd.Series(filepaths, name='filepaths')
Lseries = pd.Series(labels, name='labels')
df=pd.concat([Fseries, Lseries], axis=1)
print (df['labels'].value_counts())
print(df.isna().sum())
classes=list(df['labels'].unique())
class_count=len(classes)

# split df into a train_df, a test_df and a valid df

In [None]:
train_df, dummy_df =train_test_split(df, train_size=.9, shuffle=True, random_state=123, stratify= df['labels'])
valid_df, test_df = train_test_split(dummy_df, train_size=.5, shuffle=True, random_state=123, stratify =dummy_df['labels'])
print('train_df length: ', len(train_df), '  test_df length: ',len(test_df), '  valid_df length: ', len(valid_df))
print (train_df['labels'].value_counts())

## train_df is not balanced but should be OK

## input an image and get the shape

In [None]:
img_path=r'../input/ceramic-tiles-defects-crackspotspinhole/dataset/crack.1.jpg'
img=plt.imread(img_path)
print ('Image shape is: ', img.shape)
plt.axis('off')
plt.imshow(img)

## make train, test and valid generators

In [None]:
img_size=(150,150)
batch_size= 40
# calculate test_batch_size and test_step so we go through test files exactly once
length=len(test_df)
test_batch_size=sorted([int(length/n) for n in range(1,length+1) if length % n ==0 and length/n<=80],reverse=True)[0]
test_steps=int(length/test_batch_size)
print ('test batch size= ', test_batch_size, '  test steps= ', test_steps)
trgen=ImageDataGenerator(horizontal_flip=True, vertical_flip=True)
tvgen=ImageDataGenerator()
train_gen=trgen.flow_from_dataframe(train_df, x_col='filepaths', y_col='labels', target_size=img_size, class_mode='categorical',
                                    color_mode='rgb', shuffle=True, batch_size=batch_size)
valid_gen=tvgen.flow_from_dataframe(valid_df, x_col='filepaths', y_col='labels', target_size=img_size, class_mode='categorical',
                                    color_mode='rgb', shuffle=True, batch_size=batch_size)
test_gen=tvgen.flow_from_dataframe(test_df, x_col='filepaths', y_col='labels', target_size=img_size, class_mode='categorical',
                                    color_mode='rgb', shuffle=False, batch_size=test_batch_size)

## Use transfer learning with EfficientNetB3 model

In [None]:
img_shape=(img_size[0], img_size[1], 3)
model_name='EfficientNetB3'
base_model=tf.keras.applications.efficientnet.EfficientNetB3(include_top=False, weights="imagenet",input_shape=img_shape, pooling='max')
x=base_model.output
x=keras.layers.BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001 )(x)
x = Dense(256, kernel_regularizer = regularizers.l2(l = 0.016),activity_regularizer=regularizers.l1(0.006),
                bias_regularizer=regularizers.l1(0.006) ,activation='relu')(x)
x=Dropout(rate=.45, seed=123)(x)
output=Dense(class_count, activation='softmax')(x)
model=Model(inputs=base_model.input, outputs=output)
model.compile(Adamax(learning_rate=.001), loss='categorical_crossentropy', metrics=['accuracy'])

## define a custom callback that after ask_epoch asks if you want to continue training or halt.
## You can enter an integer for how many more
## epochs to run then be asked again, or enter H to halt training

In [None]:
class ASK(keras.callbacks.Callback):
    def __init__ (self, model, epochs,  ask_epoch): # initialization of the callback
        super(ASK, self).__init__()
        self.model=model
        self.ask_epoch=ask_epoch
        self.epochs=epochs
        self.ask=True # if True query the user on a specified epoch

    def on_train_begin(self, logs=None): # this runs on the beginning of training
        if self.ask_epoch == 0:
            print('you set ask_epoch = 0, ask_epoch will be set to 1', flush=True)
            self.ask_epoch=1
        if self.ask_epoch >= self.epochs: # you are running for epochs but ask_epoch>epochs
            print('ask_epoch >= epochs, will train for ', epochs, ' epochs', flush=True)
            self.ask=False # do not query the user
        if self.epochs == 1:
            self.ask=False # running only for 1 epoch so do not query user
        else:
            print('Training will proceed until epoch', ask_epoch,' then you will be asked to')
            print(' enter H to halt training or enter an integer for how many more epochs to run then be asked again')
        self.start_time= time.time() # set the time at which training started

    def on_train_end(self, logs=None):   # runs at the end of training
        tr_duration=time.time() - self.start_time   # determine how long the training cycle lasted
        hours = tr_duration // 3600
        minutes = (tr_duration - (hours * 3600)) // 60
        seconds = tr_duration - ((hours * 3600) + (minutes * 60))
        msg = f'training elapsed time was {str(hours)} hours, {minutes:4.1f} minutes, {seconds:4.2f} seconds)'
        print (msg, flush=True) # print out training duration time

    def on_epoch_end(self, epoch, logs=None):  # method runs on the end of each epoch
        if self.ask: # are the conditions right to query the user?
            if epoch + 1 ==self.ask_epoch: # is this epoch the one for quering the user?
                print('\n Enter H to end training or  an integer for the number of additional epochs to run then ask again')
                ans=input()

                if ans == 'H' or ans =='h' or ans == '0': # quit training for these conditions
                    print ('you entered ', ans, ' Training halted on epoch ', epoch+1, ' due to user input\n', flush=True)
                    self.model.stop_training = True # halt training
                else: # user wants to continue training
                    self.ask_epoch += int(ans)
                    if self.ask_epoch > self.epochs:
                        print('\nYou specified maximum epochs of as ', self.epochs, ' cannot train for ', self.ask_epoch, flush=True)

                    else:
                        print ('you entered ', ans, ' Training will continue to epoch ', self.ask_epoch, flush=True)

## Define an early stop callback and a reduce learning rate callback and instantiate the ASK callback

In [None]:
rlronp=tf.keras.callbacks.ReduceLROnPlateau( monitor="val_loss", factor=0.5,   patience=1,  verbose=1)
estop=tf.keras.callbacks.EarlyStopping( monitor="val_loss",   patience=4,  verbose=1,   restore_best_weights=True)
epochs=40
ask_epoch=2 # at end of 5th epoch ask to enter H to halt training or an integer for how many more epochs to run then ask again
callbacks=[rlronp, estop, ASK( model,epochs, ask_epoch)]


## train the model

In [None]:
history=model.fit(x=train_gen,  epochs=epochs, verbose=1, callbacks=callbacks,  validation_data=valid_gen,
               validation_steps=None,  shuffle=False,  initial_epoch=0)

## evaluate model on the test set

In [None]:
 acc= model.evaluate(test_gen, verbose= 1,  steps=test_steps)
 print('Model accuracy on test set is ', acc[1]*100)

In [None]:
working_dir=r'./'
save_path=os.path.join(working_dir, 'EfficientNetB3.h5')
model.save(save_path, overwrite=True, include_optimizer=True, save_format='h5')

## do predictions on the test set and generate classification report

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
sns.set_style('darkgrid')
classes=list(train_gen.class_indices.keys())
class_count=len(classes)
labels=test_gen.labels
files=test_gen.filenames
error_file_list=[]
indexes=[]
errors=0
preds=model.predict(test_gen, steps=test_steps, verbose=1)
tests=len(preds)
for i, p in enumerate (preds):
    index=np.argmax(p)
    indexes.append(index)
    if index != labels[i]:
        errors +=1
        error_file_list.append(files[i])
acc=( tests-errors)/tests * 100
print(f'There were {errors}, errors in {tests} tests for an accuracy of {acc:6.2f}' )
if errors > 0:
    print ('A list of files that were incorrectly predicted is shown below')
    for i in range (len(error_file_list)):
        print (error_file_list[i])

clr = classification_report(labels, indexes, target_names=classes, digits= 4)
print("Classification Report:\n----------------------\n", clr)
cm = confusion_matrix(labels, indexes )
length=len(classes)
if length<8:
    fig_width=8
    fig_height=8
else:
    fig_width= int(length * .5)
    fig_height= int(length * .5)
plt.figure(figsize=(fig_width, fig_height))
sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Blues', cbar=False)
plt.xticks(np.arange(length)+.5, classes, rotation= 90)
plt.yticks(np.arange(length)+.5, classes, rotation=0)
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix")
plt.show()


## How to do predictions on a single image file

In [None]:
import cv2
img=plt.imread(img_path) # read in the image shown earlier
print ('Input image shape is ', img.shape)
# resize the image so it is the same size as the images the model was trained on
img=cv2.resize(img, img_size) # in earlier code img_size=(150,150) was used for training the model
print ('the resized image has shape ', img.shape)
### show the resized image
plt.axis('off')
plt.imshow(img)
# Normally the next line of code rescales the images. However the EfficientNet model expects images in the range 0 to 255
# img= img/255
# plt.imread returns a numpy array so it is not necessary to convert the image to a numpy array
# since we have only one image we have to expand the dimensions of img so it is off the form (1,150,150,3)
# where the first dimension 1 is the batch size used by model.predict
img=np.expand_dims(img, axis=0)
print ('image shape after expanding dimensions is ',img.shape)
# now predict the image
pred=model.predict(img)
print ('the shape of prediction is ', pred.shape)
# this dataset has 15 classes so model.predict will return a list of 15 probability values
# we want to find the index of the column that has the highest probability
index=np.argmax(pred[0])
# to get the actual Name of the class earlier Imade a list of the class names called classes
klass=classes[index]
# lets get the value of the highest probability
probability=pred[0][index]*100
# print out the class, and the probability
print(f'the image is predicted as being {klass} with a probability of {probability:6.2f} %')