In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## install package

In [None]:
!pip install pydub SpeechRecognition
!pip install requests
!pip install deepgram-sdk

!pip install pydub moviepy librosa
!pip install SpeechRecognition
!pip install openai==0.28

In [None]:
import librosa
import numpy as np
import speech_recognition as sr
from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips
from pydub import AudioSegment, effects
from pydub import silence
from pydub.silence import split_on_silence
from pydub.silence import detect_silence

import nltk
nltk.download('punkt')
from nltk.tokenize import sent_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

import os
import re
import cv2

import json
import requests
from io import BytesIO

import textwrap
from deepgram import DeepgramClient, PrerecordedOptions, FileSource

import openai
from skimage.metrics import structural_similarity as ssim


## Function

### 1. Video Frame Recognition

In [19]:
def extract_slide_changes(video_path, slides_detect_threshold):

    THRESHOLD = slides_detect_threshold

    clip = VideoFileClip(video_path)
    fps = clip.fps
    total_time = clip.duration
    cap = cv2.VideoCapture(video_path)

    video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    previous_frame = None
    original_timestamps = []
    original_timestamps.append(0)
    slide_pictures = []


    first_second_found = False

    frame_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        if not first_second_found and frame_count / fps < 1:
            resized_frame = cv2.resize(frame, (video_width, video_height))
            slide_pictures.append(cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB))
            first_second_found = True

        if previous_frame is not None:
            diff = cv2.absdiff(previous_frame, gray_frame)
            difference = np.mean(diff)

            if difference > THRESHOLD:
                time_position = frame_count / fps
                original_timestamps.append(time_position)

                resized_frame = cv2.resize(frame, (video_width, video_height))
                slide_pictures.append(cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB))

        previous_frame = gray_frame
        frame_count += 1

    cap.release()
    original_timestamps.append(total_time)

    return original_timestamps, slide_pictures

In [20]:
def delete_repeat_pictures(detect_slide_pictures, original_timestamps):
    results = []
    delete_index = []

    for i in range(len(detect_slide_pictures) - 1):
        ssim_score = compute_ssim(detect_slide_pictures[i], detect_slide_pictures[i + 1])

        if ssim_score > 0.99:
            delete_index.append(i + 1)

    delete_index = sorted(set(delete_index), reverse=True)
    for index in delete_index:
        del detect_slide_pictures[index]
        del original_timestamps[index]

    return detect_slide_pictures, original_timestamps

In [21]:
def Time_group(original_timestamps, slide_pictures):
    time_group = {}
    index = 0
    time_group[index] = [(original_timestamps[0], slide_pictures[0])]

    for i in range(1, len(original_timestamps)-1):
        if original_timestamps[i] - original_timestamps[i - 1] < 2:
          time_group[index].append((original_timestamps[i], slide_pictures[i]))
        else:
          index += 1
          time_group[index] = [(original_timestamps[i], slide_pictures[i])]

    return time_group

In [22]:
def compute_ssim(img1, img2, threshold = 0.9):
    gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
    gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
    score, _ = ssim(gray1, gray2, full=True)
    return score

def SSIM_group(time_group, ssim_threshold):

    ssim_group = {}
    new_index = 0

    for key, images in time_group.items():
        timestamps = [ts for ts, _ in images]
        image_arrays = [img for _, img in images]
        split_indices = []

        for i in range(len(image_arrays) - 1):
            ssim_score = compute_ssim(image_arrays[i], image_arrays[i + 1])

            if ssim_score < ssim_threshold:
                split_indices.append(i + 1)

        if not split_indices:
            ssim_group[new_index] = images
            new_index += 1
        else:

            start = 0
            for split_index in split_indices:
                ssim_group[new_index] = images[start:split_index]
                new_index += 1
                start = split_index

            if start < len(images):
                ssim_group[new_index] = images[start:]
                new_index += 1
    return ssim_group

