In [1]:
#Imports 
import cv2
import numpy as np
import math
import glob
import time
import os
from os import listdir
from os.path import isfile, join
import matplotlib.pyplot as plt
from tqdm import tqdm
from feature_extraction import *
from PIL import Image, ImageEnhance

%matplotlib inline

In [10]:
# extract features from images
is_load = True
contours = []
labels   = []  # 1 for males  , 0 for females

if ( not is_load):
    # read male images
    male_path   = 'dataset_resized/flooded'
    male_files   = [ f for f in listdir(male_path) if isfile(join(male_path,f)) ]
    for i in tqdm(range(0, len(male_files))):
        img  = Image.open(join(male_path,male_files[i]))
        img, _  = preprocess_image( img )
        contour = get_contour_pixels(img)
        contours.append(contour)
        labels.append(1)
        
    ## read female images
    female_path = 'dataset_resized/non-flooded'
    female_files = [ f for f in listdir(female_path) if isfile(join(female_path,f)) ]
    for i in tqdm(range(0, len(female_files))):
        img  = Image.open( join(female_path,female_files[i]))
        img, _  = preprocess_image( img )
        contour = get_contour_pixels(img)
        contours.append(contour)
        labels.append(0)

    contours = np.asarray(contours , dtype= object)
    labels   = np.asarray(labels   , dtype= int )
    #save lables to dataset
    np.save('labels.npy', labels)

In [11]:
hinge_features = []
cold_features  = []
if(not is_load):
    for i in tqdm(range( len(contours)) ):
        feature  = get_hinge_features( contours[i] )
        hinge_features.append(feature)  
    hinge_features = np.asarray(hinge_features , dtype=object)
    np.save('features/hinge_features.npy', hinge_features)

    for i in tqdm(range( len(contours)) ):
        feature  = get_cold_features( contours[i] )
        cold_features.append(feature)  
    cold_features = np.asarray(cold_features , dtype=object)
    np.save('features/cold_features.npy', cold_features)

In [13]:
if(is_load):
    hinge_features = np.load('features/hinge_features.npy' , allow_pickle= True)
    cold_features = np.load('features/cold_features.npy' ,  allow_pickle= True)
    labels = np.load('features/labels.npy' , allow_pickle= True)

## concate features

In [14]:
# concatenate features in one flattened array
features = np.concatenate( (hinge_features, cold_features) , axis=1)
cold_features.shape

(922, 420)

In [15]:
# Split dataset into training set and test set
from sklearn.model_selection import train_test_split
# X_train, X_test, y_train, y_test = train_test_split( hinge_features , labels , test_size=0.2 ,  random_state=109) # 80% training and 20% test
# X_train, X_test, y_train, y_test = train_test_split( hinge_features , labels , test_size=0.2 ,  random_state=50)
X_train, X_test, y_train, y_test = train_test_split( features , labels , test_size=0.2 ,  random_state=175)

In [7]:
# # Applying PCA function on training
# # and testing set of X component
# from sklearn.decomposition import PCA
 
# pca = PCA(n_components = 0.95)
 
# # X_train = pca.fit_transform(X_train)
# # X_test = pca.transform(X_test)

## SVM

In [16]:
#Import svm model
from sklearn import svm
from sklearn import metrics
from sklearn.metrics import classification_report

#Create a svm Classifier
t0 = time.time()
SVM_clf = svm.SVC(kernel= "linear" ) # Linear Kernel

#Train the model using the training sets
SVM_clf.fit(X_train, y_train)

#Predict the response for test dataset
y_pred = SVM_clf.predict(X_test)
t1 = time.time()
# clf.score(X_test, y_test)


# accuracy & time taken to train the model
print("Time taken to train the model: in milliseconds: ", (t1-t0)*1000)
print("Accuracy:",metrics.accuracy_score(y_test, y_pred))
# print(classification_report(y_test, y_pred))

Time taken to train the model: in milliseconds:  165.3447151184082
Accuracy: 0.6864864864864865


## XGBOOST


In [9]:
## make and xgboost model   (pip install xgboost)
import xgboost as xgb

# create classifier 
t0 = time.time()
XGB_clf = xgb.XGBClassifier(max_depth=3, n_estimators=1000, learning_rate=0.05)
# XGB_clf = xgb.XGBClassifier(max_depth=5, n_estimators=5000, learning_rate=0.05)

# fit the classifier on the training data
XGB_clf.fit(X_train, y_train)

# make predictions for the test data
y_pred = XGB_clf.predict(X_test)
t1 = time.time()

# accuracy & time taken to train the model
print("Time taken to train the model: in milliseconds: ", (t1-t0)*1000, "ms")
print("Accuracy:",metrics.accuracy_score(y_test, y_pred))
# print(classification_report(y_test, y_pred))

ModuleNotFoundError: No module named 'xgboost'

## Random forest tree


In [17]:
#Import Random Forest Model
from sklearn.ensemble import RandomForestClassifier

from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

