# Deeptune Hackathon

##### Setup

In [None]:
import ast
import glob
import json
import os
import subprocess

import librosa
import numpy as np
import pandas as pd
import phonemizer
import pyannote.audio
import scipy
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import openai

from IPython.display import Audio, Video
from attrdict import AttrDict
from denoiser import pretrained
from denoiser.dsp import convert_audio
from moviepy.editor import VideoFileClip, AudioFileClip
from munch import Munch
from pyannote.audio import Inference, Model, Pipeline
from pyannote.core import Segment
from pydub import AudioSegment
from pydub.silence import detect_nonsilent, split_on_silence
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler, LabelEncoder
import soundfile as sf
from tqdm import tqdm
from vocoder import Generator

from models import *
from utils import *
from whisper import *
from yaml import *

In [None]:
openai.api_key = API_KEY

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

##### Clip Video

In [None]:
def clip_video(video_file_path, video_clip_file_path, clip_length):
    video = VideoFileClip(video_file_path).subclip(0, clip_length)
    video.write_videofile(video_clip_file_path)


current_directory = os.getcwd()
video_file_path = os.path.join(current_directory, "Media/video.mp4")
video_clip_file_path = os.path.join(current_directory, "Media/video_clip.mp4")
clip_length = 60

clip_video(video_file_path, video_clip_file_path, clip_length)

In [None]:
Video(video_clip_file_path)

##### Extract Audio From Video

In [None]:
def extract_audio_from_video(video_file_path, audio_file_path):
    video = VideoFileClip(video_file_path)
    video.audio.write_audiofile(audio_file_path, codec="pcm_s32le")


video_file_path = os.path.join(current_directory, "Media/video_clip.mp4")
audio_file_path = os.path.join(current_directory, "Media/audio_clip.wav")

extract_audio_from_video(video_file_path, audio_file_path)

In [None]:
Audio(audio_file_path)

##### Separate Vocals And Background Audio
Reference Repo: https://github.com/tsurumeso/vocal-remover

In [None]:
def separate_vocals_and_background_audio(audio_file_path, output_directory):
    command = f"python3 inference.py --input {audio_file_path} --tta --output_dir {output_directory}"
    parent_directory = os.path.dirname(current_directory)
    directory = os.path.join(parent_directory, "vocal-remover")
    subprocess.run(command, cwd=directory, shell=True)


output_directory = os.path.join(current_directory, "Media")
separate_vocals_and_background_audio(audio_file_path, output_directory)

In [None]:
vocals_file_path = os.path.join(current_directory, "Media/audio_clip_Vocals.wav")
background_audio_file_path = os.path.join(
    current_directory, "Media/audio_clip_Instruments.wav"
)

In [None]:
Audio(vocals_file_path)

In [None]:
Audio(background_audio_file_path)

##### Denoise Vocals
Reference Repo: https://github.com/facebookresearch/denoiser/tree/main

In [None]:
def denoise_vocals(vocals_file_path, denoised_vocals_file_path):
    denoiser_model = pretrained.dns64()

    wav, sr = torchaudio.load(vocals_file_path)
    wav = convert_audio(wav, sr, denoiser_model.sample_rate, denoiser_model.chin)

    with torch.no_grad():
        denoised = denoiser_model(wav[None])[0]

    torchaudio.save(
        denoised_vocals_file_path, denoised.cpu(), denoiser_model.sample_rate
    )


denoised_vocals_file_path = os.path.join(current_directory, "Media/denoised_vocals.wav")
denoise_vocals(vocals_file_path, denoised_vocals_file_path)

In [None]:
Audio(denoised_vocals_file_path)

##### Split Audio On Silence

In [None]:
def split_audio_on_silence(
    audio_file_path, output_directory, min_silence_len=1000, silence_thresh=-40
):
    audio = AudioSegment.from_wav(audio_file_path)
    os.makedirs(output_directory, exist_ok=True)

    non_silent_ranges = detect_nonsilent(audio, min_silence_len, silence_thresh)
    segment_timestamps = []
    speech_start_times = []
    last_end = 0
    speech_count = 0

    for i, (start_i, end_i) in enumerate(non_silent_ranges):
        segment = audio[start_i:end_i]
        segment.export(
            os.path.join(output_directory, f"speech_audio_segment_{i}.wav"),
            format="wav",
        )

        start_s = start_i / 1000
        end_s = end_i / 1000

        silence_start_s = last_end / 1000
        silence_end_s = start_s
        if silence_start_s < silence_end_s:
            segment_timestamps.append((silence_start_s, silence_end_s, "silence"))

        segment_timestamps.append((start_s, end_s, "speech"))
        speech_start_times.append(start_s)
        speech_count += 1

        last_end = end_i

    total_length_s = len(audio) / 1000
    if last_end < total_length_s:
        segment_timestamps.append((last_end / 1000, total_length_s, "silence"))

    return segment_timestamps, speech_count, speech_start_times


