In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install pydub SpeechRecognition
!pip install requests openai
!pip install moviepy ffmpeg

In [None]:
# Install the Deepgram Python SDK
# https://github.com/deepgram/deepgram-python-sdk

!pip install deepgram-sdk

# Install python-dotenv to protect API key

!pip install python-dotenv

In [None]:
import speech_recognition as sr
from pydub import AudioSegment
from pydub.silence import split_on_silence
from moviepy.editor import VideoFileClip, AudioFileClip
from dotenv import load_dotenv
from deepgram import DeepgramClient, PrerecordedOptions, FileSource
from scipy import stats
import matplotlib.pyplot as plt
from datetime import date, datetime
import requests
import textwrap
import json
import os
import math
import numpy as np
import re
import openai
import cvxpy as cp

In [None]:
base_directory = '/content/drive/MyDrive/'
video_sample = 'S1'

In [None]:
playHT_API_USER = "?"
playHT_API_KEY = "?"

openai_api_key = "?"

Deepgram_user = "?"

---
================================================
# Pipeline
================================================


# 1. Extract audio from the video



In [None]:
video_filename = f"/content/drive/MyDrive/{video_sample}_original.mp4"
# Load the video file
video_clip = VideoFileClip(video_filename)

audio_clip = video_clip.audio
audio_clip.write_audiofile(f"{base_directory}{video_sample}.wav")

# Close the clips to release resources
video_clip.close()
audio_clip.close()

# 2. Voice clone

* Upload the original audio to playht to generate a voice id for future voice clone (test to speech)
* Do not need for every time, suggest to put this to first step

  https://play.ht/studio/api-access

In [None]:
# create voice

import requests

url = "https://api.play.ht/api/v2/cloned-voices/instant"

files = { "sample_file": (f"{base_directory}{video_sample}.wav", open(f"{base_directory}{video_sample}.wav", "rb"), "audio/wav") }
payload = { "voice_name": f'{video_sample}' }
headers = {
    "accept": "application/json",
    "AUTHORIZATION": f'{playHT_API_KEY}',
    "X-USER-ID": f'{playHT_API_USER}'
}

response = requests.post(url, data=payload, files=files, headers=headers)
print(response.text)
voice_url = json.loads(response.text)['id']
print(voice_url)

# 3. audio cut of original audio if neccessary

In [None]:
# # Load original audio
# audio = AudioSegment.from_file(f"{base_directory}{video_sample}.wav")

# # cutpoint
# cut_point1 = 60000  #60000 as 1 minute
# cut_point2 = 314000  #314000 as 5:14

# # cut audio
# part_temp1 = audio[:cut_point1]
# part_temp2 = audio[:cut_point2]

# # export
# part_temp1.export(f"{base_directory}{video_sample}_1_min.wav", format="wav")
# part_temp2.export(f"{base_directory}{video_sample}_5_min.wav", format="wav")

# 4. Audio to text

 ---

  * Deepgram Nova-2

  ---
  * Whisper via Deepgram
    - Model selection: https://developers.deepgram.com/docs/model
    - Speech2text: https://developers.deepgram.com/docs/getting-started-with-pre-recorded-audio

In [None]:
from dotenv import load_dotenv
from deepgram import DeepgramClient, PrerecordedOptions, FileSource

load_dotenv()
AUDIO_FILE = f"{base_directory}{video_sample}.wav"


def main():
    try:
        deepgram = DeepgramClient(f"{Deepgram_user}")

        with open(AUDIO_FILE, "rb") as file:
            buffer_data = file.read()

        payload: FileSource = {
            "buffer": buffer_data,
        }

        options = PrerecordedOptions(
            model="nova-2", #nova-2 #whisper-large
            smart_format=True,
            punctuate=True,
            diarize=True,
            utt_split= 0.8

        )

        response = deepgram.listen.rest.v("1").transcribe_file(payload, options)

        # Convert response to JSON format
        response_json = response.to_json(indent=4)

        # Print the JSON output to the console
        print(response_json)

        # Save the JSON output to a file
        with open(f"{base_directory}transcription_output_nova_2_{video_sample}.json", 'w') as json_file:
            json_file.write(response_json)

    except Exception as e:
        print(f"Exception: {e}")

if __name__ == "__main__":
    main()


---
# 5. Original audio timestamp

---
script and timestamp from DeepGram

## $\color{purple}{\text {This is slicing based on sentence}}$


In [None]:
# Factors setting

