In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os, time
from skimage import io
from skimage.feature import hog
#from sklearn import svm
from sklearn import linear_model
from sklearn import metrics
import pickle

In [None]:
plt.rcParams['figure.figsize'] = (15,15)
plt.rcParams.update({'font.size': 12})

In [None]:
DIR = 'Individual_Component'
SEED = 42

# [HOG parameters]
# img (w,h): (64,80)
# NFEATURES = nblocks * norientations/cell * ncells/block
BLOCK_NORM_LIST = ['L1', 'L1-sqrt', 'L2', 'L2-Hys']
BLOCK_SIZE_LIST = [1, 2, 3, 4, 5, 6]
CELL_PIXEL_LIST = [4, 6, 7, 8, 9, 10, 12, 14, 16]
ORIENTATION_LIST = [6, 8, 9, 10, 12, 16, 32]

NPOS_TRAINING_IMGS = 10#3000
NNEG_TRAINING_IMGS = 10#3000

In [None]:
def list_mul(l, num):
    return tuple(map(lambda x: int(num * x), l))

# Histogram of Oriented Gradients
def run_hog(img, visualize=False):
    global NORIENTATIONS, CELL_PIXELSHAPE, CELLS_PER_BLOCK, BLOCK_NORM
    result = hog(img, orientations=NORIENTATIONS, 
                 pixels_per_cell=CELL_PIXELSHAPE, cells_per_block=CELLS_PER_BLOCK, 
                 block_norm=BLOCK_NORM, visualize=visualize, transform_sqrt=True,
                 multichannel=True)
    return result

In [None]:
# Reference: https://www.kaggle.com/manikg/training-svm-classifier-with-hog-features

RESULTS_FH = None # global

def log_result(s):
    global RESULTS_FH
    print(s)
    print(file=RESULTS_FH)
    

# Code to load the dataset
def get_dataset_fp(is_train):
    if is_train:
        subroot = 'train'
    else:
        subroot = 'test'
    base_fp = os.path.join(DIR, subroot)
    pos_dirs = []
    neg_dirs = []
    for dir_name in os.listdir(base_fp): # all files & dirs
        subfp = os.path.join(base_fp, dir_name)
        if not os.path.isdir(subfp):
            continue    
        for subdir_name in os.listdir(subfp):
            subsubfp = os.path.join(subfp, subdir_name)
            if not os.path.isdir(subsubfp):
                continue
            if 'positive' in dir_name:
                pos_dirs.append(subsubfp)
            elif 'negative' in dir_name:
                neg_dirs.append(subsubfp)
    return pos_dirs, neg_dirs

def get_subset_images(fp, nimgs=None, preproc=False):
    results = []
    results_fp = []
    count = 0
    for img_fn in os.listdir(fp):
        img_fp = os.path.join(fp, img_fn)
        if os.path.isdir(img_fp) or img_fn[-4:].lower() != '.pnm':
            continue
        img = io.imread(img_fp)
        if preproc: # proprocess as HOG
            hog_fd = run_hog(img)
            results.append(hog_fd)
        else:
            results.append(img)
        results_fp.append(img_fp)
        count += 1 # restrict num imgs loaded
        if nimgs != None and count >= nimgs:
            break
    return results, results_fp

NO_PERSON = 0
IS_PERSON = 1