In [23]:
def silent_segment(ssim_group):
    silent_segments = {}
    original_text_timestamps = []
    slide_pictures = []
    index = 0

    for index in range(len(ssim_group) - 1):
        current_group = ssim_group[index]
        next_group = ssim_group[index + 1]

        duration = next_group[0][0] - current_group[0][0]

        if duration <= 2:

            if index - 1 in silent_segments:
                silent_segments[index - 1][1] = next_group[0][0]
            else:
                silent_segments[index - 1] = [current_group[0][0], next_group[0][0]]
        else:
            original_text_timestamps.append(current_group[0][0])
            slide_pictures.append(current_group[-1][1])

    last_key = max(ssim_group.keys())
    last_group = ssim_group[last_key]

    if len(ssim_group) < 2 or (last_group[0][0] - ssim_group[last_key - 1][0][0]) > 2:
        original_text_timestamps.append(last_group[0][0])
        slide_pictures.append(last_group[-1][1])

    if original_timestamps[-1] not in original_text_timestamps:
        original_text_timestamps.append(original_timestamps[-1])

    return silent_segments, original_text_timestamps, slide_pictures


### 2. Audio Segmentation

In [24]:
def extract_audio_segments(video_path, timestamps, audioSegmentation):

    audio_files = []
    durations = []

    video_clip = VideoFileClip(video_path)

    for i in range(len(timestamps) - 1):
        start_time = timestamps[i]
        end_time = timestamps[i + 1]

        audio_segment = video_clip.audio.subclip(start_time, end_time)
        output_file = f"{audioSegmentation}/original_audio_segment_{i}.wav"

        audio_segment.write_audiofile(output_file, codec='pcm_s16le')
        audio_files.append(output_file)

        durations.append(audio_segment.duration)

    video_clip.close()

    return audio_files, durations


### 3. ASR

In [25]:
def videos_to_texts(audio_files):

    try:
        deepgram = DeepgramClient(f"{deepgrame_key}")
        results = []

        for audio_file in audio_files:
            with open(audio_file, "rb") as file:
                response = deepgram.listen.rest.v("1").transcribe_file({"buffer": file.read()}, {"model": "nova-2", "smart_format": True})

            text = ""
            for channel in json.loads(response.to_json())['results']['channels']:
                text += " ".join([s['text'] for p in channel['alternatives'][0]['paragraphs']['paragraphs'] for s in p['sentences']]) + " "

            word_count = len(text.split())

            audio_clip = AudioFileClip(audio_file)
            duration = audio_clip.duration
            audio_clip.close()

            results.append((text, duration, word_count))

        original_text_segments = [text for text, duration, word_count in results]

        return original_text_segments

    except Exception as e:
        print(f"Exception occurred: {str(e)}")
        return None

### 4. LLM

In [26]:
def LLM_modify_text(original_text_segments):

    original_text_dict = {index: value for index, value in enumerate(original_text_segments)}

    prompt = (f'''
    Please modify the following transcript which comes from a transcript as a whole, with attention to the following requirements:
    1. Correct any grammatical mistakes and mispronunciations;
    2. Keep the total number of segments unchanged;
    3. Ensure that the word count per segment remains approximately the same;
    4. Make as few alterations as necessary;
    5. The content of the revised segments should exactly resemble that of the original;
    6. Format the output as a python dictionary, with double quotes (") instead of single quotes (') for keys and values.

    Transcript:
    {original_text_dict}
    ''')

    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt}
        ]
    )

    revised_transcript = response['choices'][0]['message']['content']

    parsed_data = json.loads(revised_transcript)
    polished_text_segments = list(parsed_data.values())

    return polished_text_segments


### 5. Voice Clone

In [27]:
def voice_clone(input_video):

    url = "https://api.play.ht/api/v2/cloned-voices/instant"

    files = { "sample_file": (f"{input_video}", open(f"{input_video}", "rb"), "audio/mpeg") }
    # The audio file selected as the source for the voice clone should have a duration ranging from 2 seconds to 1 hour.
    # It can be in any audio format,
    # as long as it falls within the size range of 5kb to 50MB.
    payload = { "voice_name": "Cloned_Voice" }
    headers = {
        "accept": "application/json",
        "AUTHORIZATION": f"{playht_key}",
        "X-USER-ID": f"{playht_id}"
    }

    response = requests.post(url, data=payload, files=files, headers=headers)

    response_dict = json.loads(response.text)
    voiceID = response_dict['id']

    return voiceID

