In [1]:
import numpy as np
from matplotlib import pyplot as plt
 
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
model = keras.models.load_model('./models/1000')

from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier

from numba import njit

import random
import os
import cv2
import glob
import shutil
import imutils
from pathlib import Path
from distutils.dir_util import copy_tree

In [3]:
img_height = 88
img_width = 128

def get_id(subject):
    sub = str(subject)
    if(len(sub)==1): sub = '00' + sub
    elif(len(sub)==2): sub = '0' + sub
    return sub

subjects = 124
subject_ids = [get_id(i) for i in range(1, subjects+1)] 
conditions = ['bg-01', 'bg-02', 'cl-01', 'cl-02', 'nm-01', 'nm-02', 'nm-03', 'nm-04', 'nm-05', 'nm-06']
views = ['000', '018', '036', '054', '072', '090', '108', '126', '144', '162', '180']

paths = []
for subject in subject_ids:
    for condition in conditions:
        for view in views:
            path = subject + '/' + condition + '/' + view + '/'
            paths.append(path)


file = open('./Results/gait_cycles.txt', "r")
cycles = file.read().split('\n')
file.close()

In [4]:
# ------------------------------------- PreProcessing ------------------------------------ #
# function to find vertical boundary of the subject in image
@njit
def find_boundary_y(image):
    y1 = -1
    y2 = -1
    
    Y = image.shape[0]
    X = image.shape[1]

    for y in range(0, Y):
        last = -1
        for x in range(0, X):
            if(image[y, x]!=0):
                if(y1==-1):
                    y1 = y
                last = y
        if(last!=-1):
            y2 = y

    return y1, y2

# function to find horizontal boundary of the subject in image
@njit
def find_boundary_x(image):
    x1 = -1
    x2 = -1

    Y = image.shape[0]
    X = image.shape[1]

    for x in range(0, X):
        last = -1
        for y in range(0, Y):
            if(image[y, x]!=0):
                if(x1==-1):
                    x1 = x
                last = x
        if(last!=-1):
            x2 = last

    return x1, x2

# function to find horizontal meadian
def find_median_x(image):
    x1, x2 = find_boundary_x(image)

    if ((x1+x2)&1):
        return (x1+x2)//2
    else :
        return (x1+x2)/2

# function to preprocess given image and bring the subject in the center.
def preprocess_image(image):
    y1, y2 = find_boundary_y(image)

    if(y1==-1): return [[]]
    
    image = image[y1:y2+1, :]
    image = imutils.resize(image, height=img_height)


    meadian = find_median_x(image)
    center = image.shape[1] // 2
    image = imutils.translate(image, x=(center-meadian), y=0)

    image = image[:, center-64 : center+64]
    return image

# function to get sequence of silhouette for given subject.
def preprocess_seq(path):
    images = Path('../dataset/' + path).glob('*.png')
    for img in images:
        silhouette = cv2.imread('../dataset/' + path + img.name, cv2.IMREAD_UNCHANGED)
        silhouette = silhouette // 255;
        silhouette = preprocess_image(silhouette)
        if(len(silhouette[0])==0): continue
        cv2.imwrite(os.path.join('./data/' + path, img.name), silhouette*255)
    return

def preprocess():
    for path in paths:
        preprocess_seq(path)
                
# preprocess()

In [5]:
def get_silhouette_seq(path):
    folder = './data/' + path
    seq = []
    for img in os.listdir(folder):
        silhouette = cv2.imread(os.path.join(folder + img), cv2.IMREAD_UNCHANGED)
        silhouette = silhouette // 255;
        if(silhouette.shape == (img_height, img_width)): seq.append(silhouette)
    np_seq = np.array(seq)
    return np_seq

