<div align="center">
    
<br>

# FINE-GRAINED EMOTIONAL CONTROL OF TEXT-TO-SPEECH 

### LEARNING TO RANK INTER- AND INTRA-CLASS EMOTION INTENSITIES

Shijun Wang, Jón Guðnason, Damian Borth

**ICASSP 2023**

<br>

---

<br>

</div>

In [110]:
##############################################
# 1. Paths
##############################################
DATA_PATH           = '/workspace/data/EmoV-DB'
CORPUS_PATH         = '/workspace/montreal_forced_aligner/corpus'
TEXTGRID_PATH       = '/workspace/montreal_forced_aligner/aligned'
PREPROCESSED_PATH   = '/workspace/preprocessed'


##############################################
# 2. Preprocessing
##############################################
NOISE_SYMBOL        = ' [noise] '
SPEAKERS            = ['bea', 'jenie', 'josh', 'sam']
EMOTIONS            = ['neutral', 'amused', 'angry', 'disgusted', 'sleepy']
SIL_PHONES          = ['sil', 'spn', 'sp', '']
PITCH_AVERAGING     = True
ENERGY_AVERAGING    = True


##############################################
# 3. Audio (optimized for vocoder)
##############################################
SAMPLING_RATE       = 16000
HOP_LENGTH          = 256
WIN_LENGTH          = 1024
N_FFT               = 1024
N_MELS              = 80
F_MIN               = 0.0
F_MAX               = 8000.0

In [107]:
import os
import tgt
import glob
import tqdm
import json
import torch
import scipy
import librosa
import torchaudio
import numpy as np
import pyworld as pw

from text import _clean_text
from sklearn.preprocessing import StandardScaler
from speechbrain.lobes.models.FastSpeech2 import mel_spectogram

**1. Preprocessing**



In [None]:
audio_id_to_transcript = {}

with open(os.path.join(DATA_PATH, 'cmuarctic.data')) as f:
    for line in f.readlines():
        audio_id, transcript = line[2:-2].split('\"')[:2]

        audio_id = audio_id.strip()
        transcript = transcript.strip()

        if audio_id.startswith('arctic_b'):
            continue
        
        audio_id = audio_id[-4:]
        transcript = NOISE_SYMBOL + _clean_text(transcript, ['english_cleaners']) + NOISE_SYMBOL

        audio_id_to_transcript[audio_id] = transcript.strip()

  return s in _symbol_to_id and s is not '_' and s is not '~'
  return s in _symbol_to_id and s is not '_' and s is not '~'


In [None]:
for speaker in tqdm.tqdm(SPEAKERS):
    for emotion in EMOTIONS:

        # check the path existence: josh has only three emotions
        spk_emo_path = os.path.join(DATA_PATH, speaker, emotion)
        if not os.path.exists(spk_emo_path):
            continue
        
        # resample and create .lab file
        for wav_path in glob.glob(os.path.join(spk_emo_path, '*.wav')):

            y, sr = librosa.load(wav_path, sr=SAMPLING_RATE)

            audio_id = os.path.basename(wav_path)[:-4]
            transcript = audio_id_to_transcript[audio_id]

            os.makedirs(os.path.join(CORPUS_PATH, speaker), exist_ok=True)

            tgt_path = os.path.join(CORPUS_PATH, speaker, f'{emotion}_{audio_id}')
            scipy.io.wavfile.write(tgt_path + '.wav', sr, y)

            with open(tgt_path + '.lab', 'w') as f:
                f.write(transcript + '\n')


100%|██████████| 4/4 [02:39<00:00, 39.76s/it]


In [None]:
# download speech dictionary
wget -O /workspace/montreal_forced_aligner/librispeech-lexicon.txt https://openslr.trmal.net/resources/11/librispeech-lexicon.txt 

# prepare environment for montreal forced aligner
conda create -n aligner -c conda-forge montreal-forced-aligner -y

# **important** please make sure to select `aligner` environment
mfa model download acoustic english_us_arpa
mfa validate /workspace/montreal_forced_aligner/corpus /workspace/montreal_forced_aligner/librispeech-lexicon.txt english_us_arpa
mfa align /workspace/montreal_forced_aligner/corpus /workspace/montreal_forced_aligner/librispeech-lexicon.txt english_us_arpa /workspace/montreal_forced_aligner/aligned