speed_factor = 2.62  # Scaling factor used in the speed calculation for playHT speed rate


In [None]:
def main():

    with open(f"{base_directory}transcription_output_nova_2_{video_sample}.json", 'r') as file:
        data = json.load(file)

    full_script = ""
    segment_dictionary = {}
    segment_durations = {}
    pauses_between_segments = {}
    segment_speed_dictionary = {}
    first_segment_start_time = None
    last_segment_end_time = None
    total_duration = 0
    segment_length = 0
    segment_counter = 1
    pause_counter = 1
    last_end = 0

    channels = data['results']['channels']

    for channel in channels:
        words = channel['alternatives'][0]['words']
        current_words = []
        segment_start_time = None

        for i, word_info in enumerate(words):
            current_word = word_info['punctuated_word']
            word_start = word_info['start']
            word_end = word_info['end']

            if segment_start_time is None:
                segment_start_time = word_start  # Initialize segment start time

            current_words.append(current_word)

            # Calculate word gap
            if i + 1 < len(words):
                next_word_start = words[i + 1]['start']
                word_gap = next_word_start - word_end
            else:
                word_gap = 0  # Last word in the list

            # Determine if the current segment should end
            should_end_segment = False

            if current_word.endswith(('!', '?', '.', '。', '！', '？')):
                should_end_segment = True

            if should_end_segment:
                current_segment = " ".join(current_words)
                current_words = []  # Reset for the next segment

                if first_segment_start_time is None:
                    first_segment_start_time = segment_start_time

                # Update script and segment details
                full_script += current_segment + " "
                segment_duration = word_end - segment_start_time
                segment_durations[f"S{segment_counter}"] = segment_duration
                segment_dictionary[f"S{segment_counter}"] = current_segment
                total_duration += segment_duration

                # Calculate segment speed
                segment_length = len(current_segment.rstrip().split())
                segment_speed = segment_length / (speed_factor * segment_duration)
                segment_speed_dictionary[f"S{segment_counter}"] = segment_speed

                # Calculate pause between segments
                if last_end > 0 and segment_start_time >= last_end:
                    pause_duration = segment_start_time - last_end
                    pauses_between_segments[f"P{pause_counter}"] = pause_duration
                    pause_counter += 1

                segment_counter += 1
                segment_start_time = None  # Reset for the next segment
                last_end = word_end

        # Process any remaining words as the last segment
        if current_words:
            current_segment = " ".join(current_words)

            if first_segment_start_time is None:
                first_segment_start_time = segment_start_time

            full_script += current_segment + " "
            segment_duration = word_end - segment_start_time
            segment_durations[f"S{segment_counter}"] = segment_duration
            segment_dictionary[f"S{segment_counter}"] = current_segment
            total_duration += segment_duration

            segment_length = len(current_segment.rstrip().split())
            segment_speed = segment_length / (speed_factor * segment_duration)
            segment_speed_dictionary[f"S{segment_counter}"] = segment_speed

            if last_end > 0 and segment_start_time >= last_end:
                pause_duration = segment_start_time - last_end
                pauses_between_segments[f"P{pause_counter}"] = pause_duration
                pause_counter += 1

            segment_counter += 1
            last_end = word_end

    word_count = len(full_script.rstrip().split())
    last_segment_end_time = last_end

    # Calculate estimated speech rate
    if total_duration > 0:
        estimate_speed = word_count / (speed_factor * total_duration)
    else:
        estimate_speed = 0  # Prevent division by zero

    return full_script, segment_dictionary, segment_durations, segment_speed_dictionary, pauses_between_segments, first_segment_start_time, last_segment_end_time, total_duration, word_count, estimate_speed

In [None]:
if __name__ == "__main__":
    results = main()
    full_script, segment_dictionary, segment_durations, segment_speed_dictionary, pauses_between_segments, first_segment_start_time, last_segment_end_time, total_duration, word_count, estimate_speed = results

    # print("Refined Script:")
    # print(full_script)
    print("\nSegment Content:")
    for key, value in segment_dictionary.items():
        print(f"{key}: {value}")
    print("\nSegment Durations (seconds):")
    for key, value in segment_durations.items():
        print(f"{key}: {value:.2f}")
    # print("\nSegment Speed:")
    # for key, value in segment_speed_dictionary.items():
    #     print(f"{key}: {value:.2f}")
    print("\nPauses Between Segments (seconds):")
    for key, value in pauses_between_segments.items():
        print(f"{key}: {value:.2f}")
    print(f"\nStart time of the first segment:\n{first_segment_start_time} seconds")
    print(f"\nEnd time of the last segment:\n{last_segment_end_time} seconds")
    print(f"\nTotal duration of all segments:\n{total_duration:.2f} seconds")
    print(f"\nTotal word count of the script:\n{word_count} words")
    print(f"\nEstimated Speech Speed:\n{estimate_speed:.2f} words/({speed_factor} * second)")