In [6]:
# --------------------------- Gait Cycle detection --------------------------- #
@njit
def detect_gait_cycle(seq=np.array([[[]]]), start=20, end=40):
    total_frames = len(seq)
    # to do: manage the shift for subjects with less images
    
    gait_cycle_frames = 0
    max_similarity = -1

    # correlations = np.array()

    for N in range(start, end+1):
        if(total_frames <= N): break

        # finding auto-correlation of the sequence, for given shift N.
        
        numerator =0
        denominator1 = 0
        denominator2 = 0
        for n in range(0, total_frames - N):
            numerator = numerator + np.sum(np.multiply(seq[n], seq[n+N]))
            denominator1 = denominator1 + np.sum(np.multiply(seq[n], seq[n]))
            denominator2 = denominator2 + np.sum(np.multiply(seq[n+N], seq[n+N]))

        denominator1 = np.sqrt(denominator1)
        denominator2 = np.sqrt(denominator2)

        # calculating similarity
        similarity = numerator / (denominator1 * denominator2)

        # correlations.append(similarity)

        # will consider the shift which gives maximum similarity.
        if(max_similarity < similarity):
            gait_cycle_frames = N
            max_similarity = similarity

    # generate_video(seq, gait_cycle_frames)
    # return gait_cycle_frames, correlations
    return gait_cycle_frames

def compute_get_cycles():
    file = open('./gait_cycles.txt', 'w')
    for path in paths:
        seq = get_silhouette_seq(path)
        cycle =  str(detect_gait_cycle(seq)) if len(seq) else '0'
        file.write(cycle+'\n')
    file.close()

# compute_get_cycles()

# def generate_video(seq, gait_cycle):
#     out = cv2.VideoWriter('output_video.mp4', cv2.VideoWriter_fourcc(*'DIVX'), 1, (img_width, img_height), False)

#     for i in range(0,gait_cycle):
#         out.write(seq[i] * 255)
    
#     out.release()
#     cv2.destroyAllWindows()
#     return

# cycle, correlations = detect_gait_cycle(1)
# shift = [y for y in range(20, 41)]
# plt.xlim(15, 45)
# plt.plot(shift, correlations)
# plt.show()

In [14]:
# ------------------------------- Generating GEIs ------------------------------ #

# function to get GEI for given sequence of the frames.
@njit
def get_GEI(seq, cycle):
    geis = []
    start = 0
    n = len(seq)
    while start <= (n-cycle):
        gei = np.zeros((img_height, img_width))
        for k in range(0, cycle):
            gei = np.add(gei, seq[start+k])
        gei = gei / cycle
        start = start + cycle
        geis.append(gei)
    return geis

# driver function to generate GEI for given subject.
def generate_GEIs():
    for i in range(0, len(paths)):
        seq = get_silhouette_seq(paths[i])
        cycle = int(cycles[i])
        if(cycle==0): continue
        geis = get_GEI(seq, cycle)
        id = 1
        for gei in geis:
            name = paths[i].split('/')
            cv2.imwrite('./GEIs/%s/%s.png'%(name[0], name[1]+'-'+name[2]+'-'+str(id)), gei * 255)
            id+=1

generate_GEIs()

In [17]:
# ------------------------ Generate Gait Entropy Image ----------------------- #
# will find shannon entropy matrix for given sequence of silhouettes.
@njit
def shannon_entropy(seq, start, cycle):
    seq = seq[start:start+cycle]
    entropy = np.zeros((img_height, img_width))
    for img in seq:
        entropy = np.add(entropy, img)
    
    entropy = entropy / cycle
    max_h = 0
    min_h = 1
    for i in range(entropy.shape[0]):
        for j in range(entropy.shape[1]):
            p = entropy[i][j]
            if(p==0 or p==1):
                entropy[i][j] = 0
                continue
            entropy[i][j] = -(p*np.log2(p) + (1-p)*np.log2(1-p))
            max_h = max(entropy[i][j], max_h)
            min_h = min(entropy[i][j], min_h) 
    
    return min_h, max_h, entropy

