In [1]:
import cv2 as cv
import numpy as np
from matplotlib import pyplot as plt
from skimage import io
import scipy.io as sio
import random
import torch
from torch import nn
import torch.utils.data
from torchvision.transforms import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.autograd import Variable
from PIL import Image
cv.__version__

'3.4.2'

### Shot Boundary

In [None]:
# prepare data
all_frame_1 = []
for i in range(22, 201):
    name = str(i) + '.jpg'
    if len(name) == 6:
        name = '0' + name
    pic = cv.imread("./project1/clip_1/" + name)
    RGB_img = cv.cvtColor(pic, cv.COLOR_BGR2RGB)
    all_frame_1.append(RGB_img)

In [2]:
# all_f is a list that contains all frames of a video
# this function returns a list that has each pair of frames' color histogram differences
def create_hist_diff(all_f):
    red_hists = []
    green_hists = []
    blue_hists = []
    wide, height = all_f[0].shape[1], all_f[0].shape[0]
    for img in all_f:
        red_hists.append(cv.calcHist([img],[0],None,[256],[0,256]))
        green_hists.append(cv.calcHist([img],[1],None,[256],[0,256]))
        blue_hists.append(cv.calcHist([img],[2],None,[256],[0,256]))
    result = []
    for i in range(1,len(red_hists)):
    #   find total number of how many pixels have different color, normallize it to 0~1
        red_diff = np.sum(np.abs(red_hists[i] - red_hists[i-1]))
        green_diff = np.sum(np.abs(green_hists[i] - green_hists[i-1]))
        blue_diff = np.sum(np.abs(blue_hists[i] - blue_hists[i-1]))
        result.append((red_diff + green_diff + blue_diff) / (3*wide*height))
    return result

In [3]:
# precondition: shot_len > hist_diff
# returns a list that contains the index of the boudary frames
def find_boundary(hist_diff, shot_len = 8, shot_diff = 4, pace = 3):
    result = []
    prev_bound_i = 0
    for i in range(0, len(hist_diff) - shot_len, pace):
        # create a sliding window
        slide_w = hist_diff[i:shot_len + i]
        # find the frame that has largest possibility to be a boudary frame
        suspect_frame_i = np.argmax(slide_w) + i
        
        # if two boudary frame are too close, skip this boudary frame
        if suspect_frame_i - prev_bound_i < shot_len:
            continue
        
        # if the boudary frame does not have largest color hist difference between his neighbour, skip it
        is_largest_neighbour = 0
        for neighbour in range(1,pace+1):
            prev = suspect_frame_i - neighbour
            after = suspect_frame_i + neighbour
            if prev < 0 or after >= len(hist_diff):
                break
            if hist_diff[suspect_frame_i] <= hist_diff[prev] or hist_diff[suspect_frame_i] <= hist_diff[after]:
                is_largest_neighbour = 1
                break
        if is_largest_neighbour == 1:
            continue
        
        # if the color hist difference of boundary frame < shot_diff * average color hist in the window, skip
        window_avg = np.sum(hist_diff[prev_bound_i:suspect_frame_i]) / (suspect_frame_i - prev_bound_i)
        if window_avg * shot_diff > hist_diff[suspect_frame_i]:
            continue
        
        # else, says the suspect frame is a shot boudary frame
        prev_bound_i = suspect_frame_i
        result.append(suspect_frame_i)
    return result

### Evaluate performance

In [5]:
# prepare parameter sets for find boundar algorithm
shot_length_list = [1,2,3,4,6,8]
shot_diff_list = [1.5,1.8,2.5,3,4,4.5]
pace_list = [1,2,3,4]  

In [4]:
# evaluate the performance of the algorithm with given color histogram difference and true boudary frame list
# return recalls and precisions for each parameter set
def evaluate_shot_detection(hist_diff, truth):
    truth_copy = copy.deepcopy(truth)
    measure_indices = []
    for sl in shot_length_list:
        for sd in shot_diff_list:
            for p in pace_list:
                # for each set of parameters
                if p <= sl: # otherwise we will miss checking some frames
                    predicts = find_boundary(hist_diff, shot_len=sl, shot_diff=sd, pace=p)
                    predicts_copy = copy.deepcopy(predicts)
                    correct = 0
                    # if the predict frame is close to the true frame, add 1 to correct
                    for predict in predicts:
                        if predict in truth:
                            correct += 1
                            truth_copy.remove(predict)
                            predicts_copy.remove(predict)
                        elif predict+1 in truth:
                            correct += 1
                            truth_copy.remove(predict+1)
                            predicts_copy.remove(predict)
                        elif predict-1 in truth:
                            correct += 1
                            truth_copy.remove(predict-1)
                            predicts_copy.remove(predict)
                    # count number of missing ture value and number of errors that are predict as true
                    miss = len(truth_copy)
                    false_positive = len(predicts_copy)
                    # restore global truth for next iteration
                    truth_copy = copy.deepcopy(truth)
                    # calculate indices
                    recall = correct/(correct+miss)
                    precision = correct/(correct+false_positive)
                    measure_indices.append([recall,precision])
    return np.array(measure_indices)

