In [None]:
import os
import random
import datetime
import argparse
import time
import argparse
import numpy as np

from torchvision import models
import torch.nn as nn
import torch
from facenet_pytorch import InceptionResnetV1, MTCNN
import random

import dlib
import cv2
import imutils
from imutils.video import VideoStream
from imutils import face_utils
from moviepy.editor import *
from moviepy.editor import VideoFileClip, concatenate_videoclips

## 방법1: Random distance

In [None]:
class RandomDistance:
    def distance(self, reference_clip, compare_clip):
        dur_end = min(reference_clip.duration, compare_clip.duration)
        return random.randrange(1,100), min(dur_end, random.randrange(3,7))

## 방법2: Feature distance

In [None]:
class FeatureExtractor(nn.Module):
    def __init__(self, model):
        super(FeatureExtractor, self).__init__()
        self.features = nn.Sequential(
            *list(model.children())[:-1]
        )

    def forward(self, x):
        x = self.features(x)
        return x

In [None]:
class FeatureDistance:
    def __init__(self):
        r3d_model = models.video.r3d_18(pretrained=True)
        self.model = FeatureExtractor(r3d_model)
        
    def distance(self, reference_clip, compare_clip):
        ref_frames = []
        frames = []
        for t in range(0, 10, 1):
            ref_frames.append(cv2.resize(reference_clip.get_frame(t) / 255.0, (108, 192)))
            frames.append(cv2.resize(compare_clip.get_frame(t) / 255.0, (108, 192)))

        ref_frames = torch.from_numpy(np.array(ref_frames).reshape(-1, 3, 10, 108, 192)).float()
        frames = torch.from_numpy(np.array(frames).reshape(-1, 3, 10, 108, 192)).float()

        ref_feature = self.model(ref_frames)
        feature = self.model(frames)

        ret = ref_feature - feature
        return np.mean(np.abs(ret.detach().numpy())), reference_clip.duration

## 방법3: Face distance 

In [None]:
class FaceDistance:
    def __init__(self, shape_predictor_path, face_embedding_penalty=None):
        self.skip_frame_rate = 4
        self.minimax_frames = 5
        self.shape_predictor = shape_predictor_path
        self.face_embedding_penalty = face_embedding_penalty
        
    def extract_landmark(self, reference_clip, compare_clip):
        self.clips =[reference_clip, compare_clip]

        detector = dlib.get_frontal_face_detector()
        predictor = dlib.shape_predictor(self.shape_predictor)

        clips_frame_info = []
        for clip in self.clips:
            i=0
            every_frame_info= []
            while True:
                frame = clip.get_frame(i*1.0/clip.fps)
                i+=self.skip_frame_rate
                if (i*1.0/clip.fps)> clip.duration:
                    break
                
                frame = imutils.resize(frame, width=800)
                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                rects = detector(gray, 0)

                if len(rects)>0:
                    max_width = 0
                    max_rect = None
                    for rect in rects:
                        if int(rects[0].width()) > max_width:
                            max_rect = rect
                    shape = predictor(gray, max_rect)
                    shape = face_utils.shape_to_np(shape)
                    every_frame_info.append(shape)
                else:
                    every_frame_info.append([])
        
            clips_frame_info.append(np.array(every_frame_info))

        cv2.destroyAllWindows()
        return clips_frame_info
    
    def embedding_cosine_distance(self, reference_frame, compare_frame):
        face_detector = MTCNN(select_largest=True)
        embed_model = InceptionResnetV1(pretrained='vggface2').eval()
        
        reference_frame = np.array(reference_frame)
        compare_frame = np.array(compare_frame)
        try:
            reference_frame_detected = face_detector(reference_frame)
            compare_frame_detected = face_detector(compare_frame)
        except:
            cosine_dist = 1
            return cosine_dist
        
        reference_frame_embed = embed_model(reference_frame_detected.unsqueeze(0)).detach().numpy()
        compare_frame_embed = embed_model(compare_frame_detected.unsqueeze(0)).detach().numpy()
        reference_frame_embed = np.squeeze(reference_frame_embed)
        compare_frame_embed = np.squeeze(compare_frame_embed)

        cosine_dist = 1 - np.dot(reference_frame_embed, compare_frame_embed) / (np.linalg.norm(reference_frame_embed) * np.linalg.norm(compare_frame_embed))
        return cosine_dist

    def get_all_frame_distance(self, clips_frame_info, min_size):
        dist_arr = []
        for i in range(min_size-1):
            if len(clips_frame_info[0][i])>0 and len(clips_frame_info[1][i+1])>0:
                l = 36
                r = 45
                left_eye = ((clips_frame_info[0][i][l][0] - clips_frame_info[1][i+1][l][0])**2 + (clips_frame_info[0][i][l][1] - clips_frame_info[1][i+1][l][1])**2)**0.5
                right_eye = ((clips_frame_info[0][i][r][0] - clips_frame_info[1][i+1][r][0])**2 + (clips_frame_info[0][i][r][1] - clips_frame_info[1][i+1][r][1])**2)**0.5
                total_diff = left_eye + right_eye
                dist_arr.append(total_diff)
            else:
                dist_arr.append(None)
        return dist_arr

    def distance(self, reference_clip, compare_clip):
        clips_frame_info = self.extract_landmark(reference_clip, compare_clip)
        min_size = min(len(clips_frame_info[0]),len(clips_frame_info[1]))
        dist_arr = self.get_all_frame_distance(clips_frame_info, min_size)
        clips =[reference_clip,compare_clip]

        minimax_frames = self.minimax_frames
        min_diff = np.float('Inf')
        min_idx = 0
        for i in range(min_size - (minimax_frames - 1)):
            start_minmax_idx = 0 if (i - minimax_frames)<0 else i - minimax_frames
            if (None not in dist_arr[start_minmax_idx :i + minimax_frames]):
                tmp_max = np.max(dist_arr[start_minmax_idx:i + minimax_frames])
                if min_diff > tmp_max:
                    min_diff = tmp_max
                    min_idx = i
        
        if self.face_embedding_penalty != None and min_diff < np.float("Inf"):
            ref_frame = reference_clip.get_frame(min_idx * 1.0/reference_clip.fps)
            frame = compare_clip.get_frame(min_idx * 1.0/compare_clip.fps)

            cosine_dist = self.embedding_cosine_distance(ref_frame, frame)
            min_diff += cosine_dist * self.face_embedding_penalty
        
        return min_diff, (min_idx*self.skip_frame_rate)/self.clips[0].fps
    

