In [1]:
import pickle
import numpy as np
from sklearn.feature_selection import SelectPercentile, f_classif
import math
import time
import os
import cv2
import csv
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

class ViolaJones:
    def __init__(self, T = 10):
        self.clfs = []
        self.T = T  #T: The number of weak classifiers which should be used
        self.alphas = []
    
    @staticmethod
    def load(filename):
        with open(filename+".pkl", 'rb') as f:
            return pickle.load(f)    

    def train(self, training, pos_num, neg_num):
        training_data = []
        weights = np.zeros(len(training))
 
        for i in range(len(training)):
            label = training[i][1]
            img = training[i][0]
            integral_img = (integral_image(img), label)
            training_data.append(integral_img)
            
            if label == 1:
                temp = (2 * pos_num)
                weights[i] = 1.0 / temp
            else:
                temp = (2 * neg_num)
                weights[i] = 1.0 / temp

       
        img_0 = training_data[0][0]
        i = 0
        n = 0
        if len(img_0):
            n = 1
        features = self.get_features(img_0.shape)

        
        X = np.zeros((len(features), len(training_data)))
        y = np.array(list(map(lambda data: data[1], training_data)))
        
        for f in features:
            if len(f):
                n = n+1
            
            feature = lambda inte_img: sum([pos.compute_feature(inte_img) for pos in f[0]]) - sum([neg.compute_feature(inte_img) for neg in f[1]])
            if(n):
                n = n+1
            X[i] = list(map(lambda data: feature(data[0]), training_data))
            i += 1

        # Selecting best features
        top = SelectPercentile(f_classif, percentile=10)
        cl = top.fit(X.T, y)
        ind = cl.get_support(indices=True)
        features = features[ind]
        X = X[ind]
        
      
        for t in range(self.T):
            norm = np.linalg.norm(weights)
            weights = weights / norm
            weak_classifiers = self.train_weak(X, y, features, weights)
            clf, error, accuracy = self.top(weak_classifiers, weights, training_data)
            er = (1.0 - error)
            beta = error / er
            for i in range(len(accuracy)):
                temp = (beta ** (1 - accuracy[i]))
                weights[i] = weights[i] * temp
            al = 1.0/beta
            alpha = math.log(al)
            self.alphas.append(alpha)
            self.clfs.append(clf)
            # print("Chose classifier: %s with accuracy: %f and alpha: %f" % (str(clf), len(accuracy) - sum(accuracy), alpha))

    def train_weak(self, X, y, features, weights):
        """
        Finds the optimal threshs for each weak classifier given the current weights
        """
        classifiers = []
        total_pos = 0
        total_neg = 0
        for i in range(len(y)):
            if y[i] == 1:
                total_pos = total_pos + weights[i]
            else:
                total_neg = total_neg + weights[i]

        
        total_features = X.shape[0]
        for i in range(len(X)):
            applied_feature = sorted(zip(weights, X[i], y), key=lambda x: x[1])

            pos_visited = 0
            neg_visited = 0
            pos_weights = 0
            neg_weights = 0

            min_error = float('inf')
            best_feature = None
            best_thresh = None
            best_polarity = None

            for j in range(len(applied_feature)):
            
                er1 = neg_weights + total_pos - pos_weights
                er2 = pos_weights + total_neg - neg_weights
                error = min(er1, er2)

                if error < min_error:
                    min_error = error
                    best_feature = features[index]
                    best_thresh = applied_feature[j][1]
                    if pos_visited > neg_visited:
                        best_polarity = 1
                    else:
                        best_polarity = -1

                if applied_feature[j][2] == 1:
                    pos_weights = pos_weights + applied_feature[j][0]
                    pos_visited = pos_visited + 1
                    
                else:
                    neg_weights = neg_weights + applied_feature[j][0]
                    neg_visited = neg_visited + 1
                    
            classifiers.append(Classifier_weak(best_feature[0], best_feature[1], best_thresh, best_polarity))
        return classifiers
      
        
    def classify(self, image):
        total = 0
        for i in range(len(self.alphas)):
            total = total + self.alphas[i] * self.clfs[i].classify(integral_image(image))
        p = 0
        temp =  0.5 * sum(self.alphas)
        if(temp>0):
            p += 1
            
        if total <= temp:
            return 0
        return 1
   

    def top(self, classifiers, weights, training_data):
        best_clf = None
        best_error = float('inf')
        best_accuracy = None

        for i in range(len(classifiers)):
            
            error = 0
            accuracy = []
            for i in range(len(training_data)):
                correctness = abs(classifier[i].classify(training_data[i][0]) - training_data[i][1])
                accuracy.append(correctness)
                w_correctness = weights[i] * correctness
                error =  error + w_correctness
                
            error = error / len(training_data)
            if error < best_error:
                best_accuracy = accuracy
                best_clf = classifier[i]

                best_error = error
                

        return best_clf, best_error, best_accuracy
     
     
    def get_features(self, image_shape):
        height = image_shape[0]
        width = image_shape[1]
        features = []
        fi = []
        flag = 0
        for w in range(1, width+1):
            for h in range(1, height+1):
                i = 0
                if(len(features)==0):
                    fi.append([w,h])
                while i + w < width:
                    j = 0
                    if(len(fi)):
                        flag = 1
                    while j + h < height:
                   
                        imme = FeaturesComp(i, j, w, h)
                        right = FeaturesComp(i+w, j, w, h)
                        cur_w = i + 2 * w
                        if  cur_w < width: #Horizontally Adjacent
                            t = ([right], [imme])
                            features.append(t)

                        cur_h = j + 2 * h
                        bottom = FeaturesComp(i, j+h, w, h)
                        if cur_h < height: #Vertically Adjacent
                            t = ([imme], [bottom])
                            features.append(t)
                        
                        right_2 = FeaturesComp(i+2*w, j, w, h)
                        cur_w = i + 3 * w
                        if cur_w < width: #Horizontally Adjacent
                            ha = [right_2, imme]
                            t = ([right], ha)
                            features.append(t)

                        bottom_2 = FeaturesComp(i, j+2*h, w, h)
                        cur_h = j + 3 * h
                        if cur_h < height: #Vertically Adjacent
                            va = [bottom_2, imme]
                            t = ([bottom], va)
                            features.append(t)

                       
                        bottom_right = FeaturesComp(i+w, j+h, w, h)
                        cur_w = i + 2 * w
                        if cur_w < width and j + 2 * h < height:
                            ibr = [imme, bottom_right]
                            t = ([right, bottom], ibr)
                            features.append(t)

                        j += 1
                    i += 1
        features = np.array(features)
        return features

       

