![image.png](attachment:image.png)

### This integrated process ensures that the video output from Wav2Lip undergoes high-quality enhancement for both the overall resolution (via RealESRGAN) and facial details (via GFPGAN). Finally, it reassembles the video with the enhanced frames and the original synced audio.

In [24]:
#this code for recording audio
from IPython.display import HTML, Audio
from base64 import b64decode
import numpy as np
from scipy.io.wavfile import read as wav_read
import io
import ffmpeg
from IPython.display import clear_output 

In [25]:
import os
# Create the temp directory if it doesn't exist
temp_dir = "temp"
os.makedirs(temp_dir, exist_ok=True)

##### Have a good GPU Skip this

In [26]:
import subprocess
import os
import cv2

def get_video_duration(video_path):
    """Returns the duration of the video in seconds."""
    video = cv2.VideoCapture(video_path)
    fps = video.get(cv2.CAP_PROP_FPS)
    frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT)
    duration = frame_count / fps
    video.release()
    return duration

def trim_video_to_5_seconds(input_video_path, output_video_path):
    """Trims the video to the first 5 seconds if it is longer than 5 seconds."""
    duration = get_video_duration(input_video_path)
    if duration <= 5:
        print(f"Video is already {duration:.2f} seconds long. No trimming needed.")
        return
    
    command = [
        "ffmpeg", 
        "-i", input_video_path,   # Input file
        "-t", "5",                # Duration to keep (5 seconds)
        "-c", "copy",             # Copy codec (no re-encoding)
        output_video_path         # Output file
    ]
    
    try:
        subprocess.run(command, check=True)
        print(f"Video trimmed to 5 seconds and saved as: {output_video_path}")
    except subprocess.CalledProcessError as e:
        print(f"Error trimming video: {e}")

In [27]:
# Example usage
input_video = "MonnaLisa.mp4"
output_video = "trimmed_video_5s.mp4"
trim_video_to_5_seconds(input_video, output_video)

ZeroDivisionError: float division by zero

## Start

## Upload video

In [None]:
import os
import cv2
import shutil
import ipywidgets as widgets
from IPython.display import clear_output
import moviepy.editor as mp
import time
import tempfile

def get_video_resolution(video_path):
    """Function to get the resolution of a video"""
    video = cv2.VideoCapture(video_path)
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    video.release()
    return (width, height)

def resize_video(video_path, new_resolution):
    """Function to resize a video and handle file replacement"""
    temp_dir = tempfile.mkdtemp()  # Create a temporary directory
    temp_path = os.path.join(temp_dir, "temp_resized_video.mp4")
    backup_path = video_path + ".bak"  # Backup path for original file
    
    try:
        # Open the original video
        video = cv2.VideoCapture(video_path)
        if not video.isOpened():
            print(f"Error opening video file: {video_path}")
            return None
        
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for mp4 files
        fps = video.get(cv2.CAP_PROP_FPS)
        width, height = new_resolution
        writer = cv2.VideoWriter(temp_path, fourcc, fps, (width, height))
        
        while True:
            success, frame = video.read()
            if not success:
                break
            resized_frame = cv2.resize(frame, (width, height))
            writer.write(resized_frame)
        
        video.release()
        writer.release()
        
        # Try to rename the original file
        for attempt in range(5):  # Retry up to 5 times
            try:
                if os.path.exists(backup_path):
                    os.remove(backup_path)  # Ensure backup file is not present
                os.rename(video_path, backup_path)
                break
            except PermissionError as e:
                print(f"Attempt {attempt+1}: Unable to rename the original file '{video_path}'. Error: {e}")
                time.sleep(1)  # Wait a moment before retrying
        
        # Replace the original file with the resized video
        try:
            shutil.move(temp_path, video_path)
        except PermissionError as e:
            print(f"Unable to replace the original file '{video_path}' with the resized video. Error: {e}")
            return None
        
        # Clean up temporary directory
        shutil.rmtree(temp_dir)
        
        return video_path
    except Exception as e:
        print(f"Error during video processing: {e}")
        return None

# File upload widget
upload_widget = widgets.FileUpload(accept='.mp4', multiple=False)

