<a href="https://colab.research.google.com/github/10udCryp7/TV-command-synthesis/blob/main/src_prototype/Phase3_ForceAlighment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!gdown 1MrkteyPMIhLgki82_qJ_A9JDOIme3za3
!unzip -q 100-samples.zip

In [None]:
!gdown 121dcbCvDVzB22uelCeYkg4Fs8TVbQ1tp

In [None]:
import pandas as pd

data = pd.read_csv("100-samples.csv")

In [None]:
!pip install -q -U openai-whisper

In [None]:
import json
import os
import itertools
import whisper
from pydub import AudioSegment
import numpy as np
import re
from difflib import SequenceMatcher
import ast
import pandas as pd


class SpeechCleaning:
    def __init__(self, cant_clean_list = None):
        self.model = whisper.load_model("medium")
        if not cant_clean_list:
            self.cant_clean_list = []
        else:
            self.cant_clean_list = cant_clean_list
    def clean_pipeline(self, export_dir: str, data: pd.DataFrame, speech_folder: str, padding = None):
        os.makedirs(export_dir, exist_ok = True)
        for idx, row in data.iterrows():

            file_name = f"{row['type']}_{row['id']}"
            file_type = "full"

            start, end = self.get_clean_range(file_name = file_name, file_type = file_type, padding = padding, speech_folder = speech_folder)
            trimmed_audio = self.trim_audio(file_name = file_name, file_type = file_type,
                                                      start = start, end = end, speech_folder = speech_folder)

            os.makedirs(os.path.join(export_dir, file_name), exist_ok = True)
            out_path = os.path.join(export_dir, file_name, f"{file_name}_{file_type}_trimmed.wav")
            trimmed_audio.export(out_path, format="wav")

            if row['type'] in ['single_mix', 'chain_mix']:

               for i in range(row['num_segments']):
                   file_type = f"seg_{i}"
                   start, end = self.get_clean_range(file_name = file_name, file_type = file_type, padding = padding, speech_folder = speech_folder)
                   trimmed_audio = self.trim_audio(file_name = file_name, file_type = file_type,
                                                              start = start, end = end, speech_folder = speech_folder)

                   os.makedirs(os.path.join(export_dir, file_name), exist_ok = True)
                   out_path = os.path.join(export_dir, file_name, f"{file_name}_{file_type}_trimmed.wav")
                   trimmed_audio.export(out_path, format="wav")


    def get_clean_range(self, file_name: str, file_type: str, speech_folder: str, padding = None):
        # prepare whisper words
        file_path = os.path.join(speech_folder, file_name, f"{file_name}_{file_type}.wav")

        transcribe = self.model.transcribe(file_path, word_timestamps=True)
        try:
            whisper_words = [speaker['words'] for speaker in transcribe['segments']]

            whisper_words = list(itertools.chain.from_iterable(whisper_words))
        except Exception as e:
            print(file_name + " " + file_type + " " + 'cant transcribe')
            self.cant_clean_list.append((file_name, file_type, "cant transcribe"))
            return None, None

        # prepare reference text
        meta_path = os.path.join(speech_folder, file_name, f"{file_name}.json")

        with open(meta_path, "r", encoding = "utf-8") as f:
            data = json.load(f)

        if file_type == "full":
            ref_text = data['command']
        else:
            num_seg = file_type.split("_")[1]
            ref_text = data['text_segments'][num_seg]

        try:
            start, end = self.get_start_end_from_alignment(ref_text = ref_text,
                                                  whisper_words = whisper_words)
            if padding:
                start = max(0, start - padding)
                end = end + padding
        except Exception as e:
            # Problems with long synthesis, should use only with segment
            print(file_name + " " + file_type + " " + 'cant get start end')
            print(e)
            print(f'ref_text: {ref_text}')
            print(f'whisper_words: {whisper_words}')
            self.cant_clean_list.append((file_name, file_type, "cant get start end"))
            return None, None
        return start, end

    def trim_audio(self, file_name, file_type, start, end, speech_folder):
        file_path = os.path.join(speech_folder, file_name, f"{file_name}_{file_type}.wav")

        audio = AudioSegment.from_wav(file_path)
        if start and end:

            trimmed_audio = audio[start*1000 : end*1000]
            return trimmed_audio
        else:
            return audio

    def get_start_end_from_alignment(self, ref_text, whisper_words):
        ref_words = self.normalize_text(ref_text).split()
        hypo_words = [self.normalize_text(w['word']) for w in whisper_words]

        start_idx, end_idx = self.smith_waterman_fuzzy(ref_words, hypo_words)
        start_time = whisper_words[start_idx]['start']
        end_time = whisper_words[end_idx]['end']

        return start_time, end_time

    def smith_waterman_fuzzy(self, ref_words, hypo_words, match_score=2, fuzzy_score=1, mismatch=-1, gap=-2):
        m, n = len(ref_words), len(hypo_words)
        score = np.zeros((m+1, n+1))
        max_score = 0
        max_pos = None

        for i in range(1, m+1):
            for j in range(1, n+1):
                sim = self.word_similarity(ref_words[i-1], hypo_words[j-1])
                if sim == 1:
                    s = match_score
                elif sim >= 0.8:
                    s = fuzzy_score
                else:
                    s = mismatch

                diag = score[i-1, j-1] + s
                delete = score[i-1, j] + gap
                insert = score[i, j-1] + gap
                score[i, j] = max(0, diag, delete, insert)

                if score[i, j] > max_score:
                    max_score = score[i, j]
                    max_pos = (i, j)

        # Traceback
        i, j = max_pos
        end_j = j - 1
        while i > 0 and j > 0 and score[i, j] > 0:
            sim = self.word_similarity(ref_words[i-1], hypo_words[j-1])
            if sim >= 0.8:
                i -= 1
                j -= 1
            elif score[i-1, j] + gap == score[i, j]:
                i -= 1
            else:
                j -= 1
        start_j = j

        return start_j, end_j

    def str2list(self, list_str: str):
        return ast.literal_eval(list_str)

    def word_similarity(self, w1, w2):
        return SequenceMatcher(None, w1, w2).ratio()

    def normalize_text(self, text):
        return re.sub(r'[^\w\s]', '', text.lower()).strip()



