In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import recall_score, precision_score, roc_auc_score, f1_score, accuracy_score, confusion_matrix
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.svm import LinearSVC, SVC, NuSVC
import tensorflow.keras as keras
import csv
import time

In [2]:
X = []
Y = []

with open('./data/data.csv', newline='') as csfile:
    spamreader = csv.reader(csfile, delimiter=' ', quotechar='|')
    for row in spamreader:
        X.append(row[0])
        Y.append(int(row[1]))

In [3]:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=0)

In [5]:
def compute_descriptors(X):
    list_des = []
    list_kp = []
    kaze = cv2.KAZE_create()
    for x in X:
        img = cv2.imread(x, 0)
        img = cv2.resize(img, (128, 128))
        kp, des = kaze.detectAndCompute(img, None)
        list_des.append(des)
        list_kp.append(kp)
    return list_kp, list_des

In [9]:
def preparing(descriptors):

    res = []
    for item in descriptors:
        n_rows = item.shape[0]
        if n_rows < 128:
            arr = np.append(item[:128], 100*np.ones((128 - n_rows, 64)), axis=0)
        else:
            arr = item[:128]
        res.append(arr)

    return res

In [10]:
list_kp, list_des = compute_descriptors(X_train)
for i in range(len(list_des)):
    t = np.sum(list_des[i] == None)
    if t == 1:
        list_des[i] = np.zeros((64, 64))
print(np.mean([i.shape[0] for i in list_des]))

70.12865497076024


In [11]:
train = preparing(list_des)


In [12]:
def model():
    key_points = keras.layers.Input(shape=(128,64), dtype='float32')
    X = keras.layers.BatchNormalization()(key_points)
    X = keras.layers.LSTM(128, return_sequences=False)(X)
    X = keras.layers.Dense(units=2)(X)
    X = keras.layers.Activation(activation='softmax')(X)

    model = keras.Model(inputs=key_points, outputs=X)

    return model

In [13]:
model = model()
model.summary()

Model: "functional_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 128, 64)]         0         
_________________________________________________________________
batch_normalization (BatchNo (None, 128, 64)           256       
_________________________________________________________________
lstm (LSTM)                  (None, 128)               98816     
_________________________________________________________________
dense (Dense)                (None, 2)                 258       
_________________________________________________________________
activation (Activation)      (None, 2)                 0         
Total params: 99,330
Trainable params: 99,202
Non-trainable params: 128
_________________________________________________________________


In [14]:
model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])

In [15]:
X_train_ar = np.zeros((len(train), 128, 64))
Y_train_ar = np.zeros((len(train), 2))

for i in range(len(train)):
    X_train_ar[i] = train[i]
    Y_train_ar[i, Y[i]] = 1

In [16]:
model.fit(X_train_ar, Y_train_ar, epochs=52, batch_size=64, shuffle=True)

Epoch 1/52
Epoch 2/52
Epoch 3/52
Epoch 4/52
Epoch 5/52
Epoch 6/52
Epoch 7/52
Epoch 8/52
Epoch 9/52
Epoch 10/52
Epoch 11/52
Epoch 12/52
Epoch 13/52
Epoch 14/52
Epoch 15/52
Epoch 16/52
Epoch 17/52
Epoch 18/52
Epoch 19/52
Epoch 20/52
Epoch 21/52
Epoch 22/52
Epoch 23/52
Epoch 24/52
Epoch 25/52
Epoch 26/52
Epoch 27/52
Epoch 28/52
Epoch 29/52
Epoch 30/52
Epoch 31/52
Epoch 32/52
Epoch 33/52
Epoch 34/52
Epoch 35/52
Epoch 36/52
Epoch 37/52
Epoch 38/52
Epoch 39/52
Epoch 40/52
Epoch 41/52
Epoch 42/52
Epoch 43/52
Epoch 44/52
Epoch 45/52
Epoch 46/52
Epoch 47/52
Epoch 48/52
Epoch 49/52
Epoch 50/52
Epoch 51/52
Epoch 52/52


<tensorflow.python.keras.callbacks.History at 0x7f79006da970>

