In [1]:
import dlib
import cv2
import os

In [4]:
RESULT_PATH = './result/'       # The path that the result images will be saved
VIDEO_PATH = './dataset/'       # Dataset path
LOG_PATH = 'log.txt'            # The path for the working log file
LIP_MARGIN = 0.3                # Marginal rate for lip-only image.
RESIZE = (96,96)                # Final image size
logfile = open(LOG_PATH,'w')
# Face detector and landmark detector
face_detector = dlib.get_frontal_face_detector()   
landmark_detector = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")	# Landmark detector path

In [5]:
def shape_to_list(shape):
	coords = []
	for i in range(0, 68):
		coords.append((shape.part(i).x, shape.part(i).y))
	return coords

In [6]:
video_list = os.listdir(VIDEO_PATH)     # Read video list

In [7]:
for vid_name in video_list:                 # Iterate on video files
    vid_path = VIDEO_PATH + vid_name
    vid = cv2.VideoCapture(vid_path)       # Read video

    # Parse into frames 
    frame_buffer = []               # A list to hold frame images
    frame_buffer_color = []         # A list to hold original frame images
    while(True):
        success, frame = vid.read()                # Read frame
        if not success:
            break                           # Break if no frame to read left
        gray = cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)   # Convert image into grayscale
        frame_buffer.append(gray)                  # Add image to the frame buffer
        frame_buffer_color.append(frame)
    vid.release()

In [8]:
# Obtain face landmark information
landmark_buffer = []        # A list to hold face landmark information
for (i, image) in enumerate(frame_buffer):          # Iterate on frame buffer
    face_rects = face_detector(image,1)             # Detect face
    if len(face_rects) < 1:                 # No face detected
        print("No face detected: ",vid_path)
        logfile.write(vid_path + " : No face detected \r\n")
        break
    if len(face_rects) > 1:                  # Too many face detected
        print("Too many face: ",vid_path)
        logfile.write(vid_path + " : Too many face detected \r\n")
        break
    rect = face_rects[0]                    # Proper number of face
    landmark = landmark_detector(image, rect)   # Detect face landmarks
    landmark = shape_to_list(landmark)
    landmark_buffer.append(landmark)

In [9]:
def detect_landmark(image):
    face_rects = face_detector(image,1)             # Detect face
    if len(face_rects) < 1:                 # No face detected
        print("No face detected: ",vid_path)
        logfile.write(vid_path + " : No face detected \r\n")
        return
    if len(face_rects) > 1:                  # Too many face detected
        print("Too many face: ",vid_path)
        logfile.write(vid_path + " : Too many face detected \r\n")
        return
    rect = face_rects[0]                    # Proper number of face
    landmark = landmark_detector(image, rect)   # Detect face landmarks
    landmark = shape_to_list(landmark)
    # landmark_buffer.append(landmark)
    return landmark

In [13]:
from multiprocessing import Pool, TimeoutError
# from multiprocessing.dummy import Pool
import time
import os

with Pool(processes = 10) as pool:
    landmark_buffer2 = pool.map(detect_landmark, frame_buffer)

print("done")

done


In [14]:
if len(landmark_buffer) != len(landmark_buffer2):
    print("ERROR: lengths not equal")
else:
    for i in range(len(landmark_buffer)):
        if landmark_buffer[i] != landmark_buffer2[i]:
            print("ERROR: buffers don't match")
            break
print("done")

done


In [12]:
# Crop images
cropped_buffer = []
for (i,landmark) in enumerate(landmark_buffer):
    lip_landmark = landmark[48:68]                                          # Landmark corresponding to lip
    lip_x = sorted(lip_landmark,key = lambda pointx: pointx[0])             # Lip landmark sorted for determining lip region
    lip_y = sorted(lip_landmark, key = lambda pointy: pointy[1])
    x_add = int((-lip_x[0][0]+lip_x[-1][0])*LIP_MARGIN)                     # Determine Margins for lip-only image
    y_add = int((-lip_y[0][1]+lip_y[-1][1])*LIP_MARGIN)
    crop_pos = (lip_x[0][0]-x_add, lip_x[-1][0]+x_add, lip_y[0][1]-y_add, lip_y[-1][1]+y_add)   # Crop image
    cropped = frame_buffer_color[i][crop_pos[2]:crop_pos[3],crop_pos[0]:crop_pos[1]]
    cropped = cv2.resize(cropped,(RESIZE[0],RESIZE[1]),interpolation=cv2.INTER_CUBIC)        # Resize
    cropped_buffer.append(cropped)