### logo detection

In [21]:
# compare how to image different directly
# input are list of pictures in numpy array
def picture_comp_gray(all_frame, iteration = 20, threshold = 2):
    result = np.ones(all_frame[0].shape[0:2], dtype=bool)
    while iteration > 0:
        rand = random.sample(range(len(all_frame)), 2)
        a = cv.cvtColor(all_frame_1[rand[0]], cv.COLOR_BGR2GRAY) 
        b = cv.cvtColor(all_frame_1[rand[1]], cv.COLOR_BGR2GRAY) 
        diff = np.absolute(a - b) < threshold
        result = np.logical_and(result, diff)
        iteration -= 1
    return np.transpose(result.nonzero())
def picture_comp(all_frame, iteration = 20, threshold = 2):
    result = np.ones(all_frame[0].shape[0:2], dtype=bool)
    while iteration > 0:
        rand = random.sample(range(len(all_frame)), 2)
        a = all_frame_1[rand[0]]
        b = all_frame_1[rand[1]]
        diff = np.absolute(a - b) < threshold
        result = np.logical_and(result, diff[:,:,0])
        result = np.logical_and(result, diff[:,:,1])
        result = np.logical_and(result, diff[:,:,2])
        iteration -= 1
    return np.transpose(result.nonzero())  

In [None]:
# brute force on compare random frames for one picture
# works not well
def comp_1(all_frame, iteration=30,threshold=10, frame_to_comp = 0):
    # extract input picture's feature point
    sift = cv.xfeatures2d.SIFT_create()
    bf = cv.BFMatcher()
    result = []
    flag = 0
    a =  all_frame[frame_to_comp]
    kp_1, des_1 = sift.detectAndCompute(a,None)
    while iteration > 0:
        # randomly select one picture
        rand = random.sample(range(len(all_frame)), 1)
        b =  all_frame[rand[0]]
        # find the keypoints and descriptors with SIFT
        kp_2, des_2 = sift.detectAndCompute(b,None)
        matches = bf.knnMatch(des_1,des_2, k=2)

        # Apply ratio test
        good = []
        for m,n in matches:
            if m.distance < 0.75*n.distance:
                good.append(m)
        # add feature points that has good match in both pictures
        for match in good:
            match_1 = kp_1[match.queryIdx]
            result.append((int(match_1.pt[0]),int(match_1.pt[1])))
        iteration -= 1
    # find those feature points in input picture that can find match in other frames
    high_occur = []
    for pt in result:
        if result.count(pt) > 20:
            high_occur.append(pt)
    # remove duplicate
    if len(high_occur) > 0:
            high_occur = np.unique(np.array(high_occur), axis=0)
    return high_occur 

In [None]:
# find feature points that can be found in most pictures with fixed position
def feature_comp(all_frame, iteration=10,threshold=10):
    sift = cv.xfeatures2d.SIFT_create()
    bf = cv.BFMatcher()
    result = []
    while iteration > 0:
        # randomly pick 2 images and extract their feature point
        rand = random.sample(range(len(all_frame)), 2)
        a =  all_frame[rand[0]]
        b =  all_frame[rand[1]]
        # find the keypoints and descriptors with SIFT
        kp_1, des_1 = sift.detectAndCompute(a,None)
        kp_2, des_2 = sift.detectAndCompute(b,None)
        matches = bf.knnMatch(des_1,des_2, k=2)

        # Apply ratio test
        good = []
        for m,n in matches:
            if m.distance < 0.75*n.distance:
                good.append(m)
        # find matching points that in same or close place
        same_position = []
        for match in good:
            match_1 = kp_1[match.queryIdx]
            match_2 = kp_2[match.trainIdx]
            # if a match feature has same position, add it to same_position
            if np.allclose(np.array(match_1.pt, dtype = np.int32), np.array(match_2.pt, dtype = np.int32), atol=threshold):
                find_match = np.array(match_1.pt, dtype= np.int32)
                if result == []:
                    same_position.append(find_match)
                else:
                    for prev_find in result:
                        if np.allclose(prev_find, find_match, atol=threshold):
                            same_position.append(prev_find)
                            same_position.append(find_match)
                            break
        # remove duplicate points
        result = same_position
        if len(result) > 0:
            result = np.unique(np.array(same_position), axis=0)
        iteration -= 1
        
    return result       


