In [13]:
import random
import numpy as np
from matplotlib import pyplot as plt
from sklearn.model_selection import KFold, train_test_split
import tensorflow as tf
import cv2
import os
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Flatten
from tensorflow.keras.optimizers import SGD

In [82]:

# scale pixels
def prep_pixels(tensor_arr):
  # convert from integers to floats
  train_norm = tensor_arr.astype('float32')
  # normalize to range 0-1
  train_norm = train_norm / 255.0
  # return normalized images
  return train_norm

# load train and test dataset
def appendImages(pt1,pt2):
  e1 = cv2.imread(pt1,cv2.IMREAD_GRAYSCALE)
  e2 = cv2.imread(pt2,cv2.IMREAD_GRAYSCALE)
  return np.concatenate((e1,e2),axis=1)

def loadData(db_root,filepath, intra=True):
  trainX = []
  trainY = []
  file = open(filepath,'r')
  count=0
  for line in file:
    l = line.split(',')
    sad = float(l[-1])
    if(sad<7 and intra == True):
      trainX.append(appendImages(f'{db_root}{l[0]}', f'{db_root}{l[1]}'))
      trainY.append([1,0])
      count+=1
    if(sad>7 and intra==False):
      trainX.append(appendImages(f'{db_root}{l[0]}', f'{db_root}{l[1]}'))
      trainY.append([0,1])
      count+=1
    if(count>=1000):
      break
  print(intra, count)
  file.close()
  return trainX, trainY

def load_dataset(db_root):
  # load dataset
  trainX,trainY = loadData(db_root,'IntraClassSad1.txt', True)
  print("Positives ",len(trainX))
  X,Y = loadData(db_root,'InterClassSad1.txt',False)
  print("Negatives ",len(X))
  trainX.extend(X)
  trainY.extend(Y)
  trainX = tf.convert_to_tensor(trainX)
  trainY = tf.convert_to_tensor(trainY)
  trainX = prep_pixels(trainX)
  trainX, testX, trainY, testY = train_test_split(trainX, trainY, test_size=0.2)
  # reshape dataset to have a single channel
  trainX = trainX.reshape((trainX.shape[0], 160, 280, 1))
  testX = testX.reshape((testX.shape[0], 160, 280, 1))
  print('ALL ', trainX.shape)
  # one hot encode target values
  assert(trainY.shape[1] == testY.shape[1])
  return trainX, trainY, testX, testY


In [83]:
trainX, trainY, testX, testY = load_dataset('features/m1/')

True 1000
Positives  1000
False 1000
Negatives  1000
ALL  (1600, 160, 280, 1)


In [84]:
print(trainY.shape, testY.shape)

(1600, 2) (400, 2)


In [85]:



# define cnn model
def define_model():
  model = Sequential()
  model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=(160, 280, 1)))
  model.add(MaxPooling2D((3, 3)))
  model.add(Flatten())
  model.add(Dense(200, activation='relu', kernel_initializer='he_uniform'))
  model.add(Dense(100, activation='relu', kernel_initializer='he_uniform'))
  model.add(Dense(2, activation='softmax'))
  # compile model
  opt = SGD(learning_rate=0.01, momentum=0.9)
  model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
  return model




# run the test harness for evaluating a model
def run_test_harness(trainX, trainY, testX, testY):
  # load dataset
  
  # prepare pixel data
  trainX = prep_pixels(trainX)
  
  # evaluate model
  model = define_model()
  #trainX, testX, trainY, testY = train_test_split(trainX, trainY, test_size=0.2, random_state=0)
  model.fit(trainX, trainY, epochs=5, batch_size=256)
  
  print(trainX.shape, testX.shape)
  # learning curves
  #summarize_diagnostics(histories)
  # summarize estimated performance
  model.save_weights('./tf_models/matcher.pth')
  return model

In [86]:
model = run_test_harness(trainX, trainY, testX, testY)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
(1600, 160, 280, 1) (400, 160, 280, 1)


In [87]:
model.evaluate(testX, testY)



[0.4624626040458679, 0.7699999809265137]

In [90]:
model = define_model()
model.load_weights('./tf_models/matcher2.pth')

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x199187fd840>

In [66]:
def createTestSet(db_root):
  random.seed(0)
  testX = []
  testY = []
  dirs = os.listdir(db_root)

  #negatives
  for d1 in range(len(dirs)-1):
    files1 = os.listdir(f'{db_root}/{dirs[d1]}')
    for d2 in range(d1+1, len(dirs)):
      files2 = os.listdir(f'{db_root}/{dirs[d2]}')
      samples = []
      for i in range(7):
        k1 = random.randint(0,len(files1)-1)
        k2 = random.randint(0, len(files2)-1)
        samples.append((k1,k2))
      for k in samples:
        im1 = cv2.imread(f'{db_root}/{dirs[d1]}/{files1[k[0]]}', 0)
        im2 = cv2.imread(f'{db_root}/{dirs[d2]}/{files2[k[1]]}', 0)
        testX.append(np.concatenate((im1,im2), axis=1))
        testY.append([0,1])
  
  
  count=0
  #positives
  for d in range(len(dirs)):
    files = os.listdir(f'{db_root}/{dirs[d]}')
    for k1 in range(len(files)-1):
      for k2 in range(k1+1,len(files)):
        im1 = cv2.imread(f'{db_root}/{dirs[d]}/{files[k1]}', 0)
        im2 = cv2.imread(f'{db_root}/{dirs[d]}/{files[k2]}', 0)
        testX.append(np.concatenate((im1,im2), axis=1))
        testY.append([1,0])
        count+=1
        if(count==5):
            count=0
            break
  testX1 = tf.convert_to_tensor(testX)
  testX1 = testX1.reshape((testX1.shape[0], 160, 280, 1))
  testX1 = prep_pixels(testX1)
  testY1 = tf.convert_to_tensor(testY)
  print(testX1.shape, testY1.shape)
  return testX1, testY1

In [88]:
testX1,testY1 = createTestSet('features/test/m1/')


(939, 160, 280, 1) (939, 2)


In [91]:
model.evaluate(testX1, testY1)



[0.6645694375038147, 0.771032989025116]

In [50]:
c1=0
c2=0
for k in testY1:
    if(k[0] == 0 and k[1] == 1):
        c1 +=1
    else:
        c2+=1
print(c1,c2)

393 546
