Starter EDA and ConvNet implementation using Keras. 

Inspiration for this notebook comes from this [Keras blog post](https://blog.keras.io/building-powerful-image-classification-models-using-very-little-data.html) and the [VGG ConvNet paper](https://arxiv.org/pdf/1409.1556.pdf). 


In [None]:
import os, cv2, random
import numpy as np
import numpy
import pandas as pd

import pyttsx3 
import matplotlib.pyplot as plt
from matplotlib import ticker
import seaborn as sns
%matplotlib inline 
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score

from keras.models import Sequential
from keras.layers import Input, Dropout, Flatten, Conv2D, MaxPooling2D, Dense, Activation
from keras.optimizers import RMSprop
from keras.callbacks import ModelCheckpoint, Callback, EarlyStopping
from keras.utils import np_utils

from os.path import realpath, normpath
print(cv2.data)

In [2]:
# from PIL import Image
# import matplotlib.pyplot as plt
# import matplotlib.image as mpimg

# lol = []
# path1 = '../test/' 

# listing = os.listdir(path1)    
# for file in listing:
#     if 'Comp' in file: 
#         lol = path1 + file
#         img=mpimg.imread(path1 + file)
#         imgplot = plt.imshow(img)
#         plt.show()
# print(len(lol))
    


In [None]:
import csv

ROWS = 64
COLS = 64
CHANNELS = 3

train_images = [] # use this for full dataset
train_passed =   []
train_failed =   []
test_images =  []
test_passed =   []
test_failed =   []

TRAIN_PASSED_DIR = 'C:/train-folder/passed/'
TRAIN_FAILED_DIR = 'C:/train-folder/failed/'
TEST_PASSED_DIR = 'C:/test-folder/passed/'
TEST_FAILED_DIR = 'C:/test-folder/failed/'

if not os.path.exists(TEST_PASSED_DIR):
    os.makedirs(TEST_PASSED_DIR)
if not os.path.exists(TEST_FAILED_DIR):
    os.makedirs(TEST_FAILED_DIR)

train_passed =   [TRAIN_PASSED_DIR+i for i in os.listdir(TRAIN_PASSED_DIR) if '.db' not in i]
train_failed =   [TRAIN_FAILED_DIR+i for i in os.listdir(TRAIN_FAILED_DIR) if '.db' not in i]
train_images = train_passed + train_failed # use this for full dataset

test_passed =   [TEST_PASSED_DIR+i for i in os.listdir(TEST_PASSED_DIR) if '.db' not in i]
test_failed =   [TEST_FAILED_DIR+i for i in os.listdir(TEST_FAILED_DIR) if '.db' not in i]
test_images = test_passed + test_failed # use this for full dataset

# test_images =  [TEST_DIR+i for i in os.listdir(TEST_DIR)]

# slice datasets for memory efficiency on Kaggle Kernels, delete if using full dataset
# train_images = train_passed[:1000] + train_failed[:1000]
# test_images =  test_images[:25]

print('Train images: ' + str(len(train_images)) + ', passed: ' + str(len(train_passed)) + ', failed: ' + str(len(train_failed)))
print('Test images: ' + str(len(test_images)) + ', passed: ' + str(len(test_passed)) + ', failed: ' + str(len(test_failed)))
random.shuffle(train_images)
# random.shuffle(test_images)


def read_image(file_path):
    img = cv2.imread(str(file_path), cv2.IMREAD_COLOR) #cv2.IMREAD_GRAYSCALE
    return cv2.resize(img, (ROWS, COLS), interpolation=cv2.INTER_CUBIC)


def prep_data(images):
    count = len(images)
    data = np.ndarray((count, CHANNELS, ROWS, COLS), dtype=np.uint8)
    for i, image_file in enumerate(images):
        image = read_image(image_file)
        data[i] = image.T
        if i == (len(images)-1): print('Processed {} of {}'.format(i+1, count))
        else: print('Processed {} of {}'.format(i+1, count), end='\r')
    return data

train = prep_data(train_images)
test = prep_data(test_images)

print("Train shape: {}".format(train.shape))
print("Test shape: {}".format(test.shape))

In [None]:
labels = []
for i in train_images:
    if 'passed' in i:
        labels.append(1)
    else:
        labels.append(0)

sns.countplot(labels)
plt.title('Passed:(1) and failed:(0)')

In [None]:
def show_passed_and_failed(idx):
    passed = read_image(train_passed[idx])
    failed = read_image(train_failed[idx])
    pair = np.concatenate((passed, failed), axis=1)
    plt.figure(figsize=(10,5))
    plt.imshow(pair)
    plt.show()
    
for idx in range(0,5):
    show_passed_and_failed(idx)

In [None]:
passed_avg = np.array([passed[0].T for i, passed in enumerate(train) if labels[i]==1]).mean(axis=0)
plt.imshow(passed_avg)
plt.title('Your Average Passed')

In [None]:
failed_avg = np.array([failed[0].T for i, failed in enumerate(train) if labels[i]==0]).mean(axis=0)
plt.imshow(failed_avg)
plt.title('Your Average Failed')

In [None]:
optimizer = RMSprop(lr=1e-4)
objective = 'binary_crossentropy'


def passedorfailed():
    
    model = Sequential()

    model.add(Conv2D(32, 3, padding='same', input_shape=(3, ROWS, COLS), activation='relu'))
    model.add(Conv2D(32, 3, padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2), data_format="channels_first"))

    model.add(Conv2D(64, 3, padding='same', activation='relu'))
    model.add(Conv2D(64, 3, padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2), data_format="channels_first"))
    
    model.add(Conv2D(128, 3, padding='same', activation='relu'))
    model.add(Conv2D(128, 3, padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2), data_format="channels_first"))
    
    model.add(Conv2D(256, 3, padding='same', activation='relu'))
    model.add(Conv2D(256, 3, padding='same', activation='relu'))
