# 제10장 한국어 Tacotron에 기반한 음성 합성 시스템의 구현

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/r9y9/ttslearn/blob/master/notebooks/ch10_Recipe-Tacotron.ipynb)

Google colab에서 실행하는 예상 소요 시간: 5시간

이 노트북의 레시피 설정은 Google Colab에서 실행할 때 시간 초과를 피하기 위해 학습 조건을 책에 나열된 설정에서 일부 수정한 것입니다. (배치 크기 줄이기 등).
참고로 책에 기재된 조건으로 저자(야마모토)가 레시피를 실행한 결과를 아래에서 공개하고 있습니다.

- Tensorboard logs: https://tensorboard.dev/experiment/gHKogn7wRxa4B3NIVw27xw/
- exp 디렉토리 (학습 모델, 합성 음성 포함) : https://drive.google.com/file/d/1LoIGkwTLUZmkJkxbTR1S7yyaWexn-Wfp/view?usp=sharing (226.9 MB)

## 준비

### Google Colab을 이용하는 경우

Google Colab에서 이 노트북을 실행하려면 메뉴의 '런타임 -> 런타임 시간 변경'에서 '하드웨어 가속기'를 **GPU**로 변경하세요.

### Python version

In [None]:
!python -VV

### ttslearn 설치

In [None]:
%%capture
try:
    import ttslearn
except ImportError:
    !pip install ttslearn

In [None]:
import ttslearn
ttslearn.__version__

## 10.1 이 장의 일본어 음성 합성 시스템 구현

### 학습된 모델을 이용한 음성 합성

In [None]:
from ttslearn.tacotron import Tacotron2TTS
from tqdm.notebook import tqdm
from IPython.display import Audio

engine = Tacotron2TTS()
wav, sr = engine.tts("一貫学習にチャレンジしましょう！", tqdm=tqdm)
Audio(wav, rate=sr)

In [None]:
import librosa.display
import matplotlib.pyplot as plt
import numpy as np

fig, ax = plt.subplots(figsize=(8,2))
librosa.display.waveshow(wav.astype(np.float32), sr, ax=ax)
ax.set_xlabel("Time [sec]")
ax.set_ylabel("Amplitude")
plt.tight_layout()

### 레시피 실행 전 준비

In [None]:
%%capture
from ttslearn.env import is_colab
from os.path import exists

# pip install ttslearn은 레시피를 설치하지 않으므로 수동으로 다운로드
if is_colab() and not exists("recipes.zip"):
    !curl -LO https://github.com/r9y9/ttslearn/releases/download/v{ttslearn.__version__}/recipes.zip
    !unzip -o recipes.zip

In [None]:
import os
# recipe 디렉토리로 이동
cwd = os.getcwd()
if cwd.endswith("notebooks"):
    os.chdir("../recipes/tacotron/")
elif is_colab():
    os.chdir("recipes/tacotron/")   

In [None]:
import time
start_time = time.time()

### 패키지 임포트

In [None]:
%pylab inline
%load_ext autoreload
%load_ext tensorboard
%autoreload
import IPython
from IPython.display import Audio
import tensorboard as tb
import os

In [None]:
# 수치 연산
import numpy as np
import torch
from torch import nn
# 음성 파형 불러오기
from scipy.io import wavfile
# 풀 컨텍스트 라벨, 질문 파일 로드
from nnmnkwii.io import hts
# 음성 분석
import pyworld
# 음성 분석, 시각화
import librosa
import librosa.display
import pandas as pd
# 파이썬에서 배우는 음성 합성
import ttslearn

In [None]:
# 시드 고정
from ttslearn.util import init_seed
init_seed(773)

In [None]:
torch.__version__

### 그래프 그리기 설정 (描画周りの設定) // 번역 수정 필요

In [None]:
from ttslearn.notebook import get_cmap, init_plot_style, savefig
cmap = get_cmap()
init_plot_style()

### 레시피 설정

In [None]:
# run.sh를 사용하여 학습 스크립트를 노트북에서 실행하려면 True
# google colab의 경우 True라고 가정합니다.
# 로컬 환경의 경우 run.sh를 터미널에서 실행하는 것이 좋습니다.
# 이 경우이 노트북은 시각화 및 학습 된 모델을 테스트하는 데 사용됩니다.
run_sh = is_colab()

# 참고 : WaveNet을 사용한 평가 데이터에 대한 음성 생성은 시간이 오래 걸립니다.
run_stage6 = True

# run.sh를 통해 실행되는 스크립트의 tqdm
run_sh_tqdm = "none"

# CUDA
# NOTE: run.sh의 인수로 전달하므로 bool이 아닌 문자열로 정의됩니다.
cudnn_benchmark = "true"
cudnn_deterministic = "false"

# 특징량 추출시 병렬 처리 작업 수
n_jobs = os.cpu_count()//2

# 음향 모델 (Tacotron)의 설정 파일 이름
acoustic_config_name="tacotron2_rf2"
# WaveNet 보코더 설정 파일 이름
wavenet_config_name="wavenet_sr16k_mulaw256_30layers"

# Tacotron 학습의 배치 크기
tacotron_batch_size = 16
# Tacotron 학습의 반복 수
# 주의: 충분한 품질을 얻기 위해 필요한 값: 50k ~ 100k steps
tacotron_max_train_steps = 5000