def load_images(is_train, shuffle, npos_imgs=None, nneg_imgs=None, save_fn=None):
    pos_dirs, neg_dirs = get_dataset_fp(is_train=is_train)    
    x_train = []
    y_train = []
    fp_train = []
    # get preprocessed training/testing data
    for fp in pos_dirs:
        start_time = time.time()
        cur_subset, cur_subset_fp = get_subset_images(fp, npos_imgs, True)
        x_train += cur_subset
        fp_train += cur_subset_fp
        log_result('  * {:.3f}s runtime (images loaded): {}'.format(time.time() - start_time,fp))
        if npos_imgs != None:
            npos_imgs -= len(cur_subset)
            if npos_imgs <= 0:
                break
    pos_length = len(x_train)
    y_train += [IS_PERSON] * pos_length
    
    for fp in neg_dirs:
        start_time = time.time()
        cur_subset, cur_subset_fp = get_subset_images(fp, nneg_imgs, True)
        x_train += cur_subset
        fp_train += cur_subset_fp
        log_result('  * {:.3f}s runtime (images loaded): {}'.format(time.time() - start_time,fp))
        if nneg_imgs != None:
            nneg_imgs -= len(cur_subset)
            if nneg_imgs <= 0:
                break
    y_train += [NO_PERSON] * (len(x_train) - pos_length)
    
    # get hog descriptor size
    hog_shape = x_train[0].shape
    
    # shuffle training data
    #print('Reformatting data...')
    x_train = np.array(x_train)
    y_train = np.array(y_train)
    fp_train_index = np.arange(len(fp_train)) # create unique IDs (from 0)
    
    if shuffle:
        y_train = y_train.reshape(len(y_train),1)
        fp_train_index = fp_train_index.reshape(len(fp_train_index),1)
        
        data_frame = np.hstack((x_train,y_train, fp_train_index))
        #print('Reshuffling data...')
        start_time = time.time()
        np.random.seed(SEED)
        np.random.shuffle(data_frame)
        x_train = data_frame[:,:-2]
        y_train = data_frame[:,-2].ravel()
        fp_train_index = data_frame[:,-1].ravel()
        log_result('  * {:.3f}s runtime (shuffling)'.format(time.time() - start_time))
    
    log_result('HOG descriptor size: ' + str(hog_shape))
    result = (x_train, y_train, fp_train_index, fp_train)
    # save loaded images
    if save_fn != None:
        with open(save_fn, 'wb') as fh:
            pickle.dump(result, fh)
    return result
    
# Code to generate the SVM model
def gen_model(model_fn, x_train, y_train):
    # generate SVM model
    start_time = time.time()
    #clf = svm.SVC(probability=True)
    clf = linear_model.SGDClassifier()
    clf.fit(x_train, y_train)
    log_result('{:.3f}s runtime (SGD training)'.format(time.time() - start_time))
    
    # save SVM model
    with open(model_fn, 'wb') as fh:
        pickle.dump(clf, fh)
    return clf

In [None]:
# Computes statistics for the classifier's performance
def run_test(clf, x_test, y_test):
    log_result('\n[Classifier statistics]:')
    start_time = time.time()
    y_pred = clf.predict(x_test)
    y_prob = clf.predict_proba(x_test)
    # y_prob: https://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html
    #  - shape: (nsamples, nclasses), by order of `clf.classes_`
    elapsed_time = time.time() - start_time
    log_result('  * {:.3f}s prediction time ({:.6f} s/image)'.format(elapsed_time, elapsed_time/len(y_test)))
    log_result('  * AUC (weighted): {:.3f}'.format(metrics.roc_auc_score(y_test, y_prob[:,IS_PERSON], average='weighted')))
    log_result('  * Accuracy: {:.3f}'.format(metrics.accuracy_score(y_test, y_pred)))
    log_result('  * Precision (weighted): {:.3f}'.format(metrics.precision_score(y_test, y_pred, average='weighted')))
    log_result('  * Recall (weighted): {:.3f}'.format(metrics.recall_score(y_test, y_pred, average='weighted')))
    log_result('  * Confusion Matrix:')
    log_result(str(metrics.confusion_matrix(y_test, y_pred)))
    return y_pred, y_prob

# Returns all image file paths detected falsely
def filter_failed_fp(y_pred, y_test, fp_test_index, fp_test):
    failed_fp = []
    for i in range(len(y_pred)):
        if y_pred[i] != y_test[i]:
            if y_pred[i] == NO_PERSON:
                label = 'FN'
            else:
                label = 'FP'
            failed_fp.append( (label, fp_test[int(fp_test_index[i])]) )
    return failed_fp

