In [1]:
import tensorflow as tf

### Input Data: .wav -> Pitch contour (f0s), Harmonic spectral envelope (sps), Aperiodic spectral envelope (aps)

In [2]:
import numpy as np
import os
import time
import argparse
import librosa

In [3]:
from utils import *
from ops import *

In [4]:
import librosa.display
from IPython.display import Audio
import matplotlib.pyplot as plt

In [5]:
%matplotlib inline

In [6]:
random_seed = 0
np.random.seed(random_seed)

## Module: F0

In [7]:
class F0(object):
    def __init__(self, sess, folder='S01/', source='ang', target='neu'):
        
        self.train_A_dir = './../../../Database/Emotion/' + folder + source + '_' + target + '/' + source
        self.train_B_dir = './../../../Database/Emotion/' + folder + source + '_' + target + '/' + target
        self.validation_A_dir = './../../../Database/Emotion/' + folder + source + '_' + target + '/' + 'val_' + source
        self.validation_B_dir = './../../../Database/Emotion/' + folder + source + '_' + target + '/' + 'val_' + target

        self.audio_len = 128    # = n_frames, time_length
        self.audio_ch = 24      # = num_mcep, num_features
               
        self.dataset_name = source + '_' + target
        self.model_name = 'C'
        self.gan_type = 'lsgan'
        self.log_dir = "logs/" # + datetime.now().strftime("%Y%m%d-%H%M%S")
        self.sample_dir = 'samples'
        self.checkpoint_dir = 'checkpoint'
        self.A2B_dir = 'F0_results/' + source + '2' + target
        self.B2A_dir = 'F0_results/' + target + '2' + source
        
        self.sess = sess
        
        self.sampling_rate = 16000
        self.frame_period = 5.0
        self.num_mcep = 24
        

    def data_prepare(self, f0s_A, f0s_B, coded_sps_norm_A, coded_sps_norm_B):
        
        train_data_A = sample_train_data03(sps=list(coded_sps_norm_A), f0s=list(f0s_A), n_frames=self.audio_len)
        train_data_B = sample_train_data03(sps=list(coded_sps_norm_B), f0s=list(f0s_B), n_frames=self.audio_len)

        minlen = min(len(train_data_A), len(train_data_B))
        np.random.shuffle(train_data_A)
        np.random.shuffle(train_data_B)
        train_data_A = np.array(train_data_A[0:minlen])
        train_data_B = np.array(train_data_B[0:minlen])

        return train_data_A, train_data_B


    def test(self):
        # initialize all variables
        tf.global_variables_initializer().run()
    
        # check result_dir
        check_folder(self.A2B_dir)
        check_folder(self.B2A_dir)
        
        # Get statistic from train_A, train_B
        _, _, log_f0s_mean_A, log_f0s_std_A, coded_sps_A_mean, coded_sps_A_std = vocoder_extract(self.train_A_dir)
        _, _, log_f0s_mean_B, log_f0s_std_B, coded_sps_B_mean, coded_sps_B_std = vocoder_extract(self.train_B_dir) 
        print('std_log_src:', log_f0s_std_A, 'std_log_target', log_f0s_std_B)
        
        
        # A2B
        test_files_A = os.listdir(self.validation_A_dir)
        for i in range(len(test_files_A)):
            file = test_files_A[i]
            filepath = os.path.join(self.validation_A_dir, file)
            wav, _ = librosa.load(filepath, sr = self.sampling_rate, mono = True)
            wav = wav_padding(wav = wav, sr = self.sampling_rate, frame_period = self.frame_period, multiple = 4)
            f0, timeaxis, sp, ap = world_decompose(wav = wav, fs = self.sampling_rate, frame_period = self.frame_period)
            
            # f0 conversion
            f0_converted = pitch_conversion(f0 = f0, mean_log_src = log_f0s_mean_A, std_log_src = log_f0s_std_A, mean_log_target = log_f0s_mean_B, std_log_target = log_f0s_std_B)

            # sp normalization
            coded_sp = world_encode_spectral_envelop(sp = sp, fs = self.sampling_rate, dim = self.num_mcep)
            coded_sp_transposed = coded_sp.T
            coded_sp_norm = (coded_sp_transposed - coded_sps_A_mean) / coded_sps_A_std
                       
            coded_sp_converted = coded_sp_norm * coded_sps_B_std + coded_sps_B_mean
            coded_sp_converted = coded_sp_converted.T
            coded_sp_converted = np.ascontiguousarray(coded_sp_converted)
            decoded_sp_converted = world_decode_spectral_envelop(coded_sp = coded_sp_converted, fs = self.sampling_rate)
            wav_transformed = world_speech_synthesis(f0 = f0_converted, decoded_sp = decoded_sp_converted, ap = ap, fs = self.sampling_rate, frame_period = self.frame_period)
            librosa.output.write_wav(os.path.join(self.A2B_dir, os.path.basename(file)), wav_transformed, self.sampling_rate)
            
            print('converting test samples A2B: [%d/%d]' %(i+1, len(test_files_A)), end='\r')
        
        # B2A
        test_files_B = os.listdir(self.validation_B_dir)
        for i in range(len(test_files_B)):
            file = test_files_B[i]
            filepath = os.path.join(self.validation_B_dir, file)
            wav, _ = librosa.load(filepath, sr = self.sampling_rate, mono = True)
            wav = wav_padding(wav = wav, sr = self.sampling_rate, frame_period = self.frame_period, multiple = 4)
            f0, timeaxis, sp, ap = world_decompose(wav = wav, fs = self.sampling_rate, frame_period = self.frame_period)
            
            # f0 conversion
            f0_converted = pitch_conversion(f0 = f0, mean_log_src = log_f0s_mean_B, std_log_src = log_f0s_std_B, mean_log_target = log_f0s_mean_A, std_log_target = log_f0s_std_A)

            # sp normalization
            coded_sp = world_encode_spectral_envelop(sp = sp, fs = self.sampling_rate, dim = self.num_mcep)
            coded_sp_transposed = coded_sp.T
            coded_sp_norm = (coded_sp_transposed - coded_sps_B_mean) / coded_sps_B_std
            
            coded_sp_converted = coded_sp_norm * coded_sps_A_std + coded_sps_A_mean
            coded_sp_converted = coded_sp_converted.T
            coded_sp_converted = np.ascontiguousarray(coded_sp_converted)
            decoded_sp_converted = world_decode_spectral_envelop(coded_sp = coded_sp_converted, fs = self.sampling_rate)
            wav_transformed = world_speech_synthesis(f0 = f0_converted, decoded_sp = decoded_sp_converted, ap = ap, fs = self.sampling_rate, frame_period = self.frame_period)
            librosa.output.write_wav(os.path.join(self.B2A_dir, os.path.basename(file)), wav_transformed, self.sampling_rate)
            
            print('converting test samples B2A: [%d/%d]' %(i+1, len(test_files_B)), end='\r')
            
        print(" [*] Testing finished!")
        
        
    @property
    def model_dir(self):
        return "{}_{}_{}".format(self.model_name, self.dataset_name, self.gan_type)
    
    
    def load(self, checkpoint_dir):
        import re
        print(" [*] Reading checkpoints...")
        checkpoint_dir = os.path.join(checkpoint_dir, self.model_dir)

        ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
        if ckpt and ckpt.model_checkpoint_path:
            ckpt_name = os.path.basename(ckpt.model_checkpoint_path)
            self.saver.restore(self.sess, os.path.join(checkpoint_dir, ckpt_name))
            counter = int(next(re.finditer("(\d+)(?!.*\d)", ckpt_name)).group(0))
            print(" [*] Success to read {}".format(ckpt_name))
            return True, counter
        else:
            print(" [*] Failed to find a checkpoint")
            return False, 0
        
        
    def save(self, checkpoint_dir, step):
        checkpoint_dir = os.path.join(checkpoint_dir, self.model_dir)

        if not os.path.exists(checkpoint_dir):
            os.makedirs(checkpoint_dir)

        self.saver.save(self.sess, os.path.join(checkpoint_dir, self.model_name + '.model'), global_step=step)        
        

## Train

In [8]:
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True)) as sess:
    model = F0(sess, folder='S05/', source='hap', target='sad')
    model.test()
    

std_log_src: 0.3504122157244266 std_log_target 0.2549442639021922
 [*] Testing finished!s B2A: [26/26]