def on_upload_change(change):
    clear_output(wait=True)
    
    if upload_widget.value:
        # Handle the uploaded file
        file_info = upload_widget.value[0]
        file_content = file_info['content']
        with open('input_video.mp4', 'wb') as f:
            f.write(file_content)
        print('Uploaded file successfully')
        
        # Define the video path
        PATH_TO_YOUR_VIDEO = 'input_video.mp4'
        print(f"Video path: {PATH_TO_YOUR_VIDEO}")  # Debug print
        
        # Check video duration
        video_duration = mp.VideoFileClip(PATH_TO_YOUR_VIDEO).duration
        if video_duration > 60:
            print("WARNING: Video duration exceeds 60 seconds. Please upload a shorter video.")
            return
        
        # Check video resolution and resize if necessary
        video_resolution = get_video_resolution(PATH_TO_YOUR_VIDEO)
        print(f"Video resolution: {video_resolution}")  # Debug print
        if video_resolution[0] >= 1920 or video_resolution[1] >= 1080:
            print("Resizing video to 720p...")
            PATH_TO_YOUR_VIDEO = resize_video(PATH_TO_YOUR_VIDEO, (1280, 720))
            if PATH_TO_YOUR_VIDEO:
                print("Video resized to 720p")
            else:
                print("Error resizing video")
        else:
            print("No resizing needed")

upload_widget.observe(on_upload_change, names='value')

# Display the file upload widget
display(upload_widget)


Uploaded file successfully
Video path: input_video.mp4
Video resolution: (360, 288)
No resizing needed


## Upload Audio or Wana write your self

In [None]:
import ipywidgets as widgets
from IPython.display import display, clear_output, Audio
from gtts import gTTS

# Create the widgets
audio_source = widgets.RadioButtons(
    options=['Upload Audio', 'Use gTTS'],
    description='Audio Source:'
)

upload_widget = widgets.FileUpload(accept='.mp3,.wav', multiple=False)
gtts_text = widgets.Text(placeholder='Enter text for Text to Speech', description='TTS:')
submit_button = widgets.Button(description='Submit')

output = widgets.Output()

# Function to handle the display of widgets based on selection
def on_audio_source_change(change):
    if change['new'] == 'Upload Audio':
        gtts_text.layout.display = 'none'
        upload_widget.layout.display = 'block'
    else:
        upload_widget.layout.display = 'none'
        gtts_text.layout.display = 'block'

audio_source.observe(on_audio_source_change, names='value')

# Function to handle submit button click
def on_submit_click(b):
    clear_output(wait=True)
    with output:
        if audio_source.value == 'Upload Audio':
            if upload_widget.value:
                try:
                    # Print type and content of the upload_widget value
                    print("Type of upload_widget.value:", type(upload_widget.value))
                    print("Content of upload_widget.value:", upload_widget.value)

                    # Handle tuple data
                    if isinstance(upload_widget.value, tuple):
                        uploaded_file = upload_widget.value[0]
                    else:
                        uploaded_file = list(upload_widget.value.values())[0]
                    
                    file_content = uploaded_file['content']
                    
                    with open('input_audio.wav', 'wb') as f:
                        f.write(file_content)
                    
                    print("Uploaded file saved as 'input_audio.wav'")
                    display(Audio('input_audio.wav'))
                except Exception as e:
                    print(f"Error saving uploaded file: {e}")
            else:
                print("No file uploaded.")
        else:
            if gtts_text.value:
                try:
                    tts = gTTS(gtts_text.value)
                    tts.save("input_audio.wav")
                    print("gTTS audio saved as 'input_audio.wav'")
                    display(Audio('input_audio.wav'))
                except Exception as e:
                    print(f"Error generating gTTS audio: {e}")
            else:
                print("No text entered for gTTS.")

submit_button.on_click(on_submit_click)

# Set initial state of widgets
on_audio_source_change({'new': audio_source.value})

# Display all widgets
display(audio_source, upload_widget, gtts_text, submit_button, output)


## Start the Processing

In [30]:
import os
import subprocess

# Define file paths (assuming they are uploaded to the working directory)
video_path = "input_video.mp4"  # Change this if your video file has a different name
audio_path = "input_audio.wav"  # Change this if your audio file has a different name
result_dir = "Wav2Lip\\results"
result_path = os.path.join(result_dir, "lip_synced_video.mp4")

# Ensure the results directory exists
os.makedirs(result_dir, exist_ok=True)

# Run the Wav2Lip script
try:
    process = subprocess.run(
        [
            "python", "Wav2Lip\\inference.py",
            "--checkpoint_path", "Wav2Lip\\checkpoints\\wav2lip_gan.pth",
            "--face", video_path,
            "--audio", audio_path,
            "--outfile", result_path,
            "--resize_factor", "2"  # Change this to 0 if you have a high GPU
        ],
        capture_output=True, text=True, check=True
    )
    print("Subprocess Output:\n", process.stdout)
    print("Subprocess Error (if any):\n", process.stderr)