# WaveNet 보코더 학습의 배치 크기
# 권장 배치 크기: 8 이상
# 동작 확인을 위해 작은 값으로 설정합니다.
wavenet_batch_size = 4
# WavaNet의 학습 반복 수
# 주의: 충분한 품질을 얻기 위해 필요한 값: 300k ~ 500k steps
wavenet_max_train_steps = 20000

# 음성을 생성하는 발화 수
# WaveNet의 추론은 시간이 걸리므로 노트북에서 표시하는 5개만 생성
num_eval_utts = 5

# 노트북에서 사용하는 테스트용 발화(학습 데이터, 평가 데이터)
train_utt = "BASIC5000_0001"
test_utt = "BASIC5000_5000"

### Tensorboard로 로그 시각화

In [None]:
# 노트북에서 tensorboard 로깅을 확인하려면 다음 줄을 사용하도록 설정하십시오.
if is_colab():
    %tensorboard --logdir tensorboard/

## 10.2 Tacotron 2를 일본어에 적용하기위한 변경

### 음소열과 운율 기호가 있는 음소열 비교

In [None]:
import pyopenjtalk
# 이 구현은 나중에 설명합니다.
from ttslearn.tacotron.frontend.openjtalk import pp_symbols

In [None]:
print("음소열:", pyopenjtalk.g2p("端が"))
print("음소열:", pyopenjtalk.g2p("箸が"))
print("음소열:", pyopenjtalk.g2p("橋が"))

In [None]:
print("운율 기호가있는 음소열:", " ".join(pp_symbols(pyopenjtalk.extract_fullcontext("端が"))))
print("운율 기호가있는 음소열:", " ".join(pp_symbols(pyopenjtalk.extract_fullcontext("箸が"))))
print("운율 기호가있는 음소열:", " ".join(pp_symbols(pyopenjtalk.extract_fullcontext("橋が"))))

### 풀 컨텍스트 라벨에서 음소 열과 운율 기호 추출

In [None]:
import re

def numeric_feature_by_regex(regex, s):
    match = re.search(regex, s)
    # 정의되지 않은 (xx)의 경우, 컨텍스트의 가능한 값 이외의 적절한 값
    if match is None:
        return -50
    return int(match.group(1))

In [None]:
labels = hts.load(ttslearn.util.example_label_file())
labels.contexts[1]

In [None]:
numeric_feature_by_regex(r"/A:([0-9\-]+)\+", labels.contexts[1])

In [None]:
def pp_symbols(labels, drop_unvoiced_vowels=True):
    PP = []
    N = len(labels)

    # 각 음소마다 순서대로 처리
    for n in range(N):
        lab_curr = labels[n]

        # 해당 음소
        p3 = re.search(r"\-(.*?)\+", lab_curr).group(1)

        # 무성화 모음을 일반 모음으로 취급
        if drop_unvoiced_vowels and p3 in "AEIOU":
            p3 = p3.lower()

        # 선두와 후행의 sil만 예외 대응
        if p3 == "sil":
            assert n == 0 or n == N - 1
            if n == 0:
                PP.append("^")
            elif n == N - 1:
                # 질문 시스템인지 여부
                e3 = numeric_feature_by_regex(r"!(\d+)_", lab_curr)
                if e3 == 0:
                    PP.append("$")
                elif e3 == 1:
                    PP.append("?")
            continue
        elif p3 == "pau":
            PP.append("_")
            continue
        else:
            PP.append(p3)

        # 악센트 유형 및 위치 정보(전방 또는 후방)
        a1 = numeric_feature_by_regex(r"/A:([0-9\-]+)\+", lab_curr)
        a2 = numeric_feature_by_regex(r"\+(\d+)\+", lab_curr)
        a3 = numeric_feature_by_regex(r"\+(\d+)/", lab_curr)
        # 악센트 절의 모라 수
        f1 = numeric_feature_by_regex(r"/F:(\d+)_", lab_curr)

        a2_next = numeric_feature_by_regex(r"\+(\d+)\+", labels[n + 1])

        # 악센트 구 경계
        if a3 == 1 and a2_next == 1:
            PP.append("#")
        # 피치 하강(악센트 핵)
        elif a1 == 0 and a2_next == a2 + 1 and a2 != f1:
            PP.append("]")
        # 피치의 상승
        elif a2 == 1 and a2_next == 2:
            PP.append("[")

    return PP

In [None]:
import pyopenjtalk

text = "今日の天気は？"

# 텍스트에서 풀 컨텍스트 추출
labels = pyopenjtalk.extract_fullcontext(text)
# 풀 컨텍스트에서 운율 기호가있는 음소 열로 변환
PP = pp_symbols(labels)

print("입력 문자열:", text)
print("음소열:", pyopenjtalk.g2p(text))
print("운율 기호가있는 음소열:", " ".join(PP))

## 프로그램 구현 전 준비

### stage -1: 코퍼스 다운로드

In [None]:
if is_colab():
    ! ./run.sh --stage -1 --stop-stage -1

### Stage 0: 학습/검증/평가 데이터 분할