In [None]:
adjust_speed = 0
if estimate_speed < 0.8:
   adjust_speed = 0.8
elif estimate_speed < 1.0 and estimate_speed >= 0.8:
   adjust_speed = round(estimate_speed,2)
elif estimate_speed >= 1.0:
   adjust_speed = 1.0

adjust_speed

In [None]:
segment_adjust_word_count_dictionary = segment_durations.copy()

for key in segment_adjust_word_count_dictionary:
    segment_content = segment_dictionary[key]

    original_length = len(segment_content.split())
    polished_length = round(speed_factor * adjust_speed * segment_durations[key])

    # print(key, original_length, polished_length)

    # Apply the condition to adjust the word count
    if abs(original_length - polished_length) < 3 or original_length < 6:
        segment_adjust_word_count_dictionary[key] = original_length
    else:
        segment_adjust_word_count_dictionary[key] = polished_length
        print(key," -- now:original ",segment_adjust_word_count_dictionary[key]," ", original_length)


print("Adjusted Segment Word Counts:")
print(segment_adjust_word_count_dictionary)

In [None]:
segment_adjust_word_count_dictionary

In [None]:
segment_dictionary

---
# 6. OpenAI prompt  


In [None]:
%%capture
!pip install openai==1.55.3 httpx==0.27.2 --force-reinstall --quiet

In [None]:
import os
os.kill(os.getpid(), 9)

In [None]:
from openai import OpenAI

client = OpenAI(api_key=f'{openai_api_key}')

def chat(prompt):
    completion = client.chat.completions.create(
        model="gpt-4o",#gpt-3.5-turbo	#gpt-4o #gpt-4
        messages=[
            {
                "role": "user",
                "content": prompt
            }
        ]
    )
    return completion.choices[0].message.content


# Define the prompt
prompt = '''Please update the `segment_dictionary` using the complete transcript, following these criteria, especially No. 2 criteria:

1. Correct grammatical errors, proper nouns that may have been mispronounced in each segment.
2. Adjust each segment legnth to meet the exact word count number specified in the `segment_adjust_word_count_dictionary` **(important)**.
3. Retain the original total number of segments.
4. Ensure each revised segment corresponds exactly to its original; do not mix content between segments.
5. Make the script make sense as a whole, segments should form a coherent script, while polishing each segment individually.
6. Minimize changes to the original text.
7. Exam again to ensure that each segment legnth to meet the exact word count number specified in the `segment_adjust_word_count_dictionary` **(important)**.
8. Format the output as a Python dictionary using double quotes (`"`) for keys and values.
9. Only output the updated dictionary.'''


revised_text = chat(f"{prompt}\n\nsegment_dictionary: {segment_dictionary}\n\nsegment_adjust_word_count_dictionary: {segment_adjust_word_count_dictionary}")
clean_string = revised_text.replace("```python\n", "").replace("```", "").replace("\n    ", "").replace("\n", "").strip()

try:
    revised_segment_dictionary = json.loads(clean_string)
    print(revised_segment_dictionary)
except json.JSONDecodeError as e:
    print("Error parsing JSON:", e)
    print("Received string:", clean_string)


#  !!! Compare the revised script with the original one !!!

---

In [None]:
original_word_count = sum(len(value.split()) for value in segment_dictionary.values())
print("Original_word_count:", original_word_count)

punctuation_marks = ".?!"
sentence_count = sum(len(re.findall(f"[{re.escape(punctuation_marks)}]", value)) for value in segment_dictionary.values())
print("Total original sentence counts:", sentence_count)

revised_word_count = sum(len(value.split()) for value in revised_segment_dictionary.values())
print("Revised_word_count:", revised_word_count)

punctuation_marks = ".?!"
sentence_count = sum(len(re.findall(f"[{re.escape(punctuation_marks)}]", value)) for value in revised_segment_dictionary.values())
print("Total revised sentence counts:", sentence_count)