#Create a Gaussian Classifier  , random_state -> 39 , 60 , 67
# randomFores_clf=RandomForestClassifier(n_estimators=1000,  random_state = 67)

#Train the model using the training sets 
# randomFores_clf.fit(X_train,y_train)

# y_pred=randomFores_clf.predict(X_test)
# # Create the parameter grid based on the results of random search
param_grid = {
    'max_depth': [10, 50, 100],
    'min_samples_leaf': [1, 2,5],
    'min_samples_split': [1, 2, 5],
    'n_estimators': [10,20, 100, 200]
}

# Create a based model
pipe= Pipeline([('scaler', StandardScaler()), ('rf', RandomForestClassifier())])

# Instantiate the grid search model
grid_search = GridSearchCV(estimator = rf, param_grid = param_grid, cv = 3, n_jobs = -1, verbose = 2)

# accuracy & time taken to train the model
print("Time taken to train the model: in milliseconds: ", (t1-t0)*1000, "ms")
print("Accuracy:",metrics.accuracy_score(y_test, y_pred))
# print(classification_report(y_test, y_pred))

Time taken to train the model: in milliseconds:  165.3447151184082 ms
Accuracy: 0.8864864864864865


In [None]:
def train(classifier , random=None) :
    if random is None :
        X_train, X_test, y_train, y_test = train_test_split( hinge_features , labels , test_size=0.2)
    else:
        X_train, X_test, y_train, y_test = train_test_split( hinge_features , labels , test_size=0.2, random_state=random ) # 80% training and 20% test
    #Train the model using the training sets
    classifier.fit(X_train, y_train)

    #Predict the response for test dataset
    y_pred = classifier.predict(X_test)

    return metrics.accuracy_score(y_test, y_pred)

In [None]:
# Get avg accuraccy for models ( SVM , XGB , Random Forest)
is_get_avg = False 
if is_get_avg: 
    svm_list = []
    xgb_list = []
    rf_list  = []
    time_list = []

    # train svm model for 100 times
    t0 = time.time()
    for i in tqdm(range(100, 200)):
        svm_list.append( train(SVM_clf ) )
    t1 = time.time()
    time_list.append( (t1-t0) )

    # train xgboost model for 100 times
    t0 = time.time()
    for i in tqdm(range(100, 200)):
        xgb_list.append( train(XGB_clf) )
    t1 = time.time()
    time_list.append( (t1-t0) )
    
    # train random forest model for 100 times
    t0 = time.time()
    for i in tqdm(range(100, 200)):
        rf_list.append ( train(randomFores_clf ))
    t1 = time.time()
    time_list.append( (t1-t0) )

print("Time taken to train SVM: in miutes", time_list[0]/60, "minutes")
print("Time taken to train XGBOOST: in miutes", time_list[1]/60, "minutes")
print("Time taken to train Random forest: in miutes", time_list[2]/60, "minutes")
avg_svm = sum(svm_list) / len(svm_list)
avg_xgb = sum(xgb_list) / len(xgb_list)
avg_rf  = sum(rf_list)  / len(rf_list)

Time taken to train SVM: in miutes 0.008482722441355388 minutes
Time taken to train XGBOOST: in miutes 1.6525152762730917 minutes
Time taken to train Random forest: in miutes 2.960528488953908 minutes


In [None]:
if is_get_avg:
    print(f" avg SVM {avg_svm} || max SVM {max(svm_list)} || min SVM {min(svm_list)} || argmax {np.array(svm_list).argmax()}")
    print(f" avg XGB {avg_xgb} || max XGB {max(xgb_list)} || min XGB {min(xgb_list)} || argmax {np.array(xgb_list).argmax()}")
    print(f" avg RF  {avg_rf}  || max RF  {max(rf_list)}  || min RF  {min(rf_list)}  || argmax {np.array(rf_list).argmax() }")

 avg SVM 0.641232876712329 || max SVM 0.7534246575342466 || min SVM 0.4931506849315068 || argmax 69
 avg XGB 0.8000000000000002 || max XGB 0.8767123287671232 || min XGB 0.6712328767123288 || argmax 2
 avg RF  0.7875342465753427  || max RF  0.8904109589041096  || min RF  0.6712328767123288  || argmax 78


## Test images

In [None]:
def test_img_hinge( classifier , path):
    img  = Image.open(path)
    img, _  = preprocess_image(  img)
    contour = get_contour_pixels(img)
    hinge = get_hinge_features(contour)
    tt    = classifier.predict(np.array([hinge]))
    if tt == 1 : print(f"{path} \t: is Male") 
    else       : print(f"{path} \t: is Female") 

# def test_img_cold( classifier , path):
#     img  = Image.open(path)
#     img, _  = preprocess_image(  img)
#     contour = get_contour_pixels(img)
#     cold  = get_cold_features(contour)
#     tt    = classifier.predict(np.array([cold]))
#     if tt == 1 : print(f"{path} \t: is Male") 
#     else       : print(f"{path} \t: is Female") 