def generate_GEnIs():
    for i in range(0, len(paths)):
        path = paths[i]
        cycle = int(cycles[i])

        if(cycle==0): continue

        subject = path.split('/')[0]
        condition = path.split('/')[1]
        view = path.split('/')[2]

        seq = get_silhouette_seq(path)
        id=0; start=0; n = len(seq)
        while start<=(n-cycle):
            min_h, max_h, GEnI = shannon_entropy(seq, start, cycle)
            GEnI = (GEnI - min_h) / (max_h - min_h)
            cv2.imwrite('./GEnIs/%s/%s.png'%(subject, condition+'-'+view+'-'+str(id)), GEnI*255)
            id+=1
            start+=cycle
    return


generate_GEnIs()


In [3]:
num_classes = 74
data_dir = './GEnIs/'
train_data_dir = './training_data/'


In [11]:
# get training dataset for 74 subjects
classes = []
for i in range(num_classes):
    label = random.choice(subject_ids)
    while label in classes:
        label = random.choice(subject_ids)
    classes.append(label)


files = glob.glob('./training_data/*')
for f in files:
    shutil.rmtree(f)

for sub in classes:
    shutil.copytree('./GEnIs/%s'%sub, './training_data/%s'%sub)

In [80]:
epochs = 1000
batch_size = 64
val_split = 0.2

f1 = 40; k1 = 3; s1 = 2
f2 = 32; k2 = 5; s2 = 3
drop = 0.4


In [79]:
# building data 
train_data = tf.keras.utils.image_dataset_from_directory(
  train_data_dir,
  seed=114,
  subset='training',
  batch_size=batch_size,
  validation_split=val_split,
  image_size=(img_height, img_width),
)

val_data = tf.keras.utils.image_dataset_from_directory(
  train_data_dir,
  seed=114,
  # shuffle=False,
  subset="validation",
  batch_size=batch_size,
  validation_split=val_split,
  image_size=(img_height, img_width),
)

# cache method will help to cache the data during the first epoch
# prefetch method basically loads the next batch of data while the current batch is being processed.
AUTOTUNE = tf.data.AUTOTUNE

train_data = train_data.cache().prefetch(buffer_size=AUTOTUNE)
val_data = val_data.cache().prefetch(buffer_size=AUTOTUNE)