In [None]:
for key in segment_dictionary:
    if key in revised_segment_dictionary:
        original_len = len(segment_dictionary[key].split())
        revised_len = len(revised_segment_dictionary[key].split())


        if original_len != revised_len:
            print(f"Length of '{key}' in original: {original_len}")
            print(f"Length of '{key}' in revised: {revised_len}")
            print(f"\nThe original segment:{segment_dictionary[key]}\nThe revised segment :{revised_segment_dictionary[key]}\n")

    else:
        print(f"Key '{key}' not found in revised_segment_dictionary.")


In [None]:
for key, value in revised_segment_dictionary.items():
    if value[0].islower():
        revised_segment_dictionary[key] = ' ' + value[0].upper() + value[1:]

    value = revised_segment_dictionary[key].strip()
    if value.endswith(','):
        revised_segment_dictionary[key] = value[:-1] + '.'
    elif not value.endswith('.') and not value.endswith('!') and not value.endswith('?') and not value.endswith('...'):
        revised_segment_dictionary[key] = value + '.'
revised_segment_dictionary

In [None]:
date = date.today().strftime("_%Y%m%d")
filename = f'{base_directory}{video_sample}_revised_segment_dictionary{date}.json'

with open(filename, 'w') as file:
    json.dump(revised_segment_dictionary, file, indent=4)

# print(f'Dictionary saved as {filename}')

filename = f'{base_directory}{video_sample}_segment_dictionary{date}.json'

with open(filename, 'w') as file:
    json.dump(segment_dictionary, file, indent=4)

In [None]:
with open(f'{base_directory}{video_sample}_revised_segment_dictionary{date}.json', 'r') as file:
    revised_segment_dictionary = json.load(file)
    print(revised_segment_dictionary)

with open(f'{base_directory}{video_sample}_segment_dictionary{date}.json', 'r') as file:
    segment_dictionary = json.load(file)
    print(segment_dictionary)

In [None]:
revised_script = ""

for key in revised_segment_dictionary:
    revised_script += revised_segment_dictionary[key] + " "
print(revised_script)
len(revised_script),len(full_script)

# 7. TTS

In [None]:
adjust_speed

In [None]:
# TTS API endpoint
url = "https://api.play.ht/api/v2/tts"

# Open a file to save the responses
with open(f"{base_directory}tts_responses_{video_sample}_adj_speed_{adjust_speed:.1f}.txt", 'w') as file:
    # Loop through each segment and send it to the TTS service
    for key, value in revised_segment_dictionary.items():
        content = value

        if content and not content.endswith(('.', ',', '?', '!', '...')):
            content += '.'  # Ensure each segment ends with a period if not already punctuated
        payload = {
            "text": content,
            "voice": f"{voice_url}",
            "output_format": "wav",
            "voice_engine": "PlayHT2.0",
            "voice_guidance": 1,
            "quality": "premium",
            # "sample_rate": 48000,
            "speed": adjust_speed
        }
        headers = {
            "accept": "text/event-stream",
            "content-type": "application/json",
            "AUTHORIZATION": f"{playHT_API_KEY}",
            "X-USER-ID": f"{playHT_API_USER}"
        }

        response = requests.post(url, json=payload, headers=headers)
        # Write the response text to the file
        file.write(response.text + '\n')

print(f"All responses have been saved to {base_directory}tts_responses_{video_sample}_adj_speed_{adjust_speed:.1f}.txt.")


  ---
  * Parse JSON from playHt to get audio url and duration

** !!! Can not use duration from playHt response! Wrong number !!! **

In [None]:
# Path to the response file
response_file = f"{base_directory}tts_responses_{video_sample}_adj_speed_{adjust_speed:.1f}.txt"

# Initialize dictionaries to store the URLs and durations with labels
audio_urls = {}
audio_durations_ht = {}

# Read and process the file
with open(response_file, 'r') as file:
    voice_clip_counter = 1  # Initialize counter for voice clips
    for line in file:
        if 'event: completed' in line:
            # The next line contains the JSON data we need
            data_line = next(file)
            # Extract the part after 'data: '
            json_str = data_line.split('data: ')[1].strip()
            # Parse the JSON data
            data = json.loads(json_str)
            # Extract URL and duration
            url = data['url']
            duration = data['duration']
            # Store data in dictionaries with labels
            audio_urls[f"VC{voice_clip_counter}"] = url
            audio_durations_ht[f"VC{voice_clip_counter}"] = duration
            voice_clip_counter += 1