In [None]:
if run_sh:
    ! ./run.sh --stage 0 --stop-stage 0

In [None]:
! ls data/

In [None]:
! head data/dev.list

## 10.3 데이터 전처리

### Tacotron 2를 위한 전처리

#### 1 발화에 대한 전처리

In [None]:
from ttslearn.tacotron.frontend.openjtalk import text_to_sequence, pp_symbols
from ttslearn.dsp import mulaw_quantize, logmelspectrogram

# 운율 기호가있는 음소 열 추출
labels = hts.load(ttslearn.util.example_label_file())
PP = pp_symbols(labels.contexts)
in_feats = np.array(text_to_sequence(PP), dtype=np.int64)

# 멜 스펙트로그램 계산
sr = 16000
_sr, x = wavfile.read(ttslearn.util.example_audio_file())
x = (x / 32768).astype(np.float64)
x = librosa.resample(x, _sr, sr)

out_feats = logmelspectrogram(x, sr)

# 시작과 끝의 비음성 구간 길이 조정
assert "sil" in labels.contexts[0] and "sil" in labels.contexts[-1]
start_frame = int(labels.start_times[1] / 125000)
end_frame = int(labels.end_times[-2] / 125000)

# 처음: 50ms, 마지막: 100ms
start_frame = max(0, start_frame - int(0.050 / 0.0125))
end_frame = min(len(out_feats), end_frame + int(0.100 / 0.0125))

out_feats = out_feats[start_frame:end_frame]

# 시간 영역에서 오디오의 길이 조정
x = x[int(start_frame * 0.0125 * sr) :]
length = int(sr * 0.0125) * out_feats.shape[0]
x = pad_1d(x, length) if len(x) < length else x[:length]

# 특징량의 업샘플링을 하기 때문에, 음성 파형의 길이는 프레임 시프트로 나눌 필요가 있습니다
assert len(x) % int(sr * 0.0125) == 0

# mu-law 양자화
x = mulaw_quantize(x)

In [None]:
print("Tacotron의 입력 특징의 크기:", in_feats.shape)
print("Tacotron 출력 특징의 크기:", out_feats.shape)
print("WaveNet 보코더 출력의 오디오 파형 크기:", x.shape)

In [None]:
from ttslearn.tacotron.frontend.openjtalk import num_vocab
from ttslearn.dsp import inv_mulaw_quantize
from torch.nn import functional as F

inp = F.one_hot(torch.from_numpy(in_feats), num_vocab()).numpy()

fig, ax = plt.subplots(3, 1, figsize=(8,8))
ax[0].set_title("Phoneme sequence + prosody symbols (one-hot)")
ax[1].set_title("Mel-spectrogram")
ax[2].set_title("Mu-law quantized waveform")

ax[0].imshow(inp.T, aspect="auto", interpolation="nearest", origin="lower", cmap=cmap)
ax[1].imshow(out_feats.T, aspect="auto", interpolation="nearest", origin="lower", cmap=cmap)
librosa.display.waveshow(x.astype(np.float32), ax=ax[2], sr=sr)

ax[0].set_xlabel("Phoneme")
ax[0].set_ylabel("Binary value")
ax[1].set_xlabel("Time [frame]")
ax[1].set_ylabel("Mel filter channel")
ax[2].set_xlabel("Time [sec]")
ax[2].set_ylabel("Amplitude")

plt.tight_layout()
savefig("fig/e2etts_impl_taco2_inout")

#### 레시피의 stage 1 실행

배치 처리를 실시하는 커멘드 라인 프로그램은, `preprocess.py`를 참조해 주세요.

In [None]:
if run_sh:
    ! ./run.sh --stage 1 --stop-stage 1

### 특징량 정규화

정규화를 위한 통계량을 계산하는 명령행 프로그램은 `recipes/common/fit_scaler.py`를 참조하십시오. 또, 정규화를 실시하는 커멘드 라인 프로그램은, `recipes/common/preprocess_normalize.py` 를 참조해 주세요.

#### 레시피의 stage 2 실행

In [None]:
if run_sh:
    ! ./run.sh --stage 2 --stop-stage 2 --n-jobs $n_jobs

#### 정규화 처리 결과 확인

In [None]:
in_feats = np.load(f"dump/jsut_sr16000/org/train/out_tacotron/{train_utt}-feats.npy")
in_feats_norm = np.load(f"dump/jsut_sr16000/norm/train/out_tacotron/{train_utt}-feats.npy")
fig, ax = plt.subplots(2, 1, figsize=(8,6), sharex=True)
ax[0].set_title("Mel-spectrogram (before normalization)")
ax[1].set_title("Mel-spectrogram (after normalization)")

hop_length = int(sr * 0.0125)
mesh = librosa.display.specshow(
    in_feats.T, sr=sr, hop_length=hop_length, x_axis="time", y_axis="frames", ax=ax[0], cmap=cmap)
fig.colorbar(mesh, ax=ax[0])
mesh = librosa.display.specshow(
    in_feats_norm.T, sr=sr, hop_length=hop_length, x_axis="time", y_axis="frames",ax=ax[1], cmap=cmap)
mesh.set_clim(-4, 4)
fig.colorbar(mesh, ax=ax[1])