### Gender Classifier

In [6]:
# prepare training data
def detect_face_helper(image):
    face_cascade = cv.CascadeClassifier("/home/yige/anaconda3/share/OpenCV/haarcascades/haarcascade_frontalface_alt.xml")
    gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    return faces

In [7]:
# crop the image using OpenCV face detector
male_dir = "./project1/train_data/male/"
female_dir = "./project1/train_data/female/"
male_out = "./train/male/"
female_out = "./train/female/"
for i in range(1,261):# image_001 to image_260
    index = ""
    if i < 10:
        index = "00" + str(i)
    elif i< 100:
        index = "0" + str(i)
    else:
        index = str(i)
    image_male = cv.imread(male_dir + "image_" + index + ".jpg")
    faces = detect_face_helper(image_male)
    if len(faces) == 1:
        RGB_img = cv.cvtColor(image_male, cv.COLOR_BGR2RGB)
        bound = max(faces[0][2], faces[0][3])
        x,y = faces[0][0], faces[0][1]
        male_face = RGB_img[y:y+bound, x:x+bound, :]
#         io.imsave(male_out+str(i)+".jpg", male_face)
    
    image_female = cv.imread(female_dir + "image_" + index + ".jpg")
    faces = detect_face_helper(image_female)
    if len(faces) == 1:
        RGB_img = cv.cvtColor(image_female, cv.COLOR_BGR2RGB)
        bound = max(faces[0][2], faces[0][3])
        x,y = faces[0][0], faces[0][1]
        female_face = RGB_img[y:y+bound, x:x+bound, :]
#         io.imsave(female_out+str(i)+".jpg", female_face)

In [10]:
# define our transformation function
train_trans = transforms.Compose([
    transforms.Resize((227,227)),
    # data augmentation, randomly flip picture
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    # normalize inputs with mean 0.5, std 0.5
    transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
])

In [12]:
# load data
# training data are manually seperate into differnt folder
# directory: train -> train -> -> male/female ->picutres 1.jpg 2.jpg...
#                 -> test ->  male/female ->picutres 400.jpg 401.jpg...
train_data = datasets.ImageFolder(root='./train/train', transform=train_trans)
train_loader = DataLoader(train_data, batch_size=4, shuffle=True, num_workers=4)
test_data = datasets.ImageFolder(root='./train/test', transform=train_trans)
test_loader = DataLoader(test_data, batch_size=len(test_data), shuffle=False, num_workers=4)

In [13]:
# create model - with respect of AlexNet
class simpleAlex(nn.Module):
    def __init__(self, num_classes = 2):
        super().__init__()
        
        # convolution layer
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=96,kernel_size=11,stride=4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, padding=2, groups=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2)
        )
        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=384,kernel_size=3,padding=1),
            nn.ReLU(inplace=True)
        )
        self.layer4 = nn.Sequential(
            nn.Conv2d(in_channels=384, out_channels=384,kernel_size=3,padding=1),
            nn.ReLU(inplace=True)
        )
        self.layer5 = nn.Sequential(
            nn.Conv2d(in_channels=384, out_channels=256,kernel_size=3,padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2)
        )
        # fully connect layer
        self.layer6 = nn.Sequential(
            nn.Linear(in_features=9216, out_features=4096),
            nn.ReLU(inplace=True),
            nn.Dropout()
        )
        self.layer7 = nn.Sequential(
            nn.Linear(in_features=4096, out_features=4096),
            nn.ReLU(inplace=True),
            nn.Dropout()
        )
        self.layer8 = nn.Linear(in_features=4096, out_features=num_classes)
        
    def forward(self, train):
        output = self.layer5(self.layer4(self.layer3(self.layer2(self.layer1(train)))))
        # flattern
        output = output.view(-1, 9216)
        output = self.layer8(self.layer7(self.layer6(output)))
        return output