# Output the results with labels
print("Audio URLs:")
for label, url in audio_urls.items():
    print(f"{label}: {url}")

# print("\nAudio Durations (seconds):")
# for label, duration in audio_durations_ht.items():
#     print(f"{label}: {duration} seconds")


---
# Download audio

In [None]:
# Function to download an audio file from a URL
def download_audio(url, save_path):
    try:
        response = requests.get(url)
        response.raise_for_status()  # Raises an HTTPError for bad responses
        with open(save_path, 'wb') as f:
            f.write(response.content)
        print(f"Audio downloaded successfully: {save_path}")
        return True
    except requests.RequestException as e:
        print(f"Failed to download audio: {e}")
        return False

# Function to calculate duration of an audio file
def get_audio_duration(file_path):
    try:
        audio = AudioSegment.from_file(file_path)
        duration = len(audio) / 1000.0  # Convert from ms to seconds
        return duration
    except Exception as e:
        print(f"Failed to load audio for duration calculation: {e}")
        return None


# Dictionary to store the durations
audio_durations = {}
combined_audio = AudioSegment.silent(duration=0)  # Initialize an empty audio segment

# Loop through each audio URL entry and download it
for label, url in audio_urls.items():
    save_path = os.path.join(base_directory, f"draft/{video_sample}_{label}.wav")
    if download_audio(url, save_path):
        audio_segment = AudioSegment.from_file(save_path)
        duration = len(audio_segment) / 1000.0
        audio_durations[label] = duration
        print(f"Duration of {label}: {duration} seconds")
        combined_audio += audio_segment

# Save the combined audio
combined_save_path = os.path.join(base_directory, f"draft/{video_sample}_adj_speed_{adjust_speed:.1f}_combined_audio_segments.wav")
combined_audio.export(combined_save_path, format="wav")

print(f"Combined audio saved to: {combined_save_path}")


---
# 8. Duration alignment

In [None]:
# Number of segments
N = len(segment_durations)
min_pause = 0.05

# Sort the segments and VCs to ensure consistent ordering
segment_keys = ['S{}'.format(i) for i in range(1, N+1)]
vc_keys = ['VC{}'.format(i) for i in range(1, N+1)]
pause_keys = ['P{}'.format(i) for i in range(1, N)]

# Extract durations in order
D_s = [segment_durations[key] for key in segment_keys]
D_vc = [audio_durations[key] for key in vc_keys]
D_p = [pauses_between_segments[key] for key in pause_keys]

# Compute start times of original segments
T_s = [0]  # Start time of first segment is 0
# T_s = [float(first_segment_start_time)]
for i in range(1, N):
    T_s.append(T_s[i-1] + D_s[i-1] + D_p[i-1])

# Compute midpoints of original segments
M_s = [T_s[i] + D_s[i]/2 for i in range(N)]

# Set up optimization variables
T_vc = cp.Variable(N)  # Start times of synthesized VCs

# Compute midpoints of synthesized VCs
M_vc = T_vc + np.array(D_vc)/2

# Compute pauses between synthesized VCs
P_vc = T_vc[1:] - (T_vc[:-1] + D_vc[:-1])

# Objective function: minimize sum of squared differences between midpoints and between pauses
# Weight factor for pauses term

w = 1

objective = cp.Minimize(cp.sum_squares(M_s - M_vc) + w * cp.sum_squares(P_vc - D_p))

# Constraints:
constraints = []

# Non-overlapping constraints
for i in range(N-1):
    constraints.append(T_vc[i+1] - (T_vc[i] + D_vc[i]) >= 0)

# Start times non-negative
constraints += [T_vc >= 0]
constraints += [P_vc >= min_pause]

# Form and solve the problem
prob = cp.Problem(objective, constraints)
prob.solve()

# Print the results
print("Optimal start times for synthesized VCs:")
for i in range(N):
    print(f"{vc_keys[i]}: {T_vc.value[i]:.6f}")

# Differences between midpoints after alignment
print("\nDifferences between midpoints after alignment:")
for i in range(N):
    diff_mid = M_s[i] - (T_vc.value[i] + D_vc[i]/2)
    print(f"Segment {i+1} (S{i+1} vs {vc_keys[i]}): {diff_mid:.6f} seconds")

# Differences between pauses after alignment
print("\nDifferences between pauses after alignment:")
P_vc_values = T_vc.value[1:] - (T_vc.value[:-1] + D_vc[:-1])
for i in range(N-1):
    diff_pause = D_p[i] - P_vc_values[i]
    print(f"Pause {i+1} (P{i+1}): {diff_pause:.6f} seconds")

