In [3]:
!pip install unidecode

Collecting unidecode
  Obtaining dependency information for unidecode from https://files.pythonhosted.org/packages/e4/63/7685ef40c65aba621ccd2524a24181bf11f0535ab1fdba47e40738eacff6/Unidecode-1.3.7-py3-none-any.whl.metadata
  Downloading Unidecode-1.3.7-py3-none-any.whl.metadata (13 kB)
Downloading Unidecode-1.3.7-py3-none-any.whl (235 kB)
   ---------------------------------------- 0.0/235.5 kB ? eta -:--:--
   - -------------------------------------- 10.2/235.5 kB ? eta -:--:--
   - -------------------------------------- 10.2/235.5 kB ? eta -:--:--
   ------ -------------------------------- 41.0/235.5 kB 393.8 kB/s eta 0:00:01
   ------------- ------------------------- 81.9/235.5 kB 657.6 kB/s eta 0:00:01
   ----------------------- -------------- 143.4/235.5 kB 944.1 kB/s eta 0:00:01
   -------------------------------------- - 225.3/235.5 kB 1.1 MB/s eta 0:00:01
   ---------------------------------------- 235.5/235.5 kB 1.2 MB/s eta 0:00:00
Installing collected packages: unidecode
Su

In [2]:
import re
import torch
from unidecode import unidecode

HARDCODED_MODEL_CONFIG = {
    "model":{
        #"vocab_file": "lug/vocab.txt",
        #"g_checkpoint_path": "/path/to/checkpoint",#
        #"d_checkpoint_path": "/path/to/checkpoint",
        "inter_channels": 192,
        "hidden_channels": 192,
        "filter_channels": 768,
        "n_heads": 2,
        "n_layers": 6,
        "kernel_size": 3,
        "p_dropout": 0.1,
        "resblock": "1",
        "resblock_kernel_sizes": [3,7,11],
        "resblock_dilation_sizes": [[1,3,5], [1,3,5], [1,3,5]],
        "upsample_rates": [8,8,2,2],
        "upsample_initial_channel": 512,
        "upsample_kernel_sizes": [16,16,4,4],
        "n_layers_q": 3,
        "use_spectral_norm": False,
        "gin_channels": 256
    },
    "data":{
        #"text_cleaners":["custom_cleaners"],
        "custom_cleaner_regex": ":!\?>\.;,", #LUG + ENG
        "max_wav_value": 32768.0,
        "sampling_rate": 16000,
        "filter_length": 1024,
        "hop_length": 256,
        "win_length": 1024,
        "n_mel_channels": 80,
        "mel_fmin": 0.0,
        "mel_fmax": None,
        "add_blank": True,
        "n_speakers": 109,
        "cleaned_text": True
    }
}

ModuleNotFoundError: No module named 'unidecode'

In [None]:
_whitespace_re = re.compile(r'\s+')

def collapse_whitespace(text):
  return re.sub(_whitespace_re, ' ', text)

def custom_add(text, regex):
  regex = r"[" + regex + r"]"
  text = re.sub(regex, ' ', text)
  return text

#Luganda and English
def custom_cleaners(text):
  text = text.lower()
  text = custom_add(text, regex=HARDCODED_MODEL_CONFIG["data"]["custom_cleaner_regex"] )
  text = unidecode(text)
  text = collapse_whitespace(text)
  return text


In [None]:
class TextMapper(object):
    def __init__(self, vocab_file):
        self.symbols = [x.replace("\n", "") for x in open(vocab_file, encoding="utf-8").readlines()]
        self.SPACE_ID = self.symbols.index(" ")
        self._symbol_to_id = {s: i for i, s in enumerate(self.symbols)}
        self._id_to_symbol = {i: s for i, s in enumerate(self.symbols)}


    def text_to_sequence(self, text, cleaner):
        '''Converts a string of text to a sequence of IDs corresponding to the symbols in the text.
        Args:
        text: string to convert to a sequence
        cleaner_names: names of the cleaner functions to run the text through
        Returns:
        List of integers corresponding to the symbols in the text
        '''
        #sequence = []
        #clean_text = text.strip()
        sequence = []
        clean_text = cleaner(text)
        for symbol in clean_text:
            symbol_id = self._symbol_to_id[symbol]
            sequence += [symbol_id]
        return sequence

    @staticmethod
    def intersperse(lst, item):
        result = [item] * (len(lst) * 2 + 1)
        result[1::2] = lst
        return result


    def get_text(self, text):
        text_norm = self.text_to_sequence(text, custom_cleaners)
        if HARDCODED_MODEL_CONFIG["data"]["add_blank"]:
            text_norm = self.intersperse(text_norm, 0)
        text_norm = torch.LongTensor(text_norm)
        return text_norm

    def filter_oov(self, text):
        val_chars = self._symbol_to_id
        txt_filt = "".join(list(filter(lambda x: x in val_chars, text)))
        #print(f"text after filtering OOV: {txt_filt}")
        return txt_filt

