In [1]:
"""
Created on Wednesday Oct 14 08:51 2020
Used to pre-process dataset for Phase-based video magnification
@author: Keira - github.com/Keira. Bai
a.function to divide data set into 5 groups for cross-validation, return a 5*N dimention matrix 
b.function to extract trainiing data into fixed size sequences, sequences consist by consecuous internal
c.function to extract validation data, sequences consist by raw image and adding frames to the same size
d.function to read image
e.function to magnify frames in slide window
"""
import os
import cv2
import PIL
import sys
import torch
import glob as gb
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
from PIL import Image
from torch.utils import data
from perceptual.filterbank import *
from pyr2arr import Pyramid2arr
from torchvision import transforms
from temporal_filters import IdealFilterWindowed, ButterBandpassFilter

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")
plt.ion()   # interactive mode

In [2]:
#Allocate dataset into k groups for training and validation
def CrossAllocation(datapaths,k):
    neg_dir = "*/negative/*"
    pos_dir = "*/positive/*"
    sur_dir = "*/surprise/*"
    non_dir = "non_micro/*"
    imgpath = "/*.bmp"
    sequ_list = []
    label_list = []
    Val_tra_tes_list = []
    label = 0
    #Load in all folders adding expression
    for vid_dir in [neg_dir, pos_dir, sur_dir, non_dir]:        
        sequ = gb.glob(datapaths+vid_dir)#read all folders under different expression
        sequ_list += sequ 
        label_list += [label for i in range(len(sequ))] #read in the same number of expression label
        label += 1 #change the expression category
    #Devide All Data into k groups
    for i in range(k):
        if k-i>1:
            train_list, valid_list, train_label, valid_label = train_test_split(sequ_list, label_list, \
                                          test_size=1/(k-i), random_state=42)
            Val_tra_tes_list.append([valid_list, valid_label])
            
        else:
            Val_tra_tes_list.append([train_list, train_label])

        sequ_list = train_list
        label_list = train_label   

    return Val_tra_tes_list 

In [3]:
#Data augmentation on sequence level by fixed size
def InputImagewithSlide(tra_tes_list, seleframe = 11):    
    imgpath = "/*.bmp"
    sequ_list = []
    
    exp_list = []
    step_list = [1,2,3,4,5,6]
    expCount = [0,0,0,0,0,0,0,0]

    for group in tra_tes_list:    
        for folder, exp in zip(group[0], group[1]):
            img_list = sorted(gb.glob(folder + imgpath))#get frames from the same one folder
            img_len = len(img_list)
            expCount[exp] += img_len#recording the same number of expression 
            padSeq = []
            #repackage training sequence            
            if img_len < seleframe: #for frames less than seleframe
                padSeq.extend(img_list)
                padSeq.extend(img_list[:seleframe-img_len])
                sequ_list.append(padSeq)#append the sequence directly
                exp_list.append(exp)
                expCount[exp+4] += seleframe
            else: #for frames more than seleframe
                #control step to keep the amout balance between expressions
                if exp == 3:#non-micro-expression
                    step = 3#decrease the growth rate
                else:
                    step = 1
                for i in range(0, img_len-seleframe+step, step):
                        if ((img_len-i) >= (seleframe+step-1)):#sequence is enough for a seleframe+step
                            for s in step_list:#assign inner step for frame
                                if img_len-i >= seleframe*s: #if frame number is enough for the inner step
                                    sequence = []
                                    for j in range(0,seleframe*s,s):
                                        sequence.append(img_list[j+i]) #frames for a window size  
                                    sequ_list.append(sequence)#adding sequence for sequence list
                                    exp_list.append(exp)
                                    expCount[exp+4] += seleframe
                        else:
                            if step != 1:
                                m = img_len-seleframe
                                sequence = []
                                for j in range(seleframe):
                                    sequence.append(img_list[j+m])#adding the frames in the end part of video 
                                sequ_list.append(sequence)
                                exp_list.append(exp)
                                expCount[exp+4] += seleframe

    return sequ_list, exp_list, expCount

In [4]:
#Raw data for validation
def RawforVal(val):
    imgpath = "/*.bmp"
#     imgpath = "/*.png"
    sequ_list = []
    exp_list = []
    expCount = [0,0,0,0]    

    for folder, exp in zip(val[0],val[1]):      
        img_list = sorted(gb.glob(folder + imgpath))
        img_len = len(img_list)
        expCount[exp] += img_len
        sequ_list.append(img_list)
        exp_list.append(exp)
    return sequ_list, exp_list, expCount

