<a href="https://colab.research.google.com/github/akshhack/TTS/blob/dev/DatasetGenerator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup installs and requirements



In [41]:
# requirements 
!pip3 install pytube youtube_transcript_api pydub pathlib uuid soundfile librosa

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [42]:

# python imports
import os, math, IPython, json, requests, shutil, enum, time, uuid, librosa
from IPython.display import Audio
from pathlib import Path

# pip imports
from pytube import YouTube
import soundfile as sf
from youtube_transcript_api import YouTubeTranscriptApi
from pydub import AudioSegment

# Youtube downloading

In [43]:
# youtube video downloader and .srt extractor
def download_yt_video_and_srt_file(video_id):
  try:
    print('Downloading video')
    yt = YouTube(f'https://www.youtube.com/watch?v={video_id}')
    ys = yt.streams.get_highest_resolution()
    ys.download()
    download_video_name = f'tmp.mp4'
    os.rename(ys.default_filename, download_video_name)
    print(f'Successfully downloaded file {download_video_name} to current directory')

    srt_list = YouTubeTranscriptApi.list_transcripts(video_id)
    srt = srt_list.find_transcript(['en', 'en-GB', 'en-US', 'en-IN'])

    if srt.is_generated: raise Exception('Transcription not accurate')
    print(f'Transcription is not generated: {srt.is_generated}')

    srt_dialogues = []
    for dialogue in srt.fetch():
      print(f'the dialogue is {dialogue}')
      srt_dialogues.append({
          'text': dialogue['text'].replace("\n", " "),
          'start': int(float(dialogue['start']) * 1000),
          'duration': int(float(dialogue['duration']) * 1000)
      })
    return srt_dialogues, None
  except Exception as e:
    print(f'Something went wrong. Error: {e}')
    return None, e

In [44]:
'''
Dolby background noise cleanup 
'''
DOLBY_APP_KEY = '3bHmxPox_SgvmflWCLesjw==' 
DOLBY_APP_SECRET = 'U7XSpthgyYvu3r3EXjblG1GrqC28PkpwCi0R3r8fbt4='

DOLBY_AUTH_TOKEN_URL = "https://api.dolby.io/v1/auth/token"
DOLBY_UPLOAD_URL = "https://api.dolby.com/media/input"
DOLBY_DOWNLOAD_URL = "https://api.dolby.com/media/output"
DOLBY_ENHANCE_URL = "https://api.dolby.com/media/enhance"

class DolbyJobStatuses(enum.Enum):
    PENDING = 'Pending'
    RUNNING = 'Running'
    SUCCESS = 'Success'
    FAILED = 'Failed'
    CANCELLED = 'Cancelled'
    INTERNAL_ERROR = 'InternalError'

logger_name = "[DOLBY]"

# get access token from keys.
def _generate_dolby_access_token():
    payload = { 'grant_type': 'client_credentials', 'expires_in': 1800 }
    response = requests.post(DOLBY_AUTH_TOKEN_URL, data=payload, auth=requests.auth.HTTPBasicAuth(DOLBY_APP_KEY, DOLBY_APP_SECRET))
    body = json.loads(response.content)
    return body['access_token']