def preprocess_text(txt, text_mapper, lang=None):
    txt = preprocess_char(txt, lang=lang)
    # is_uroman = hps.data.training_files.split('.')[-1] == 'uroman'
    # if is_uroman:
    #     with tempfile.TemporaryDirectory() as tmp_dir:
    #         if uroman_dir is None:
    #             cmd = f"git clone git@github.com:isi-nlp/uroman.git {tmp_dir}"
    #             print(cmd)
    #             subprocess.check_output(cmd, shell=True)
    #             uroman_dir = tmp_dir
    #         uroman_pl = os.path.join(uroman_dir, "bin", "uroman.pl")
    #         print(f"uromanize")
    #         txt = text_mapper.uromanize(txt, uroman_pl)
    #         print(f"uroman text: {txt}")
    txt = txt.lower()
    txt = text_mapper.filter_oov(txt)
    return txt



In [None]:
def load_checkpoint(checkpoint_path, model, optimizer=None):
  assert os.path.isfile(checkpoint_path)
  checkpoint_dict = torch.load(checkpoint_path, map_location='cpu')
  try:
    iteration = checkpoint_dict['iteration']
  except:
    iteration = None
  try:
    learning_rate = checkpoint_dict['learning_rate']
  except:
    learning_rate = None
  if optimizer is not None:
    optimizer.load_state_dict(checkpoint_dict['optimizer'])
  try:
    saved_state_dict = checkpoint_dict['model']
  except: 
    saved_state_dict = checkpoint_dict

  if hasattr(model, 'module'):
    state_dict = model.module.state_dict()
  else:
    state_dict = model.state_dict()
  new_state_dict= {}
  for k, v in state_dict.items():
    try:
      new_state_dict[k] = saved_state_dict[k]
    except:
      logger.info("%s is not in the checkpoint" % k)
      new_state_dict[k] = v
  if hasattr(model, 'module'):
    model.module.load_state_dict(new_state_dict)
  else:
    model.load_state_dict(new_state_dict)
  logger.info("Loaded checkpoint '{}' (iteration {})" .format(
    checkpoint_path, iteration))
  return model, optimizer, learning_rate, iteration


In [None]:
class VITSInfereceAdapterModel:

    def __init__(self, model_path, config_path, vocab_path, repo_name):
        self.repo_name = repo_name
        self.hps = self._download_and_load_config(config_path)
        self.text_mapper = self._download_and_load_vocab(vocab_path)
        self.model  = SynthesizerTrn(
                len(self.text_mapper.symbols),
                HARDCODED_MODEL_CONFIG["data"["filter_length"] // 2 + 1,
                HARDCODED_MODEL_CONFIG["train"]["segment_size"] // HARDCODED_MODEL_CONFIG["HARDCODED_MODEL_CONFIG"]["hop_length"],
                **HARDCODED_MODEL_CONFIG['model'])
        self.net_g = self._download_and_load_model(model_path)

    def _download_and_load_model(self, model_path, model, optimizer):
        # Use hf_hub_download to download the model
        model_file = hf_hub_download(repo_id=self.repo_name, filename=model_path)
        # Load the model using your custom logic
        load_checkpoint(model_file, model, optimizer)
        
        return model

    #def _download_and_load_config(self, config_path):
    #    # Similar logic for downloading and loading the config
    #    config_file = hf_hub_download(repo_id=self.repo_name, filename=model_path)

    def _download_and_load_vocab(self, vocab_path):
        # Similar logic for downloading and loading the vocab
        vocab_file = hf_hub_download(repo_id=self.repo_name, filename=vocab_path)
        text_mapper = TextMapper(vocab_file)
        return text_mapper

    def encode_text(self, txt):
        txt = preprocess_text(txt, self.text_mapper, lang=LANG)
        stn_tst = self.text_mapper.get_text(txt, hps)
        with torch.no_grad():
            x_tst = stn_tst.unsqueeze(0).to(device)
            x_tst_lengths = torch.LongTensor([stn_tst.size(0)]).to(device)
            hyp = net_g.infer(
                x_tst, x_tst_lengths,0, noise_scale=.667,
                noise_scale_w=0.8, length_scale=1.0
            )[0][0].detach().cpu()
        return hyp


    @classmethod
    def from_pretrained(cls, repo_name, G_net_path = "G_eng_lug.pth", vocab_path="vocab.txt", config_path = None  ):
        # Logic to instantiate the model using the repository name
        #G_net_path = "path_to_model_within_repo"
        return cls(G_net_path, config_path, vocab_path, repo_name)