for a in ax:
    a.set_xlabel("Time [sec]")
    a.set_ylabel("Mel filter channel")
plt.tight_layout()

## 10.4 Tacotron 학습 스크립트 작성

### DataLoader 구현

#### collate_fn 구현

In [None]:
def ensure_divisible_by(feats, N):
    if N == 1:
        return feats
    mod = len(feats) % N
    if mod != 0:
        feats = feats[: len(feats) - mod]
    return feats

In [None]:
from ttslearn.util import pad_1d, pad_2d

def collate_fn_tacotron(batch, reduction_factor=1):
    xs = [x[0] for x in batch]
    ys = [ensure_divisible_by(x[1], reduction_factor) for x in batch]
    in_lens = [len(x) for x in xs]
    out_lens = [len(y) for y in ys]
    in_max_len = max(in_lens)
    out_max_len = max(out_lens)
    x_batch = torch.stack([torch.from_numpy(pad_1d(x, in_max_len)) for x in xs])
    y_batch = torch.stack([torch.from_numpy(pad_2d(y, out_max_len)) for y in ys])
    in_lens = torch.tensor(in_lens, dtype=torch.long)
    out_lens = torch.tensor(out_lens, dtype=torch.long)
    stop_flags = torch.zeros(y_batch.shape[0], y_batch.shape[1])
    for idx, out_len in enumerate(out_lens):
        stop_flags[idx, out_len - 1 :] = 1.0
    return x_batch, in_lens, y_batch, out_lens, stop_flags

#### DataLoader 사용 예

In [None]:
from pathlib import Path
from ttslearn.train_util import Dataset, collate_fn_tacotron
from functools import partial

in_paths = sorted(Path("./dump/jsut_sr16000/norm/dev/in_tacotron/").glob("*.npy"))
out_paths = sorted(Path("./dump/jsut_sr16000/norm/dev/out_tacotron/").glob("*.npy"))

dataset = Dataset(in_paths, out_paths)
collate_fn = partial(collate_fn_tacotron, reduction_factor=1)
data_loader = torch.utils.data.DataLoader(dataset, batch_size=8, collate_fn=collate_fn, num_workers=0)

in_feats, in_lens, out_feats, out_lens, stop_flags = next(iter(data_loader))
print("입력 특징량의 크기:", tuple(in_feats.shape))
print("출력 특징량의 크기:", tuple(out_feats.shape))
print("stop flags 크기:", tuple(stop_flags.shape))

#### ミニバッチの可視化

In [None]:
fig, ax = plt.subplots(len(out_feats), 1, figsize=(8,10), sharex=True, sharey=True)
for n in range(len(in_feats)):
    x = out_feats[n].data.numpy()
    hop_length = int(sr * 0.0125)
    mesh = librosa.display.specshow(x.T, sr=sr, x_axis="time", y_axis="frames", hop_length=hop_length, cmap=cmap, ax=ax[n])
    fig.colorbar(mesh, ax=ax[n])
    mesh.set_clim(-4, 4)
    # 나중에 다시 붙이기 때문에 여기에서 라벨을 삭제합니다.
    ax[n].set_xlabel("")
    
ax[-1].set_xlabel("Time [sec]")
for a in ax:
    a.set_ylabel("Mel channel")

plt.tight_layout()
savefig("fig/e2etts_impl_minibatch")

### 간단한 학습 스크립트 구현

#### 학습 전 준비

In [None]:
from ttslearn.tacotron import Tacotron2 as Tacotron
from torch import optim

# 동작 확인용: 레이어 수를 줄인 작은 Tacotron
model = Tacotron(
    embed_dim=32, encoder_conv_layers=1, encoder_conv_channels=32, encoder_hidden_dim=32,
    decoder_hidden_dim=32, postnet_channels=32, postnet_layers=1)

# lr은 학습률을 나타냅니다.
optimizer = optim.Adam(model.parameters(), lr=0.001)

# gamma는 학습률의 감쇠 계수를 나타냅니다.
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, gamma=0.5, step_size=100000)

#### 학습 루프 구현

In [None]:
from ttslearn.util import make_non_pad_mask

# DataLoader를 사용하여 미니 배치 생성 : 미니 배치 당 처리
for in_feats, in_lens, out_feats, out_lens, stop_flags in tqdm(data_loader):
    in_lens, indices = torch.sort(in_lens, dim=0, descending=True)
    in_feats, out_feats, out_lens = in_feats[indices], out_feats[indices], out_lens[indices]
    
    # 순전파 계산
    outs, outs_fine, logits, _ = model(in_feats, in_lens, out_feats)
    
    # 제로 패디그 부분을 손실 함수의 계산에서 제외하기 위해 마스크를 적용합니다.
    # Mask (B x T x 1)
    mask = make_non_pad_mask(out_lens).unsqueeze(-1)
    out_feats = out_feats.masked_select(mask)
    outs = outs.masked_select(mask)
    outs_fine = outs_fine.masked_select(mask)
    stop_flags = stop_flags.masked_select(mask.squeeze(-1))
    logits = logits.masked_select(mask.squeeze(-1))

    # 손실 계산
    decoder_out_loss = nn.MSELoss()(outs, out_feats)
    postnet_out_loss = nn.MSELoss()(outs_fine, out_feats) 
    stop_token_loss = nn.BCEWithLogitsLoss()(logits, stop_flags)
    
    # 손실의 합계
    loss = decoder_out_loss + postnet_out_loss + stop_token_loss

    # 손실 값 출력
    print(f"decoder_out_loss: {decoder_out_loss:.2f}, postnet_out_loss: {postnet_out_loss:.2f}, stop_token_loss: {stop_token_loss:.2f}")
    # optimizer에 축적 된 그라디언트 재설정
    optimizer.zero_grad()
    # 오차의 역전파
    loss.backward()
    # 매개변수 업데이트
    optimizer.step()
    # 학습률 스케줄러 업데이트
    lr_scheduler.step()