### 6. TTS

In [28]:

def text_to_url(polish_sentence, speed, voiceID, UserID, UserKey):

    if not polish_sentence.strip():
        return None

    sentence = polish_sentence
    url = "https://api.play.ht/api/v2/tts"
    headers = {
        "accept": "text/event-stream",
        "content-type": "application/json",
        "AUTHORIZATION": UserKey,
        "X-USER-ID": UserID
    }

    payload = {
        "text": sentence,
        "voice": voiceID,
        "output_format": "wav",
        "voice_engine": "PlayHT2.0",
        "temperature": 0.1,
        "seed": 1,
        "speed": speed,
        "voice_guidance": 1,
        "style_guidance": 1,
        "sample_rate": 24000
    }

    response = requests.post(url, json=payload, headers=headers)

    for line in response.text.splitlines():
        if line.startswith('data: '):
            data = line[len('data: '):]
            try:
                json_data = json.loads(data)
                if json_data.get('stage') == 'complete':
                    URL = json_data.get('url')


            except json.JSONDecodeError:
                continue
    return URL

In [29]:
def blocks_to_urls(polish_blocks, voiceID, UserID, UserKey, speed):

    URLs = []
    for block in polish_blocks:
        URL = text_to_url(block, speed, voiceID, UserID, UserKey)
        URLs.append(URL)

    return URLs

### 7. Align Video and Audio

In [30]:
def process_audio_segments(urls, original_audio_durations, silent_segments):
    final_audio_fixed = AudioSegment.empty()
    audio_durations = []

    for key, (start_time, end_time) in silent_segments.items():
        if key < 1:
            time_diff = abs(end_time - start_time)
            if time_diff <= 2:
                time_diff = time_diff + 1

            silence_duration = time_diff * 1000
            silence_audio = AudioSegment.silent(duration=silence_duration)
            final_audio_fixed += silence_audio

    for index, url in enumerate(urls):

        final_audio_fixed += AudioSegment.silent(duration=1000)
        if url is None:

            silence_duration = original_audio_durations[index] * 1000
            silence_audio = AudioSegment.silent(duration=silence_duration)
            final_audio_fixed += silence_audio
            audio_durations.append(silence_duration / 1000)
        else:
            response = requests.get(url)
            audio = AudioSegment.from_file(BytesIO(response.content), format="wav")

            audio_duration = audio.duration_seconds
            audio_durations.append(audio_duration)

            final_audio_fixed += audio

            if index == len(urls) - 1:
                final_audio_fixed += AudioSegment.silent(duration=1000)

        i = index
        if i in silent_segments:
            start_time, end_time = silent_segments[i]
            time_diff = abs(end_time - start_time)

            if time_diff <= 2:
              time_diff = time_diff + 1

            silence_duration = time_diff * 1000
            silence_audio = AudioSegment.silent(duration=silence_duration)
            final_audio_fixed += silence_audio

    modified_audio = final_audio_fixed
    total_duration = modified_audio.duration_seconds

    for idx, audio_length in enumerate(audio_durations):
        print(f"Duration of audio {idx + 1} is {audio_length} seconds")

    print(f"Total duration of the concatenated audio is {total_duration} seconds")

    return modified_audio, total_duration, audio_durations