In [None]:
from scipy import stats
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde

def analyze_alignment_quality(M_s, T_vc, D_vc, D_p, P_vc_values):
    """Analyze alignment quality between original segments and synthesized VCs."""
    M_vc = T_vc + np.array(D_vc)/2
    midpoint_diffs = np.array(M_s) - M_vc
    pause_diffs = np.array(D_p) - P_vc_values

    stats_data = calculate_statistics(midpoint_diffs, pause_diffs, M_s, M_vc)
    quality_assessment = assess_quality(stats_data)
    create_visualizations(midpoint_diffs, pause_diffs, M_s, M_vc, quality_assessment)

    return {**stats_data, 'quality_assessment': quality_assessment}

def calculate_statistics(midpoint_diffs, pause_diffs, M_s, M_vc):
    """Calculate statistical measures for alignment analysis."""
    def get_stats(diffs):
        return {
            'mean_abs_error': np.mean(np.abs(diffs)),
            'median_abs_error': np.median(np.abs(diffs)),
            'std_dev': np.std(diffs),
            'max_deviation': np.max(np.abs(diffs)),
            'rmse': np.sqrt(np.mean(np.square(diffs))),
            'percentiles': {
                '90th': np.percentile(np.abs(diffs), 90),
                '95th': np.percentile(np.abs(diffs), 95)
            }
        }

    return {
        'midpoint_statistics': get_stats(midpoint_diffs),
        'pause_statistics': get_stats(pause_diffs),
        'timing_correlation': stats.pearsonr(M_s, M_vc)[0],
        'total_alignment_error': np.sum(np.square(midpoint_diffs)) + np.sum(np.square(pause_diffs)),
        'average_timing_deviation': np.mean(np.abs(midpoint_diffs)),
        'significant_deviations': {
            'midpoints': np.sum(np.abs(midpoint_diffs) > 0.5),
            'pauses': np.sum(np.abs(pause_diffs) > 0.5)
        }
    }

def assess_quality(stats_data):
    """Assess alignment quality based on thresholds."""
    thresholds = {
        'midpoint': {'good': 0.2, 'acceptable': 0.5},
        'pause': {'good': 0.3, 'acceptable': 0.7},
        'correlation': {'good': 0.95, 'acceptable': 0.9},
        'significant_deviations': {'good': 0.1, 'acceptable': 0.2}
    }

    quality_levels = {'good': 3, 'acceptable': 2, 'poor': 1}
    assessment = {}

    # Assess individual metrics
    assessment['midpoint_quality'] = get_quality_level(
        stats_data['midpoint_statistics']['mean_abs_error'],
        thresholds['midpoint'])

    assessment['pause_quality'] = get_quality_level(
        stats_data['pause_statistics']['mean_abs_error'],
        thresholds['pause'])

    assessment['correlation_quality'] = get_quality_level(
        stats_data['timing_correlation'],
        thresholds['correlation'],
        reverse=True)

    total_deviations = sum(stats_data['significant_deviations'].values())
    total_possible = stats_data['significant_deviations']['midpoints'] * 2 - 1
    deviation_ratio = total_deviations / total_possible

    assessment['deviation_quality'] = get_quality_level(
        deviation_ratio,
        thresholds['significant_deviations'])

    # Calculate overall quality
    total_score = sum(quality_levels[v] for v in assessment.values())
    max_score = len(assessment) * 3

    assessment['overall'] = (
        'good' if total_score >= max_score * 0.8 else
        'acceptable' if total_score >= max_score * 0.6 else
        'poor'
    )

    return assessment

def get_quality_level(value, thresholds, reverse=False):
    """Helper function to determine quality level."""
    if reverse:
        return ('good' if value >= thresholds['good'] else
                'acceptable' if value >= thresholds['acceptable'] else
                'poor')
    return ('good' if value <= thresholds['good'] else
            'acceptable' if value <= thresholds['acceptable'] else
            'poor')