# upload audio file to dolby temp store for processing 
# Set or replace these values
def _upload_to_dolby_temp_store(input_file_path: str, temp_dolby_path: str, access_token: str):
    # Declare dlb:// location
    headers = {
        "Authorization": "Bearer {0}".format(access_token),
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    body = {
        "url": temp_dolby_path,
    }

    response = requests.post(DOLBY_UPLOAD_URL, json=body, headers=headers)
    response.raise_for_status()
    data = response.json()
    presigned_url = data["url"]

    # Upload media
    print(f"{logger_name} Uploading {input_file_path} to {presigned_url}")
    with open(input_file_path, "rb") as input_file:
        requests.put(presigned_url, data=input_file)
    return None

def _download_file_from_dolby_temp_store(output_path: str, temp_dolby_path: str,  access_token: str):
    headers = {
        "Authorization": "Bearer {0}".format(access_token),
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    args = {
        "url": temp_dolby_path,
    }

    with requests.get(DOLBY_DOWNLOAD_URL, params=args, headers=headers, stream=True) as response:
        response.raise_for_status()
        response.raw.decode_content = True
        print(f"{logger_name} Downloading from {0} into {1}".format(response.url, output_path))
        # output_dir = get_file_directory(output_path)
        # create_directory_if_not_exist(output_dir)
        with open(output_path, "wb") as output_file:
            shutil.copyfileobj(response.raw, output_file)

def _check_job_status(job_URL: str, job_id: str, access_token: str) -> str:
    request_url = job_URL + f"?job_id={job_id}"
    print(f"{logger_name} Request URL for checking job status is {request_url}")

    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    response = requests.get(request_url, headers=headers)
    print(f"{logger_name} Response is {str(response.text)}")
    return response.json()['status']

def enhance_voice_from_dolby(input_file_path:str, output_path:str, access_token) -> str:
    payload = {
      "input" : input_file_path,
      "output": output_path,      # IT IS ALSO REQUIRED
      "content" : {
        "type": "voice_recording"
      },
      "audio": {
        "loudness": {
            "enable": True,
            "target_level": -18,
            "dialog_intelligence": True,
            "speech_threshold": 15,
            "peak_limit": -1,
            "peak_reference": "true_peak"
        },
        "dynamics": {"range_control": {
                "enable": True,
                "amount": "medium"
            }},
        "noise": {"reduction": {
                "enable": True,
                "amount": "high"
            }},
        "filter": {
            "dynamic_eq": {"enable": True},
            "high_pass": {
                "enable": True,
                "frequency": 80
            },
            "hum": {"enable": True}
        },
        "speech": {
            "isolation": {
                "enable": True,
                "amount": 100
            },
            "sibilance": {"reduction": {
                    "enable": True,
                    "amount": "medium"
                }},
            "plosive": {"reduction": {
                    "enable": True,
                    "amount": "medium"
                }},
            "click": {"reduction": {
                    "enable": False,
                    "amount": "medium"
                }}
        },
        "music": {"detection": {"enable": False}}
    }
    }

    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    response = requests.post(DOLBY_ENHANCE_URL, json=payload, headers=headers)
    print(f'Response is {response.json()}')
    return response.json()['job_id']

def get_enhanced_voice_from_dolby(input_file_path:str, output_path:str) -> str:

    access_token = _generate_dolby_access_token()

   # write input_path file to temporary storage in dolby (expires after 24 hours)
    input_file_name, input_file_name_extension = get_file_name_and_extension(input_file_path)
    temp_dolby_input_path = f"dlb://in/{input_file_name + input_file_name_extension}"
    print(f"{logger_name} Dolby path for input: {temp_dolby_input_path}")
    _upload_to_dolby_temp_store(input_file_path, temp_dolby_input_path, access_token)


    # process the uploaded input file using dolby analyze. 
    output_file_name, output_file_name_extension = get_file_name_and_extension(output_path)
    temp_dolby_output_path = f"dlb://out/{output_file_name + output_file_name_extension}"

    job_id = enhance_voice_from_dolby(temp_dolby_input_path, temp_dolby_output_path, access_token)

    # wait for analyze job to be finished (note that this is blocking)
    status = None 
    while (status == None or status == DolbyJobStatuses.PENDING.value or status == DolbyJobStatuses.RUNNING.value):
        time.sleep(1) # sleep for 1 second before checking again
        status =  _check_job_status(DOLBY_ENHANCE_URL, job_id, access_token)
        print(f"{logger_name} Status is {status}")
        if status == DolbyJobStatuses.FAILED.value or status == DolbyJobStatuses.CANCELLED.value or status == DolbyJobStatuses.INTERNAL_ERROR.value:
            return Exception(f"{logger_name} Something went wrong while processing the analyze request")
        elif status == DolbyJobStatuses.SUCCESS.value:
            break
        else:
            continue
    
    # now download the processed file and write 
    _download_file_from_dolby_temp_store(output_path, temp_dolby_output_path, access_token)
    return temp_dolby_output_path

def get_file_name_and_extension(file_path: str):
    """Returns the filename (without extension) and extension of file located at {file_path}

    Args:
        file_path (str): the file path of the file

    Returns:
        Tuple[str, str]: Returns a tuple of the file base name (without extension) and the file extension
    """
    path = Path(file_path)
    return path.with_suffix("").name, path.suffix

In [45]:
'''
Split clips 
'''
ROUND_SMOOTH_FACTOR = 200 # ms 
FADE_SMOOTH_FACTOR = 100 # ms
CLIP_LENGTH_FACTOR = 2

BASE_PATH='/content/drive/MyDrive/Camb.ai/Technology/Experiments/TTS/dataset/'

def round_down(x, base=ROUND_SMOOTH_FACTOR):
  return x - (x % base)
def round_up(x, base=ROUND_SMOOTH_FACTOR):
  return int(math.ceil(x / float(base))) * base

def split_audio_clips(video_id, srt_dialogues):
  audio = AudioSegment.from_file(f'tmp.mp4', "mp4")
  i = 0
  clip_num = 0
  dialogue_map = []
  while i < len(srt_dialogues) - CLIP_LENGTH_FACTOR:
    current_dialogue = srt_dialogues[i]
    next_dialogue = srt_dialogues[i+CLIP_LENGTH_FACTOR]

    dialogues = []
    j = i
    while j < i + CLIP_LENGTH_FACTOR: 
      dialogues.append(srt_dialogues[j]['text'])
      j += 1
    dialogue_text = ' '.join(dialogues).replace('\n', ' ')

    start = round_down(current_dialogue['start'], ROUND_SMOOTH_FACTOR)
    end = round_up(next_dialogue['start'], ROUND_SMOOTH_FACTOR)

    # save clipped
    uid = uuid.uuid4().hex
    audio_clip_output_file_name = f'{BASE_PATH}{uid}.wav'
    audio_clip = audio[start:end]
    audio_clip.export(audio_clip_output_file_name, format='wav')

    # trim silence using librosa
    y, sr = librosa.load(audio_clip_output_file_name)
    yt, index = librosa.effects.trim(y)
    sf.write(audio_clip_output_file_name, yt, 22050)

    # smooth out end and write back
    final_audio_clip = AudioSegment.from_file(audio_clip_output_file_name, "mp4")
    final_audio_clip = final_audio_clip.fade_in(FADE_SMOOTH_FACTOR).fade_out(FADE_SMOOTH_FACTOR)
    final_audio_clip.export(audio_clip_output_file_name, format='wav')

    dialogue_map.append({
        'uid': uid,
        'text': dialogue_text
    })

    i += CLIP_LENGTH_FACTOR
    clip_num += 1

  return dialogue_map

In [46]:
def dataset_generator(youtube_ids_file, output_metadata_file):
  dialogues = []
  with open(youtube_ids_file, 'r') as f:
    for id in f.readlines():
      try:
        # get youtube link downloaded
        srt_dialogues, err = download_yt_video_and_srt_file(id)
        if err is not None: raise err

        # remove background noise
        get_enhanced_voice_from_dolby("tmp.mp4", "tmp.mp4")
        
        # split and write the audio clips 
        dialogue_map = split_audio_clips(id, srt_dialogues)

        '''
        {
          'uid': uid,
          'text': dialogue_text
        }
        '''
        for entry in dialogue_map:
          print(entry)
          dialogues.append(f"{entry['uid']}|{entry['text']}|{entry['text'].lower()}\n")
      except Exception as e:
        print(f'Failed to process id {id} because {e}')
  
  with open(output_metadata_file, 'w') as f:
    f.writelines(dialogues)

In [47]:
youtube_ids_file = f'{BASE_PATH}/youtube_ids.txt'
output_metadata_file = f'{BASE_PATH}/metadata.csv'

In [None]:
dataset_generator(youtube_ids_file, output_metadata_file)

Downloading video
Successfully downloaded file tmp.mp4 to current directory
Transcription is not generated: False
the dialogue is {'text': 'Namaskar\nYou listen to a song early in the morning', 'start': 0.7, 'duration': 5.009}
the dialogue is {'text': 'and that plays in your head throughout the\nday. That is the power of morning. How you', 'start': 5.709, 'duration': 7.251}
the dialogue is {'text': 'are going to end the day totally depends upon\nhow you start your day, but most of us begin', 'start': 12.96, 'duration': 8.59}
the dialogue is {'text': 'the day totally in a wrong way. Here are a\nfew common morning mistakes that people do.', 'start': 21.55, 'duration': 7.49}
the dialogue is {'text': 'First, staying in dark. Do not stay in dark\nfor long once you wake up, slide open your', 'start': 29.04, 'duration': 9.07}
the dialogue is {'text': 'curtains and let the light fill your room.\nYour biological clock is light sensitive.', 'start': 38.11, 'duration': 7.59}
the dialogue is {'tex