output_directory = os.path.join(current_directory, "Media")
segment_timestamps, speech_count, speech_start_times = split_audio_on_silence(
    denoised_vocals_file_path, output_directory
)
segment_timestamps, speech_count, speech_start_times

In [None]:
audio_segment_file_path = os.path.join(output_directory, "speech_audio_segment_0.wav")
Audio(audio_segment_file_path)

##### Speech-To-Text
Reference Repo: https://github.com/openai/whisper

In [None]:
def transcribe(vocals_file_path):
    audio_file = open(vocals_file_path, "rb")
    result = openai.Audio.transcribe(
        "whisper-1", audio_file, response_format="verbose_json", temperature=0
    )
    return result

In [None]:
results = []

for i in tqdm(range(speech_count)):
    segment_vocals_vocals_file_path = os.path.join(
        current_directory, f"Media/speech_audio_segment_{i}.wav"
    )
    results.append(transcribe(segment_vocals_vocals_file_path))

In [None]:
texts = []
starts = []
ends = []

for i, result in enumerate(results):
    if result["text"] != "":
        segments = result["segments"]
        for segment in segments:
            texts.append(segment["text"].strip())
            starts.append(round(speech_start_times[i] + segment["start"], 2))
            ends.append(round(speech_start_times[i] + segment["end"], 2))

df = pd.DataFrame((texts, starts, ends), index=("texts", "starts", "ends")).T
df = df[df["texts"].str.contains(r"[a-zA-Z]", regex=True)]
df["durations"] = df.ends - df.starts
df.reset_index(drop=True, inplace=True)
df

##### Audio Embeddings
Reference Repo: https://github.com/pyannote/pyannote-audio

In [None]:
USE_AUTH_TOKEN = AUTH_TOKEN
pyannote_embedding_model = Model.from_pretrained(
    "pyannote/embedding", use_auth_token=USE_AUTH_TOKEN
)
inference = Inference(pyannote_embedding_model, window="whole")

In [None]:
def get_audio_embeddings(audio_file_path):
    audio_embeddings = []

    for index, row in df.iterrows():
        start = row["starts"]
        end = row["ends"]
        excerpt = Segment(start, end)
        audio_embedding = inference.crop(audio_file_path, excerpt)
        audio_embeddings.append(audio_embedding)

    return np.array(audio_embeddings)


audio_embeddings = get_audio_embeddings(denoised_vocals_file_path)
audio_embeddings.shape

In [None]:
def cluster(embeddings, n_clusters):
    scaler = StandardScaler()
    scaled_embeddings = scaler.fit_transform(embeddings)
    scaled_embeddings.shape

    kmeans = KMeans(n_clusters=n_clusters, random_state=0)
    kmeans.fit(scaled_embeddings)

    return kmeans.labels_


n_clusters = 5
labels = cluster(audio_embeddings, n_clusters)
df["audio_clusters"] = labels
df

##### Text Embeddings
Reference: https://platform.openai.com/docs/api-reference/chat

In [None]:
text_embeddings = []

for index, row in tqdm(df.iterrows(), total=df.shape[0]):
    text = row["texts"]
    response = openai.Embedding.create(input=text, model="text-embedding-ada-002")
    text_embedding = response.data[0].embedding
    text_embeddings.append(text_embedding)

text_embeddings = np.array(text_embeddings)
print(text_embeddings.shape)

In [None]:
labels = cluster(text_embeddings, n_clusters)
df["text_clusters"] = labels
df

##### Speaker Diarization
Reference: https://platform.openai.com/docs/api-reference/chat

In [None]:
formatted_transcript = ""