In [11]:
# Save result
directory = RESULT_PATH + vid_name + "/"
for (i,image) in enumerate(cropped_buffer):
    if not os.path.exists(directory):           # If the directory not exists, make it.
        os.makedirs(directory)
    cv2.imwrite(directory + "%d"%(i+1) + ".jpg", image)     # Write lip image

In [12]:
import subprocess

command = f'ffmpeg -i {directory}%d.jpg output.mp4'
subprocess.call(command, shell=True)

ffmpeg version 4.2.7-0ubuntu0.1 Copyright (c) 2000-2022 the FFmpeg developers
  built with gcc 9 (Ubuntu 9.4.0-1ubuntu1~20.04.1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-avresample --disable-filter=resample --enable-avisynth --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librsvg --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --e

0

In [4]:
input_video_path = "dataset/angel_clip_mp4_compressed.mp4"
output_video_path = "angel_output.mp4"
face_predictor_path = "shape_predictor_68_face_landmarks.dat"

In [1]:
import dlib
import cv2
import os
import tempfile
import subprocess
from functools import partial
from itertools import repeat
from multiprocessing import Pool

face_detector = dlib.get_frontal_face_detector()

def shape_to_list(shape):
	coords = []
	for i in range(0, 68):
		coords.append((shape.part(i).x, shape.part(i).y))
	return coords

# Obtain face landmark information
def detect_landmark(image, landmark_detector):
    
    face_rects = face_detector(image,1)             # Detect face
    if len(face_rects) < 1:                 # No face detected
        print("No face detected")
        return
    if len(face_rects) > 1:                  # Too many face detected
        print("Too many faces")
        return
    rect = face_rects[0]                    # Proper number of face
    landmark = landmark_detector(image, rect)   # Detect face landmarks
    landmark = shape_to_list(landmark)
    return landmark

In [2]:
def detect_landmark_unpack(args):
    return detect_landmark(args[0], args[1])

In [5]:
# def process_video(input_video_path, output_video_path, face_predictor_path):

LIP_MARGIN = 0.3                # Marginal rate for lip-only image.
RESIZE = (96,96)                # Final image size

vid = cv2.VideoCapture(input_video_path)       # Read video

# Parse into frames
frame_buffer = []               # A list to hold frame images
frame_buffer_color = []         # A list to hold original frame images
while(True):
    success, frame = vid.read()                # Read frame
    if not success:
        break                           # Break if no frame to read left
    gray = cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)   # Convert image into grayscale
    frame_buffer.append(gray)                  # Add image to the frame buffer
    frame_buffer_color.append(frame)
vid.release()

In [6]:
landmark_detector = dlib.shape_predictor(face_predictor_path)

with Pool(processes = 10) as pool:
    landmark_buffer = pool.starmap(detect_landmark, zip(frame_buffer, repeat(landmark_detector)))

print("done")

done


In [None]:
# Crop images
cropped_buffer = []
for (i,landmark) in enumerate(landmark_buffer):
    lip_landmark = landmark[48:68]                                          # Landmark corresponding to lip
    lip_x = sorted(lip_landmark,key = lambda pointx: pointx[0])             # Lip landmark sorted for determining lip region
    lip_y = sorted(lip_landmark, key = lambda pointy: pointy[1])
    x_add = int((-lip_x[0][0]+lip_x[-1][0])*LIP_MARGIN)                     # Determine Margins for lip-only image
    y_add = int((-lip_y[0][1]+lip_y[-1][1])*LIP_MARGIN)
    crop_pos = (lip_x[0][0]-x_add, lip_x[-1][0]+x_add, lip_y[0][1]-y_add, lip_y[-1][1]+y_add)   # Crop image
    cropped = frame_buffer_color[i][crop_pos[2]:crop_pos[3],crop_pos[0]:crop_pos[1]]
    cropped = cv2.resize(cropped,(RESIZE[0],RESIZE[1]),interpolation=cv2.INTER_CUBIC)        # Resize
    cropped_buffer.append(cropped)