## test results in notebook

In [None]:
# test_img_hinge( XGB_clf, '/001.jpg')
# test_img_hinge( XGB_clf, 'dataSet/Females/F100.jpg')


OUTPUT_DIRECTORY   = 'Project Submission/test'
test_files   = [ f for f in listdir(OUTPUT_DIRECTORY) if isfile(join(OUTPUT_DIRECTORY,f)) ]
for i in range(0, len(test_files)):
    img  = Image.open(join(OUTPUT_DIRECTORY,test_files[i]))
    test_img_hinge(SVM_clf , join(OUTPUT_DIRECTORY,test_files[i]))
print('------------------------------------------------')

for i in range(0, len(test_files)):
    img  = Image.open(join(OUTPUT_DIRECTORY,test_files[i]))
    test_img_hinge(XGB_clf , join(OUTPUT_DIRECTORY,test_files[i]))

print('------------------------------------------------')
for i in range(0, len(test_files)):
    img  = Image.open(join(OUTPUT_DIRECTORY,test_files[i]))
    test_img_hinge(randomFores_clf , join(OUTPUT_DIRECTORY,test_files[i]))

Project Submission/test\001.jpg 	: is Male
Project Submission/test\002.jpg 	: is Male
Project Submission/test\003.jpg 	: is Male
Project Submission/test\004.jpg 	: is Male
Project Submission/test\005.jpg 	: is Male
Project Submission/test\006.jpg 	: is Male
------------------------------------------------
Project Submission/test\001.jpg 	: is Male
Project Submission/test\002.jpg 	: is Male
Project Submission/test\003.jpg 	: is Female
Project Submission/test\004.jpg 	: is Female
Project Submission/test\005.jpg 	: is Female
Project Submission/test\006.jpg 	: is Male
------------------------------------------------
Project Submission/test\001.jpg 	: is Male
Project Submission/test\002.jpg 	: is Male
Project Submission/test\003.jpg 	: is Female
Project Submission/test\004.jpg 	: is Female
Project Submission/test\005.jpg 	: is Female
Project Submission/test\006.jpg 	: is Male


## Test Script

In [None]:
OUTPUT_DIRECTORY = "Project Submission/out" ## * Appended Path of outputs
TEST_DIRECTORY   = "Project Submission/test" ## * Appended Path of test images
answers   = []
time_list =[]


test_files   = [ f for f in listdir(TEST_DIRECTORY) if isfile(join(TEST_DIRECTORY,f)) ]
for i in tqdm(range(0, len(test_files))):
    img  = Image.open(join(TEST_DIRECTORY,test_files[i]))
    
    t0 = time.time()
    try: 
        img, _  = preprocess_image( img )
        contour = get_contour_pixels(img)
        hinge = get_hinge_features(contour)
        ans    = randomFores_clf.predict(np.array([hinge]))
        t1 = time.time()

        time_list.append( (t1-t0) )
        if   ans == 1 :   answers.append(1)
        elif ans == 0 : answers.append(0)
        else : answers.append(-1)
    except:
        print(f"{test_files[i]} \t: is not a valid image")
        answers.append(-1)
        time_list.append( time.time() - t0 )

# save the output in txt files 
with open(join(OUTPUT_DIRECTORY,'results.txt'), 'w') as f:
    for i in range(0, len(answers)):
        f.write(f"{answers[i]}\n")

#write time in txt files 
with open(join(OUTPUT_DIRECTORY,'times.txt'), 'w') as f:
    for i in range(0, len(time_list)):
        f.write(f"{time_list[i]}\n")



100%|██████████| 6/6 [00:16<00:00,  2.73s/it]


## Evaluate Script

In [None]:
OUTPUT_DIRECTORY = "Project Submission/out" ## * Appended Path of outputs
TEST_DIRECTORY   = "Project Submission/test" ## * Appended Path of test images
truth = None
with open('./ground_truth.txt', 'rb') as gt_file:
    truth = [ int(line)  for line in gt_file.readlines()]
    truth = np.array(truth)

hypothesis = None
with open( join(OUTPUT_DIRECTORY,'results.txt'), 'rb') as hypo_file:
    hypothesis = [ int(line)  for line in hypo_file.readlines()]
    hypothesis = np.array(hypothesis)

## ! Account for length mismatch:
if len(truth) != len(hypothesis):
    truncation_len = min(len(truth), len(hypothesis))
    hypothesis = hypothesis[:truncation_len]
    truth = truth[:truncation_len]

ACCUARICY = np.sum(truth == hypothesis) / len(truth)    

## * Time Evaluation:
times = None
with open(join(OUTPUT_DIRECTORY,'times.txt'), 'rb') as times_file:

    times = [float(line) for line in times_file.readlines()]
    times = np.array(times)

TIME_MEAN = round(np.mean(times), 3)


## * Report:
print(ACCUARICY*100  , TIME_MEAN , sep = "\t || \t")

100.0	 || 	2.726