In [116]:
def process_textgrid(textgrid_file):

    phones = []
    durations = []
    speech_start_time = 0.0
    speech_end_time = 0.0
    end_idx = 0

    tg = tgt.io.read_textgrid(textgrid_file, include_empty_intervals=True)
    tier = tg.get_tier_by_name('phones')

    for t in tier._objects:
        
        s, e, p = t.start_time, t.end_time, t.text

        if len(phones) == 0:
            if p in SIL_PHONES:
                continue
            speech_start_time = s

        if p not in SIL_PHONES:
            phones.append(p)
            speech_end_time = e
            end_idx = len(phones)
        else:
            phones.append('spn')

        durations.append(
            int(
                np.round(e * SAMPLING_RATE / HOP_LENGTH) -
                np.round(s * SAMPLING_RATE / HOP_LENGTH)
            )
        )

    phones = phones[:end_idx]
    durations = durations[:end_idx]

    return phones, durations, speech_start_time, speech_end_time


def trim_audio(y, start_time, end_time):
    start_idx = int(np.round(start_time * SAMPLING_RATE))
    end_idx = int(np.round(end_time * SAMPLING_RATE))

    y = y[start_idx:end_idx].astype(np.float32)
    return y


def get_pitch(y):
    y = y.astype(np.float64)
    f0, t = pw.dio(y, SAMPLING_RATE, frame_period=HOP_LENGTH / SAMPLING_RATE * 1000)
    f0 = pw.stonemask(y, f0, t, SAMPLING_RATE)
   
    return f0


def get_mel(y):
    y = torch.FloatTensor(y)
    mel, energy = mel_spectogram(
        audio=y,
        sample_rate=SAMPLING_RATE,
        hop_length=HOP_LENGTH,
        win_length=WIN_LENGTH,
        n_mels=N_MELS,
        n_fft=N_FFT,
        f_min=F_MIN,
        f_max=F_MAX,
        power=1,
        normalized=False,
        min_max_energy_norm=True,
        norm="slaney",
        mel_scale="slaney",
        compression=True
    )
    return mel, energy


def expand(values, durations):
    out = list()
    for value, d in zip(values, durations):
        out += [value] * max(0, int(d))
    return np.array(out)