In [None]:
# 1. Generate SVM model (and training images) or load cached result
def get_svm_model(cur_dir_contents, SVM_MODEL, PRE_TRAINING_IMGS):
    global NPOS_TRAINING_IMGS,NNEG_TRAINING_IMGS
    if SVM_MODEL not in cur_dir_contents:
        log_result('\n[Training SVM model]:')
        x_train, y_train, fp_train_index, fp_train = load_images(True,True, 
                                                                 NPOS_TRAINING_IMGS,NNEG_TRAINING_IMGS, 
                                                                 PRE_TRAINING_IMGS)
        clf = gen_model(SVM_MODEL,x_train,y_train)
    else:
        log_result('\n[Loading cached SVM model & training images]:')
        with open(SVM_MODEL, 'rb') as fh:
            clf = pickle.load(fh)
        with open(PRE_TRAINING_IMGS, 'rb') as fh:
            x_train, y_train, fp_train_index, fp_train = pickle.load(fh)
            
    log_result('Number of training images loaded:', len(x_train))
    return x_train, y_train, fp_train_index, fp_train, clf

# 2. Generate test images or load cached result
def get_test_imgs(cur_dir_contents, PRE_TESTING_IMGS):
    # Generate test images or load cached result
    if PRE_TESTING_IMGS not in cur_dir_contents:
        log_result('\n[Generating test images]:')
        x_test, y_test, fp_test_index, fp_test = load_images(False,True, save_fn=PRE_TESTING_IMGS)
    else:
        log_result('\n[Loading cached test images]:')
        with open(PRE_TESTING_IMGS, 'rb') as fh:
            x_test, y_test, fp_test_index, fp_test = pickle.load(fh)
    
    log_result('Number of testing images loaded:', len(x_test))
    return x_test, y_test, fp_test_index, fp_test
            
# 3. Evaluate performance (for all permutations)
#    call: run_test()
#    manual post-analysis: filter_failed_fp()

# Wrapper for single iteration
def run_hogsvm():
    global NORIENTATIONS, CELL_PIXELSHAPE, CELLS_PER_BLOCK, BLOCK_NORM
    global RESULTS_FH
    base_fn = '_ori({})_cellpix({})_blksze({})_blknrm({})'.format(
        NORIENTATIONS, CELL_PIXELSHAPE[0], CELLS_PER_BLOCK[0], BLOCK_NORM)
    pickle_type = '.pickle'
    text_type = '.txt'
    SVM_MODEL = 'hogsvm_model' + base_fn + pickle_type
    PRE_TRAINING_IMGS = 'hogsvm_train' + base_fn + pickle_type
    PRE_TESTING_IMGS = 'hogsvm_test' + base_fn + pickle_type
    results_fn = 'hogsvm_result' + base_fn + text_type
    
    cur_dir_contents = os.listdir('.')
    RESULTS_FH = open(results_fn, 'w')
    log_result('[Current parameter sweep]:')
    log_result('Number of orientations: {}'.format(NORIENTATIONS))
    log_result('Cell pixel shape: {}'.format(CELL_PIXELSHAPE))
    log_result('Number of cells per block: {}'.format(CELLS_PER_BLOCK))
    log_result('Block normalisation method: {}'.format(BLOCK_NORM))
    
    # load svm model and test dataset
    x_train, y_train, fp_train_index, fp_train, clf = get_svm_model(cur_dir_contents, SVM_MODEL, PRE_TRAINING_IMGS)
    x_test, y_test, fp_test_index, fp_test          = get_test_imgs(cur_dir_contents, PRE_TESTING_IMGS)
    # evaluate on test and training datasets (as a crude check for overfitting)
    y_pred, y_prob = run_test(clf, x_test, y_test)
    y_pred_training, y_prob_training = run_test(clf, x_train, y_train)
    # identify all false results
    failed_fp = filter_failed_fp(y_pred, y_test, fp_test_index, fp_test)
    log_result('\n[Falsely detected images]:')
    for label, fp in failed_fp:
        log_result('  * {}: {}'.format(label,fp))

    RESULTS_FH.close()
    RESULTS_FH = None

In [None]:
# [PARAMETER SWEEP]
# block parameters
for BLOCK_NORM in BLOCK_NORM_LIST:
    for block_size in BLOCK_SIZE_LIST:
        CELLS_PER_BLOCK = (block_size, block_size)    
        # cell/orientation parameters
        for cp in CELL_PIXEL_LIST:
            CELL_PIXELSHAPE = (cp,cp)
            for NORIENTATIONS in ORIENTATION_LIST:
                run_hogsvm()