# create the model
model = Sequential([
    layers.Rescaling(1./255, input_shape=(img_height, img_width, 3)),
    
    layers.Conv2D(filters=f1, kernel_size=k1, strides=s1, padding='same', activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D(pool_size=2, strides=2),
    layers.Dropout(drop),

    layers.Conv2D(filters=f2, kernel_size=k2, strides=s2, padding='same', activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D(pool_size=3, strides=2),
    layers.Dropout(drop),


    layers.Flatten(),

    layers.Dense(1024, use_bias=False),
    layers.BatchNormalization(),
    layers.Activation('relu'),
    layers.Dropout(drop),

    layers.Dense(num_classes, activation='softmax')
])

model.compile(
    optimizer='adam',
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

# train the model
history = model.fit(
    train_data,
    validation_data=val_data,
    epochs=epochs
)

In [81]:
model.summary()

In [91]:
temp = os.listdir('./training_data/')
img = cv2.imread('./training_data/031/cl-01-090.png')
t = np.argmax(model.predict(np.array([img])))
temp[t]

In [10]:
def empty(path):
    shutil.rmtree(path)
    os.mkdir(path)


def KNN(k):
    gallery, y_train = get_data('./gallery/*')
    probe, y_test = get_data('./probe/*')
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(gallery, y_train)
    score = knn.score(probe, y_test) * 100
    print("for k=%s, "%k, end=" ")
    print(score)
    return knn
    

def get_layer_data(path, layerIndex=9):
    func = keras.backend.function([model.get_layer(index=0).input], model.get_layer(index=layerIndex).output)
    img = cv2.imread(path)
    layerOutput = func(np.array([img]))  # input_data is a numpy array
    return layerOutput[0]


def get_data(path):
    data = []
    labels = []
    for img in glob.glob(path):
        data.append(get_layer_data(img))
        labels.append(img.split('\\')[1].split('-')[0])
    return np.array(data), labels

n = 1
temp = os.listdir('./training_data/')
classes = []
for i in subject_ids:
    if i not in temp:
        classes.append(i);

In [34]:
# fix angle, take 4 normal in gallery for 50 subjects...
def make_gallery(view):
    empty('./gallery/')
    for sub in classes:
        for env in ['nm-01', 'nm-02', 'nm-03', 'nm-04']:
            files = glob.glob('./GEnIs/%s/%s-%s.png'%(sub, env, view))
            for file in files:
                temp = file.split('/')
                name = temp[2] + '-' + temp[3]
                shutil.copy(file, './gallery/%s'%name)

# take 2 sequences in probe...
def make_probe(view):
    empty('./probe/')
    for sub in classes:
        for env in ['cl-01', 'cl-02']:
            files = glob.glob('./GEnIs/%s/%s-%s.png'%(sub, env, view))
            for file in files:
                temp = file.split('/')
                name = temp[2] + '-' + temp[3]
                shutil.copy(file, './probe/%s'%name)

# running for each pair
vs = views
for i in vs:
    for j in vs:
        make_gallery(i)
        make_probe(j)
        print(i, j)
        KNN(1)

In [85]:
# Random Experiment - take 50 x 4 x 10 sequences / images in gallery...
empty('./gallery/')
for v in ['036', '054', '072', '090']:
    for sub in classes:
        files = glob.glob('./GEnIs/%s/*%s*'%(sub, v))
        for file in files:
            temp = file.split('/')
            name = temp[2].replace('\\', '-')
            shutil.copy(file, './gallery/%s'%name)

train_data = glob.glob('./gallery/*')
test_data = random.sample(train_data, (40 * len(train_data))//100)

empty('./probe/')
for file in test_data:
    shutil.move(file, './probe/')

KNN(1)

# Average accuracy of 10 iterations of this block = 78.81%

In [88]:
knn = np.array([])
for i in range(0, n):
    for j in range(0, n):
        gallery_view = views[i]
        probe_view = views[j]
        
        print("probe=%s"%probe_view)
        print("gallery=%s"%gallery_view)
        for cl in ["nm", "bg", "cl"]:
            empty('./gallery/')
            empty('./probe/')
            y_train = []; y_test = []
            for cls in classes:
                    for g in glob.glob('./GEnIs/%s/%s*%s*.png'%(cls, cl, gallery_view)):
                        y_train.append(cls)
                        print(g)
                        shutil.copy(g, './gallery/%s'%(g.split('/')[2].replace('\\', '-')))
                    for p in glob.glob('./GEnIs/%s/%s*%s*.png'%(cls, cl, probe_view)):
                        y_test.append(cls)
                        shutil.copy(p, './probe/%s.png'%(p.split('/')[2].replace('\\', '-')))
            K = [1, 3, 5, 7, 9]
            # 11, 13, 15, 17, 19, 21, 23, 25
            curr = np.array([])
            print("for", cl)
            for k in K:
                np.append(curr, KNN(k, y_train, y_test))
            np.append(knn, curr);

In [86]:
# same angle in gallery and probe
# 4 normal in gallery, 2 variable in probe
knn = np.array([])
for i in range(0, n):
    view = views[i]
    print("view angle:", view)
    empty('./gallery/')
    empty('./probe/')
    y_train = []; y_test = []
    for cls in classes:
        for c1 in ['01', '02', '03', '04']:
            for g in glob.glob('./GEnIs/%s/nm-%s*%s*.png'%(cls, c1, view)):
                y_train.append(cls)
                shutil.copy(g, './gallery/%s'%(g.split('/')[2].replace('\\', '-')))
        
        for c2 in ['bg-01', 'bg-02']:
            for p in glob.glob('./GEnIs/%s/%s*%s*.png'%(cls, c2, view)):
                y_test.append(cls)
                shutil.copy(p, './probe/%s.png'%(p.split('/')[2].replace('\\', '-')))
    K = [1, 3, 5, 7, 9]
    # 11, 13, 15, 17, 19, 21, 23, 25
    curr = np.array([])
    for k in K:
        np.append(curr, KNN(k, y_train, y_test))
    np.append(knn, curr);