In [14]:
model = simpleAlex(2)

if torch.cuda.is_available():
    model.cuda()
learning = 0.0001
optimizer = Adam(params=model.parameters(),lr=learning)
loss_func = nn.CrossEntropyLoss()

In [16]:
# helpers
# save a model
def save_models(epoch):
    torch.save(model.state_dict(), "./models/gender_classifier_{}.model".format(epoch))
    print("checkpoint")

# dynamic adjusting learning rate
def adjust_lr(epoch, lr=0.learning):
    if epoch > 120:
        lr = lr/1000000
    elif epoch > 100:
        lr = lr/100000
    elif epoch > 80:
        lr = lr/10000
    elif epoch > 60:
        lr = lr/1000
    elif epoch > 40:
        lr = lr/100
    elif epoch > 20:
        lr = lr/10
    
    for params in optimizer.param_groups:
        print("learning rate: {}".format(lr))
        params["lr"] = lr

# return model accuarcy on test dataset
def test():
    model.eval()
    correct = 0
    total = 0
    for data in test_loader:
            t_img, t_label = data
            t_img = t_img.cuda()
            t_label = t_label.cuda()
            outputs = model(t_img)
            _,predicted = torch.max(outputs.data, 1)
            total += t_label.size(0)
            correct += (predicted == t_label).sum().item()
    return correct/total

# return model accuarcy on train dataset
def test_train():
    model.eval()
    correct = 0
    total = 0
    for data in train_loader:
            t_img, t_label = data
            t_img = t_img.cuda()
            t_label = t_label.cuda()
            outputs = model(t_img)
            _,predicted = torch.max(outputs.data, 1)
            total += t_label.size(0)
            correct += (predicted == t_label).sum().item()
    return correct/total 

In [17]:
def train(epoches):
    best_acc = 0.0
    for epoch in range(epoches):
        model.train()
        correct = 0
        running_loss = 0.0
        total = 0
        print("===================")
        for idx, (images, labels) in enumerate(train_loader, 0):
            if torch.cuda.is_available():
                images = images.cuda()
                labels = labels.cuda()
            # reset optimizer's gradients to zero, every .step() will add gradients to model
            # reset gradient to avoid duplicate addition
            optimizer.zero_grad()
            # make prediction
            outputs = model(images)
            # evaluate loss
            loss = loss_func(outputs, labels)
            # update model
            loss.backward()
            optimizer.step()
            
            # add loss number each batch size iteration
            running_loss += loss.item()
        # calcualte loss rate for this epoch
        running_loss /= len(train_data)
        
        # adjust learning rate if necessary
        adjust_lr(epoch)
        
        # use test dataset to evaluate model
        train_acc =test_train()
        test_acc = test()
        # if it is good enough or improved, save the model
        if test_acc > best_acc or test_acc > 0.98:
            save_models(epoch, test_acc)
            best_acc = test_acc
        print("epoch {}, test accuarcy {}, training loss {}, train accuarcy {}".format(epoch, test_acc, running_loss, train_acc))

### load model and test

In [18]:
device = torch.device('cpu')
checkpoint = torch.load("./models/gender_classifier_18_97.model",map_location=device)
simpleA_net = simpleAlex(num_classes=2)
simpleA_net.load_state_dict(checkpoint)
simpleA_net.eval()

simpleAlex(
  (layer1): Sequential(
    (0): Conv2d(3, 96, kernel_size=(11, 11), stride=(4, 4))
    (1): ReLU(inplace)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer2): Sequential(
    (0): Conv2d(96, 256, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2), groups=2)
    (1): ReLU(inplace)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer3): Sequential(
    (0): Conv2d(256, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace)
  )
  (layer4): Sequential(
    (0): Conv2d(384, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace)
  )
  (layer5): Sequential(
    (0): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer6): Sequential(
    (0): Linear(in_features=9216, out_features=4096, bias=True)
    (1): ReLU(in

In [19]:
def predict_gender(pic):
    transformation = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize(227),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    image_tensor = transformation(pic).float()
    image_tensor = image_tensor.unsqueeze_(0)
    if torch.cuda.is_available():
        image_tensor.cuda()
    output = simpleA_net(image_tensor)
    _,predicted = torch.max(output.data, 1)
    if int(predicted) == 0:
        return 'female'
    else:
        return 'male'