except subprocess.CalledProcessError as e:
    # Capture the error output
    error_message = f"Subprocess failed with exit code {e.returncode}\n"
    error_message += f"Output: {e.stdout}\n"
    error_message += f"Error: {e.stderr}\n"
    print(error_message)

# Check if the output file exists
if os.path.exists(result_path):
    print(f"Lip-synced video generated successfully: {result_path}")

    # Get FPS of the generated video using ffprobe
    try:
        ffprobe_command = [
            "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=r_frame_rate",
            "-of", "default=noprint_wrappers=1:nokey=1", result_path
        ]
        fps_process = subprocess.run(
            ffprobe_command, capture_output=True, text=True, check=True
        )
        fps_fraction = fps_process.stdout.strip()

        # Convert FPS to integer if it's in fractional form
        if '/' in fps_fraction:
            numerator, denominator = map(int, fps_fraction.split('/'))
            fps = numerator // denominator  # Use integer division for FPS
        else:
            fps = int(fps_fraction)

        print(f"Frames per second (FPS) of the generated video: {fps}")
    except subprocess.CalledProcessError as e:
        print(f"Failed to get FPS of the video: {e}")
else:
    print("Error: The lip-synced video was not generated.")


Subprocess Output:
 Using cuda for inference.
Reading video frames...
Number of frames available for inference: 75
(80, 493)
Length of mel chunks: 151
Load checkpoint from: Wav2Lip\checkpoints\wav2lip_gan.pth
Model loaded