class Classifier_weak:
    def __init__(self, pos, neg, thresh, polarity):
        self.pos = pos
        self.neg = neg
        self.tot = self.pos + self.neg
        self.thresh = thresh
        self.polarity = polarity
    
    
    def classify(self, x):
        feature = lambda ii: sum([pos.compute_feature(ii) for pos in self.pos]) - sum([neg.compute_feature(ii) for neg in self.neg])
        pf = self.polarity * feature(x) 
        if pf < self.polarity * self.thresh:
            return 1
        return 0
    
def integral_image(image):
    h, w = image.shape
    inte_img = np.zeros((h, w))
    a = np.zeros((h, w))
    for i in range(len(image)):
        for j in range(len(image[i])):
            a[i][j] = image[i][j]
            if i >= 1:
                a_cur = a[i-1][j] + image[i][j]
                a[i][j] = a_cur
            
            inte_img[i][j] = a[i][j]
            
            if j >= 1:
                inte_cur = inte_img[i][j-1]+a[i][j]
                inte_img[i][j] = inte_cur
    
    return inte_img
    
class FeaturesComp:
    def __init__(self, x, y, width, height):
        self.width = width
        self.pos = [x,y]
        self.x = x
        self.y = y
        self.height = height
    
    def compute_feature(self, ii):
        f = ii[self.y+self.height][self.x+self.width]
        f += ii[self.y][self.x]
        f -= ii[self.y+self.height][self.x]
        f += ii[self.y][self.x+self.width]
        return  f

        