### 어텐션 가중치 시각화

여기에서는 학습이 성공적으로 진행되지 않는 경우의 예로 의도적으로 학습된 모델의 일부 매개 변수를 난수로 초기화합니다. 자세한 내용은 `randomize_tts_engine_`을 참조하십시오.

In [None]:
from ttslearn.tacotron import Tacotron2TTS
from ttslearn.tacotron.tts import randomize_tts_engine_

tacotron_engine = Tacotron2TTS()

tacotron_engine_bad = Tacotron2TTS()
randomize_tts_engine_(tacotron_engine_bad)
print("randomized some of network weights")

In [None]:
text = "水をマレーシアから買わなくてはならないのです。"

import pyopenjtalk
from ttslearn.tacotron.frontend.openjtalk import text_to_sequence, pp_symbols

labels = pyopenjtalk.extract_fullcontext(text)
# 운율 기호가있는 음소열
in_feats = text_to_sequence(pp_symbols(labels))
in_feats = torch.tensor(in_feats, dtype=torch.long)

with torch.no_grad():
    outs, outs_fine, logits, att_ws = tacotron_engine.acoustic_model.inference(in_feats)
    
with torch.no_grad():
    outs2, outs_fine2, logits2, att_ws2 = tacotron_engine_bad.acoustic_model.inference(in_feats)

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(12, 5))
ax[0].set_title("Failure")
ax[1].set_title("Normal")

mesh = ax[0].imshow(att_ws2.cpu().data.numpy().T, aspect="auto", origin="lower", interpolation="nearest")
fig.colorbar(mesh, ax=ax[0])
ax[0].set_xlabel("Decoder time step [frame]")
ax[0].set_ylabel("Encoder time step [phoneme]")

mesh = ax[1].imshow(att_ws.cpu().data.numpy().T, aspect="auto", origin="lower", interpolation="nearest")
fig.colorbar(mesh, ax=ax[1])
ax[1].set_xlabel("Decoder time step [frame]")
ax[1].set_ylabel("Encoder time step [phoneme]")

plt.tight_layout()

# 그림 10-5
savefig("./fig/e2etts_impl_attention_failure")

### 실용적인 학습 스크립트 구현

`train_tacotron.py`를 참조하십시오.

## 10.5 Tacotron 학습

### Tacotron 구성 파일

In [None]:
! cat conf/train_tacotron/model/{acoustic_config_name}.yaml

### Tacotron 인스턴스화

In [None]:
import hydra
from omegaconf import OmegaConf
hydra.utils.instantiate(OmegaConf.load(f"./conf/train_tacotron/model/{acoustic_config_name}.yaml")["netG"])

### 레시피의 stage 3 실행

In [None]:
if run_sh:
    ! ./run.sh --stage 3 --stop-stage 3 --acoustic-model $acoustic_config_name \
        --tqdm $run_sh_tqdm --tacotron-train-max-train-steps $tacotron_max_train_steps \
        --tacotron-data-batch-size $tacotron_batch_size \
        --cudnn-benchmark $cudnn_benchmark --cudnn-deterministic $cudnn_deterministic

### 손실 함수의 값 추이

著者による実験結果です。Tensorboardのログは https://tensorboard.dev/ にアップロードされています。
ログデータを`tensorboard` パッケージを利用してダウンロードします。

https://tensorboard.dev/experiment/yXyg9qgfQRSGxvil5FA4xw/

In [None]:
if exists("tensorboard/all_log.csv"):
    df = pd.read_csv("tensorboard/all_log.csv")
else:
    experiment_id = "gHKogn7wRxa4B3NIVw27xw"
    experiment = tb.data.experimental.ExperimentFromDev(experiment_id)
    df = experiment.get_scalars() 
    df.to_csv("tensorboard/all_log.csv", index=False)
df["run"].unique()

In [None]:
tacotron_loss = df[df.run.str.contains("tacotron2_rf2")]

tacotron_train_loss = tacotron_loss[tacotron_loss.tag.str.startswith("Loss/train")]
tacotron_dev_loss = tacotron_loss[tacotron_loss.tag.str.startswith("Loss/dev")]

fig, ax = plt.subplots(figsize=(6,4))
ax.plot(tacotron_train_loss["step"], tacotron_train_loss["value"], label="Train")
ax.plot(tacotron_dev_loss["step"], tacotron_dev_loss["value"], "--", label="Dev")
ax.set_xlabel("Epoch")
ax.set_ylabel("Epoch loss")
plt.legend()