def format_time(time_float):
    minutes = int(time_float // 60)
    seconds = int(time_float % 60)
    fraction = int((time_float * 100) % 100)
    return f"{minutes:02d}:{seconds:02d}.{fraction:02d}"


for index, row in df.iterrows():
    start_time = row["starts"]
    end_time = row["ends"]
    transcription = row["texts"]
    audio_cluster = row["audio_clusters"]
    text_cluster = row["text_clusters"]

    start_time_str = format_time(start_time)
    end_time_str = format_time(end_time)

    output_string = f'Row {index} - {start_time_str}-{end_time_str} - Audio Cluster {audio_cluster} – Text Cluster {text_cluster} – "{transcription}"'

    formatted_transcript += output_string + "\n"

formatted_transcript = formatted_transcript.rstrip("\n")
print(formatted_transcript)

In [None]:
content = (
    "Return in a list one speaker tag for each row in the following video transcript:\n"
    + formatted_transcript
)
print(content)

In [None]:
completion = openai.ChatCompletion.create(
    model="gpt-4", messages=[{"role": "user", "content": content}], temperature=0
)

speaker_string = completion.choices[0].message.content
speaker_string

In [None]:
speaker_list = ast.literal_eval(speaker_string)
encoder = LabelEncoder()

encoded_speakers = encoder.fit_transform(speaker_list)
df["speakers"] = encoded_speakers
df

In [None]:
current_speaker = None
current_row = None
new_rows = []

for index, row in df.iterrows():
    if row["speakers"] == current_speaker:
        current_row["texts"] += " " + row["texts"]
        current_row["ends"] = row["ends"]
        current_row["durations"] += row["durations"]
    else:
        if current_row is not None:
            new_rows.append(current_row)

        current_speaker = row["speakers"]
        current_row = row.copy()

if current_row is not None:
    new_rows.append(current_row)

diarized_df = pd.DataFrame(new_rows).reset_index(drop=True)
diarized_df

##### Translation
Reference: https://platform.openai.com/docs/api-reference/chat

In [None]:
content = (
    "Return an idiomatic English translation of the following video transcript:\n"
    + "\n".join(diarized_df.texts)
)
print(content)

In [None]:
completion = openai.ChatCompletion.create(
    model="gpt-4", messages=[{"role": "user", "content": content}], temperature=0
)

translation = completion.choices[0].message.content
print(translation)

In [None]:
diarized_df["translations"] = translation.split("\n")
diarized_df

##### Collect Reference Speech For StyleTTS

In [None]:
def extract_audio(audio_file_path, start_time, end_time):
    audio = AudioSegment.from_wav(audio_file_path)
    start_time = start_time * 1000
    end_time = end_time * 1000
    return audio[start_time:end_time]

In [None]:
speaker_audio_dict = {}

for index, row in diarized_df.iterrows():
    start = row["starts"]
    end = row["ends"]
    speaker = row["audio_clusters"]

    extracted_audio_segment = extract_audio(vocals_file_path, start, end)

    if speaker in speaker_audio_dict:
        speaker_audio_dict[speaker] += extracted_audio_segment
    else:
        speaker_audio_dict[speaker] = extracted_audio_segment

    reference_file_path = os.path.join(
        current_directory, f"Media/reference_{index}.wav"
    )
    extracted_audio_segment.export(reference_file_path)

for speaker, audio_segment in speaker_audio_dict.items():
    speaker_reference_file_path = os.path.join(
        current_directory, f"Media/speaker_reference_{speaker}.wav"
    )
    audio_segment.export(speaker_reference_file_path)

In [None]:
Audio(reference_file_path)

In [None]:
Audio(speaker_reference_file_path)

##### Setup Text-To-Speech Models
Reference Repo: https://github.com/yl4579/StyleTTS

Pre-trained StyleTTS and Hifi-GAN on LibriTTS

In [None]:
_pad = "$"
_punctuation = ';:,.!?¡¿—…"«»“” '
_letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
_letters_ipa = "ɑɐɒæɓʙβɔɕçɗɖðʤəɘɚɛɜɝɞɟʄɡɠɢʛɦɧħɥʜɨɪʝɭɬɫɮʟɱɯɰŋɳɲɴøɵɸθœɶʘɹɺɾɻʀʁɽʂʃʈʧʉʊʋⱱʌɣɤʍχʎʏʑʐʒʔʡʕʢǀǁǂǃˈˌːˑʼʴʰʱʲʷˠˤ˞↓↑→↗↘'̩'ᵻ"


symbols = [_pad] + list(_punctuation) + list(_letters) + list(_letters_ipa)

dicts = {}
for i in range(len((symbols))):
    dicts[symbols[i]] = i


class TextCleaner:
    def __init__(self, dummy=None):
        self.word_index_dictionary = dicts

    def __call__(self, text):
        indexes = []
        for char in text:
            try:
                indexes.append(self.word_index_dictionary[char])
            except KeyError:
                print(char)
        return indexes


textcleaner = TextCleaner()

In [None]:
to_mel = torchaudio.transforms.MelSpectrogram(
    n_mels=80, n_fft=2048, win_length=1200, hop_length=300
)
mean, std = -4, 4


def length_to_mask(lengths):
    mask = (
        torch.arange(lengths.max())
        .unsqueeze(0)
        .expand(lengths.shape[0], -1)
        .type_as(lengths)
    )
    mask = torch.gt(mask + 1, lengths.unsqueeze(1))
    return mask


def preprocess(wave):
    wave_tensor = torch.from_numpy(wave).float()
    mel_tensor = to_mel(wave_tensor)
    mel_tensor = (torch.log(1e-5 + mel_tensor.unsqueeze(0)) - mean) / std
    return mel_tensor


def compute_style(ref_dicts):
    reference_embeddings = {}
    for key, path in ref_dicts.items():
        wave, sr = librosa.load(path, sr=24000)
        audio, index = librosa.effects.trim(wave, top_db=30)
        if sr != 24000:
            audio = librosa.resample(audio, sr, 24000)
        mel_tensor = preprocess(audio).to(device)
        try:
            with torch.no_grad():
                ref = model.style_encoder(mel_tensor.unsqueeze(1))
            reference_embeddings[key] = (ref.squeeze(1), audio)
        except:
            continue

    return reference_embeddings

In [None]:
global_phonemizer = phonemizer.backend.EspeakBackend(
    language="en-us", preserve_punctuation=True, with_stress=True
)

In [None]:
h = None


def load_checkpoint(filepath, device):
    assert os.path.isfile(filepath)
    print("Loading '{}'".format(filepath))
    checkpoint_dict = torch.load(filepath, map_location=device)
    print("Complete.")
    return checkpoint_dict


def scan_checkpoint(cp_dir, prefix):
    pattern = os.path.join(cp_dir, prefix + "*")
    cp_list = glob.glob(pattern)
    if len(cp_list) == 0:
        return ""
    return sorted(cp_list)[-1]


cp_g = scan_checkpoint("Vocoder/LibriTTS/", "g_")

config_file = os.path.join(os.path.split(cp_g)[0], "config.json")
with open(config_file) as f:
    data = f.read()
json_config = json.loads(data)
h = AttrDict(json_config)

device = torch.device(device)
generator = Generator(h).to(device)

state_dict_g = load_checkpoint(cp_g, device)
generator.load_state_dict(state_dict_g["generator"])
generator.eval()
generator.remove_weight_norm()

In [None]:
model_path = "./Models/LibriTTS/epoch_2nd_00050.pth"
model_config_path = "./Models/LibriTTS/config.yml"

config = yaml.safe_load(open(model_config_path))

ASR_config = config.get("ASR_config", False)
ASR_path = config.get("ASR_path", False)
text_aligner = load_ASR_models(ASR_path, ASR_config)

F0_path = config.get("F0_path", False)
pitch_extractor = load_F0_models(F0_path)

model = build_model(Munch(config["model_params"]), text_aligner, pitch_extractor)

params = torch.load(model_path, map_location="cpu")
params = params["net"]
for key in model:
    if key in params:
        if not "discriminator" in key:
            print("%s loaded" % key)
            model[key].load_state_dict(params[key])
_ = [model[key].eval() for key in model]
_ = [model[key].to(device) for key in model]

##### Synthesize Speech
Reference Repo: https://github.com/yl4579/StyleTTS

In [None]:
def synthesize_speech(
    text,
    style_reference,
    speaker_reference,
    audio_reference,
    output_filename,
    target_duration,
):
    if target_duration < 1:
        ref_dicts = {"single_speaker": speaker_reference}
    else:
        ref_dicts = {"single_speaker": style_reference}

    reference_embeddings = compute_style(ref_dicts)

    if len(reference_embeddings) == 0:
        ref_dicts = {"single_speaker": audio_reference}
        reference_embeddings = compute_style(ref_dicts)

    ref, _ = reference_embeddings["single_speaker"]
    s = ref.squeeze(1)
    style = s

    ps = global_phonemizer.phonemize([text])
    tokens = textcleaner(ps[0])
    tokens.insert(0, 0)
    tokens.append(0)
    tokens = torch.LongTensor(tokens).to(device).unsqueeze(0)

    with torch.no_grad():
        input_lengths = torch.LongTensor([tokens.shape[-1]]).to(device)
        m = length_to_mask(input_lengths).to(device)
        t_en = model.text_encoder(tokens, input_lengths, m)
        d = model.predictor.text_encoder(t_en, style, input_lengths, m)

        x, _ = model.predictor.lstm(d)
        duration = model.predictor.duration_proj(x)
        pred_dur = torch.round(duration.squeeze()).clamp(min=1)

        target_frames = int((target_duration * 1000) / 12.5)
        scaling_factor = target_frames / pred_dur.sum().item() / 2
        scaled_pred_dur = (pred_dur * scaling_factor).round().clamp(min=1)

        pred_aln_trg = torch.zeros(input_lengths, int(scaled_pred_dur.sum().data))
        c_frame = 0
        for i in range(pred_aln_trg.size(0)):
            pred_aln_trg[i, c_frame : c_frame + int(scaled_pred_dur[i].data)] = 1
            c_frame += int(scaled_pred_dur[i].data)

        en = d.transpose(-1, -2) @ pred_aln_trg.unsqueeze(0).to(device)
        F0_pred, N_pred = model.predictor.F0Ntrain(en, s)

        out = model.decoder(
            (t_en @ pred_aln_trg.unsqueeze(0).to(device)),
            F0_pred,
            N_pred,
            ref.squeeze().unsqueeze(0),
        )
        c = out.squeeze()
        y_g_hat = generator(c.unsqueeze(0))
        y_out = y_g_hat.squeeze().cpu().numpy()

        sf.write(output_filename, y_out, 24000)

In [None]:
for i, row in tqdm(diarized_df.iterrows(), total=len(diarized_df)):
    translation = row["translations"]
    duration = row["durations"]
    speaker = row["audio_clusters"]

    style_reference = os.path.join(current_directory, f"Media/reference_{i}.wav")
    speaker_reference = os.path.join(
        current_directory, f"Media/speaker_reference_{speaker}.wav"
    )
    output_filename = os.path.join(current_directory, f"Media/speech_{i}.wav")

    synthesize_speech(
        translation,
        style_reference,
        speaker_reference,
        vocals_file_path,
        output_filename,
        duration,
    )

In [None]:
Audio(f"Media/speech_{0}.wav")

##### Add Generated Speech To Background Audio

In [None]:
voice_files = [
    {"file": f"Media/speech_{i}.wav", "start_time": diarized_df.starts[i] * 1000}
    for i in range(diarized_df.starts.shape[0])
]

background = AudioSegment.from_wav(background_audio_file_path)

for voice in voice_files:
    voice_audio = AudioSegment.from_wav(voice["file"])
    start_time = voice["start_time"]

    background = background.overlay(voice_audio, position=start_time)

final_audio_output_file_path = os.path.join(
    current_directory, "Media/final_audio_output.wav"
)
background.export(final_audio_output_file_path, format="wav")

In [None]:
Audio(final_audio_output_file_path)

##### Write Audio To Video

In [None]:
video_input_path = os.path.join(current_directory, "Media/video_clip.mp4")
audio_input_path = os.path.join(current_directory, "Media/final_audio_output.wav")
video_output_path = os.path.join(current_directory, "Media/final_video_output.mp4")

video = VideoFileClip(video_input_path)
new_audio = AudioFileClip(audio_input_path)

video = video.set_audio(new_audio)

video.write_videofile(video_output_path, codec="libx264", audio_codec="aac")

In [None]:
notebook_display_video_output_path = os.path.join(
    current_directory, "Media/notebook_display_video_output.mp4"
)
video.write_videofile(notebook_display_video_output_path)

In [None]:
Video(notebook_display_video_output_path)

##### Moving Forward

- use facial expressions
- facial recognition
- dig deeper Wav2Lip
- cleaner snippets
- better timestamps
- explore multimodal approaches
- generate metadata (number of speakers)
- better manage short audio segments
- ensembling
- facebook multimodal translation