In [155]:
def feature_extraction(speaker, emotion):

    def _remove_outliers(x):
        p25 = np.percentile(x, 25)
        p75 = np.percentile(x, 75)
        lower = p25 - 1.5 * (p75 - p25)
        upper = p75 + 1.5 * (p75 - p25)
        normal_indices = np.logical_and(x >= lower, x <= upper)
        return x[normal_indices]

    def _normalize(name, mean, std):
        
        max_value = np.finfo(np.float64).min
        min_value = np.finfo(np.float64).max

        for preprocessed_path in glob.glob(os.path.join(PREPROCESSED_PATH, speaker, f'{emotion}_*.npz')):
            data = dict(np.load(preprocessed_path))

            data[name] = (data[name] - mean) / std
            np.savez(preprocessed_path, **data)

            max_value = max(max_value, max(data[name]))
            min_value = min(min_value, min(data[name]))

        return min_value, max_value

    pitch_scaler = StandardScaler()
    energy_scaler = StandardScaler()

    for audio_path in glob.glob(os.path.join(CORPUS_PATH, speaker, f'{emotion}_*.wav')):

        audio_id = os.path.basename(audio_path)[:-4].split('_')[-1]
        
        textgrid_path = os.path.join(TEXTGRID_PATH, speaker, f'{emotion}_{audio_id}.TextGrid')
        transcript_path = os.path.join(CORPUS_PATH, speaker, f'{emotion}_{audio_id}.lab')

        # check the path existence
        if not os.path.exists(textgrid_path):
            continue

        phones, durations, speech_start_time, speech_end_time = process_textgrid(textgrid_path)

        if speech_start_time >= speech_end_time:
            print(f"Invalid start/end: {audio_path}")

        # trim audio
        y, sr = librosa.load(audio_path, sr=SAMPLING_RATE)
        y = trim_audio(y, speech_start_time, speech_end_time)

        # transcript
        with open(transcript_path, 'r') as f:
            transcript = f.read().strip().replace(NOISE_SYMBOL, '')

        # pitch
        pitch = get_pitch(y)
        if np.sum(pitch != 0) <= 1:
            print(f"Invalid pitch: {audio_path}")
            continue
        pitch = pitch[:sum(durations)]
        
        # melspectrogram, energy
        mel, energy = get_mel(y)
        mel = mel.numpy()
        energy = energy.numpy()
        mel = mel[:, :sum(durations)]
        energy = energy[:sum(durations)]


        if PITCH_AVERAGING:
            nonzero_ids = np.where(pitch != 0)[0]
            interp_fn = scipy.interpolate.interp1d(
                nonzero_ids,
                pitch[nonzero_ids],
                kind='linear',
                fill_value=(pitch[nonzero_ids[0]], pitch[nonzero_ids[-1]]),
                bounds_error=False  
            )
            pitch = interp_fn(np.arange(len(pitch)))

            pos = 0
            for i, d in enumerate(durations):
                if d > 0:
                    pitch[i] = np.mean(pitch[pos:pos + d])
                else:
                    pitch[i] = 0
                pos += d
            pitch = pitch[:len(durations)]

        if ENERGY_AVERAGING:
            pos = 0
            for i, d in enumerate(durations):
                if d > 0:
                    energy[i] = np.mean(energy[pos:pos + d])
                else:
                    energy[i] = 0
                pos += d
            energy = energy[:len(durations)]

        
        # remove outliers
        outlier_removed_pitch = _remove_outliers(pitch)
        outlier_removed_energy = _remove_outliers(energy)

        pitch_scaler.partial_fit(outlier_removed_pitch.reshape((-1, 1)))
        energy_scaler.partial_fit(outlier_removed_energy.reshape((-1, 1)))


        # save artifacts
        np.savez(
            os.path.join(PREPROCESSED_PATH, speaker, f'{emotion}_{audio_id}.npz'),
            
            # metadata
            phones=phones,
            emotion=emotion,
            speaker=speaker,
            audio_id=audio_id,
            audio_path=audio_path,
            transcript=transcript,
            textgrid_path=textgrid_path,
            
            # inputs
            mel=mel,
            pitch=pitch,
            energy=energy,
            durations=durations,
        )

    pitch_mean, pitch_std = pitch_scaler.mean_[0], pitch_scaler.scale_[0]
    energy_mean, energy_std = energy_scaler.mean_[0], energy_scaler.scale_[0]

    print("* Calculating statistics for pitch and energy")
    pitch_min, pitch_max = _normalize('pitch', pitch_mean, pitch_std)
    energy_min, energy_max = _normalize('energy', energy_mean, energy_std)

    if os.path.exists(os.path.join(PREPROCESSED_PATH, 'stats.json')):
        with open(os.path.join(PREPROCESSED_PATH, 'stats.json'), 'r') as f:
            stats = json.load(f)
    else:
        stats = {}
    
    with open(os.path.join(PREPROCESSED_PATH, 'stats.json'), 'w') as f:
        new_stats = {
            'pitch': [float(pitch_min), float(pitch_max), float(pitch_mean), float(pitch_std)],
            'energy': [float(energy_min), float(energy_max), float(energy_mean), float(energy_std)],
        }
        stats[speaker] = {}
        stats[speaker][emotion] = new_stats
        json.dump(stats, f, indent=4)


In [156]:
tbar = tqdm.tqdm(SPEAKERS)
for speaker in tbar:

    for emotion in EMOTIONS:
        
        tbar.set_description(f'{speaker} {emotion}')

        # check the path existence
        if not os.path.exists(os.path.join(DATA_PATH, speaker, emotion)):
            continue

        # preprocessed path
        os.makedirs(os.path.join(PREPROCESSED_PATH, speaker), exist_ok=True)

        # mel, energy, pitch, durations
        feature_extraction(speaker, emotion)

bea neutral:   0%|          | 0/4 [00:00<?, ?it/s]

* Calculating statistics for pitch and energy


bea amused:   0%|          | 0/4 [00:18<?, ?it/s] 