In [5]:
def loadImg(sequ_list,Gwidth,Gheight):
    video_tensor = np.zeros((len(sequ_list), Gwidth, Gheight, 3),dtype='float')
    for img in range(len(sequ_list)):
#         print(sequ_list[img])
#         print(cv2.imread(sequ_list[img]).shape)
        frame = cv2.resize(cv2.imread(sequ_list[img]), (Gheight, Gwidth), interpolation=cv2.INTER_CUBIC)
        video_tensor[img]=frame
    
    return video_tensor

In [6]:
def phaseBasedMagnify(sequ_list,factor,lowFreq,highFreq):
    img_set = []
    steer = Steerable(5)
    pyArr = Pyramid2arr(steer)
    fps = 100
    Gwidth=224
    Gheight=224
    for sequ in sequ_list:
        video_tensor = loadImg(sequ,Gwidth,Gheight)
        final=np.zeros(video_tensor.shape)
        # setup temporal filter
        filter = IdealFilterWindowed(len(sequ), lowFreq, highFreq, fps, outfun=lambda x: x[0])
        filter = ButterBandpassFilter(1, lowFreq, highFreq, fps)

        for frameNr in range(len(sequ)):
    #         print (frameNr)
            sys.stdout.flush()
    #         if frameNr < nrFrames:
            # read frame
            im = video_tensor[frameNr]
            im = im.astype('uint8')
            if im is None:
                # if unexpected, quit
                break
    #           convert to gray image
            if len(im.shape) > 2:
                 grayIm = cv2.cvtColor(im, cv2.COLOR_RGB2GRAY)
            else:
                # already a grayscale image?
                grayIm = im

            # get coeffs for pyramid
            coeff = steer.buildSCFpyr(grayIm)

            # add image pyramid to video array
            # NOTE: on first frame, this will init rotating array to store the pyramid coeffs                 
            arr = pyArr.p2a(coeff)

            phases = np.angle(arr)

            # add to temporal filter
            filter.update([phases])

            # try to get filtered output to continue            
            try:
                filteredPhases = filter.next()
            except StopIteration: 
                continue

            #motion magnification
            magnifiedPhases = (phases - filteredPhases) + filteredPhases*factor

            # create new array
            newArr = np.abs(arr) * np.exp(magnifiedPhases * 1j)

            # create pyramid coeffs  
            newCoeff = pyArr.a2p(newArr)

            # reconstruct pyramid
            out = steer.reconSCFpyr(newCoeff)

            # clip values out of range
            out[out>255] = 255
            out[out<0] = 0

            # make a RGB image
            rgbIm = np.empty( (out.shape[0], out.shape[1], 3 ) )
            rgbIm[:,:,0] = out
            rgbIm[:,:,1] = out
            rgbIm[:,:,2] = out

            #write to disk
            res = cv2.convertScaleAbs(rgbIm)
    #             print('res:',res.shape)
        final[frameNr]=res
    img_set.append(final)   
    return img_set

In [8]:
datapaths = "../SMIC/SMIC_all_cropped/HS/*/" 
k=5
v=0
factor = 20
# low ideal filter
lowFreq = 15
# high ideal filter
highFreq = 25
seleframe = 22
Val_tra_tes_list = CrossAllocation(datapaths,k)
batch_size = 40
# Detect devices
use_cuda = torch.cuda.is_available()                   # check if GPU exists
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # use CPU or GPU

params = {'batch_size': batch_size, 'shuffle': True, 'num_workers': 4, 'pin_memory': True} if use_cuda else {}

#Set transformation
tran_t = transforms.Compose(
    [transforms.Resize([224,224]), 
     transforms.ToTensor(),     
    ])
val = Val_tra_tes_list[v] 
# valid_list, valid_label, val_expCount = InputImageforVal(val, seleframe)
# for v in valid_list:
#     print(len(v))
tra_tes_list = np.delete(Val_tra_tes_list,v,axis = 0) 
train_test_list, train_test_label, expCount = InputImagewithSlide(tra_tes_list, seleframe)#get training and testing data

train_test_img = phaseBasedMagnify(train_test_list,factor,lowFreq,highFreq)#Phase-based magnification
# train_list, test_list, train_label, test_label = train_test_split(train_test_list, train_test_label, \
#                                       test_size=1/k, random_state=42)

KeyboardInterrupt: 