In [None]:

# Save result to temp folder
with tempfile.TemporaryDirectory() as tempdir:
    for (i,image) in enumerate(cropped_buffer):
        cv2.imwrite(str(tempdir) + "%d"%(i+1) + ".jpg", image)     # Write lip image

    # Save video
    command = f'ffmpeg -i {str(tempdir)}%d.jpg {output_video_path}'
    subprocess.call(command, shell=True)

In [3]:
process_video("dataset/angel_clip_mp4_compressed.mp4", "angel_output.mp4", "shape_predictor_68_face_landmarks.dat")

done


/bin/sh: 1: cannot open TemporaryDirectory: No such file


TypeError: rmdir: path should be string, bytes or os.PathLike, not TemporaryDirectory

In [1]:
import dlib
import cv2
import os
import tempfile
import subprocess
from functools import partial
from multiprocessing.dummy import Pool

class ROIExtractor:
    def __init__(self, face_predictor_path):
        self.landmark_detector = dlib.shape_predictor(face_predictor_path)
        self.face_detector = dlib.get_frontal_face_detector()

    def shape_to_list(self,  shape):
        coords = []
        for i in range(0, 68):
            coords.append((shape.part(i).x, shape.part(i).y))
        return coords

    # Obtain face landmark information
    def detect_landmark(self, image):
        face_rects = self.face_detector(image,1)             # Detect face
        if len(face_rects) < 1:                 # No face detected
            print("No face detected")
            return
        if len(face_rects) > 1:                  # Too many face detected
            print("Too many faces")
            return
        rect = face_rects[0]                    # Proper number of face
        landmark = self.landmark_detector(image, rect)   # Detect face landmarks
        landmark = self.shape_to_list(landmark)
        # landmark_buffer.append(landmark)
        return landmark

    def process_video(self, input_video_path, output_video_path):
        LIP_MARGIN = 0.3                # Marginal rate for lip-only image.
        RESIZE = (96,96)                # Final image size

        vid = cv2.VideoCapture(input_video_path)       # Read video

        # Parse into frames
        frame_buffer = []               # A list to hold frame images
        frame_buffer_color = []         # A list to hold original frame images
        while(True):
            success, frame = vid.read()                # Read frame
            if not success:
                break                           # Break if no frame to read left
            gray = cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)   # Convert image into grayscale
            frame_buffer.append(gray)                  # Add image to the frame buffer
            frame_buffer_color.append(frame)
        vid.release()

        with Pool(processes = 10) as pool:
            landmark_buffer = pool.map(self.detect_landmark, frame_buffer)
        
        print("done")

        # Crop images
        cropped_buffer = []
        for (i,landmark) in enumerate(landmark_buffer):
            lip_landmark = landmark[48:68]                                          # Landmark corresponding to lip
            lip_x = sorted(lip_landmark,key = lambda pointx: pointx[0])             # Lip landmark sorted for determining lip region
            lip_y = sorted(lip_landmark, key = lambda pointy: pointy[1])
            x_add = int((-lip_x[0][0]+lip_x[-1][0])*LIP_MARGIN)                     # Determine Margins for lip-only image
            y_add = int((-lip_y[0][1]+lip_y[-1][1])*LIP_MARGIN)
            crop_pos = (lip_x[0][0]-x_add, lip_x[-1][0]+x_add, lip_y[0][1]-y_add, lip_y[-1][1]+y_add)   # Crop image
            cropped = frame_buffer_color[i][crop_pos[2]:crop_pos[3],crop_pos[0]:crop_pos[1]]
            cropped = cv2.resize(cropped,(RESIZE[0],RESIZE[1]),interpolation=cv2.INTER_CUBIC)        # Resize
            cropped_buffer.append(cropped)

        # Save result to temp folder
        tempdir = tempfile.TemporaryDirectory()
        for (i,image) in enumerate(cropped_buffer):
            cv2.imwrite(str(tempdir) + "%d"%(i+1) + ".jpg", image)     # Write lip image

        # Save video
        command = f'ffmpeg -i {str(tempdir)}%d.jpg {output_video_path}'
        subprocess.call(command, shell=True)

        os.removedirs(str(tempdir))