# 그림 10-6
savefig("fig/tacotron_impl_tacotron_loss")

## 10.6 WaveNet 보코더 학습

### WaveNet 보코더 설정 파일

In [None]:
! cat conf/train_wavenet/model/{wavenet_config_name}.yaml

### WaveNet 보코더 인스턴스화

In [None]:
import hydra
from omegaconf import OmegaConf
# WaveNet의 30 층 모두를 표시하면 길어지므로 여기서는 생략합니다.
# hydra.utils.instantiate(OmegaConf.load(f"./conf/train_wavenet/model/{wavenet_config_name}.yaml")["netG"])

### 레시피 stage 4 실행

In [None]:
if run_sh:
    ! ./run.sh --stage 4 --stop-stage 4 --wavenet-model $wavenet_config_name \
        --tqdm $run_sh_tqdm --wavenet-train-max-train-steps $wavenet_max_train_steps \
        --wavenet-data-batch-size $wavenet_batch_size \
        --cudnn-benchmark $cudnn_benchmark --cudnn-deterministic $cudnn_deterministic

### 손실 함수의 값 추이

In [None]:
wavenet_loss = df[df.run.str.contains("wavenet")]

wavenet_train_loss = wavenet_loss[wavenet_loss.tag.str.contains("Loss/train")]
wavenet_dev_loss = wavenet_loss[wavenet_loss.tag.str.contains("Loss/dev")]

fig, ax = plt.subplots(figsize=(6,4))
ax.plot(wavenet_train_loss["step"], wavenet_train_loss["value"], label="Train")
ax.plot(wavenet_dev_loss["step"], wavenet_dev_loss["value"], "--", label="Dev")
ax.set_xlabel("Epoch")
ax.set_ylabel("Epoch loss")
ax.set_ylim(1.6, 2.3)
plt.legend()

# 그림 10-7
savefig("fig/tacotron_impl_wavenet_loss")

## 10.7 학습된 모델을 사용하여 텍스트에서 음성 합성

### 학습된 모델 로드

In [None]:
import joblib
device = torch.device("cpu")

#### Tacotron 로드

In [None]:
acoustic_config = OmegaConf.load(f"exp/jsut_sr16000/{acoustic_config_name}/model.yaml")
acoustic_model = hydra.utils.instantiate(acoustic_config.netG)
checkpoint = torch.load(f"exp/jsut_sr16000/{acoustic_config_name}/latest.pth", map_location=device)
acoustic_model.load_state_dict(checkpoint["state_dict"])
acoustic_model.eval();

#### WaveNet 보코더 로드

In [None]:
wavenet_config = OmegaConf.load(f"exp/jsut_sr16000/{wavenet_config_name}/model.yaml")
wavenet_model = hydra.utils.instantiate(wavenet_config.netG)
checkpoint = torch.load(f"exp/jsut_sr16000/{wavenet_config_name}/latest_ema.pth", map_location=device)
wavenet_model.load_state_dict(checkpoint["state_dict"])
# weight normalization 은 추론시에는 불필요하므로 제외
wavenet_model.remove_weight_norm_()
wavenet_model.eval();

#### 통계량 로드

통계는 Griffin-Lim 알고리즘을 사용하는 경우에만 필요합니다.

In [None]:
acoustic_out_scaler = joblib.load("./dump/jsut_sr16000/norm/out_tacotron_scaler.joblib")

### 멜 스펙트로그램 예측

In [None]:
from ttslearn.util import find_lab, find_feats

labels = hts.load(find_lab("downloads/jsut-label/", test_utt))

in_feats = text_to_sequence(pp_symbols(labels.contexts))
in_feats = torch.tensor(in_feats, dtype=torch.long).to(device)

with torch.no_grad():
    out_feats, out_feats_fine, stop_flags, alignment = acoustic_model.inference(in_feats)
    
# 비교를 위해 자연음성에서 추출한 음향 특징량을 읽기
feats = np.load(find_feats("dump/jsut_sr16000/norm/", test_utt, typ="out_tacotron"))

#### 멜 스펙트로그램 시각화

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(8,6))
ax[0].set_title("Mel-spectrogram of natural speech")
ax[1].set_title("Mel-spectrogram of Tacotron output")

mindb = min(feats.min(), out_feats_fine.min())
maxdb = max(feats.max(), out_feats_fine.max())

hop_length = int(sr * 0.0125)
mesh = librosa.display.specshow(
    feats.T, sr=sr, x_axis="time", y_axis="frames", hop_length=hop_length, cmap=cmap, ax=ax[0])
mesh.set_clim(mindb, maxdb)
fig.colorbar(mesh, ax=ax[0])
mesh = librosa.display.specshow(
    out_feats_fine.data.numpy().T, sr=sr, x_axis="time", y_axis="frames", hop_length=hop_length, cmap=cmap, ax=ax[1])
mesh.set_clim(mindb, maxdb)
fig.colorbar(mesh, ax=ax[1])

for a in ax:
    a.set_xlabel("Time [sec]")
    a.set_ylabel("Mel filter channel")
fig.tight_layout()

# 그림 10-8
savefig("./fig/e2etts_impl_logmel_comp")

#### 어텐션 가중치 시각화