def create_visualizations(midpoint_diffs, pause_diffs, M_s, M_vc, quality_assessment):
    """Create visualization plots for alignment analysis."""
    plt.rcParams.update({'figure.figsize': [15, 10], 'axes.grid': True, 'grid.alpha': 0.3})
    fig, axs = plt.subplots(2, 2)

    def plot_distribution(ax, data, title, quality_key):
        ax.hist(data, bins=20, density=True, alpha=0.7, color='blue')
        kde = gaussian_kde(data)
        x_range = np.linspace(min(data), max(data), 100)
        ax.plot(x_range, kde(x_range), 'r-', lw=2)
        ax.set_title(title)
        ax.set_xlabel('Time Difference (seconds)')
        ax.set_ylabel('Density')
        ax.text(0.05, 0.95, f"Quality: {quality_assessment[quality_key]}",
                transform=ax.transAxes, bbox=dict(facecolor='white', alpha=0.8))

    # Plot distributions
    plot_distribution(axs[0, 0], midpoint_diffs, 'Distribution of Midpoint Differences', 'midpoint_quality')
    plot_distribution(axs[0, 1], pause_diffs, 'Distribution of Pause Differences', 'pause_quality')

    # Scatter plot
    axs[1, 0].scatter(M_s, M_vc, alpha=0.6, color='blue')
    min_val, max_val = min(min(M_s), min(M_vc)), max(max(M_s), max(M_vc))
    axs[1, 0].plot([min_val, max_val], [min_val, max_val], 'r--', label='Perfect alignment')
    axs[1, 0].set_title('Original vs. Synthesized Midpoints')
    axs[1, 0].set_xlabel('Original Midpoints (seconds)')
    axs[1, 0].set_ylabel('Synthesized Midpoints (seconds)')
    axs[1, 0].legend()
    axs[1, 0].text(0.05, 0.95, f"Quality: {quality_assessment['correlation_quality']}",
                   transform=axs[1, 0].transAxes, bbox=dict(facecolor='white', alpha=0.8))

    # Timeline plot
    x_range = np.arange(len(midpoint_diffs))
    axs[1, 1].plot(x_range, midpoint_diffs, 'b-', label='Midpoint differences', alpha=0.7)
    axs[1, 1].plot(x_range[:-1], pause_diffs, 'g-', label='Pause differences', alpha=0.7)
    axs[1, 1].axhline(y=0, color='r', linestyle='--', alpha=0.5)
    axs[1, 1].set_title('Timeline of Timing Differences')
    axs[1, 1].set_xlabel('Segment Number')
    axs[1, 1].set_ylabel('Time Difference (seconds)')
    axs[1, 1].legend()
    axs[1, 1].text(0.05, 0.95, f"Overall Quality: {quality_assessment['overall']}",
                   transform=axs[1, 1].transAxes, bbox=dict(facecolor='white', alpha=0.8))

    plt.tight_layout()
    plt.show()

def print_results(results):
    """Print detailed analysis results."""
    print("\nAlignment Quality Analysis:")

    print("\nMidpoint Statistics:")
    for key, value in results['midpoint_statistics'].items():
        if key != 'percentiles':
            print(f"{key.replace('_', ' ').title()}: {value:.3f} seconds")

    print("\nPause Statistics:")
    for key, value in results['pause_statistics'].items():
        if key != 'percentiles':
            print(f"{key.replace('_', ' ').title()}: {value:.3f} seconds")

    print("\nOverall Metrics:")
    print(f"Timing Correlation: {results['timing_correlation']:.3f}")
    print(f"Total Alignment Error: {results['total_alignment_error']:.3f}")
    print(f"Average Timing Deviation: {results['average_timing_deviation']:.3f} seconds")
    print(f"Number of Significant Midpoint Deviations (>0.5s): {results['significant_deviations']['midpoints']}")
    print(f"Number of Significant Pause Deviations (>0.5s): {results['significant_deviations']['pauses']}")

    print("\nQuality Assessment:")
    for aspect, quality in results['quality_assessment'].items():
        print(f"{aspect.replace('_', ' ').title()}: {quality}")

In [None]:
results = analyze_alignment_quality(M_s, T_vc.value, D_vc, D_p, P_vc_values)
print_results(results)

In [None]:
P_vc_values

## Combine generated audio pieces together

In [None]:
# Ensure T_vc is a numpy array
T_vc_values = T_vc.value.flatten()

# Paths to the audio files
N = len(T_vc_values)
tag = "_deepgram"
audio_files = [f"{base_directory}draft/{video_sample}_VC{i}.wav".format(i) for i in range(1, N+1)]
original_audio_path = f"{base_directory}{video_sample}.wav"
original_audio = AudioSegment.from_file(original_audio_path)


