# PhoneLM

## Test `G2P` and `Encodec`

In [None]:
!pip install g2p_en encodec

### `G2P`

In [2]:
from g2p_en import G2p

[nltk_data] Downloading package cmudict to
[nltk_data]     C:\Users\win8t\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\cmudict.zip.


In [22]:
import torch
import random
import string
from functools import cache
from tqdm import tqdm

@cache
def _get_model():
    return G2p()

@cache
def _get_graphs(path):
    with open(path, "r") as f:
        graphs = f.read()
    return graphs

def encode(graphs: str) -> list[str]:
    g2p = _get_model()
    phones = g2p(graphs)
    ignored = {" ", *string.punctuation}
    return ["_" if p in ignored else p for p in phones]

@torch.no_grad()
def write_phones(folder, suffix=".normalized.txt"):
    print("ello?")
    paths = list(folder.rglob(f"*{suffix}"))
    random.shuffle(paths)

    print("paths:", paths)
    for path in tqdm(paths):
        phone_path = path.with_name(path.stem.split(".")[0] + ".phn.txt")
        if phone_path.exists():
            continue
        print("?")
        graphs = _get_graphs(path)
        phones = encode(graphs)
        with open(phone_path, "w") as f:
            f.write(" ".join(phones))

In [23]:
from pathlib import Path
write_phones(Path("./data/text"))

ello?
paths: [WindowsPath('data/text/test.normalized.txt')]


  0%|          | 0/1 [00:00<?, ?it/s]

?


100%|██████████| 1/1 [00:00<00:00,  1.45it/s]


### `Encodec`

In [39]:
import torchaudio
from encodec import EncodecModel
from torch import Tensor
from einops import rearrange
import soundfile
from encodec.utils import convert_audio

SAMPLE_RATE = 24_000

@cache
def _load_model(device="cuda"):
    # Instantiate a pretrained EnCodec model
    assert SAMPLE_RATE == 24_000
    model = EncodecModel.encodec_model_24khz()
    model.set_target_bandwidth(6.0)
    model.to(device)
    return model

def unload_model():
    return _load_model.cache_clear()

@torch.inference_mode()
def decode(codes: Tensor, device="cuda"):
    """
    Args:
        codes: (b q t)
    """
    assert codes.dim() == 3
    model = _load_model(device)
    return model.decode([(codes, None)]), model.sample_rate

def decode_to_file(resps: Tensor, path: Path):
    assert resps.dim() == 2, f"Require shape (t q), but got {resps.shape}."
    resps = rearrange(resps, "t q -> 1 q t")
    wavs, sr = decode(resps)
    soundfile.write(str(path), wavs.cpu()[0, 0], sr)

def _replace_file_extension(path, suffix):
    return (path.parent / path.name.split(".")[0]).with_suffix(suffix)

@torch.inference_mode()
def encode(wav: Tensor, sr: int, device="cuda"):
    """
    Args:
        wav: (t)
        sr: int
    """
    model = _load_model(device)
    wav = wav.unsqueeze(0)
    wav = convert_audio(wav, sr, model.sample_rate, model.channels)
    wav = wav.to(device)
    encoded_frames = model.encode(wav)
    qnt = torch.cat([encoded[0] for encoded in encoded_frames], dim=-1)  # (b q t)
    return qnt

def encode_from_file(path, device="cuda"):
    wav, sr = torchaudio.load(str(path))
    if wav.shape[0] == 2:
        wav = wav[:1]
    return encode(wav, sr, device)

def quantize_audio(folder, suffix=".wav"):
    paths = [*folder.rglob(f"*{suffix}")]
    random.shuffle(paths)

    for path in tqdm(paths):
        out_path = _replace_file_extension(path, ".qnt.pt")
        if out_path.exists():
            continue
        qnt = encode_from_file(path)
        torch.save(qnt.cpu(), out_path)

def decode_files(folder, suffix=".qnt.pt"):
    paths = [*folder.rglob(f"*{suffix}")]
    random.shuffle(paths)

    for path in tqdm(paths):
        out_path = _replace_file_extension(path, ".qt.wav")
        if out_path.exists():
            continue
        fi = rearrange(torch.load(path).squeeze(0).cuda(), "q t -> t q")
        decode_to_file(fi, out_path)

In [40]:
from pathlib import Path
quantize_audio(Path("./data/audio"))
decode_files(Path("./data/audio"))

100%|██████████| 1/1 [00:00<00:00, 929.59it/s]
100%|██████████| 1/1 [00:00<00:00,  2.62it/s]
