In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
import skimage
import time
import pickle

In [None]:
def preProcessFromPath(image, listImg, listAns, category):
    assert type(image) != None
    assert image.shape == (1080, 1920, 3)
    image = cv2.resize(image, (28, 28))
    assert type(image) == np.ndarray
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    image = image.astype('float32')
    image = image/255.0
    listImg.append(image)
    listAns.append(category)
    return listImg, listAns

def captureImgWithPath(path, category):
    fileList = os.listdir(path)
    imgList = []
    ansList = []
    for name in fileList:
        image_full_path = path + '/' + name
        img = cv2.imread(image_full_path)
        imgList, ansList = preProcessFromPath(img, imgList, ansList, category)
    imgArr = np.array(imgList)
    return imgArr, ansList

def captureImgWithCam(img):
    if img is not None:
        imgList, _ = preProcessFromPath(img, [], [], 0)
        imgArr = np.array(imgList)
        return imgArr
    else:
        assert(0)
    return None

def unison_shuffled_copies(a, b):
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

def computeNextState(listMsd, previousState, threshold):
    if len(listMsd) > 4:
        listMsd = listMsd[1 : ]
    avg = sum(listMsd)/len(listMsd)
    if previousState == 'empty' and avg > threshold:
        return 'inMotionForward', listMsd
    elif previousState == 'inMotionForward' and avg < threshold:
        return 'filled', listMsd
    elif previousState == 'filled' and avg > threshold:
        return 'inMotionBackward', listMsd
    elif previousState == 'inMotionBackward' and avg < threshold:
        return 'empty', listMsd
    else:
        return previousState, listMsd
    
def read_dataset(dataset_path):
    pathList = os.listdir(dataset_path)
    Images = None
    for index in range(len(pathList)):
        path = dataset_path + '/' + pathList[index]
        print(path)
        category = int(pathList[index])
        newImgs, newAns = captureImgWithPath(path, category)
        if Images is None:
            Images = newImgs
            Answers = newAns
        else:
            Images = np.vstack((Images, newImgs))
            Answers = Answers + newAns
    Answers = np.array(Answers)
    print('ImagesTrain shape is ', Images.shape)
    print('AnswersTrain shape is ', Answers.shape)
    x, y = unison_shuffled_copies(Images, Answers)
    return x, y, len(pathList)

In [None]:
x, y, num_categories = read_dataset('/Users/23amrutad/Work/FinalDatasetSynopsys2020/blacklight')

numSamples = x.shape[0]
x = x.reshape(numSamples, 28, 28, 1)
numTrain = int(numSamples*6/10)
x_train = x[0 : numTrain]
x_test = x[numTrain : ]
y_train = y[0 : numTrain]
y_test = y[numTrain : ]
print(x_train.shape)
print(x_test.shape)
f = open('dataset.pkl','wb')
pickle.dump([x_train, x_test, y_train, y_test], f)
f.close()

In [None]:
f = open('dataset.pkl', 'rb')
x_train, x_test, y_train, y_test = pickle.load(f)
print(x_train.shape)
print(x_test.shape)

In [None]:
from keras.models import Sequential
from keras.layers import Dense, Conv2D, Dropout, Flatten, MaxPooling2D
 
input_shape = (28, 28, 1)

 
model = Sequential()
model.add(Conv2D(28, kernel_size=(3,3), input_shape=input_shape))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(32, activation=tf.nn.relu))
model.add(Dropout(0.25))
model.add(Dense(num_categories, activation=tf.nn.softmax))
 
 
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.fit(x=x_train,y=y_train, epochs=100)
model.evaluate(x_test, y_test)

In [None]:
previousState = 'empty'
previousImg = None
currentImg = None
cap = cv2.VideoCapture(0)
msdList = []
# open serial port to communicate with arduino
import serial
serialport = serial.Serial('/dev/tty.usbserial-AL065BXG')

while(True):
    time.sleep(0.1)
    ret, currentImg = cap.read()
    #currentImg = cv2.cvtColor(currentImg, cv2.COLOR_BGR2GRAY)
    if (previousImg is not None):
        msd = skimage.measure.compare_mse(currentImg, previousImg)
        msdList.append(msd)
        state, msdList = computeNextState(msdList, previousState, 300)
        #print('the state is ', state)
        if previousState != 'filled' and state == 'filled':
            #print(currentImg.shape, ' is the image shape')
            cv2.imwrite('test_image.jpg', currentImg)
            imgList, _ = preProcessFromPath(currentImg, [], [], 0)
            imgArr = np.array(imgList)
            imgArr = imgArr.reshape(imgArr.shape[0], 28, 28, 1)
            prediction = model.predict(imgArr)
            predictCat = np.argmax(prediction)
            print(predictCat, prediction)
            if predictCat == 1:
                print('CAT is 1')
                serialport.write(b'A')
            elif predictCat == 0:
                print('CAT is 0')
                serialport.write(b'B')
        elif previousState != state and state == 'empty':
            print('Reset')
        previousState = state
    previousImg = currentImg