## 방법4: Pose distance

In [None]:
class PoseDistance:
    def __init__(self):
        self.SKIP_FRAME_RATE = 10
        self.MINIMAX_FRAME = 4
        self.model = models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
        self.model.eval()
        os.environ['KMP_DUPLICATE_LIB_OK']='True'

    def extract_boxes(self, reference_clip, compare_clip):
        self.clips = [reference_clip, compare_clip]
        clips_frame_info = []
        for clip in self.clips:
            i = 0
            every_frame_info = []
            while True:
                i+=self.SKIP_FRAME_RATE 
                if (i*1.0/clip.fps)> clip.duration:
                    break

                frame = clip.get_frame(i*1.0/clip.fps)
                frame = imutils.resize(frame, width=640)
                frame = frame/255
                frame = np.transpose(frame, (2,0,1))
                x = [torch.from_numpy(frame).float()]
                predictions = self.model(x)
                prediction= predictions[0]
                each_box_list = zip(prediction['boxes'].tolist(), prediction['labels'].tolist(), prediction['scores'].tolist())
                filtered_box_list = filter(lambda x: x[1]==1 and x[2] >= 0.95, each_box_list)
                filtered_center_dot_list = list(map(lambda x: [(x[0][0]+x[0][2])/2, (x[0][1]+x[0][3])/2], filtered_box_list))
                sorted_dot_list = sorted(filtered_center_dot_list, key = lambda x: x[0])
                every_frame_info.append(sorted_dot_list)
            
            clips_frame_info.append(np.array(every_frame_info))

        return clips_frame_info

    def get_all_frame_distance(self, clips_frame_info, min_size):
        dist_arr = list()
        for i in range(min_size):
            if len(clips_frame_info[0][i])>0 and len(clips_frame_info[1][i])>0:
                ref_frame_dots = clips_frame_info[0][i]
                compare_frame_dots = clips_frame_info[1][i]
                min_dot_num = min(len(ref_frame_dots), len(compare_frame_dots))
                dot_num_diff = abs(len(ref_frame_dots)- len(compare_frame_dots))
                penalty = ((self.clips[0].w **2 + self.clips[0].h**2)**0.5) * abs(len(ref_frame_dots)-len(compare_frame_dots)) 
                total_diff = penalty * dot_num_diff
                for dot_idx in range(min_dot_num):
                    total_diff += ((ref_frame_dots[dot_idx][0] - compare_frame_dots[dot_idx][0])**2 + (ref_frame_dots[dot_idx][1] - compare_frame_dots[dot_idx][1])**2)**0.5
                dist_arr.append(total_diff)
            else:
                dist_arr.append(None)
        return dist_arr

    def distance(self, reference_clip, compare_clip):
        clips_frame_info = self.extract_boxes(reference_clip, compare_clip)
        min_size = min(len(clips_frame_info[0]),len(clips_frame_info[1]))
        dist_arr = self.get_all_frame_distance(clips_frame_info, min_size)  

        min_diff = np.float('Inf')
        min_idx = 0 

        for i in range(min_size-(self.MINIMAX_FRAME-1)):
            start_minmax_idx = 0 if (i - self.MINIMAX_FRAME)<0 else i - self.MINIMAX_FRAME
            if (None not in dist_arr[start_minmax_idx :i + self.MINIMAX_FRAME]):
                tmp_max = np.max(dist_arr[i:i+self.MINIMAX_FRAME])
                if min_diff > tmp_max:
                    min_diff = tmp_max
                    min_idx = i

        return min_diff, (min_idx*self.SKIP_FRAME_RATE)/reference_clip.fps