In [31]:
def process_video_segments(video_path, original_timestamps, final_audio_durations, slide_pictures, silent_segments):

    video = VideoFileClip(video_path)
    original_video_segments = [video.subclip(original_timestamps[i], int(original_timestamps[i + 1])) for i in range(len(original_timestamps) - 1)]
    final_audio_durations = [duration + 1 for duration in final_audio_durations]

    polished_video_segments = []
    for key, (start_time, end_time) in silent_segments.items():
        if key < 1:
            time_diff = abs(end_time - start_time)
            if time_diff <= 2:
                end_time = end_time - 1
            silent_video_segment = video.subclip(start_time, end_time).without_audio()
            polished_video_segments.append(silent_video_segment)

    for i, video_segment in enumerate(original_video_segments):
        original_duration = video_segment.duration

        if i == len(original_video_segments) - 1:
            target_duration = final_audio_durations[i] + 1
        else:
            target_duration = final_audio_durations[i]

        if original_duration > target_duration:
            trimmed_segment = video_segment.subclip(0, target_duration).without_audio()
            polished_segment = trimmed_segment
        else:
            slide_image = slide_pictures[i]
            slide_image_bgr = cv2.cvtColor(slide_image, cv2.COLOR_RGB2BGR)
            slide_clip = ImageClip(slide_image).set_duration(target_duration - original_duration)
            slide_clip = slide_clip.resize(height=video_segment.h)
            extended_segment = concatenate_videoclips([video_segment, slide_clip])
            polished_segment = extended_segment

        if i in silent_segments:
            start_time, end_time = silent_segments[i]
            time_diff = abs(end_time - start_time)

            if time_diff <= 2:
                start_time = max(0, start_time - 1)  # Ensure start_time doesn't go negative

            silent_video_segment = video.subclip(start_time, end_time).without_audio()
            extended_silent_segment = concatenate_videoclips([polished_segment, silent_video_segment])
        else:
            extended_silent_segment = polished_segment

        polished_video_segments.append(extended_silent_segment)

    return polished_video_segments

In [32]:
def save_final_video(polished_video_segments, final_audio, final_video_path, audioSegmentation):

    final_video = concatenate_videoclips(polished_video_segments)
    temp_audio_file = f"{directory_path}/temporary/final_audio.wav"
    final_audio.export(temp_audio_file, format="wav")

    final_audio_clip = AudioFileClip(temp_audio_file)
    final_video = final_video.set_audio(final_audio_clip)
    final_video.write_videofile(final_video_path, codec="libx264", audio_codec="aac", fps=24)

    os.remove(temp_audio_file)

    print("Final video saved successfully!")

## Input information

In [232]:
playht_key = 'put your api information here'
playht_id = 'put your api information here'

deepgrame_key = 'put your api information here'
openai.api_key = 'put your api information here'
directory_path = 'temporal file will be save here when running the code'
input_video = 'input video path'
output_video = 'output video path'

## Running code

In [None]:
speed = 0.85
slides_detect_threshold = 0.2
ssim_threshold = 0.9

# 1. Video Frame Recognition
original_timestamps, detect_slide_pictures = extract_slide_changes(input_video, slides_detect_threshold)
detect_slide_pictures, original_timestamps = delete_repeat_pictures(detect_slide_pictures, original_timestamps)
time_group = Time_group(original_timestamps, detect_slide_pictures)
ssim_group = SSIM_group(time_group, ssim_threshold)
silent_segments, original_text_timestamps, final_slide_pictures = silent_segment(ssim_group)

# 2. Audio Segmentation
original_audio_segments, original_audio_durations = extract_audio_segments(input_video, original_text_timestamps, directory_path)

# 3. ASR
original_text_segments = videos_to_texts(original_audio_segments)

# 4. LLM
polished_text_segments = LLM_modify_text(original_text_segments)

# 5. Voice Clone
voiceID = voice_clone(input_video)

# 6. TTS
URLs = blocks_to_urls(polished_text_segments, voiceID, playht_id, playht_key, speed)

# 7. Align Video and Audio
final_audio, final_total_duration, final_audio_durations = process_audio_segments(URLs, original_audio_durations, silent_segments)
polished_video_segments = process_video_segments(input_video, original_text_timestamps, final_audio_durations, final_slide_pictures, silent_segments)
save_final_video(polished_video_segments, final_audio, output_video, directory_path)