#     model.add(Conv2D(256, 3, padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2), data_format="channels_first"))

#     model.add(Conv2D(512, 3, padding='same', activation='relu'))
#     model.add(Conv2D(512, 3, padding='same', activation='relu'))
#     model.add(MaxPooling2D(pool_size=(2, 2), data_format="channels_first"))

#     model.add(Conv2D(256, 3, padding='same', activation='relu'))
#     model.add(Conv2D(256, 3, padding='same', activation='relu'))
#     model.add(Conv2D(256, 3, padding='same', activation='relu'))
#     model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))

    model.add(Dense(1))
    model.add(Activation('sigmoid'))

    model.compile(loss=objective, optimizer=optimizer, metrics=['accuracy'])
    return model


model = passedorfailed()

In [None]:
epochs = 10
batch_size = 16

## Callback for loss logging per epoch
class LossHistory(Callback):
    def on_train_begin(self, logs={}):
        self.losses = []
        self.val_losses = []
        
    def on_epoch_end(self, batch, logs={}):
        self.losses.append(logs.get('loss'))
        self.val_losses.append(logs.get('val_loss'))

early_stopping = EarlyStopping(monitor='val_loss', patience=3, verbose=2, mode='auto')        
        
def run_pf():
    
    history = LossHistory()
    model.fit(train, labels, batch_size=batch_size, epochs=epochs,
              validation_split=0.25, verbose=1, shuffle=True, callbacks=[history, early_stopping])
    predictions = model.predict(test, verbose=2)
    return predictions, history

predictions, history = run_pf()

engine = pyttsx3.init('sapi5', True)
engine.say('Model training completed.')
engine.runAndWait()

In [11]:
predictions = model.predict(test, verbose=2)

In [None]:
loss = history.losses
val_loss = history.val_losses

plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('VGG-16 Loss Trend')
plt.plot(loss, 'blue', label='Training Loss')
plt.plot(val_loss, 'green', label='Validation Loss')
plt.xticks(range(0,epochs)[0::2])
plt.legend()
plt.show()

In [None]:
true_p = 0
false_p = 0
true_n = 0
false_n = 0

unidentified = 0
file_moved_passed = 0
file_exist_passed = 0
file_moved_failed = 0
file_exist_failed = 0

