In [8]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
cd /content/drive/MyDrive/ASR/StyleTokens/

/content/drive/MyDrive/ASR/StyleTokens


In [10]:
pip install pypinyin

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip3 install -r requirements.txt

In [None]:
import torch
import torch.nn as nn
from GST import GST
from Hyperparameters import Hyperparameters as hp


class Tacotron(nn.Module):
    '''
    input:
        texts: [N, T_x]
        mels: [N, T_y/r, n_mels*r]
    output:
        mels --- [N, T_y/r, n_mels*r]
        mags --- [N, T_y, 1+n_fft//2]
        attn_weights --- [N, T_y/r, T_x]
    '''

    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(len(hp.vocab), hp.E)
        self.encoder = Encoder()
        self.decoder = Decoder()

        self.gst = GST()

    def forward(self, texts, mels, ref_mels):
        embedded = self.embedding(texts)  # [N, T_x, E]
        memory, encoder_hidden = self.encoder(embedded)  # [N, T_x, E]

        style_embed = self.gst(ref_mels)  # [N, 256]
        style_embed = style_embed.expand_as(memory)
        memory = memory + style_embed

        mels_hat, mags_hat, attn_weights = self.decoder(mels, memory)

        return mels_hat, mags_hat, attn_weights


class Encoder(nn.Module):
    '''
    input:
        inputs: [N, T_x, E]
    output:
        outputs: [N, T_x, E]
        hidden: [2, N, E//2]
    '''

    def __init__(self):
        super().__init__()
        self.prenet = PreNet(in_features=hp.E)  # [N, T, E//2]

        self.conv1d_bank = Conv1dBank(K=hp.K, in_channels=hp.E // 2, out_channels=hp.E // 2)  # [N, T, E//2 * K]

        self.conv1d_1 = Conv1d(in_channels=hp.K * hp.E // 2, out_channels=hp.E // 2, kernel_size=3)  # [N, T, E//2]
        self.conv1d_2 = Conv1d(in_channels=hp.E // 2, out_channels=hp.E // 2, kernel_size=3)  # [N, T, E//2]
        self.bn1 = BatchNorm1d(num_features=hp.E // 2)
        self.bn2 = BatchNorm1d(num_features=hp.E // 2)

        self.highways = nn.ModuleList()
        for i in range(hp.num_highways):
            self.highways.append(Highway(in_features=hp.E // 2, out_features=hp.E // 2))

        self.gru = nn.GRU(input_size=hp.E // 2, hidden_size=hp.E // 2, num_layers=2, bidirectional=True, batch_first=True)

    def forward(self, inputs, prev_hidden=None):
        # prenet
        inputs = self.prenet(inputs)  # [N, T, E//2]

        # CBHG
        # conv1d bank
        outputs = self.conv1d_bank(inputs)  # [N, T, E//2 * K]
        outputs = max_pool1d(outputs, kernel_size=2)  # [N, T, E//2 * K]

        # conv1d projections
        outputs = self.conv1d_1(outputs)  # [N, T, E//2]
        outputs = self.bn1(outputs)
        outputs = nn.functional.relu(outputs)  # [N, T, E//2]
        outputs = self.conv1d_2(outputs)  # [N, T, E//2]
        outputs = self.bn2(outputs)

        outputs = outputs + inputs  # residual connect

        # highway
        for layer in self.highways:
            outputs = layer(outputs)
            # outputs = nn.functional.relu(outputs)  # [N, T, E//2]

        # outputs = torch.transpose(outputs, 0, 1)  # [T, N, E//2]

        self.gru.flatten_parameters()
        outputs, hidden = self.gru(outputs, prev_hidden)  # outputs [N, T, E]

        return outputs, hidden


class Decoder(nn.Module):
    '''
    input:
        inputs --- [N, T_y/r, n_mels * r]
        memory --- [N, T_x, E]
    output:
        mels   --- [N, T_y/r, n_mels*r]
        mags --- [N, T_y, 1+n_fft//2]
        attn_weights --- [N, T_y/r, T_x]
    '''

    def __init__(self):
        super().__init__()
        self.prenet = PreNet(hp.n_mels)
        self.attn_rnn = AttentionRNN()
        self.attn_projection = nn.Linear(in_features=2 * hp.E, out_features=hp.E)
        self.gru1 = nn.GRU(input_size=hp.E, hidden_size=hp.E, batch_first=True, bidirectional=False)
        self.gru2 = nn.GRU(input_size=hp.E, hidden_size=hp.E, batch_first=True, bidirectional=False)
        self.fc1 = nn.Linear(in_features=hp.E, out_features=hp.n_mels * hp.r)
        self.cbhg = DecoderCBHG()  # Deng
        self.fc2 = nn.Linear(in_features=hp.E, out_features=1 + (hp.n_fft))  # Deng

    def forward(self, inputs, memory):
        if self.training:
            # prenet
            outputs = self.prenet(inputs)  # [N, T_y/r, E//2]

            attn_weights, outputs, attn_hidden = self.attn_rnn(outputs, memory)

            attn_apply = torch.bmm(attn_weights, memory)  # [N, T_y/r, E]
            attn_project = self.attn_projection(torch.cat([attn_apply, outputs], dim=2))  # [N, T_y/r, E]

            # GRU1
            self.gru1.flatten_parameters()
            outputs1, gru1_hidden = self.gru1(attn_project)  # outputs1--[N, T_y/r, E]  gru1_hidden--[1, N, E]
            gru_outputs1 = outputs1 + attn_project  # [N, T_y/r, E]
            # GRU2
            self.gru2.flatten_parameters()
            outputs2, gru2_hidden = self.gru2(gru_outputs1)  # outputs2--[N, T_y/r, E]  gru2_hidden--[1, N, E]
            gru_outputs2 = outputs2 + gru_outputs1

            # generate log melspectrogram
            mels = self.fc1(gru_outputs2)  # [N, T_y/r, n_mels*r]

            # CBHG
            out, cbhg_hidden = self.cbhg(mels)  # out -- [N, T_y, E]

            # generate linear spectrogram
            mags = self.fc2(out)  # out -- [N, T_y, 1+n_fft//2]

            return mels, mags, attn_weights

        else:
            # inputs = Go_frame  [1, 1, n_mels*r]
            attn_hidden = None
            gru1_hidden = None
            gru2_hidden = None

            mels = []
            mags = []
            attn_weights = []
            for i in range(hp.max_Ty):
                inputs = self.prenet(inputs)
                attn_weight, outputs, attn_hidden = self.attn_rnn(inputs, memory, attn_hidden)
                attn_weights.append(attn_weight)  # attn_weight: [1, 1, T_x]
                attn_apply = torch.bmm(attn_weight, memory)  # [1, 1, E]
                attn_project = self.attn_projection(torch.cat([attn_apply, outputs], dim=-1))  # [1, 1, E]

                # GRU1
                self.gru1.flatten_parameters()
                outputs1, gru1_hidden = self.gru1(attn_project, gru1_hidden)  # outputs1--[1, 1, E]  gru1_hidden--[1, 1, E]
                outputs1 = outputs1 + attn_project  # [1, T_y/r, E]
                # GRU2
                self.gru2.flatten_parameters()
                outputs2, gru2_hidden = self.gru2(outputs1, gru2_hidden)  # outputs2--[1, T_y/r, E]  gru2_hidden--[1, 1, E]
                outputs2 = outputs2 + outputs1

                # generate log melspectrogram
                mel = self.fc1(outputs2)  # [1, 1, n_mels*r]
                inputs = mel[:, :, -hp.n_mels:]  # get last frame
                mels.append(mel)

            mels = torch.cat(mels, dim=1)  # [1, max_iter, n_mels*r]
            attn_weights = torch.cat(attn_weights, dim=1)  # [1, T, T_x]

            out, cbhg_hidden = self.cbhg(mels)
            mags = self.fc2(out)

            return mels, mags, attn_weights


class DecoderCBHG(nn.Module):
    '''
    input:
        inputs: [N, T/r, n_mels * r]
    output:
        outputs: [N, T, E]
        hidden: [2, N, E//2]
    '''

    def __init__(self):
        super().__init__()

        self.conv1d_bank = Conv1dBank(K=hp.decoder_K, in_channels=hp.n_mels, out_channels=hp.E // 2)

        self.conv1d_1 = Conv1d(in_channels=hp.decoder_K * hp.E // 2, out_channels=hp.E, kernel_size=3)
        self.bn1 = BatchNorm1d(hp.E)
        self.conv1d_2 = Conv1d(in_channels=hp.E, out_channels=hp.n_mels, kernel_size=3)
        self.bn2 = BatchNorm1d(hp.n_mels)

        self.highways = nn.ModuleList()
        for i in range(hp.num_highways):
            self.highways.append(Highway(in_features=hp.n_mels, out_features=hp.n_mels))

        self.gru = nn.GRU(input_size=hp.n_mels, hidden_size=hp.E // 2, num_layers=2, bidirectional=True, batch_first=True)

    def forward(self, inputs, prev_hidden=None):
        inputs = inputs.view(inputs.size(0), -1, hp.n_mels)  # [N, T, n_mels]

        # conv1d bank
        outputs = self.conv1d_bank(inputs)  # [N, T, E//2 * K]
        outputs = max_pool1d(outputs, kernel_size=2)

        # conv1d projections
        outputs = self.conv1d_1(outputs)  # [N, T, E]
        outputs = self.bn1(outputs)
        outputs = nn.functional.relu(outputs)
        outputs = self.conv1d_2(outputs)  # [N, T, n_mels]
        outputs = self.bn2(outputs)

        outputs = outputs + inputs  # residual connect  [N, T, n_mels]

        # highway net
        for layer in self.highways:
            outputs = layer(outputs)  # [N, T, n_mels]

        # bidirection gru
        self.gru.flatten_parameters()
        outputs, hidden = self.gru(outputs, prev_hidden)  # outputs: [N, T, E]

        return outputs, hidden


In [7]:
from utils import *
from Data import get_eval_data
from Hyperparameters import Hyperparameters as hp
import torch
from scipy.io.wavfile import write
from Network import *
import librosa
from pypinyin import lazy_pinyin, Style
from __future__ import print_function, division
import numpy as np
# import tensorflow as tf
import librosa
import copy
import matplotlib
matplotlib.use('pdf')
import matplotlib.pyplot as plt
from scipy import signal
import os

device = torch.device('cpu')

def spectrogram2wav(mag):
    '''# Generate wave file from spectrogram'''
    # transpose
    #print(mag.shape, "Mag shape")
    mag = mag.T

    # de-noramlize
    mag = (np.clip(mag, 0, 1) * hp.max_db) - hp.max_db + hp.ref_db

    # to amplitude
    mag = np.power(10.0, mag * 0.05)

    # wav reconstruction
    wav = griffin_lim(mag)

    # de-preemphasis
    wav = signal.lfilter([1], [1, -hp.preemphasis], wav)

    # trim
    wav, _ = librosa.effects.trim(wav)
    #print(mag.shape)
    return wav.astype(np.float32)


def griffin_lim(spectrogram):
    '''Applies Griffin-Lim's raw.
    '''
    X_best = copy.deepcopy(spectrogram)
    for i in range(hp.n_iter):
        X_t = invert_spectrogram(X_best)
        est = librosa.stft(X_t, 2*hp.n_fft, hp.hop_length, win_length=hp.win_length)
        #print(est.shape, "Est shape")
        phase = est / np.maximum(1e-8, np.abs(est))
        #print(phase.shape, "Phase shape")
        X_best = spectrogram * phase
    X_t = invert_spectrogram(X_best)
    y = np.real(X_t)

    return y

def invert_spectrogram(spectrogram):
    '''
    spectrogram: [f, t]
    '''
    return librosa.istft(spectrogram, hp.hop_length, win_length=hp.win_length, window="hann")

def synthesis(model, eval_text):
    eval_text = _pinyin(eval_text)

    model.eval()

    # ref_wavs = [
    #     'ref_wav/nannan.wav', 'ref_wav/xiaofeng.wav', 'ref_wav/donaldduck.wav'
    # ]
    ref_wavs = [
        'ref_wav/nannan.wav',
        'ref_wav/xiaofeng.wav',
        'ref_wav/donaldduck.wav'
    ]
    speakers = ['nannan', 'xiaofeng', 'donaldduck']

    wavs = {}

    for ref_wav, speaker in zip(ref_wavs, speakers):
        text, GO, ref_mels = get_eval_data(eval_text, ref_wav)
        text = text.to(device)
        GO = GO.to(device)
        ref_mels = ref_mels.to(device)

        mel_hat, mag_hat, attn = model(text, GO, ref_mels)
        mag_hat = mag_hat.squeeze().detach().cpu().numpy()
        attn = attn.squeeze().detach().cpu().numpy()
        #print(mag_hat.shape)
        wav_hat = spectrogram2wav(mag_hat)
        wavs[speaker] = wav_hat

    return wavs


def load_model(checkpoint_path):
    model = Tacotron().to(device)
    model.load_state_dict(
        torch.load(
            checkpoint_path, map_location=lambda storage, location: storage))
    return model


def _pinyin(s):
    symbols = '0123456789abcdefghijklmnopqrstuvwxyz '
    s = lazy_pinyin(s, style=Style.TONE2)
    yin = []
    for token in s:
        if token != ' ':
            a = ''
            for c in token:
                if c in symbols:
                    a += c
            yin.append(a)
    a = ''
    s = ' '.join(yin)
    for i in range(len(s)):
        if s[i] == ' ' and i < len(s) - 1 and s[i + 1] == ' ':
            continue
        a += s[i]
    return a



In [None]:
text = '''毛主席是中国的红太阳'''
text = "刘易斯汉密尔顿赢得摩纳哥大奖赛"
text = "太阳从东方升起"
model = load_model('checkpoint/epoch100.pt')
wavs = synthesis(model, text)
for k in wavs:
  wav = wavs[k]
  write('samples/{}.wav'.format(k), hp.sr, wav)