# Install Library

In [None]:
!git clone https://github.com/Zyphra/Zonos
%cd Zonos
!pip install -e .
!pip install --no-build-isolation -e .[compile]

Cloning into 'Zonos'...
remote: Enumerating objects: 340, done.[K
remote: Counting objects: 100% (192/192), done.[K
remote: Compressing objects: 100% (77/77), done.[K
remote: Total 340 (delta 156), reused 115 (delta 115), pack-reused 148 (from 1)[K
Receiving objects: 100% (340/340), 3.03 MiB | 6.89 MiB/s, done.
Resolving deltas: 100% (211/211), done.
/content/Zonos
Obtaining file:///content/Zonos
  Installing build dependencies ... [?25l[?25hdone
  Checking if build backend supports build_editable ... [?25l[?25hdone
  Getting requirements to build editable ... [?25l[?25hdone
  Preparing editable metadata (pyproject.toml) ... [?25l[?25hdone
Collecting kanjize>=1.5.0 (from zonos==0.1.0)
  Downloading kanjize-1.6.0-py3-none-any.whl.metadata (2.5 kB)
Collecting numpy>=2.2.2 (from zonos==0.1.0)
  Downloading numpy-2.2.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.0/62.0 kB[0m [31

In [None]:
import subprocess
subprocess.check_call(['pip', 'install', 'moviepy', 'demucs', 'pydub', 'dotenv', 'openai', 'noisereduce', 'pyannote.audio', 'openai-whisper', 'gradio'],
                      stdout=subprocess.DEVNULL,
                      stderr=subprocess.DEVNULL)

0

In [None]:
import subprocess

subprocess.run(['apt-get', 'update'], env={'LC_ALL': 'C.UTF-8'})
subprocess.run(['apt-get', 'install', '-y', 'espeak-ng'], env={'LC_ALL': 'C.UTF-8'})

CompletedProcess(args=['apt-get', 'install', '-y', 'espeak-ng'], returncode=0)

# Load Library

In [None]:
# Video processing and editing libraries
import cv2
from moviepy.editor import VideoFileClip, ImageSequenceClip, AudioClip, concatenate_videoclips, ColorClip
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip

# Audio processing libraries
import torch
import torch.nn.functional as F
import torchaudio
import numpy as np
from pydub import AudioSegment
import soundfile as sf
import numpy as np
from typing import List, Tuple, Dict, Any

# Models and audio source separation / ASR
import whisper
from demucs.pretrained import get_model
from demucs.apply import apply_model

# Speech analysis and forced alignment
from pyannote.audio import Pipeline
from pyannote.audio.pipelines.utils.hook import ProgressHook

# OpenAI API and environment variable handling
from openai import OpenAI
from dotenv import load_dotenv
import json
import os

# Memory management
import gc

# Ignore Warning
import warnings
warnings.filterwarnings("ignore")

# Zonos and librosa
from zonos.model import Zonos
from zonos.conditioning import make_cond_dict
from zonos.utils import DEFAULT_DEVICE as device
import librosa

# SEED SETTING
import random
seed_value = 42
random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)

if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed_value)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

  if event.key is 'enter':

DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for _speechbrain_save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for _speechbrain_load
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for load
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for _save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for _recover


# Audio / Frame Split

In [None]:
########################
#  Audio / Frame Split #
########################
def get_normalized_audio(video_path):
    audio_segment = AudioSegment.from_file(video_path, format="mp4")
    audio_samples = np.array(audio_segment.get_array_of_samples())

    if audio_segment.channels == 2:
        audio_samples = audio_samples.reshape((-1, 2))

    sample_width = audio_segment.sample_width
    max_val = float(2 ** (8 * sample_width - 1))  # 2^(16-1)=32768
    audio_float = audio_samples.astype(np.float32) / max_val

    del audio_samples, audio_segment
    gc.collect()

    return audio_float

def get_image_sequence_clip(video_path):
    video_clip = VideoFileClip(video_path)
    fps_video = video_clip.fps
    frames = list(video_clip.iter_frames())
    image_seq_clip = ImageSequenceClip(frames, fps=fps_video)

    return image_seq_clip

video_path = '/content/homer_29.mp4'
video_clip = get_image_sequence_clip(video_path)
audio = get_normalized_audio(video_path)

# Split Speech & Background

In [None]:
########################
#  Background / Voice  # (Demuc를 사용)
########################
def separate_vocals_and_background(audio, device):
    audio_tensor = torch.tensor(audio.T).float().unsqueeze(0).to(device)
    model = get_model('htdemucs').to(device)
    model.eval()
    with torch.no_grad():
        estimates = apply_model(model, audio_tensor, shifts=1, overlap=0.5)

    estimates_np = estimates.squeeze(0).cpu().numpy()

    vocals = estimates_np[3].T
    background = (estimates_np[0] + estimates_np[1] + estimates_np[2]).T

    del audio_tensor, estimates, estimates_np, model
    torch.cuda.empty_cache()
    gc.collect()

    return vocals, background

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocals, background = separate_vocals_and_background(audio, device)

Downloading: "https://dl.fbaipublicfiles.com/demucs/hybrid_transformer/955717e8-8726e21a.th" to /root/.cache/torch/hub/checkpoints/955717e8-8726e21a.th
100%|██████████| 80.2M/80.2M [00:00<00:00, 90.0MB/s]


# Save Audio to Whisper

In [None]:
########################
#       save audio     # (Speech to text 과정에서 필요함.)
########################
vocals_file_root = "/content/vocals.wav"
background_file_root = "/content/background.wav"

sf.write(vocals_file_root, vocals, 48000)
sf.write(background_file_root, background, 48000)

gc.collect()

0

# Speech-To-Text (Whisper & VAD)

In [None]:
########################
#    Speech-To-Text    # Whisper의 결과로부터, Voice Activity Detection을 통하여 명확한 초 단위 구간으로 변환
########################
def speech_to_text_with_vad(audio_file, device='cuda', margin=0.0):
    with torch.no_grad():
        model = whisper.load_model("medium", device=device)
        vad_pipeline = Pipeline.from_pretrained("pyannote/voice-activity-detection") # , use_auth_token='')
        vad_pipeline.to(torch.device("cuda"))
        result = model.transcribe(audio_file, word_timestamps=True)

    vad_output = vad_pipeline(audio_file)

    refined_segments = []
    for segment in result['segments']:
        vad_matches = [
            (speech.start, speech.end)
            for speech in vad_output.get_timeline()
            if (speech.start <= segment['end'] and speech.end >= segment['start'])
        ]

        if vad_matches:
            best_match = min(vad_matches, key=lambda x: x[1] - x[0])

            extended_start = max(segment['start'], max(best_match[0], 0)) - margin
            extended_end = min(segment['end'], best_match[1]) + margin

            extended_start = max(extended_start, 0.0)

            refined_segments.append({
                'start': extended_start,
                'end': extended_end,
                'text': segment['text'].strip()
            })
        else:
            extended_start = max(segment['start'] - margin, 0.0)
            extended_end = segment['end'] + margin

            refined_segments.append({
                'start': extended_start,
                'end': extended_end,
                'text': segment['text'].strip()
            })

    output_string = ""
    for segment in refined_segments:
        output_string += f"[{segment['start']:05.2f} ~ {segment['end']:05.2f}] {segment['text']}\n"

    del model, vad_pipeline, result, vad_output
    torch.cuda.empty_cache()
    gc.collect()

    return {
        'segments': refined_segments,
        'output_string': output_string
    }

result_stt = speech_to_text_with_vad(vocals_file_root, 'cuda', 0.00)

output_string_whisper = ""
for segment in result_stt["segments"]:
    start = segment["start"]
    end = segment["end"]
    if start == end:
        continue
    text = segment["text"].strip()
    output_string_whisper += f"[{start:05.3f} ~ {end:05.3f}] {text}\n"
    print(f"[{start:05.3f} ~ {end:05.3f}] {text}")

for segment in result_stt["segments"]:
  if "start" in segment and "end" in segment and \
      isinstance(segment["start"], (int, float)) and \
      isinstance(segment["end"], (int, float)):

      formatted_start = float(f"{segment['start']:05.3f}")
      formatted_end = float(f"{segment['end']:05.3f}")

      segment["start"] = formatted_start
      segment["end"] = formatted_end

gc.collect()

def apply_margin_to_segments(segments, margin=0.0):
    extended_segments = []

    for segment in segments:
        extended_start = max(segment['start'] - margin, 0.0)
        extended_end = segment['end'] + margin

        extended_segments.append({
            'start': extended_start,
            'end': extended_end,
            'text': segment['text']
        })

    return extended_segments

100%|█████████████████████████████████████| 1.42G/1.42G [00:41<00:00, 37.1MiB/s]


config.yaml:   0%|          | 0.00/277 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/17.7M [00:00<?, ?B/s]

config.yaml:   0%|          | 0.00/1.98k [00:00<?, ?B/s]

INFO:pytorch_lightning.utilities.migration.utils:Lightning automatically upgraded your loaded checkpoint from v1.1.3 to v2.5.1.post0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../root/.cache/torch/pyannote/models--pyannote--segmentation/snapshots/059e96f964841d40f1a5e755bb7223f76666bba4/pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.7.1, yours is 2.6.0+cu124. Bad things might happen unless you revert torch to 1.x.
[0.875 ~ 3.640] Hey, open up! I've been standing here all morning!
[4.140 ~ 5.920] Hey, Homer! What are you doing at work?


# Translate to Korean

In [None]:
########################
#      translate       # GPT 4o-mini를 활용하여 번역함. 의성어 부분은 제외하도록 함. 추가적으로 번역에 반영하고 싶은 부분들 체크
########################
transcript_schema = {
    "type": "object",
    "properties": {
        "transcript": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "start": {
                        "type": "number",
                        "description": "문장이 시작하는 시간 (초 단위)"
                    },
                    "end": {
                        "type": "number",
                        "description": "문장이 끝나는 시간 (초 단위)"
                    },
                    "line": {
                        "type": "string",
                        "description": "Translated Text"
                    }
                },
                "required": ["start", "end", "line"]
            }
        }
    },
    "required": ["transcript"]
}

system_prompt = """
                ### Persona ###
                You are a Translator who receives text formatted as a Whisper-style transcript. Your role is to translate this transcript to korean.

                ### Task ###
                Translate the provided transcript text into **Korean**. You must structure the result strictly according to the specified JSON schema.
                All keys and string values must use double quotes.
                The resulting JSON object must contain a "transcript" key, with its value being an array of objects, each containing timestamps and the translated dialogue line.
                The translation should capture the satirical, humorous, and cynical style characteristic of 'The Simpsons', taking into account the video title and the provided summary information.
                Include liberal translations (paraphrasing) to naturally convey the original context, characters, and humor to a Korean audience.

                Translation Guidelines:
                - Prioritize context and cultural nuance over literal translation (use paraphrasing).
                - Maintain the tone appropriate for the original characters and situations.
                - Choose expressions that effectively convey American humor and satire in Korean.

                ### Output Instructions ###
                You must call the `get_transcript` tool (function) to return the result in JSON format. Generate *only* the JSON object resulting from this tool call, with no other text or explanations.

                ### [Critically Important] Exclusion Rule: ###
                If a line of the transcript consists *only* of single interjections, onomatopoeia, short cheers, brief affirmative/negative responses, or laughter (such as 'Yeah', 'Yummy', 'Oh', 'Ah', 'Mmm', 'Huh', 'Wow', 'Haha', 'Woo hoo', 'Uh-huh', etc.),
                that entire line **must absolutely not be included** in the final JSON output. For example, if an original transcript line is just "0:04.000 --> 0:06.000\\nYeah! Woo hoo!", the corresponding object for that line must be **completely omitted** from the `transcript` array in the resulting JSON.
                **Failure to adhere strictly to this rule negates your primary function.**

                ### Output Example (Illustrating exclusion of single interjection lines): ###
                ```json
                {
                    "transcript": [
                        {"start": 0.03, "end": 4.00, "line": "Translated meaningful dialogue line 1"},
                        # Omitted lines containing only interjections like "Yeah!", "Oh!", etc.
                        {"start": 7.00, "end": 9.00, "line": "Translated meaningful dialogue line 2"}

                    ]
                }
                """

def get_text_user_prompt(text, video_title, request_text):
    return (
        f"Please translate to korean the following text, considering the video title and the user's requests, and structure it in the specified format.\n\n"
        f"Video Title: \"{video_title}\"\n"
        f"User's requests for the translation:\n"
        f"{request_text}\n\n"
        f"Text to translate and structure:\n"
        f"{text}\n"
    )

def translate_text_with_summary(text, video_title, summary_result):
    user_prompt = get_text_user_prompt(text, video_title, summary_result)
    openai = OpenAI(api_key='')
    response = openai.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        tools=[{
            "type": "function",
            "function": {
                "name": "get_transcript",
                "description":"Converts and returns the text in transcript JSON format, including timeline segments.",
                "parameters": transcript_schema
            }
        }],
        tool_choice={"type": "function", "function": {"name": "get_transcript"}}
    )

    message = response.choices[0].message

    if message.tool_calls and message.tool_calls[0].function.name == "get_transcript":
        tool_call = message.tool_calls[0]
        function_args = tool_call.function.arguments
        try:
            result = json.loads(function_args)
            return result
        except json.JSONDecodeError as e:
            print(f"JSON 파싱 실패: {e}")
            print(f"모델이 반환한 원본 인수 문자열: {function_args}")
            return {"error": "JSON 파싱 실패", "raw_output": function_args}
    else:
        print("오류: 모델이 예상된 get_transcript 툴을 호출하지 않았습니다.")
        print(f"모델 응답: {message}")
        raw_content = message.content if message.content else "내용 없음"
        return {"error": "툴 호출 누락", "raw_output": raw_content}



request_text = '''
[0.875 ~ 3.640] 이봐, 문 열어! 나 여기 아침 내내 서 있었어!, Hey, open up! I've been standing here all morning!
[4.140 ~ 5.920] 이봐, 호머! 여기서 뭐 하고 있어?, Hey, Homer! What are you doing at work?
'''

translated_result = translate_text_with_summary(output_string_whisper, 'Simpson', request_text)

merged_result = {"transcript": []}

for segment in translated_result["transcript"]:
    start = segment["start"]
    end = segment["end"]
    text = segment["line"].strip()

    best_match = None
    best_overlap = 0
    original_text = ""

    for trans_seg in result_stt.get("segments", []):
        trans_start = trans_seg["start"]
        trans_end = trans_seg["end"]

        overlap_start = max(start, trans_start)
        overlap_end = min(end, trans_end)
        overlap = max(0, overlap_end - overlap_start)

        if overlap > best_overlap:
            best_overlap = overlap
            best_match = trans_seg["text"]
            original_text = trans_seg["text"]

    merged_result["transcript"].append({
        "start": start,
        "end": end,
        "original": original_text,
        "translation": text
    })

for segment in merged_result["transcript"]:
    start = segment["start"]
    end = segment["end"]
    text = segment["translation"].strip()
    ori = segment["original"].strip()
    print(f"[{start:05.3f} ~ {end:05.3f}] {text}, {ori}")
gc.collect()

[0.875 ~ 3.640] 이봐, 문 열어! 나 여기 아침 내내 서 있었어!, Hey, open up! I've been standing here all morning!
[4.140 ~ 5.920] 이봐, 호머! 여기서 뭐 하고 있어?, Hey, Homer! What are you doing at work?


121

In [None]:
#################################
# Preprocess about onomatopoeia #
#################################
processed_times = set((seg['start'], seg['end']) for seg in merged_result['transcript'])

removed_segments_info = []
for seg in result_stt['segments']:
    start_time = seg['start']
    end_time = seg['end']
    original_line = seg['text']

    if (start_time, end_time) not in processed_times:
        removed_segments_info.append({
            'start': start_time,
            'end': end_time,
        })

print("\nLLM 처리 후 빠진 대본 구간:")
if not removed_segments_info:
    print("빠진 구간이 없습니다.")
else:
    for removed in removed_segments_info:
        print(f"- 구간: [{removed['start']:.3f} ~ {removed['end']:.3f}]")

removed_time_tuples = [(item['start'], item['end']) for item in removed_segments_info]
print("\n빠진 구간 (시작, 종료) 튜플 리스트:")
print(removed_time_tuples)


LLM 처리 후 빠진 대본 구간:
빠진 구간이 없습니다.

빠진 구간 (시작, 종료) 튜플 리스트:
[]


In [None]:
#####################################################
# Preprocess: Find Missing Segments & Silence Gaps #
#####################################################
processed_times = set((seg['start'], seg['end']) for seg in translated_result.get('transcript', []))

original_segments = result_stt.get('segments', [])
valid_segments_for_sorting = [
    seg for seg in original_segments
    if isinstance(seg, dict) and 'start' in seg and 'end' in seg
]
sorted_segments = sorted(valid_segments_for_sorting, key=lambda x: x['start'])

missing_segments_info = []
silence_segments_info = []

last_valid_end_time = 0.0

for seg in sorted_segments:
    start_time = seg['start']
    end_time = seg['end']
    original_line = seg.get('text', '')

    silence_threshold = 0.1

    if start_time > last_valid_end_time + silence_threshold:
        silence_info = {
            'start': last_valid_end_time,
            'end': start_time,
            'duration': start_time - last_valid_end_time
        }
        silence_segments_info.append(silence_info)

    if (start_time, end_time) not in processed_times:
        missing_info = {
            'start': start_time,
            'end': end_time,
            'text': original_line
        }
        missing_segments_info.append(missing_info)
    last_valid_end_time = max(last_valid_end_time, end_time)

print("\nLLM 처리 후 빠진 대본 구간 (원래 음성 O):")
if not missing_segments_info:
    print("빠진 구간이 없습니다.")
else:
    for removed in missing_segments_info:
        print(f"- 구간: [{removed['start']:.3f} ~ {removed['end']:.3f}], 원본: '{removed.get('text', '')}'")

print("\n대본 사이의 음성 없는 구간 (묵음 구간):")
if not silence_segments_info:
    print("묵음 구간이 없습니다.")
else:
    for silence in silence_segments_info:
        print(f"- 구간: [{silence['start']:.3f} ~ {silence['end']:.3f}] (길이: {silence.get('duration', 0):.3f}s)")

removed_time_tuples = [(item['start'], item['end']) for item in missing_segments_info]
print("\n[기존] 빠진 구간 (시작, 종료) 튜플 리스트:")
print(removed_time_tuples)

silence_time_tuples = [(item['start'], item['end']) for item in silence_segments_info]
print("\n[추가] 묵음 구간 (시작, 종료) 튜플 리스트:")
print(silence_time_tuples)

combined_time_tuples = sorted(removed_time_tuples + silence_time_tuples)
print("\n[통합] 빠진 구간 및 묵음 구간 (시작, 종료) 튜플 리스트 (시간순 정렬):")
print(combined_time_tuples)

margin = 0.04
margin_combined_tuples = []

for start, end in combined_time_tuples:
    margin_start = max(0, start + margin)
    margin_end = end - margin
    margin_combined_tuples.append((margin_start, margin_end))

print("\n[여유] 0.1초 마진이 추가된 구간 튜플 리스트:")
print(margin_combined_tuples)


LLM 처리 후 빠진 대본 구간 (원래 음성 O):
빠진 구간이 없습니다.

대본 사이의 음성 없는 구간 (묵음 구간):
- 구간: [0.000 ~ 0.875] (길이: 0.875s)
- 구간: [3.640 ~ 4.140] (길이: 0.500s)

[기존] 빠진 구간 (시작, 종료) 튜플 리스트:
[]

[추가] 묵음 구간 (시작, 종료) 튜플 리스트:
[(0.0, 0.875), (3.64, 4.14)]

[통합] 빠진 구간 및 묵음 구간 (시작, 종료) 튜플 리스트 (시간순 정렬):
[(0.0, 0.875), (3.64, 4.14)]

[여유] 0.1초 마진이 추가된 구간 튜플 리스트:
[(0.04, 0.835), (3.68, 4.1)]


In [None]:
##############################
#  Onomatopeia to Background #
##############################
def merge_segments(
    vocals_data: np.ndarray,
    background_data: np.ndarray,
    segments_to_add: List[Tuple[float, float]],
    sr: int
) -> np.ndarray:
    merged_background = background_data.copy()
    audio_length_samples = len(vocals_data)

    for start_sec, end_sec in segments_to_add:
        start_sample = int(start_sec * sr)
        end_sample = int(end_sec * sr)

        start_sample = max(0, start_sample)
        end_sample = min(audio_length_samples, end_sample)

        if start_sample >= end_sample:
             print(f"경고: 구간 ({start_sec}, {end_sec}) -> 샘플 [{start_sample}, {end_sample}) 변환 후 유효하지 않아 건너뜁니다.")
             continue

        vocal_segment = vocals_data[start_sample:end_sample]

        merged_background[start_sample:end_sample] += vocal_segment

    return merged_background

background_merged = merge_segments(
    vocals, background, margin_combined_tuples, 48000
)

print("보컬 구간이 배경에 성공적으로 추가되었습니다.")

보컬 구간이 배경에 성공적으로 추가되었습니다.


# TTS with ZONOS

In [None]:
######################################
# Time Stretching / Time Compression #
######################################
def process_waveform(wavs, start_sec, end_sec, sample_rate=44100):
    target_duration = end_sec - start_sec
    target_length = int(target_duration * sample_rate)
    if target_length < 1:
        target_length = 1

    if wavs.ndim == 3 and wavs.shape[0] == 1:
        wavs_np = wavs[0, 0].numpy()
    elif wavs.ndim == 2 and wavs.shape[0] == 1:
        wavs_np = wavs[0].numpy()
    else:
        try:
            wavs_np = wavs.view(-1).numpy()
            print(f"Warning: Unexpected wavs shape {wavs.shape}. Flattening to 1D.")
        except Exception as e:
            raise ValueError(f"Cannot convert wavs with shape {wavs.shape} to numpy for librosa: {e}")

    current_length = len(wavs_np)

    if target_length > 0 and current_length > 0:
        stretch_rate = current_length / target_length
    else:
        stretch_rate = 1.0

    if stretch_rate <= 0:
        print(f"Warning: Invalid stretch_rate ({stretch_rate}). Setting to 1.0")
        stretch_rate = 1.0

    try:
        stretched_wav = librosa.effects.time_stretch(wavs_np, rate=stretch_rate)
    except Exception as e:
        print(f"Error during librosa.effects.time_stretch: {e}")
        print(f"Input shape: {wavs_np.shape}, Rate: {stretch_rate}")
        stretched_wav = wavs_np

    wavs_processed = torch.from_numpy(stretched_wav).unsqueeze(0)
    if wavs_processed.shape[0] > 0:
        wavs_to_save = wavs_processed.unsqueeze(1)
    else:
        print(f"Warning: wavs_processed has batch size 0 for start_sec {start_sec}. Skipping save.")
        return None
    return wavs_to_save

def get_first_vector(x):
    if isinstance(x, torch.Tensor):
        while x.dim() > 1:
            if x.size(0) == 0:
                return x
            x = x[0]
        return x
    elif isinstance(x, np.ndarray):
        while x.ndim > 1:
            if x.shape[0] == 0:
                return x
            x = x[0]
        return x
    else:
        raise ValueError("Unsupported type.")

def scale_waveform(segment, voiced_audio):
    seg_vector = get_first_vector(segment)
    wavs_vector = get_first_vector(voiced_audio)

    if isinstance(seg_vector, torch.Tensor):
        max_segment_amp = seg_vector.abs().max()
    else:
        max_segment_amp = np.abs(seg_vector).max()

    if isinstance(wavs_vector, torch.Tensor):
        max_wavs_amp = wavs_vector.abs().max()
    else:
        max_wavs_amp = np.abs(wavs_vector).max()

    scaling_factor = max_segment_amp / max_wavs_amp
    wavs_scaled = wavs_vector * scaling_factor
    return wavs_scaled

######################################
#           Speaking Speed           #
######################################
def count_phonemes(text):
    total_phonemes = 0

    for char in text:
        code = ord(char)
        if 0xAC00 <= code <= 0xD7A3:
            syllable_index = code - 0xAC00
            initial_index = syllable_index // (21 * 28)
            medial_index = (syllable_index % (21 * 28)) // 28
            final_index = syllable_index % 28

            total_phonemes += 2 if final_index == 0 else 3

    return total_phonemes

In [None]:
########################
#    text-to-speech    # ZONOS를 활용
########################
model = Zonos.from_pretrained("Zyphra/Zonos-v0.1-transformer", device=device)
model.eval()

class EmotionMLP(torch.nn.Module):
    def __init__(self, input_size=1024, hidden_size=256):
        super(EmotionMLP, self).__init__()
        self.linear1 = torch.nn.Linear(input_size, hidden_size)
        self.batch_norm1 = torch.nn.BatchNorm1d(hidden_size)
        self.relu1 = torch.nn.ReLU()
        self.dropout1 = torch.nn.Dropout(0.3)

        self.linear2 = torch.nn.Linear(hidden_size, 128)
        self.batch_norm2 = torch.nn.BatchNorm1d(128)
        self.relu2 = torch.nn.ReLU()
        self.dropout2 = torch.nn.Dropout(0.3)
        self.linear3 = torch.nn.Linear(128, 8)

    def forward(self, x):
        x = self.linear1(x)
        x = self.batch_norm1(x)
        x = self.relu1(x)
        x = self.dropout1(x)

        x = self.linear2(x)
        x = self.batch_norm2(x)
        x = self.relu2(x)
        x = self.dropout2(x)

        x = self.linear3(x)
        probabilities = torch.sigmoid(x)
        return probabilities

emotion_module = EmotionMLP(input_size=1024, hidden_size=256)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
module_path = "/content/best_emotion_model.pth"
emotion_module.load_state_dict(torch.load(module_path, map_location=device))
emotion_module.to(device)
emotion_module.eval()

from transformers import AutoTokenizer, ClapModel, AutoFeatureExtractor
clap_model = ClapModel.from_pretrained("laion/larger_clap_general").to(device)
feature_extractor = AutoFeatureExtractor.from_pretrained("laion/larger_clap_general")
tokenizer = AutoTokenizer.from_pretrained("laion/larger_clap_general")
clap_model.eval()

config.json:   0%|          | 0.00/643 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/776M [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/541 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/1.36k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/776M [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/798k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.11M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/280 [00:00<?, ?B/s]

ClapModel(
  (text_model): ClapTextModel(
    (embeddings): ClapTextEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): ClapTextEncoder(
      (layer): ModuleList(
        (0-11): 12 x ClapTextLayer(
          (attention): ClapTextAttention(
            (self): ClapTextSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): ClapTextSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm):

In [None]:
wav, sampling_rate = torchaudio.load(vocals_file_root)
translated_vocal = np.zeros(wav.shape[1], dtype=np.float32)

for segment in merged_result["transcript"]:
    start_sec = segment["start"]
    end_sec = segment["end"]
    text = segment["translation"].strip()
    eng_text = segment["original"].strip()
    #######################################
    #           TTS with ZONOS            #
    #######################################
    start_sample = int(start_sec * sampling_rate)
    end_sample = int(end_sec * sampling_rate)
    seg = wav[:, start_sample:end_sample]

    segment_duration = (end_sec - start_sec)
    repeat_times = int(30 / segment_duration)
    if repeat_times < 1:
      repeat_times = 1

    repeated_seg = torch.cat([seg] * repeat_times, dim=1)
    max_samples = int(30 * sampling_rate)

    if repeated_seg.shape[1] > max_samples:
        repeated_seg = repeated_seg[:, :max_samples]

    if repeated_seg.shape[1] < max_samples:
        padding_samples = max_samples - repeated_seg.shape[1]
        repeated_seg = torch.nn.functional.pad(repeated_seg, (0, padding_samples))
    #######################################
    #           TTS with ZONOS            #
    #######################################
    wav_numpy = wav.squeeze().cpu().numpy()
    resampled_numpy = librosa.resample(y=wav_numpy, orig_sr=sampling_rate, target_sr=48000)
    inputs_text = tokenizer([eng_text], padding=True, return_tensors="pt")
    inputs_audio = feature_extractor(resampled_numpy, sampling_rate=48000, return_tensors="pt")
    for key in inputs_text:
        inputs_text[key] = inputs_text[key].to(device)
    for key in inputs_audio:
        inputs_audio[key] = inputs_audio[key].to(device)

    with torch.no_grad():
        text_features = clap_model.get_text_features(**inputs_text)
        audio_features = clap_model.get_audio_features(**inputs_audio)[0].unsqueeze(0)
        concatenated_features = torch.cat((text_features, audio_features), dim=1).to(device)
        outputs = emotion_module(concatenated_features)

    speaker = model.make_speaker_embedding(repeated_seg, sampling_rate)
    speed = (count_phonemes(text)) / (end_sec - start_sec)

    with torch.no_grad():
      cond_dict = make_cond_dict(text=text, speaker=speaker, language='ko', fmax=sampling_rate/2, pitch_std=50, speaking_rate=speed, emotion=outputs)
      conditioning = model.prepare_conditioning(cond_dict)
      codes = model.generate(conditioning)
      wavs = model.autoencoder.decode(codes).cpu()

    #####################################################
    # Time Stretching / Time Compression / Amp Scailing #
    #####################################################
    wavs_stretched = process_waveform(wavs, start_sec, end_sec, sampling_rate)
    wavs_scaled = scale_waveform(seg, wavs_stretched)

    ##################
    # Combined Audio #
    ##################
    generated_audio = wavs_scaled.numpy()
    target_start = int(start_sec * sampling_rate)
    target_end = target_start + len(generated_audio)
    translated_vocal[target_start:target_end] += generated_audio

    gc.collect()
    torch.cuda.empty_cache()

Generating:  12%|█▏        | 323/2588 [00:05<00:40, 55.61it/s]
Generating:   9%|▉         | 229/2588 [00:04<00:42, 55.76it/s]


# 최종 비디오 형성

In [None]:
############################
# combined frame and audio #
############################
def create_video_with_combined_audio_with_clip(vocals, background, video_clip, output_file="output_video.mp4", fps_video=30, sr_audio=44100):
    n_samples_v = vocals.shape[0]
    n_samples_b = background.shape[0]
    n_samples = max(n_samples_v, n_samples_b)
    if n_samples_v < n_samples:
        vocals = np.pad(vocals, ((0, n_samples - n_samples_v), (0, 0)), mode='constant')
    if n_samples_b < n_samples:
        background = np.pad(background, ((0, n_samples - n_samples_b), (0, 0)), mode='constant')

    combined = vocals + background
    duration_audio = combined.shape[0] / sr_audio

    def make_audio_frame(t):
        t_arr = np.atleast_1d(t)
        idx = (t_arr * sr_audio).astype(int)
        idx = np.clip(idx, 0, combined.shape[0] - 1)
        frame = combined[idx]
        if frame.shape[0] == 1:
            return frame[0]
        return frame

    audio_clip = AudioClip(make_audio_frame, duration=duration_audio, fps=sr_audio)
    video_duration = video_clip.duration

    if video_duration < duration_audio:
        audio_clip = audio_clip.set_duration(video_duration)
    elif video_duration > duration_audio:
        video_clip = video_clip.subclip(0, duration_audio)

    final_clip = video_clip.set_audio(audio_clip)
    final_clip.write_videofile(output_file, codec="libx264", audio_codec="aac")

stereo_vocals = np.column_stack((translated_vocal, translated_vocal))
create_video_with_combined_audio_with_clip(stereo_vocals, background, video_clip, output_file="10_no_emotions.mp4", fps_video=30, sr_audio=sampling_rate)

Moviepy - Building video 10_no_emotions.mp4.
MoviePy - Writing audio in 10_no_emotionsTEMP_MPY_wvf_snd.mp4




MoviePy - Done.
Moviepy - Writing video 10_no_emotions.mp4





Moviepy - Done !
Moviepy - video ready 10_no_emotions.mp4