* Calculating statistics for pitch and energy


bea angry:   0%|          | 0/4 [00:33<?, ?it/s] 

* Calculating statistics for pitch and energy


bea disgusted:   0%|          | 0/4 [00:49<?, ?it/s]

* Calculating statistics for pitch and energy


bea sleepy:   0%|          | 0/4 [01:08<?, ?it/s]   

* Calculating statistics for pitch and energy


jenie neutral:  25%|██▌       | 1/4 [01:37<04:51, 97.12s/it]

* Calculating statistics for pitch and energy


jenie amused:  25%|██▌       | 1/4 [01:58<04:51, 97.12s/it] 

* Calculating statistics for pitch and energy


jenie angry:  25%|██▌       | 1/4 [02:10<04:51, 97.12s/it] 

* Calculating statistics for pitch and energy


jenie disgusted:  25%|██▌       | 1/4 [02:38<04:51, 97.12s/it]

* Calculating statistics for pitch and energy


jenie sleepy:  25%|██▌       | 1/4 [02:48<04:51, 97.12s/it]   

* Calculating statistics for pitch and energy


josh neutral:  50%|█████     | 2/4 [03:14<03:14, 97.19s/it]

* Calculating statistics for pitch and energy


josh amused:  50%|█████     | 2/4 [03:29<03:14, 97.19s/it] 

* Calculating statistics for pitch and energy


josh sleepy:  50%|█████     | 2/4 [03:49<03:14, 97.19s/it]   

Invalid pitch: /workspace/montreal_forced_aligner/corpus/josh/sleepy_0054.wav
* Calculating statistics for pitch and energy


sam neutral:  75%|███████▌  | 3/4 [04:07<01:16, 76.88s/it]

* Calculating statistics for pitch and energy


sam amused:  75%|███████▌  | 3/4 [04:40<01:16, 76.88s/it] 

* Calculating statistics for pitch and energy


sam angry:  75%|███████▌  | 3/4 [05:17<01:16, 76.88s/it] 

* Calculating statistics for pitch and energy


sam disgusted:  75%|███████▌  | 3/4 [05:48<01:16, 76.88s/it]

* Calculating statistics for pitch and energy


sam sleepy:  75%|███████▌  | 3/4 [06:24<01:16, 76.88s/it]   

* Calculating statistics for pitch and energy


sam sleepy: 100%|██████████| 4/4 [07:00<00:00, 105.24s/it]


In [None]:
import torchaudio
from speechbrain.inference.vocoders import HIFIGAN
from speechbrain.lobes.models.FastSpeech2 import mel_spectogram

# Load a pretrained HIFIGAN Vocoder
hifi_gan = HIFIGAN.from_hparams(source="speechbrain/tts-hifigan-libritts-16kHz", savedir="pretrained_models/tts-hifigan-libritts-16kHz")

# Load an audio file (an example file can be found in this repository)
# Ensure that the audio signal is sampled at 16000 Hz; refer to the provided link for a 22050 Hz Vocoder.
signal, rate = torchaudio.load('/workspace/montreal_forced_aligner/corpus/bea/amused_0001.wav')

# Ensure the audio is sigle channel
signal = signal[0].squeeze()

torchaudio.save('/workspace/waveform.wav', signal.unsqueeze(0), 16000)

# Compute the mel spectrogram.
# IMPORTANT: Use these specific parameters to match the Vocoder's training settings for optimal results.
spectrogram, _ = mel_spectogram(
    audio=signal.squeeze(),
    sample_rate=SAM,
    hop_length=256,
    win_length=1024,
    n_mels=80,
    n_fft=1024,
    f_min=0.0,
    f_max=8000.0,
    power=1,
    normalized=False,
    min_max_energy_norm=True,
    norm="slaney",
    mel_scale="slaney",
    compression=True
)

# Convert the spectrogram to waveform
waveforms = hifi_gan.decode_batch(spectrogram)

# Save the reconstructed audio as a waveform
torchaudio.save('/workspace/waveform_reconstructed.wav', waveforms.squeeze(1), 16000)

# If everything is set up correctly, the original and reconstructed audio should be nearly indistinguishable


  from .autonotebook import tqdm as notebook_tqdm