In [None]:
fig, ax = plt.subplots(figsize=(6,4))
im = ax.imshow(alignment.cpu().data.numpy().T, aspect="auto", origin="lower", interpolation="nearest")
fig.colorbar(im, ax=ax)
ax.set_xlabel("Decoder time step [frame]")
ax.set_ylabel("Encoder time step [phoneme]");

#### Stop token 시각화

In [None]:
fig, ax = plt.subplots(figsize=(6,4))
ax.plot(torch.sigmoid(stop_flags).cpu().numpy())
ax.set_xlabel("Time [frame]")
ax.set_ylabel("Stop probability");

### 음성 파형 생성

In [None]:
from ttslearn.dsp import inv_mulaw_quantize

@torch.no_grad()
def gen_waveform(wavenet_model, out_feats):
    # (B, T, C) -> (B, C, T)
    c = out_feats.view(1, -1, out_feats.size(-1)).transpose(1, 2)

    # 오디오 샘플 수 계산
    upsample_scale = np.prod(wavenet_model.upsample_scales)
    T = (
        c.shape[-1] - wavenet_model.aux_context_window * 2
    ) * upsample_scale

    # WaveNet으로 음성 파형 생성
    # 참고 : 계산에 시간이 오래 걸리므로 tqdm의 진행률 표시 줄을 수락합니다.
    gen_wav = wavenet_model.inference(c, T, tqdm)

    # One-hot 벡터를 1차원 신호로 변환
    gen_wav = gen_wav.max(1)[1].float().cpu().numpy().reshape(-1)

    # Mu-law 양자화의 역변환
    gen_wav = inv_mulaw_quantize(
        gen_wav, wavenet_model.out_channels - 1
    )
    
    return gen_wav

### 모든 모델을 결합하여 음성 파형 생성

In [None]:
from ttslearn.util import find_lab, find_feats
from ttslearn.dsp import logmelspectrogram_to_audio

# WaveNet 보코더 대신 Griffin-Lim 알고리즘을 사용하는 경우 다음을 True로 설정하십시오.
griffin_lim = False

labels = hts.load(find_lab("downloads/jsut-label/", test_utt))
in_feats = text_to_sequence(pp_symbols(labels.contexts))
in_feats = torch.tensor(in_feats, dtype=torch.long).to(device)

with torch.no_grad():
    _, out_feats, _, _ = acoustic_model.inference(in_feats)
    
if griffin_lim:
    # Griffin-Lim 알고리즘을 기반으로 음성 파형 생성
    out_feats = out_feats.cpu().data.numpy()
    # 정규화의 역변환
    logmel = acoustic_out_scaler.inverse_transform(out_feats)
    gen_wav = logmelspectrogram_to_audio(logmel, sr)
else:
    # WaveNet 보코더로 음성 파형 생성
    gen_wav = gen_waveform(wavenet_model, out_feats)

In [None]:
# 비교를 위해 원래 음성 로드
from scipy.io import wavfile
_sr, ref_wav = wavfile.read(f"./downloads/jsut_ver1.1/basic5000/wav/{test_utt}.wav")
ref_wav = (ref_wav / 32768.0).astype(np.float64)
ref_wav = librosa.resample(ref_wav, _sr, sr)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(8,6))

hop_length = int(sr * 0.005)
fft_size = pyworld.get_cheaptrick_fft_size(sr)

# Tacotron의 출력과 거칠게 정렬하기 위해 자연 음성의 시작과 끝의 무음 구간 삭제
ref_wav_trim = librosa.effects.trim(ref_wav, top_db=20)[0]

spec_ref = librosa.stft(ref_wav_trim, n_fft=fft_size, hop_length=hop_length, window="hann")
logspec_ref = np.log(np.abs(spec_ref))
spec_gen = librosa.stft(gen_wav, n_fft=fft_size, hop_length=hop_length, window="hann")
logspec_gen = np.log(np.abs(spec_gen))

mindb = min(logspec_ref.min(), logspec_gen.min())
maxdb = max(logspec_ref.max(), logspec_gen.max())

mesh = librosa.display.specshow(logspec_ref, hop_length=hop_length, sr=sr, cmap=cmap, x_axis="time", y_axis="hz", ax=ax[0])
mesh.set_clim(mindb, maxdb)
fig.colorbar(mesh, ax=ax[0], format="%+2.fdB")

mesh = librosa.display.specshow(logspec_gen, hop_length=hop_length, sr=sr, cmap=cmap, x_axis="time", y_axis="hz", ax=ax[1])
mesh.set_clim(mindb, maxdb)
fig.colorbar(mesh, ax=ax[1], format="%+2.fdB")

ax[0].set_title("Spectrogram of natural speech")
ax[1].set_title("Spectrogram of generated speech")

for a in ax:
    a.set_xlabel("Time [sec]")
    a.set_ylabel("Frequency [Hz]")

plt.tight_layout()

print("자연 음성")
IPython.display.display(Audio(ref_wav_trim, rate=sr))
print("Tacotron 2에 의한 합성 음성")
IPython.display.display(Audio(gen_wav, rate=sr))

# 그림 10-9
savefig("./fig/e2etts_impl_tts_spec_comp")

