## Notebook 1 - data loading and pre-processing


**This is an interactive demo for my diploma thesis - a TTS system wich includes the following capabilities:**

1) Voice cloning, based on audio recorded by the user within this notebook

2) Voice anonymization, where the textual information from the recording is kept, but the speaker's identity is not

3) Classical TTS



---

## Setup - run the following instructions

### Install necessary libraries

In [None]:
!pip install openai-whisper

Collecting openai-whisper
  Using cached openai_whisper-20231117-py3-none-any.whl
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch->openai-whisper)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cusolver-cu12==11.4.5.107 (from torch->openai-whisper)
  Using cached nvidia_cusolver_cu12-11.4.5.107-py3-none-manylinux1_x86_64.whl (124.2 MB)
Installing collected packages: nvidia-cudnn-cu12, nvidia-cusolver-cu12, openai-whisper
Successfully installed nvidia-cudnn-cu12-8.9.2.26 nvidia-cusolver-cu12-11.4.5.107 openai-whisper-20231117


In [None]:
!pip install denoiser

Collecting denoiser
  Using cached denoiser-0.1.5.tar.gz (49 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting julius (from denoiser)
  Using cached julius-0.2.7.tar.gz (59 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting hydra_core<1.0 (from denoiser)
  Using cached hydra_core-0.11.3-py3-none-any.whl (72 kB)
Collecting hydra_colorlog<1.0 (from denoiser)
  Using cached hydra_colorlog-0.1.4-py3-none-any.whl (4.0 kB)
Collecting pystoi>=0.3.3 (from denoiser)
  Using cached pystoi-0.4.1-py2.py3-none-any.whl (8.2 kB)
Collecting sounddevice>=0.4 (from denoiser)
  Using cached sounddevice-0.4.7-py3-none-any.whl (32 kB)
Collecting colorlog (from hydra_colorlog<1.0->denoiser)
  Using cached colorlog-6.8.2-py3-none-any.whl (11 kB)
Collecting omegaconf<1.5,>=1.4 (from hydra_core<1.0->denoiser)
  Using cached omegaconf-1.4.1-py3-none-any.whl (14 kB)
Building wheels for collected packages: denoiser, julius
  Building wheel for denoiser (setup.py) ... [?25l[?25

In [None]:
!pip install nemo_toolkit[all]

Collecting nemo_toolkit[all]
  Using cached nemo_toolkit-1.23.0-py3-none-any.whl (3.2 MB)
Collecting onnx>=1.7.0 (from nemo_toolkit[all])
  Using cached onnx-1.16.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (15.9 MB)
Collecting ruamel.yaml (from nemo_toolkit[all])
  Using cached ruamel.yaml-0.18.6-py3-none-any.whl (117 kB)
Collecting wget (from nemo_toolkit[all])
  Using cached wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting black==19.10b0 (from nemo_toolkit[all])
  Using cached black-19.10b0-py36-none-any.whl (97 kB)
Collecting click==8.0.2 (from nemo_toolkit[all])
  Using cached click-8.0.2-py3-none-any.whl (97 kB)
Collecting isort<6.0.0,>5.1.0 (from nemo_toolkit[all])
  Using cached isort-5.13.2-py3-none-any.whl (92 kB)
Collecting parameterized (from nemo_toolkit[all])
  Using cached parameterized-0.9.0-py2.py3-none-any.whl (20 kB)
Collecting pytest-runner (from nemo_toolkit[all])
  Using cached pytest_runner-6.0.1-py3-none-any.w



---



###By accessing the link to the Google Drive directory, you have now Shared Access to the 'demo_licenta' folder. Right click on it, and from the "organize" option add a shortcut in MyDrive. The final path to demo_licenta should be, to any user, /content/drive/MyDrive/demo_licenta/.

*Otherwise, modify the second cell:*

Mount to drive and import pre-trained models' libraries

In [None]:
import os, sys
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
root_path = '/content/drive/MyDrive/demo_licenta/' # or any other path to the shortcut
home_path = os.path.join(root_path, 'licenta')
os.chdir(home_path)

In [None]:
import denoiser, whisper, nemo

### Fill in with your information:

In [None]:
spk_id = "RT_slow"
txt_tb_gen = os.path.join(home_path, "./New Text Document.txt")
if not os.path.exists(txt_tb_gen):
  print("text file cannot be detected")
else:
  print("Successful loading of the text file")

Successful loading of the text file


## Recording audio for voice cloning / voice anonymization

This section allows the user to record its own reference samples, or to load them directly into the drive directory. If you choose to record within Google Colab, a minimum of 5 recordings is necessary, around 6s/utterance.

### Live recorder code

In [None]:
from io import BytesIO
from base64 import b64decode
from google.colab import output
from IPython.display import Javascript
import shutil

RECORD = """
const sleep  = time => new Promise(resolve => setTimeout(resolve, time))
const b2text = blob => new Promise(resolve => {
  const reader = new FileReader()
  reader.onloadend = e => resolve(e.srcElement.result)
  reader.readAsDataURL(blob)
})
var record = time => new Promise(async resolve => {
  stream = await navigator.mediaDevices.getUserMedia({ audio: true })
  recorder = new MediaRecorder(stream)
  chunks = []
  recorder.ondataavailable = e => chunks.push(e.data)
  recorder.start()
  await sleep(time)
  recorder.onstop = async ()=>{
    blob = new Blob(chunks)
    text = await b2text(blob)
    resolve(text)
  }
  recorder.stop()
})
"""

def record(sec, audio_path):
  print("Speak now...")
  display(Javascript(RECORD))
  s = output.eval_js('record(%d)' % (sec*1000))
  print("Done Recording !")
  b = b64decode(s.split(',')[1])

  with open(audio_path,'wb') as f:
    f.write(b)
  return b #byte stream


#########################################################################################################
##### Live recorder from: https://gist.github.com/korakot/c21c3476c024ad6d56d5f48b0bca92be ##############
#########################################################################################################

###**Two possible options:**

1. Using 5 pre-recorded audio files - fill in the path to those files

2. Record minimum 5 audios within this notebook - the path to the audio files will be internally managed, no need to specify it

Choose your option in the next cell and name your speaker id (**do not use '_'**):

In [None]:
audiopath_option = 1
audio_dir = "/content/drive/MyDrive/demo_licenta/licenta/audios_pre-recorded/RT_slow"

#### For option 1 only:

If you chose option 1 (using pre-recorded files, do not extend the hidden cells from Option 2 and run the following cell only).

In [None]:
# OPTION 1:
if audiopath_option == 1:
  if os.path.exists(audio_dir) == False:
    print("Unavailable location: the pre-recorded audios path does not exist")
  else:
    # we will rename these files to match a pattern followed by option 2 and the rest of the code:
    num = 1
    for file in os.listdir(audio_dir):
      if '.' in file:
        file_ext = file.split(".")[1]
        os.rename(os.path.join(audio_dir, file), os.path.join(audio_dir, f"{spk_id}_{num:03}.{file_ext}"))
        num += 1
    print("Available location: the following files will be used:")
    print(os.listdir(audio_dir))

Available location: the following files will be used:
['RT_slow_001.m4a', 'RT_slow_002.m4a', 'RT_slow_003.m4a', 'RT_slow_004.m4a', 'RT_slow_005.m4a']


#### For option 2 only:

In [None]:
# OPTION 2
if audiopath_option == 2:
  audio_dir = os.path.join(home_path, "audio_wavs")
  if os.path.exists(audio_dir) == False:
    os.mkdir(audio_dir)

**Once you run the following cell, your browser will ask for microphone access permision. After that's granted, you will have to speak for 6 seconds, after running each of the 4 blocks.**

**I recommend reading the suggested phrases for each recording block (or select some words if 6 seconds is not enough - do not hurry and talk in your own rythm):**


___

*Ro:*  Primele sale lucrări tratează un viitor apropiat sumbru

In [None]:
# record sample 1
for_seconds = 7
num = 1

audio_path = os.path.join(audio_dir, f"{spk_id}_{num:03}.wav")
audio = record(for_seconds, audio_path)

Speak now...


<IPython.core.display.Javascript object>

Done Recording !


Listen to your recording - you might want to give it another try (re-run the previous cell) in case of poor quality / low volume / filled pause.

In [None]:
from IPython.display import Audio
Audio(audio)

*If you are pleased with the first recording, repeat this process 4 more times. Around five recordings of 5-6 seconds each proved to be just enough to capture your voice and speaking style.*

*Ro:*  Povestirile i-au fost publicate în revistele populare de literatură

In [None]:
# record sample 2
num += 1
audio_path = os.path.join(audio_dir, f"{spk_id}_rec{num}.wav")
audio = record(for_seconds, audio_path)
print('\n')

from IPython.display import Audio
Audio(audio)

Speak now...


<IPython.core.display.Javascript object>

Done Recording !
____


*Ro:*  Pentru că familia lui s-a mutat foarte des în timpul copilăriei sale

In [None]:
# record sample 3
num += 1
audio_path = os.path.join(audio_dir, f"{spk_id}_rec{num}.wav")
audio = record(for_seconds, audio_path)
print('\n')

from IPython.display import Audio
Audio(audio)

Speak now...


<IPython.core.display.Javascript object>

Done Recording !
____


*Ro:*  După ce și-a petrecut adolescența într-un internat privat

In [None]:
# record sample 4
num += 1
audio_path = os.path.join(audio_dir, f"{spk_id}_rec{num}.wav")
audio = record(for_seconds, audio_path)
print('\n')

from IPython.display import Audio
Audio(audio)

Speak now...


<IPython.core.display.Javascript object>

Done Recording !
____


*Ro:* a devenit un adolescent timid și izolat, care prefera să citească

In [None]:
# record sample 5
num += 1
audio_path = os.path.join(audio_dir, f"{spk_id}_rec{num}.wav")
audio = record(for_seconds, audio_path)
print('\n')

from IPython.display import Audio
Audio(audio)

Speak now...


<IPython.core.display.Javascript object>

Done Recording !
____


## Denoise the recorded data with facebook denoiser

In [None]:
output_denoiser = os.path.join(home_path, f"denoised/denoised_{spk_id}")

if not os.path.exists(output_denoiser):
  os.mkdir(output_denoiser)

print(f"Results can be found in: {output_denoiser}")

Results can be found in: /content/drive/MyDrive/demo_licenta/licenta/denoised/denoised_RT_slow


In [None]:
import torch
import torchaudio
from pydub import AudioSegment # used it instead of torchaudio because soundfile (used by torchaudio backend) does not support m4a format
from denoiser import pretrained
from denoiser.dsp import convert_audio
import soundfile

model_denoiser = pretrained.dns64().cuda()

Downloading: "https://dl.fbaipublicfiles.com/adiyoss/denoiser/dns64-a7761ff99a7d5bb6.th" to /root/.cache/torch/hub/checkpoints/dns64-a7761ff99a7d5bb6.th
100%|██████████| 128M/128M [00:01<00:00, 111MB/s]


In [None]:
#DENOISING + RESAMPLNING to 16kHz + RESAMPLNING TO 22050 Hz
import warnings
warnings.filterwarnings('ignore')

input_path = audio_dir
output_path = output_denoiser

if not os.path.exists(output_denoiser):
  os.mkdir(output_denoiser)

output_16_path = os.path.join(output_path, "wavs16") # will be used for embedding extraction
output_22_path = os.path.join(output_path, "wavs22") # will be used for during the training process (mel and pitch extraction, also word transcript for WER)

if not os.path.exists(output_16_path):
    os.mkdir(output_16_path)
if not os.path.exists(output_22_path):
    os.mkdir(output_22_path)

for file in os.listdir(input_path):

    input_file = os.path.join(input_path, file)
    file_name, file_extension = os.path.splitext(file)

    output_16 = os.path.join(output_16_path, file_name+"_16.wav")
    output_22 = os.path.join(output_22_path, file_name+"_22.wav")


    supported_formats = soundfile.available_formats()
    if len(file_extension):
        if not file_extension in supported_formats:
            audio = AudioSegment.from_file(input_file)

            audio.export(os.path.join(output_path, file_name+"temp.wav"), format="wav")
            wav, sr = torchaudio.load(os.path.join(output_path, file_name+"temp.wav"))
        else:
            wav, sr = torchaudio.load(input_file)

        wav_16 = convert_audio(wav.cuda(), sr, model_denoiser.sample_rate, model_denoiser.chin) # model.sample_rate=16k, model.chin = nr. of input channels = 1
        wav_22 = convert_audio(wav.cuda(), sr, 22050, model_denoiser.chin)

        with torch.no_grad():
            denoised_16 = model_denoiser(wav_16[None])[0]
            denoised_22 = model_denoiser(wav_22[None])[0]

        torchaudio.save(output_16, denoised_16.cpu(), model_denoiser.sample_rate, model_denoiser.chin, format="wav")
        torchaudio.save(output_22, denoised_22.cpu(), 22050, model_denoiser.chin, format="wav")

for file in os.listdir(output_path):
    if "temp" in file:
        os.remove(os.path.join(output_path, file))

print("Denoising -- done")
if os.path.exists(output_path):
  print(f"Results can be found in: {output_path}")
else:
  print("Error - the output directory was not created. This can be a false error, consequence of delayed command-actions in Google Colab")

Denoising -- done
Results can be found in: /content/drive/MyDrive/demo_licenta/licenta/denoised/denoised_RT_slow


In [None]:
del model_denoiser

## Transcript

In [None]:
#load pre-downloaded Whisper model (large) from drive

import torch
whisper_model_path = os.path.join(root_path, "whisper_large")
transcript_model = whisper.load_model(os.path.join(whisper_model_path, "large-v3.pt")) ##

In [None]:
from_transcript_dir = output_22_path #or the original file: audio_dir
text_transcript = os.path.join(output_denoiser, f"whisper_transcript_{spk_id}.txt")
nl = '\n'

def get_transcript_file(txt_path, audios_dir):
  with open(txt_path, "w") as file_writer:
    for file in os.listdir(audios_dir):
      input_file = os.path.join(audios_dir, file)
      temp_result = transcript_model.transcribe(input_file)
      text = temp_result["text"]
      file_writer.write(f"{file}/{text}{nl}")

get_transcript_file(text_transcript, from_transcript_dir)

In [None]:
del transcript_model

## Embedding extraction

If any hydra-related errors are returned, make sure the

**!pip install nemo-toolkit[all]**

cell, from the begining of the notebook, was properly run

In [None]:
from tqdm import tqdm
import nemo.collections.asr as nemo_asr
import numpy as np
model_nemo = nemo_asr.models.EncDecSpeakerLabelModel.from_pretrained(model_name='titanet_large')

[NeMo I 2024-07-16 23:10:05 cloud:68] Downloading from: https://api.ngc.nvidia.com/v2/models/nvidia/nemo/titanet_large/versions/v1/files/titanet-l.nemo to /root/.cache/torch/NeMo/NeMo_1.23.0/titanet-l/11ba0924fdf87c049e339adbf6899d48/titanet-l.nemo
[NeMo I 2024-07-16 23:10:06 common:924] Instantiating model from pre-trained checkpoint


[NeMo W 2024-07-16 23:10:06 modelPT:165] If you intend to do training or fine-tuning, please call the ModelPT.setup_training_data() method and provide a valid configuration file to setup the train data loader.
    Train config : 
    manifest_filepath: /manifests/combined_fisher_swbd_voxceleb12_librispeech/train.json
    sample_rate: 16000
    labels: null
    batch_size: 64
    shuffle: true
    is_tarred: false
    tarred_audio_filepaths: null
    tarred_shard_strategy: scatter
    augmentor:
      noise:
        manifest_path: /manifests/noise/rir_noise_manifest.json
        prob: 0.5
        min_snr_db: 0
        max_snr_db: 15
      speed:
        prob: 0.5
        sr: 16000
        resample_type: kaiser_fast
        min_speed_rate: 0.95
        max_speed_rate: 1.05
    num_workers: 15
    pin_memory: true
    
[NeMo W 2024-07-16 23:10:06 modelPT:172] If you intend to do validation, please call the ModelPT.setup_validation_data() or ModelPT.setup_multiple_validation_data() method 

[NeMo I 2024-07-16 23:10:06 features:289] PADDING: 16
[NeMo I 2024-07-16 23:10:08 save_restore_connector:249] Model EncDecSpeakerLabelModel was successfully restored from /root/.cache/torch/NeMo/NeMo_1.23.0/titanet-l/11ba0924fdf87c049e339adbf6899d48/titanet-l.nemo.


In [None]:
def extract_emb(wdir=output_16_path, odir=""):
  if odir == "":
    odir = os.path.join(wdir, "embs") # os.path.join(output_denoiser, "embs")

  if not os.path.exists(odir):
      os.makedirs(odir)

  for fi in tqdm(sorted(os.listdir(wdir))):
      if 'wav' in fi:
          embedding1 = model_nemo.get_embedding(os.path.join(wdir, fi))
          np.save(os.path.join(odir, fi.replace('wav', 'npy')), embedding1.detach().cpu().numpy())
  return odir

In [None]:
emb_dir = extract_emb(output_16_path)
print('\n',f"Embedding files can be found at: {emb_dir}")

100%|██████████| 6/6 [00:00<00:00,  7.36it/s]


 Embedding files can be found at: /content/drive/MyDrive/demo_licenta/licenta/denoised/denoised_RT_slow/wavs16/embs





### Adapting the extracted embeddings to match the size expected by the model:

In [None]:
import numpy as np

norm_emb_file = os.path.join(emb_dir, f"{spk_id}_18x384.npy")
orig_emb_file = os.path.join(emb_dir, f"{spk_id}_1x192.npy")

def norm_adapt_embs(emb_dir, norm_emb_file, orig_emb_file):
  speaker_files = {}
  speaker_avg = {}

  for emb_file in sorted(os.listdir(emb_dir)):
      current_spk = emb_file.split('_')[0]
      if current_spk in speaker_files:
          speaker_files[current_spk].append(emb_file)
      else:
          speaker_files[current_spk] = [emb_file]

  print("Number of speakers: ", len(speaker_files.keys()))

  for speaker in list(speaker_files.keys()):
      all_emb = []

      for emb_npy_file in speaker_files[speaker]:
          emb = np.load(os.path.join(emb_dir, emb_npy_file))
          emb_list = sum(emb.tolist(), [])
          all_emb.append(emb_list)

      all_emb = np.array(all_emb)
      np_avg = np.average(all_emb, axis=0)

      # normalize to [-1, 1] each embedding element:
      npy_norm = [((x-min(np_avg)) / (max(np_avg)-min(np_avg)))*2 - 1 for x in np_avg]
      speaker_avg[speaker] = npy_norm

  output = []
  for speaker in list(speaker_files.keys()):
      output.append(speaker_avg[speaker])

  output_np = np.array(output)
  print(output_np.shape)
  np.save(orig_emb_file, output_np)

  new_emb = []
  for sub_array in output:
      temp = sub_array + sub_array
      new_emb.append(temp)

  new_emb_18 = np.tile(new_emb, (18, 1))
  print(new_emb_18.shape)
  np.save(norm_emb_file, new_emb_18) # shape 18 x 384

In [None]:
norm_adapt_embs(emb_dir, norm_emb_file, orig_emb_file)

Number of speakers:  1
(1, 192)
(18, 384)


## Characters to phoneme representation - text pre-processing
Using publicly available pre-processor from https://github.com/adrianastan/rolex

In [None]:
# contents of main.py:
os.chdir(os.path.join(home_path, "text_processor/"))

import torch
from tp.model import Seq2SeqTransformer, PositionalEncoding, TokenEmbedding
from tp.tp import TextProcessing
import sys, os

# DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
DEVICE = 'cpu'

def process_text(inText, inFile):
    tp = TextProcessing()
    if DEVICE == 'cpu':
        tsf = torch.load("./models/transformer.pt",  map_location=torch.device('cpu'))
    else:
        tsf = torch.load("./models/transformer.pt")

    print(f"Input text: {inText}")

    transcribed = tp.process(tsf, inText)

    print('-'*50)
    print("PROCESSED TEXT: \n")#, transcribed)

    wav_name = []
    with open(inFile, "r", encoding="utf-8") as f_wav:
      for line in f_wav:
        wav_name.append(line.split('/')[0])
    print(f"Number of WAV names: {len(wav_name)}")
    print(f"Number of transcribed phrases: {len(transcribed)}")

    counter = 0
    with open(os.path.join(os.getcwd(), "processed_text.txt"), "w", encoding="utf-8") as file_w:
      for phrase in transcribed:
        if counter < len(wav_name):
                file_w.write(f"{wav_name[counter]}|")
        else:
            print(f"Error: Counter {counter} exceeds WAV name list length")
            break
        #file_w.write(f"{wav_name[counter]}|")
        file_w.write(phrase)
        file_w.write('\n')
        counter += 1
        print(phrase)

In [None]:
def process_after_slash(input_file_path=text_transcript):

  if os.path.isfile(input_file_path):
        with open(input_file_path, 'r', encoding='utf-8') as fin:
            data = ' '.join([x.split('/',1)[1].strip() for x in fin.readlines() if '/' in x and x.strip()!=''])
            #data = ' '.join([x.strip() for x in fin.readlines() if x.strip()!=''])
  else:
        print(f"Error: {input_file_path} is not a valid file.")
        sys.exit(1)
  process_text(data, input_file_path)

In [None]:
process_after_slash()

Input text: Primele sale lucrări tratează un viitor apropiat umbru, povestirile lui analizând efectele ciberneticii și rețelelor de calculatoare asupra oamenilor. Povestirile ei au fost publicate în revistele populare de literatură științifico-fantastică, temele, decorurile și personajele. Pentru că familia lui s-a mutat foarte des în timpul copilăriei sale, a devenit un adolescent timid și izolat care prefera să citească literatură științifico-fantastică. După ce și-a petrecut adolescența într-un internat privat din Arizona, a evitat recrutarea în armată în timpul războiului din Vietnam. A devenit preocupat de contracultură și, după ce s-a stabilit în alt oraș, a devenit scriitor profesionist.
--------------------------------------------------
PROCESSED TEXT: 

Number of WAV names: 5
Number of transcribed phrases: 5
. prI-me-le sA-le lu-krĂrJ tra-tFA-z@ un vi-i-tOr a-pro-pi-At Um-bru po-ves-tI-ri-le luj a-na-li-zÂnd e-fEk-te-le Ci-ber-nE-ti-Cij Si re-țE-le-lor de kal-ku-la-tPA-re a-su

In [None]:
try:
  shutil.move(os.path.join(home_path, "text_processor/processed_text.txt"), os.path.join(output_denoiser, f"processed_text_{spk_id}.txt"))
except:
  print("Error occurred while moving processed text file.")
os.chdir(home_path)

Error occurred while moving processed text file.


**Pre-process the text to be generated**

In [None]:
os.chdir(os.path.join(home_path, "text_processor/"))
input_file_path = txt_tb_gen

if os.path.isfile(input_file_path):
        with open(input_file_path, 'r', encoding='utf-8') as fin:
            data = ' '.join([x.strip() for x in fin.readlines() if x.strip()!=''])
else:
      print(f"Error: {input_file_path} is not a valid file.")
      sys.exit(1)

tp = TextProcessing()
if DEVICE == 'cpu':
    tsf = torch.load("./models/transformer.pt",  map_location=torch.device('cpu'))
else:
    tsf = torch.load("./models/transformer.pt")
transcribed = tp.process(tsf, data)

print('-'*50)
print("PROCESSED TEXT: \n", transcribed)
with open(os.path.join(output_denoiser, f"TTS_text_{spk_id}.txt"), "w", encoding="utf-8") as file_w:
    for phrase in transcribed:
        file_w.write(phrase)
        file_w.write('\n')
        print(phrase)

--------------------------------------------------
PROCESSED TEXT: 
 ['. a-CEs-ta Es-te un eX-Em-plu de pro-po-zI-ti-e pe kA-re o pot ros-tI .']
. a-CEs-ta Es-te un eX-Em-plu de pro-po-zI-ti-e pe kA-re o pot ros-tI .


## Create custom metadata for your speaker identity


1) One metadata file for prepare_dataset -- pitch and mel extraction
  - format: wav_file|. {text} .

2) Two metadata files for finetuning the model
  - format: mels/{file.pt}|pitch/{file.pt}|. {text} .|0

In [None]:
def create_metas(meta_prep, meta_train, proc_txt):
  nl = '\n'
  with open(proc_txt, "r") as tp_file:
    with open(meta_prep, "w") as prep_file:
      with open(meta_train, "w") as train_file:
        for line in tp_file:
          current_wav = line.split('|')[0]
          current_txt = line.split('|')[-1].rstrip()
          prep_file.write(f"{current_wav}|{current_txt}{nl}")
          train_file.write(f"mels/{current_wav.replace('wav', 'pt')}|pitch/{current_wav.replace('wav', 'pt')}|{current_txt}|0{nl}")
    train_file.close()
    prep_file.close()
    tp_file.close()

meta_prep=os.path.join(output_denoiser, f"meta_4_pitch_mels_{spk_id}.txt")
meta_train=os.path.join(output_denoiser, f"{spk_id}_metadata.txt")
create_metas(meta_prep, meta_train, os.path.join(output_denoiser, f"processed_text_{spk_id}.txt"))

In [None]:
def split_meta(meta_file):
  all_lines = []
  with open(meta_file, "r") as meta_reader:
    for line in meta_reader:
      all_lines.append(line)
  with open(meta_file.replace(".txt", "_train.txt"), "w") as train_writer:
    #all lines but the last one
    for i in range(len(all_lines)-1):
      train_writer.write(all_lines[i])
  #last line:
  with open(meta_file.replace(".txt", "_eval.txt"), "w") as val_writer:
      val_writer.write(all_lines[-1])

if os.path.exists(meta_train):
  split_meta(meta_train)

## Prepare all files for fine-tuning:

In [None]:
import shutil
output_denoiser = os.path.join(home_path, f"denoised/denoised_{spk_id}")
zip_name = f"{spk_id}_zip"
zip_address = os.path.join(output_denoiser, zip_name)
zip_address

'/content/drive/MyDrive/demo_licenta/licenta/denoised/denoised_RT_slow/RT_slow_zip'

In [None]:
if not os.path.exists(zip_address):
  os.mkdir(zip_address)

In [None]:
# shutil.move(meta_prep, os.path.join(zip_address, f"meta_4_pitch_mels_{spk_id}.txt"))
# shutil.move(meta_train, os.path.join(zip_address, f"{spk_id}_metadata.txt"))
# shutil.move(os.path.join(output_denoiser, f"{spk_id}_metadata_train.txt"), os.path.join(zip_address, f"{spk_id}_metadata_train.txt"))
# shutil.move(os.path.join(output_denoiser, f"{spk_id}_metadata_eval.txt"), os.path.join(zip_address, f"{spk_id}_metadata_eval.txt"))
shutil.copy(os.path.join(output_denoiser, f"TTS_text_{spk_id}.txt"), os.path.join(zip_address, f"TTS_text_{spk_id}.txt"))
# shutil.copytree(os.path.join(output_denoiser, "wavs22"), os.path.join(zip_address, "wavs22"))
# shutil.copy(norm_emb_file, os.path.join(zip_address, f"{spk_id}_18x384.npy"))

'/content/drive/MyDrive/demo_licenta/licenta/denoised/denoised_RT_slow/RT_slow_zip/TTS_text_RT_slow.txt'

In [None]:
print(f"ALL THE SPEAKER'S FILES CAN BE FOUND AT: {zip_address}")

ALL THE SPEAKER'S FILES CAN BE FOUND AT: /content/drive/MyDrive/demo_licenta/licenta/denoised/denoised_RT_slow/RT_slow_zip