In [2]:
def load_images(folder):
    files = os.listdir(folder)
    files = sorted(files)
    images = []
    for f in files:
        img = cv2.imread(folder+f,0)
        images.append(img)
    return images

def preprocess(face_images, nonface_images):
    training = []
    for i in range(len(face_images)):
        training.append(tuple((face_images[i], 1)))
    for i in range(len(nonface_images)):
        training.append(tuple((nonface_images[i], 0)))
    return training
    
def train_viola(t, train_data, positive_samples, negative_samples):
    clf = ViolaJones(T=t)
#     clf.train(train_data, positive_samples , negative_samples)
#     evaluate(clf, training)
#     clf.save(str(t))

face_images = load_images('q1_data/train/faces/')
positive_samples = len(face_images)
nonface_images = load_images('q1_data/train/non_faces/')
negative_samples = len(nonface_images)
train_data = preprocess(face_images, nonface_images)
train_viola(60, train_data, positive_samples, negative_samples)


In [3]:
def get_metrics(y_pred, y_test):
    cm = confusion_matrix(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    return cm, accuracy

def test_viola(checkpoint, test_images):
    clf = ViolaJones.load(checkpoint)
    y_preds = []
    for i in range(len(test_images)):
        y_pred = clf.classify(test_images[i])
        y_preds.append(y_pred)
    return y_preds

def load_testData():
    y_gt = []
    with open('q1_data/test/gt.csv', 'r') as file:
        reader = csv.reader(file)

        for row in reader:
            if(row != ['img', 'label']):
                y_gt.append(int(row[1]))
    test_images = load_images('q1_data/test/images/')
    return test_images, y_gt

test_images, y_gt = load_testData()
y_preds = test_viola("50", test_images)
cm, acc = get_metrics(y_preds,y_gt)
print("\n confusion matrix \n",cm)
print("\n accuracy", acc)


 confusion matrix 
 [[22146  1427]
 [  439    33]]

 accuracy 0.9223955084217093


## Bonus

In [10]:
class Cascade():
    def __init__(self, layers):
        self.clfs = []
        self.layers = layers
    
    def load(filename):
        with open(filename+".pkl", 'rb') as f:
            return pickle.load(f)

    def train(self, training):
        pos = []
        neg = []
        
        for data in training:
            img = data[0]
            lb = data[1]
            if lb == 1:
                pos.append(data)
            if lb == 0:
                neg.append(data)
        
        for num_features in self.layers:
            if len(neg) == 0:
                print("Stopping early.")
                break
            false_positives = []   
            clf = ViolaJones(T=num_features)
            tot = pos+neg
            pos_num = len(pos)
            neg_num = len(neg)
            clf.train(tot, pos_num, neg_num)
            self.clfs.append(clf)
            num_fakes = 0
            for data in neg:
                lb = data[1]
                if self.classify(lb) == 1:
                    false_positives.append(data)
                if lb == 0:
                    num_fakes += 1
                    
            neg = false_positives

    def classify(self, image):
        for clf in self.clfs:
            pred_lb = clf.classify(image)
            if pred_lb == 0:
                return 0
        return 1

    def save(self, filename):
        with open(filename+".pkl", 'wb') as f:
            pickle.dump(self, f)

    

In [11]:
def train_cascade(layers, train_data, filename="cascade"):
    clf = Cascade(layers)
    clf.train(train_data)
    clf.save(filename)

# train_cascade([1, 5, 10, 50], train_data, filename="cascade_50")
def test_cascade(test_images,filename="cascade"):
    clf = Cascade.load(filename)
    y_preds = []
    for i in range(len(test_images)):
        y_pred = clf.classify(test_images[i])
        y_preds.append(y_pred)
    return y_preds

test_images, y_gt = load_testData()
y_preds = test_cascade(test_images, "cascade", )
cm, acc = get_metrics(y_preds,y_gt)
print("\n confusion matrix \n",cm)
print("\n accuracy", acc)


 confusion matrix 
 [[23424   149]
 [  471     1]]

 accuracy 0.9742150135163236