Subprocess Error (if any):
  return librosa.filters.mel(hp.sample_rate, hp.n_fft, n_mels=hp.num_mels,

  0%|          | 0/2 [00:00<?, ?it/s]

  0%|          | 0/5 [00:00<?, ?it/s][A

 20%|â–ˆâ–ˆ        | 1/5 [01:38<06:33, 98.31s/it][A

 40%|â–ˆâ–ˆâ–ˆâ–ˆ      | 2/5 [01:39<02:02, 40.91s/it][A

 60%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ    | 3/5 [01:39<00:45, 22.53s/it][A

 80%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ  | 4/5 [01:40<00:13, 13.90s/it][A

100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 5/5 [01:41<00:00,  9.31s/it][A
100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 5/5 [01:41<00:00, 20.31s/it]

 50%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆ     | 1/2 [01:53<01:53, 113.38s/it]
100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [01:56<00:00, 48.28s/it] 
100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [01:56<00:00, 58.05s/it]
ffmpeg version 7.0.2-essent

In [31]:
print(fps)

25


In [32]:
import torch

if torch.cuda.is_available():
    print("PyTorch has access to a GPU!")
    print(torch.cuda.get_device_name(0))
else:
    print("PyTorch is using CPU only")

PyTorch has access to a GPU!
Quadro M1200


In [33]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

In [34]:
import os
import requests
import base64
import cv2
import torch
from torchvision import models, transforms
from PIL import Image
from gfpgan.utils import GFPGANer
from realesrgan.utils import RealESRGANer
from basicsr.archs.srvgg_arch import SRVGGNetCompact
from IPython.display import display
import os
import requests
from diffusers import DiffusionPipeline, StableDiffusionXLImg2ImgPipeline 
from torchvision.transforms import ToTensor, Normalize, ConvertImageDtype

In [None]:
# URLs for the different model weights
model_urls = {
    'realesr-general-x4v3.pth': "https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-x4v3.pth",
    'GFPGANv1.4.pth': "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth",
    'RestoreFormer.pth': "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/RestoreFormer.pth",
    'CodeFormer.pth': "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/CodeFormer.pth",
}

# Create a directory for the weights if it doesn't exist
os.makedirs('weights', exist_ok=True)

# This function downloads a file from a given URL and saves it with the specified filename.
def download_file(url, filename):
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(filename, 'wb') as f:
            for chunk in response.iter_content(chunk_size=1024):
                f.write(chunk)
        print(f"Downloaded {filename}")
    else:
        print(f"Failed to download {filename}. Status code: {response.status_code}")


for filename, url in model_urls.items():
    file_path = os.path.join('weights', filename)
    if not os.path.exists(file_path):
        print(f"Downloading {filename}...")
        download_file(url, file_path)
    else:
        print(f"{filename} already exists. Skipping download.")

realesr-general-x4v3.pth already exists. Skipping download.
GFPGANv1.4.pth already exists. Skipping download.
RestoreFormer.pth already exists. Skipping download.
CodeFormer.pth already exists. Skipping download.


In [36]:
print(os.listdir('weights'))

['CodeFormer.pth', 'detection_Resnet50_Final.pth', 'GFPGANv1.4.pth', 'realesr-general-x4v3.pth', 'RestoreFormer.pth']


In [37]:
import torch

realesrgan_model_path = 'weights/realesr-general-x4v3.pth'


In [38]:
# Initialize RealESRGAN
sr_model = SRVGGNetCompact(num_in_ch=3, num_out_ch=3, num_feat=64, num_conv=32, upscale=4, act_type='prelu')
realesrganer = RealESRGANer(scale=4, model_path=realesrgan_model_path, model=sr_model, tile=0, tile_pad=10, pre_pad=0, half=True)

In [39]:
# Load GFPGAN model
gfpgan_model_path = 'weights/GFPGANv1.4.pth'

In [40]:
# Function to upscale image with RealESRGAN
def upscale_image(image_path, output_path):
    img = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
    # Upscale the image with RealESRGAN
    output, _ = realesrganer.enhance(img, outscale=4)
    cv2.imwrite(output_path, output)
    return output

In [41]:
# Initialize GFPGAN
face_enhancer = GFPGANer(model_path=gfpgan_model_path, upscale=2, arch='clean', channel_multiplier=2, bg_upsampler=realesrganer)
# change the upscale to 4 or 10 if you have High GPU

In [42]:
# Function to enhance image with GFPGAN
def enhance_faces(image_path, output_path):
    img = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
    # Enhance faces with GFPGAN
    _, _, img_enhanced = face_enhancer.enhance(img, has_aligned=False, only_center_face=False, paste_back=True)
    cv2.imwrite(output_path, img_enhanced)
    return img_enhanced

In [44]:
import cv2
import os

def extract_frames(video_path, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    for i in range(frame_count):
        ret, frame = cap.read()
        if not ret:
            break
        cv2.imwrite(os.path.join(output_dir, f"frame_{i:04d}.png"), frame)
    cap.release()

video_path = "D:\\studentinfo\\test1\\project\\Wav2Lip\\results\\lip_synced_video.mp4"
output_dir = "D:\\studentinfo\\test1\\project\\frames"
extract_frames(video_path, output_dir)


In [None]:
enhanced_frame_dir = "enhanced_frames"
os.makedirs(enhanced_frame_dir, exist_ok=True)

for frame in os.listdir(output_dir):
    frame_path = os.path.join(output_dir, frame)
    enhanced_frame_path = os.path.join(enhanced_frame_dir, frame)
    enhanced_image = enhance_faces(frame_path, enhanced_frame_path)


In [None]:
def reassemble_video(frame_dir, output_video_path, fps=fps):
    frame_list = sorted([os.path.join(frame_dir, img) for img in os.listdir(frame_dir)])
    frame = cv2.imread(frame_list[0])
    height, width, layers = frame.shape

    video = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    for frame_path in frame_list:
        frame = cv2.imread(frame_path)
        video.write(frame)

    video.release()

output_video_path = "enhanced_lip_synced_video.mp4"
reassemble_video(enhanced_frame_dir, output_video_path)


IndexError: list index out of range

In [None]:
from moviepy.editor import VideoFileClip, AudioFileClip

def add_audio_to_video(video_path, audio_path, output_path):
    video = VideoFileClip(video_path)
    audio = AudioFileClip(audio_path)
    video = video.set_audio(audio)
    video.write_videofile(output_path, codec="libx264", audio_codec="aac")

audio_path = audio_path
final_video_path = "final_video.mp4"
add_audio_to_video(output_video_path, audio_path, final_video_path)


OSError: MoviePy error: the file enhanced_lip_synced_video.mp4 could not be found!
Please check that you entered the correct path.

### clean all the mess

In [None]:
import os
import glob

folder_path = 'enhanced_frames'  # Replace with your folder path
files = glob.glob(os.path.join(folder_path, '*.jpg')) + glob.glob(os.path.join(folder_path, '*.png'))
for f in files:
    os.remove(f)
folder_path = 'frames'  # Replace with your folder path
files = glob.glob(os.path.join(folder_path, '*.jpg')) + glob.glob(os.path.join(folder_path, '*.png'))
for f in files:
    os.remove(f)

print("Folder emptied successfully.")


Folder emptied successfully.