# fig=plt.figure(figsize=(15,20*len(predictions)/5))
for i in range(len(predictions)):
#     fig.add_subplot(len(predictions), 5, i+1)
    if test_images[i] in test_passed:
        if predictions[i, 0] >= 0.5:
            true_p += 1
#             plt.title('{:.2%} '.format(predictions[i][0]) + 'passed')
        else: 
            false_n += 1
            plt.title('{:.2%} '.format(1-predictions[i][0]) + 'failed')
            plt.imshow(test[i].T)
            plt.show()
            rep = test_passed[i].replace("passed", "failed", 1)
            rep = rep.replace("Measurement", "Open", 1)
            
            print('ORI: ' + test_passed[i])
            if os.path.exists(test_passed[i]):
                if os.path.exists(rep):
                    os.remove(test_passed[i])
                    print('NEW: File already exists.')
                    file_exist_failed += 1
                else:
                    os.rename(test_passed[i], rep)
                    print('NEW: ' + rep)
                    file_moved_passed += 1
            else: print('NEW: File not exist in \'passed\' folder')
            
    elif test_images[i] in test_failed:
        ii = (len(test_failed) - (len(test_images) - len(test_passed)) + 1)
        if predictions[i, 0] >= 0.5:
            false_p += 1
            plt.title('{:.2%} '.format(predictions[i][0]) + 'passed')
            plt.imshow(test[i].T)
            plt.show()
            rep = test_failed[ii].replace("failed", "passed", 1)
            rep = rep.replace("Open", "Measurement", 1)
            
            print('ORI: ' + test_failed[ii])
            if os.path.exists(test_failed[ii]):
                if os.path.exists(rep):
                    os.remove(test_failed[ii])
                    print('NEW: File already exists.')
                    file_exist_passed += 1
                else:
                    os.rename(test_failed[ii], rep)
                    print('NEW: ' + rep)
                    file_moved_failed += 1
            else: print('NEW: File not exist in \'failed\' folder')
        else: 
            true_n += 1
#             plt.title('{:.2%} '.format(1-predictions[i][0]) + 'failed')
    else: unidentified += 1
    print('Processed {} of {}'.format(i+1, len(test_images)), end = '\r')
#     plt.imshow(test[i].T)
#     plt.show()
        

print('')
print('File unidentified: ' + str(unidentified))
print('File moved from \'passed\' folder: ' + str(file_moved_passed))
print('File exists in \'failed\' folder: ' + str(file_exist_failed))
print('File moved from \'failed\' folder: ' + str(file_moved_failed))
print('File exists in \'passed\' folder: ' + str(file_exist_passed))

engine = pyttsx3.init('sapi5', True)
engine.say('Image processing completed. Here are the results.')
# engine.say(str(unidentified) + ' files unidentified.')
# engine.say(str(file_moved_passed) + ' files moved from \'passed\' folder.')
# engine.say(str(file_exist_failed) + ' files exists in \'failed\' folder.')
# engine.say(str(file_moved_failed) + ' files moved from \'failed\' folder.')
# engine.say(str(file_exist_passed) + ' files exists in \'passed\' folder.')
engine.runAndWait()

In [None]:
from IPython.display import HTML

print('Total: ' + str(len(predictions)))
print('Passed: ' + str(len(test_passed)))
print('Failed: ' + str(len(test_failed)))

s = """
</style>
<table>
<tr>
<td></td>
<td></td>
<td></td>
<th colspan="2" style="text-align:center">Actual</th>
</tr>
<tr>
<td></td>
<td></td>
<td></td>
<td>Positive</td>
<td>Negative</td>
</tr>
<tr>

<th rowspan="2">Predicted </td>
<td>Positive </td>
<td></td>
<td><i style="color:green">True Positive:</i><i> (""" + str(true_p) + """)</i> </td>
<td><i style="color:red">False Positive:</i><i> (""" + str(false_p) + """)</i></td>
</tr>
<tr>

<td>Negative: </td>
<td></td>
<td><i style="color:red">False Negative:</i><i> (""" + str(false_n) + """)</i></td>
<td><i style="color:green">True Negative:</i><i> (""" + str(true_n) + """)</i> </td>
</tr>
</table>"""

h = HTML(s)
display(h)