In [None]:
def create_test(key_points, max_number_key_points):
    n_sample = len(key_points)
    res = np.zeros((n_sample, max_number_key_points, 2))
    for i in range(n_sample):
        for j in range(len(key_points[i])):
            res[i,j] = np.array(key_points[i][j].pt)
    return res

In [17]:
list_kp_test, list_des_test = compute_descriptors(X_test)
for i in range(len(list_des_test)):
    t = np.sum(list_des_test[i] == None)
    if t == 1:
        list_des_test[i] = np.zeros((64, 64))

In [18]:
test= preparing(list_des_test)

In [19]:
X_test_ar = np.zeros((len(test), 128, 64))

for i in range(len(test)):
    X_test_ar[i] = test[i]

In [20]:
y_pred = model.predict(X_test_ar)
y_pred_target = np.argmax(y_pred, axis=1)
y_pred_probability = np.max(y_pred, axis=1)

In [21]:
y_target = np.array(Y_test)
recall = recall_score(y_target, y_pred_target)
precission = precision_score(y_target, y_pred_target)
f1 = f1_score(y_target, y_pred_target)

Y_test_ar = np.zeros((len(test), 2))
for i in range(len(test)):
    Y_test_ar[i, Y_test[i]] = 1
roc_auc = roc_auc_score(Y_test_ar, y_pred)

In [22]:
print(f'accuracy: {accuracy_score(y_target, y_pred_target)}')
print(f'recall: {recall}')
print(f'precission: {precission}')
print(f'f1: {f1}')
print(f'roc_auc: {roc_auc}')

accuracy: 0.3953488372093023
recall: 0.8888888888888888
precission: 0.4
f1: 0.5517241379310346
roc_auc: 0.5477777777777778


In [23]:
print(f'accuracy: {accuracy_score(y_target, y_pred_target)}')
print(f'confusion_matrix: \n{confusion_matrix(y_target, y_pred_target)}')
print(f'roc_auc: {roc_auc}')

accuracy: 0.3953488372093023
confusion_matrix: 
[[ 1 24]
 [ 2 16]]
roc_auc: 0.5477777777777778


In [None]:
def compute_hist(frame, n=8):
    kaze = cv2.KAZE_create()
    img = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    img = cv2.resize(img, (128, 128))
    kp, des = kaze.detectAndCompute(img, None)

    list_kp = np.array([i.pt for i in kp])
    kmeans = KMeans(n_clusters=n, random_state=0)
    X = np.zeros((1, n))
    if len(kp) < n:
        X[:] = np.ones((1,n)) if len(list_kp) != 0 else np.zeros((1,n))
    else:
        kmeans.fit(list_kp)
        histogram = np.histogram(kmeans.labels_, bins=n)
        X[:] = histogram[0] / np.sum(histogram[0])
    return X

In [None]:
cap = cv2.VideoCapture('test.mp4')

# Check if camera opened successfully
if (cap.isOpened() == False):
  print("Unable to read camera feed")

# Default resolutions of the frame are obtained.The default resolutions are system dependent.
# We convert the resolutions from float to integer.
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))

# Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
out = cv2.VideoWriter('outpy.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 10, (frame_width,frame_height))

while(True):
  ret, frame = cap.read()

  if ret == True:
    X = compute_hist(frame)[np.newaxis, :, :]
    class_object = np.argmax(model.predict(X))
    position = (25, 350)
    cv2.putText(frame, 'Class ' + str(class_object), position, cv2.FONT_HERSHEY_SIMPLEX, 8, (209, 80, 0, 255), 3)
    out.write(frame)

  else:
    break

# When everything done, release the video capture and video write objects
cap.release()
out.release()

# Closes all the frames
cv2.destroyAllWindows()

In [None]:
## X_train



In [None]:
y_pred = model.predict(X_train_ar)
y_pred_tar = np.argmax(y_pred, axis=1)

In [None]:
y_target = np.array(Y_train)
recall = recall_score(y_target, y_pred_tar)
precission = precision_score(y_target, y_pred_tar)
f1 = f1_score(y_target, y_pred_tar)
#roc_auc = roc_auc_score(y_target, y_pred)

In [None]:
print(f'recall: {recall}')
print(f'precission: {precission}')
print(f'f1: {f1}')
print(f'roc_auc: {roc_auc}')

In [None]:
print(f'con_mat: {confusion_matrix(y_target, y_pred_tar)}')

In [None]:
model_json = model.to_json()
with open('model.json', 'w') as json_file:
    json_file.write(model_json)

model.save_weights('model.h5')

## SVM Linear

In [None]:
linear_SVM = LinearSVC(penalty='l2', loss='squared_hinge', dual=False, C=0.1)
scaler = StandardScaler()
X_train_arr = scaler.fit_transform(train)
Y_train_arr = np.array(Y_train)
linear_SVM.fit(X_train_arr, Y_train_arr)

In [None]:
list_kp_test, list_des_test = compute_descriptors(X_test)
for i in range(len(list_des_test)):
    t = np.sum(list_des_test[i] == None)
    if t == 1:
        list_des_test[i] = np.zeros((64, 64))

list_points_test = get_points(list_kp_test)
test = scaler.transform(get_train_date(list_points_test))

In [None]:
y_pred = linear_SVM.predict(test)

In [None]:
y_target = np.array(Y_test)

In [None]:
accuracy = accuracy_score(y_target, y_pred)
conf_matrix = confusion_matrix(y_target, y_pred)

In [None]:
print(f'accuracy: {accuracy}')
print(f'confusion matrix: \n {conf_matrix}')

## SVC

In [None]:
svc = SVC(kernel='sigmoid', coef0=10**-6, C=10**6, gamma='auto')
X_train_arr = scaler.fit_transform(train)
Y_train_arr = np.array(Y_train)
svc.fit(X_train_arr, Y_train_arr)

In [None]:
list_kp_test, list_des_test = compute_descriptors(X_test)
for i in range(len(list_des_test)):
    t = np.sum(list_des_test[i] == None)
    if t == 1:
        list_des_test[i] = np.zeros((64, 64))

list_points_test = get_points(list_kp_test)
test = scaler.transform(get_train_date(list_points_test))

In [None]:
y_pred = svc.predict(test)
y_target = np.array(Y_test)

accuracy = accuracy_score(y_target, y_pred)
conf_matrix = confusion_matrix(y_target, y_pred)

In [None]:
print(f'accuracy: {accuracy}')
print(f'confusion matrix: \n {conf_matrix}')

In [None]:
y_pred = svc.predict(train)

y_target = np.array(Y_train)

accuracy = accuracy_score(y_target, y_pred)
conf_matrix = confusion_matrix(y_target, y_pred)

print(f'accuracy: {accuracy}')
print(f'confusion matrix: \n {conf_matrix}')

## NuSVC

In [None]:
nusvc = NuSVC(nu=0.5, kernel='sigmoid', coef0=10**-6)
X_train_arr = scaler.fit_transform(train)
Y_train_arr = np.array(Y_train)
nusvc.fit(X_train_arr, Y_train_arr)

In [None]:
list_kp_test, list_des_test = compute_descriptors(X_test)
for i in range(len(list_des_test)):
    t = np.sum(list_des_test[i] == None)
    if t == 1:
        list_des_test[i] = np.zeros((64, 64))

list_points_test = get_points(list_kp_test)
test = scaler.transform(get_train_date(list_points_test))

In [None]:
y_pred = nusvc.predict(test)
y_target = np.array(Y_test)

accuracy = accuracy_score(y_target, y_pred)
conf_matrix = confusion_matrix(y_target, y_pred)

In [None]:
print(f'accuracy: {accuracy}')
print(f'confusion matrix: \n {conf_matrix}')

In [None]:
y_pred = nusvc.predict(train)

y_target = np.array(Y_train)

accuracy = accuracy_score(y_target, y_pred)
conf_matrix = confusion_matrix(y_target, y_pred)

print(f'accuracy: {accuracy}')
print(f'confusion matrix: \n {conf_matrix}')