# Load all audio segments
vc_segments = []
for i, file in enumerate(audio_files):
    try:
        # Load the audio file
        audio = AudioSegment.from_wav(file)
        vc_segments.append(audio)
    except FileNotFoundError:
        print(f"Audio file {file} not found.")
        exit(1)

In [None]:
# Initialize the combined audio with the first segment
combined_audio = vc_segments[0]
current_time = T_vc_values[0]

for i in range(1, N):
    prev_segment = vc_segments[i - 1]
    current_segment = vc_segments[i]

    # Calculate the expected start time of the current segment
    expected_start_time = T_vc_values[i]
    prev_end_time = T_vc_values[i - 1] + D_vc[i - 1]

    # Time difference in milliseconds
    time_diff_ms = int((expected_start_time - prev_end_time) * 1000)

    # Crossfade duration in milliseconds
    fade_duration_ms = min_pause * 1000

    # Ensure crossfade duration does not exceed half the duration of either segment
    fade_duration_ms = int(min(fade_duration_ms,
                                    prev_segment.duration_seconds * 1000 / 2,
                                    current_segment.duration_seconds * 1000 / 2))

    # print(f"time_diff_ms: {time_diff_ms}, fade_duration_ms: {fade_duration_ms}")
    # print(prev_segment.duration_seconds,current_segment.duration_seconds)

    if time_diff_ms > 0:
        if len(current_segment) > 1:
            current_segment = current_segment.fade_in(duration = fade_duration_ms)
        if len(prev_segment) > 1:
            combined_audio = combined_audio.fade_out(duration = fade_duration_ms)

        # Combine the segments with crossfade
        # Add silence if there's a gap
        silence = AudioSegment.silent(duration=time_diff_ms)
        combined_audio = combined_audio + silence + current_segment

        # Append the current segment without crossfade
        # combined_audio += current_segment
    else:
        print("!!! Raise warning !!!")

    current_time = expected_start_time

start_pause = 0
if first_segment_start_time - T_vc_values[0] > 0:
    start_pause = first_segment_start_time-T_vc_values[0]
else:
    start_pause = 0

combined_audio = AudioSegment.silent(duration=start_pause * 1000) + combined_audio.fade_in(duration = min_pause * 1000)

end_pause = 0
if len(original_audio) - len(combined_audio) > 0:
    end_pause = (len(original_audio) - len(combined_audio)) /1000
else:
    end_pause = 0

combined_audio = combined_audio.fade_out(duration = min_pause * 1000) + AudioSegment.silent(duration=end_pause * 1000)
print(f"start_pause:{start_pause}, end_pause:{end_pause}, combined_audio:{len(combined_audio)/1000}")

# Export the combined audio
combined_audio.export(f"{base_directory}draft/{video_sample}_adj_speed_{adjust_speed:.1f}_combined_audio.wav", format='wav')

print("Combined audio saved as 'combined_audio.wav'")

In [None]:
print(f'first_original_segment_start_time: {first_segment_start_time:.4f}\nfirst_vc_segment_start: {T_vc_values[0]:.4f}\ndifference: {first_segment_start_time-T_vc_values[0]:.4f}')

In [None]:
original_audio_path = f"{base_directory}{video_sample}.wav"
original_audio = AudioSegment.from_file(original_audio_path)

original_duration = last_segment_end_time - first_segment_start_time
print(f"(len(original_audio) / 1000.0):{(len(original_audio) / 1000.0)}, original_segment_duration:{original_duration}")
print(f"first_segment_start_time:{first_segment_start_time}, last_segment_end_time:{last_segment_end_time}")
print(f"combined_audio.duration_seconds:{combined_audio.duration_seconds}")
print(len(combined_audio)/1000 + first_segment_start_time)

# 9. Replace the polished audio to the original video

In [None]:
video_filename = f"/content/drive/MyDrive/{video_sample}_original.mp4"

# Load the video file
video_clip = VideoFileClip(video_filename)

In [None]:
new_audio_clip = AudioFileClip(f"{base_directory}draft/{video_sample}_adj_speed_{adjust_speed:.1f}_combined_audio.wav")

# Set the audio of the cut video clip to the new audio clip
final_video_clip = video_clip.set_audio(new_audio_clip)

# Save the final video with the new audio
final_video_clip.write_videofile(f"{base_directory}Polished_{video_sample}_speed_{adjust_speed}_global.mp4", codec="libx264")


In [None]:
# Close the clips to release resources
new_audio_clip.close()
final_video_clip.close()
video_clip.close()