In [None]:
sc = SpeechCleaning()

In [None]:
df = pd.read_csv("/kaggle/working/100-samples.csv", index_col = 0)

In [None]:
sc.clean_pipeline(export_dir = 'trimmed_speech', speech_folder = '/kaggle/working/synthesis_command', data = df, padding = 0.3)

single_active_4ff7f40d full cant get start end
cannot unpack non-iterable NoneType object
ref_text: show me Cartoon Network
whisper_words: [{'word': ' What', 'start': 0.44000000000000006, 'end': 0.88, 'probability': 0.2911490797996521}, {'word': ' a', 'start': 0.88, 'end': 1.08, 'probability': 0.19421282410621643}, {'word': ' pleasant', 'start': 1.08, 'end': 1.3, 'probability': 0.23018227517604828}, {'word': ' tome,', 'start': 1.3, 'end': 1.7, 'probability': 0.5129103735089302}, {'word': " you're", 'start': 1.76, 'end': 1.94, 'probability': 0.4445706903934479}, {'word': ' a', 'start': 1.94, 'end': 2.0, 'probability': 0.19106259942054749}, {'word': ' first', 'start': 2.0, 'end': 2.26, 'probability': 0.5872543454170227}, {'word': '-gen', 'start': 2.26, 'end': 2.72, 'probability': 0.4452144652605057}, {'word': ' dab,', 'start': 2.72, 'end': 3.14, 'probability': 0.5577821135520935}, {'word': ' and', 'start': 3.46, 'end': 3.64, 'probability': 0.920242965221405}, {'word': ' here', 'start': 3

In [None]:
sc.cant