### 합성 음성의 보다 상세한 비교 (bonus)

In [None]:
# 비교를 위해 자연 음성에서 추출한 멜 스펙트로 그램에서 음성 파형을 생성합니다.
feats = np.load(find_feats("dump/jsut_sr16000/norm/", test_utt, typ="out_tacotron"))
feats = torch.from_numpy(feats)
gen_wav_wn_gt = gen_waveform(wavenet_model, feats)

In [None]:
ref_wav_inv = np.load(find_feats("./dump/jsut_sr16000/org/", test_utt, typ="out_wavenet"))
ref_wav_inv = inv_mulaw_quantize(ref_wav_inv, 255)

In [None]:
print("자연 음성")
IPython.display.display(Audio(ref_wav, rate=sr))
print("자연 음성 (8-bit mu-law)")
IPython.display.display(Audio(ref_wav_inv, rate=sr))
print("WaveNet 보코더 출력")
IPython.display.display(Audio(gen_wav_wn_gt, rate=sr))
print("Tacotron + WaveNet 보코더 출력")
IPython.display.display(Audio(gen_wav, rate=sr))

### 평가 데이터에 대한 음성 파형 생성

#### 레시피 stage 5 실행

In [None]:
if run_sh:
    ! ./run.sh --stage 5 --stop-stage 5 --acoustic-model $acoustic_config_name \
        --tqdm $run_sh_tqdm --wavenet-model $wavenet_config_name \
        --reverse true --num-eval-utts $num_eval_utts

#### 레시피 stage 6 실행

In [None]:
if run_sh and run_stage6:
    ! ./run.sh --stage 6 --stop-stage 6 --acoustic-model $acoustic_config_name \
        --tqdm $run_sh_tqdm --wavenet-model $wavenet_config_name \
        --reverse true --num-eval-utts $num_eval_utts

## 자연 음성과 합성 음성의 비교 (bonus)

In [None]:
from pathlib import Path
from ttslearn.util import load_utt_list

with open("./downloads/jsut_ver1.1/basic5000/transcript_utf8.txt") as f:
    transcripts = {}
    for l in f:
        utt_id, script = l.split(":")
        transcripts[utt_id] = script
        
eval_list = load_utt_list("data/eval.list")[::-1][:5]

for utt_id in eval_list:
    # ref file 
    ref_file = f"./downloads/jsut_ver1.1/basic5000/wav/{utt_id}.wav"
    _sr, ref_wav = wavfile.read(ref_file)
    ref_wav = (ref_wav / 32768.0).astype(np.float64)
    ref_wav = librosa.resample(ref_wav, _sr, sr)
  
    print(f"{utt_id}: {transcripts[utt_id]}")
    print("자연 음성")
    IPython.display.display(Audio(ref_wav, rate=sr))

    gen_file = f"exp/jsut_sr16000/synthesis_{acoustic_config_name}_griffin_lim/eval/{utt_id}.wav"
    if exists(gen_file):
        _sr, gen_wav = wavfile.read(gen_file)
        print("Tacotron + Griffin-Lim")
        IPython.display.display(Audio(gen_wav, rate=sr))
    else:
        print("Tacotron + Griffin-Lim: not found")

    gen_file_wn = f"exp/jsut_sr16000/synthesis_{acoustic_config_name}_{wavenet_config_name}/eval/{utt_id}.wav"
    if exists(gen_file_wn):
        _sr, gen_wav_wn = wavfile.read(gen_file_wn)
        print("Tacotron + WaveNet 보코더")
        IPython.display.display(Audio(gen_wav_wn, rate=sr))
    else:
        print("Tacotron + WaveNet 보코더: not found")

## 학습된 모델 패키징 (bonus)

학습된 모델을 사용하는 TTS에 필요한 모든 파일을 단일 디렉토리로 그룹화합니다.
`ttslearn.tacotron.Tacotron2TTS` 클래스에는, 정리한 디렉토리를 지정해, TTS를 실시하는 기능이 구현되고 있습니다.

### 레시피 stage 99 실행

In [None]:
if run_sh:
    ! ./run.sh --stage 99 --stop-stage 99 --acoustic-model $acoustic_config_name \
        --wavenet-model $wavenet_config_name

In [None]:
!ls tts_models/jsut_sr16000_{acoustic_config_name}_{wavenet_config_name}

### 패키징된 모델을 이용한 TTS

In [None]:
from ttslearn.tacotron import Tacotron2TTS

# 패키징된 모델의 경로를 지정합니다.
engine = Tacotron2TTS(
    model_dir=f"./tts_models/jsut_sr16000_{acoustic_config_name}_{wavenet_config_name}"
)
wav, sr = engine.tts("ここまでお読みいただき、ありがとうございました。", tqdm=tqdm)

fig, ax = plt.subplots(figsize=(8,2))
librosa.display.waveshow(wav.astype(np.float32), sr, ax=ax)
ax.set_xlabel("Time [sec]")
ax.set_ylabel("Amplitude")
plt.tight_layout()

Audio(wav, rate=sr)

In [None]:
if is_colab():
    from datetime import timedelta
    elapsed = (time.time() - start_time)
    print("소요시간:", str(timedelta(seconds=elapsed)))