## Video without Audio Synchronisation (Using Original Audio)

This code cell processes video data for face swapping. It utilises the insightface library for face detection and employs computer vision techniques to replace faces in video frames. Multithreaded processing ensures efficient handling of video data. The output is a video with swapped faces, maintaining the original audio.

#### Functionality:
- Deep learning-based face detection and swapping.
- Parallel frame processing for improved performance.
- Original audio is retained in the swapped video.

#### This code cell is suitable for videos where faces are clearly visible and require swapping.

In [None]:
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from tqdm.notebook import tqdm
import os
import cv2
import matplotlib.pyplot as plt
import subprocess
import imageio
import multiprocessing
import insightface
import logging

# Suppressing imageio ffmpeg warnings
logging.getLogger().setLevel(logging.ERROR)

class FaceSwapper:
    def __init__(self, app_name='buffalo_l', model_path=None, ffmpeg_path=None):
        self.app = insightface.app.FaceAnalysis(name=app_name)
        self.app.prepare(ctx_id=0, det_size=(640, 640))
        self.swapper = insightface.model_zoo.get_model(model_path, download=False, download_zip=False)
        self.ffmpeg_path = ffmpeg_path

    def _process_frame(self, frame, face1):
        frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        face2_result = self.app.get(frame_bgr)
        if face2_result is None or not face2_result:
            return None, None
        face2 = face2_result[0]
        frame_swapped = self.swapper.get(frame_bgr, face2, face1, paste_back=True)
        frame_swapped_rgb = cv2.cvtColor(frame_swapped, cv2.COLOR_BGR2RGB)
        return frame_swapped_rgb, True

    def swap_n_show(self, img1_fn, video_fn):
        img1 = cv2.imread(img1_fn)
        face1 = self.app.get(img1)[0]
        video = imageio.get_reader(video_fn)
        fps = video.get_meta_data()['fps']

        with ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
            futures = [executor.submit(self._process_frame, frame, face1) for frame in video]
            results = []
            for future in tqdm(futures, total=len(futures), desc='Processing frames'):
                results.append(future.result())

        swapped_video, status_list = zip(*results)
        swapped_video = [frame for frame, status in zip(swapped_video, status_list) if status]

        if swapped_video:
            self._handle_video_processing(video_fn, swapped_video, fps)

    def _handle_video_processing(self, video_fn, swapped_video, fps):
        input_path = Path(video_fn)
        extension = input_path.suffix.lower()

        if extension in ['.mp4', '.mov']:
            video_codec = 'libx264'
            audio_codec = 'aac'
            audio_extension = '.m4a'
            output_extension = '.mp4'
        elif extension == '.webm':
            video_codec = 'libvpx-vp9'
            audio_codec = 'libvorbis'
            audio_extension = '.ogg'
            output_extension = '.mp4'
        else:
            raise ValueError("Unsupported video format")

        output_video_fn = str(input_path.stem) + '_output' + output_extension
        output_path = Path.cwd() / output_video_fn

        with imageio.get_writer(output_path, fps=fps, codec=video_codec, quality=9, ffmpeg_params=['-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2']) as writer:
            for frame in swapped_video:
                if frame is not None:
                    writer.append_data(frame)

        audio_fn = str(input_path.stem) + '_audio' + audio_extension
        subprocess.run([self.ffmpeg_path, '-i', video_fn, '-vn', '-acodec', audio_codec, audio_fn, '-y'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        final_output_fn = str(input_path.stem) + '_swapped' + output_extension
        final_output_path = Path.cwd() / final_output_fn
        subprocess.run([self.ffmpeg_path, '-i', output_path, '-i', audio_fn, '-c:v', 'copy', '-c:a', 'copy', '-shortest', final_output_path, '-y'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        os.remove(output_path)
        os.remove(audio_fn)

        plt.figure(figsize=(5, 5))
        plt.imshow(swapped_video[0])
        plt.axis('off')
        plt.show()

        print(f"Final output saved to {final_output_path}")

def main():
    model_path = 'YOUR_MODEL_PATH_HERE/inswapper_128.onnx'
    ffmpeg_path = 'YOUR_FFMPEG_PATH_HERE/ffmpeg'
    swapper = FaceSwapper(model_path=model_path, ffmpeg_path=ffmpeg_path)

    image_path = 'YOUR_IMAGE_PATH_HERE/hello_world.jpg'
    video_path = 'YOUR_VIDEO_DIRECTORY_PATH_HERE/'
    video_files = [f'videos_{i}.mp4' for i in range(1, 0)]

    for video_file in video_files:
        print(f"Processing {video_file}...")
        full_video_path = os.path.join(video_path, video_file)
        try:
            swapper.swap_n_show(image_path, full_video_path)
            print(f"Finished processing {video_file}")
        except Exception as e:
            print(f"Failed to process {video_file}: {e}")

if __name__ == "__main__":
    main()

## Video with Audio Extraction and Synchronisation (Work in Progress)

This code cell is a work in progress and handles video processing with audio extraction and synchronisation. It is intended for videos where faces may be obscured or not clearly visible, requiring face image replacement. While it's not fully functional yet, it's under development.

#### Functionality:
- Face detection using deep learning models.
- Face image replacement in video frames.
- Audio extraction and synchronisation with swapped video.

In [None]:
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from tqdm.notebook import tqdm
import os
import cv2
import matplotlib.pyplot as plt
import subprocess
import imageio
import multiprocessing
import insightface
import logging

# Suppressing imageio ffmpeg warnings
logging.getLogger().setLevel(logging.ERROR)

class FaceSwapper:
    def __init__(self, app_name='buffalo_l', model_path=None, ffmpeg_path=None):
        self.app = insightface.app.FaceAnalysis(name=app_name)
        self.app.prepare(ctx_id=0, det_size=(640, 640))
        self.swapper = insightface.model_zoo.get_model(model_path, download=False, download_zip=False)
        self.ffmpeg_path = ffmpeg_path

    def _process_frame(self, frame, face1):
        frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        face2_result = self.app.get(frame_bgr)
        if face2_result is None or not face2_result:
            return None, None
        face2 = face2_result[0]
        frame_swapped = self.swapper.get(frame_bgr, face2, face1, paste_back=True)
        frame_swapped_rgb = cv2.cvtColor(frame_swapped, cv2.COLOR_BGR2RGB)
        return frame_swapped_rgb, True

    def _extract_audio_segments(self, video_fn, kept_frame_timestamps, frame_duration, audio_extension):
        audio_segments = []
        last_timestamp = 0.0

        for timestamp in kept_frame_timestamps:
            if timestamp > last_timestamp + frame_duration:
                segment_fn = f"temp_audio_segment_{len(audio_segments)}{audio_extension}"
                subprocess.run([self.ffmpeg_path, '-i', video_fn, '-ss', str(last_timestamp), '-to', str(timestamp),
                                '-vn', '-acodec', 'copy', segment_fn, '-y'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                audio_segments.append(segment_fn)
                last_timestamp = timestamp + frame_duration

        return audio_segments

    def _concatenate_audio_segments(self, audio_segments, final_audio_fn):
        with open("temp_audio_list.txt", "w") as file:
            for segment in audio_segments:
                file.write(f"file '{segment}'\n")

        subprocess.run([self.ffmpeg_path, '-f', 'concat', '-safe', '0', '-i', 'temp_audio_list.txt',
                        '-c', 'copy', final_audio_fn, '-y'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        for segment in audio_segments:
            os.remove(segment)
        os.remove("temp_audio_list.txt")

    def swap_n_show(self, img1_fn, video_fn):
        img1 = cv2.imread(img1_fn)
        face1 = self.app.get(img1)[0]
        video = imageio.get_reader(video_fn)
        fps = video.get_meta_data()['fps']

        kept_frame_timestamps = []
        frame_duration = 1.0 / fps
        current_time = 0.0

        with ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
            futures = [executor.submit(self._process_frame, frame, face1) for frame in video]
            results = []

            for future in tqdm(futures, total=len(futures), desc='Processing frames'):
                result, status = future.result()
                if status:
                    kept_frame_timestamps.append(current_time)
                    results.append(result)
                current_time += frame_duration

        swapped_video = results

        if swapped_video:
            self._handle_video_processing(video_fn, swapped_video, fps, kept_frame_timestamps)

    def _handle_video_processing(self, video_fn, swapped_video, fps, kept_frame_timestamps):
        input_path = Path(video_fn)
        extension = input_path.suffix.lower()

        if extension in ['.mp4', '.mov']:
            video_codec = 'libx264'
            audio_extension = '.m4a'
            output_extension = '.mp4'
        elif extension == '.webm':
            video_codec = 'libvpx-vp9'
            audio_extension = '.ogg'
            output_extension = '.mp4'
        else:
            raise ValueError("Unsupported video format")

        output_video_fn = str(input_path.stem) + '_output' + output_extension
        output_path = Path.cwd() / output_video_fn

        with imageio.get_writer(output_path, fps=fps, codec=video_codec, quality=9, ffmpeg_params=['-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2']) as writer:
            for frame in swapped_video:
                if frame is not None:
                    writer.append_data(frame)

        # Extract and concatenate audio segments
        audio_segments = self._extract_audio_segments(video_fn, kept_frame_timestamps, 1.0 / fps, audio_extension)
        concatenated_audio_fn = str(input_path.stem) + '_concatenated' + audio_extension
        self._concatenate_audio_segments(audio_segments, concatenated_audio_fn)

        # Combine video and audio
        final_output_fn = str(input_path.stem) + '_swapped' + output_extension
        final_output_path = Path.cwd() / final_output_fn
        subprocess.run([self.ffmpeg_path, '-i', output_path, '-i', concatenated_audio_fn, '-c:v', 'copy', '-c:a', 'copy', '-shortest', final_output_path, '-y'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        os.remove(output_path)
        os.remove(concatenated_audio_fn)

        plt.figure(figsize=(5, 5))
        plt.imshow(swapped_video[0])
        plt.axis('off')
        plt.show()

        print(f"Final output saved to {final_output_path}")

def main():
    model_path = 'YOUR_MODEL_PATH_HERE/inswapper_128.onnx'
    ffmpeg_path = 'YOUR_FFMPEG_PATH_HERE/ffmpeg'
    swapper = FaceSwapper(model_path=model_path, ffmpeg_path=ffmpeg_path)

    image_path = 'YOUR_IMAGE_PATH_HERE/hello_world.jpg'
    video_path = 'YOUR_VIDEO_DIRECTORY_PATH_HERE/'
    video_files = [f'videos_{i}.mp4' for i in range(1, 0)]

    for video_file in video_files:
        print(f"Processing {video_file}...")
        full_video_path = os.path.join(video_path, video_file)
        try:
            swapper.swap_n_show(image_path, full_video_path)
            print(f"Finished processing {video_file}")
        except Exception as e:
            print(f"Failed to process {video_file}: {e}")

if __name__ == "__main__":
    main()