In [2]:
extractor = ROIExtractor("shape_predictor_68_face_landmarks.dat")

In [3]:
extractor.process_video("dataset/angel_clip_mp4_compressed.mp4", "angel_output.mp4")

Too many faces
Too many faces
Too many faces
Too many faces
Too many faces
done


TypeError: 'NoneType' object is not subscriptable

In [21]:
import dlib
import cv2
import os
import tempfile
import subprocess
from functools import partial
from multiprocessing import Pool

face_detector = dlib.get_frontal_face_detector()

def shape_to_list(shape):
	coords = []
	for i in range(0, 68):
		coords.append((shape.part(i).x, shape.part(i).y))
	return coords

def process_video(input_video_path, output_video_path, face_predictor_path):
    landmark_detector = dlib.shape_predictor(face_predictor_path)

    global detect_landmark

    # Obtain face landmark information
    def detect_landmark(image):
        face_rects = face_detector(image,1)             # Detect face
        if len(face_rects) < 1:                 # No face detected
            print("No face detected")
            return
        if len(face_rects) > 1:                  # Too many face detected
            print("Too many faces")
            return
        rect = face_rects[0]                    # Proper number of face
        landmark = landmark_detector(image, rect)   # Detect face landmarks
        landmark = shape_to_list(landmark)
        # landmark_buffer.append(landmark)
        return landmark
    
    TEMP_PATH = './process_video/'       # The path that the result images will be saved
    LIP_MARGIN = 0.3                # Marginal rate for lip-only image.
    RESIZE = (96,96)                # Final image size

    vid = cv2.VideoCapture(input_video_path)       # Read video

    # Parse into frames
    frame_buffer = []               # A list to hold frame images
    frame_buffer_color = []         # A list to hold original frame images
    while(True):
        success, frame = vid.read()                # Read frame
        if not success:
            break                           # Break if no frame to read left
        gray = cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)   # Convert image into grayscale
        frame_buffer.append(gray)                  # Add image to the frame buffer
        frame_buffer_color.append(frame)
    vid.release()

    with Pool(processes = 10) as pool:
        landmark_buffer = pool.map(detect_landmark, frame_buffer)
    
    print("done")

    # Crop images
    cropped_buffer = []
    for (i,landmark) in enumerate(landmark_buffer):
        lip_landmark = landmark[48:68]                                          # Landmark corresponding to lip
        lip_x = sorted(lip_landmark,key = lambda pointx: pointx[0])             # Lip landmark sorted for determining lip region
        lip_y = sorted(lip_landmark, key = lambda pointy: pointy[1])
        x_add = int((-lip_x[0][0]+lip_x[-1][0])*LIP_MARGIN)                     # Determine Margins for lip-only image
        y_add = int((-lip_y[0][1]+lip_y[-1][1])*LIP_MARGIN)
        crop_pos = (lip_x[0][0]-x_add, lip_x[-1][0]+x_add, lip_y[0][1]-y_add, lip_y[-1][1]+y_add)   # Crop image
        cropped = frame_buffer_color[i][crop_pos[2]:crop_pos[3],crop_pos[0]:crop_pos[1]]
        cropped = cv2.resize(cropped,(RESIZE[0],RESIZE[1]),interpolation=cv2.INTER_CUBIC)        # Resize
        cropped_buffer.append(cropped)

    # Save result to temp folder
    with tempfile.TemporaryDirectory() as tempdir:
        for (i,image) in enumerate(cropped_buffer):
            cv2.imwrite(str(tempdir) + "%d"%(i+1) + ".jpg", image)     # Write lip image

        # Save video
        command = f'ffmpeg -i {str(tempdir)}%d.jpg {output_video_path}'
        subprocess.call(command, shell=True)

In [22]:
process_video("dataset/angel_clip_mp4_compressed.mp4", "angel_output.mp4", "shape_predictor_68_face_landmarks.dat")

done


AttributeError: __enter__