# MAIN

In [None]:
class Crosscut:
    def __init__(self, dist_obj, video_path, output_path):
        self.videos_path = video_path
        self.output_path = output_path
        self.min_time = 1000.0
        video_num = len(os.listdir(self.videos_path))
        self.start_times = [0] * video_num
        self.window_time = 10
        self.padded_time = 4
        self.dist_obj = dist_obj
        self.audioclip = None
        self.extracted_clips_array = []
        self.con_clips = []
    
    def video_alignment(self):
        for i in range(len(os.listdir(self.videos_path))):
            video_path = os.path.join(self.videos_path, sorted(os.listdir(self.videos_path))[i])
            clip = VideoFileClip(video_path)
            clip = clip.subclip(self.start_times[i], clip.duration)
            if self.min_time > clip.duration:
                self.audioclip = clip.audio
                self.min_time = clip.duration
            self.extracted_clips_array.append(clip)
        print('LOGGER-- {} Video Will Be Mixed'.format(len(self.extracted_clips_array)))
    
    def select_next_clip(self, t, current_idx):
        cur_t = t
        next_t = min(t+self.window_time, self.min_time)
        
        reference_clip = self.extracted_clips_array[current_idx].subclip(cur_t, next_t)
        d = float("Inf")
        cur_clip = None
        min_idx = (current_idx+1)%len(self.extracted_clips_array)
        for video_idx in range(len(self.extracted_clips_array)):
            if video_idx == current_idx:
                continue
            clip = self.extracted_clips_array[video_idx].subclip(cur_t, next_t) 

            cur_d, plus_frame = self.dist_obj.distance(reference_clip, clip) 
            print(current_idx, video_idx, cur_d, cur_t + plus_frame)
            if d > cur_d:
                d = cur_d
                min_idx = video_idx
                next_t = cur_t + plus_frame
                cur_clip = reference_clip.subclip(0, plus_frame)
        
        if cur_clip: 
            clip = cur_clip 
        else:
            clip = reference_clip
        self.con_clips.append(clip)
        t = next_t
        return t, min_idx
        
    def add_padding(self, t, next_idx):
        print("idx : {}".format(next_idx))
        pad_clip = self.extracted_clips_array[next_idx].subclip(t, min(self.min_time,t+self.padded_time))
        self.con_clips.append(pad_clip)
        
        t = min(self.min_time,t + self.padded_time)
        return t, next_idx    
        
    def write_video(self):
        final_clip = concatenate_videoclips(self.con_clips)
        if self.audioclip != None:
            print("Not None")
            final_clip.audio = self.audioclip
        final_clip.write_videofile(self.output_path)
        return final_clip    
    
    def generate_video(self):
        self.video_alignment()
        t = 3
        current_idx = 0

        self.con_clips.append(self.extracted_clips_array[current_idx].subclip(0, min(t, int(self.min_time))))
        while t < int(self.min_time):
            t, min_idx = self.select_next_clip(t, current_idx)
            t, current_idx = self.add_padding(t, min_idx)

        final_clip = self.write_video()
        return final_clip

- 실행

In [None]:
method = 'face'
video_path = 'fifth_season'
output_path = 'my_stagemix.mp4'
shape_predictor_path = 'shape_predictor_68_face_landmarks.dat'
face_embedding_penalty = 100 # or None

print(output_path)
if method == 'random':
    random_distance = RandomDistance()
    cross_cut = Crosscut(random_distance, video_path, output_path)
elif method == 'face':
    face_distance = FaceDistance(shape_predictor_path, face_embedding_penalty)
    cross_cut = Crosscut(face_distance, video_path, output_path)
elif method == 'pose':
    pose_distance = PoseDistance()
    cross_cut = Crosscut(pose_distance, video_path, output_path)
elif method == 'feature':
    feature_distance = FeatureDistance()
    cross_cut = Crosscut(feature_distance, video_path, output_path)
cross_cut.generate_video()