このプログラムは「機械学習でギターアンプをモデリングする」

（https://qiita.com/coz-a/items/aeab3c52e3f12ba52a8b）

を基に作成しています．

データ解像度16bit-48kHzと24bit-192kHzの切り替えは適宜変更してください．


データの読み書き，教師データの前処理，trainメソッド


In [0]:
import os
import datetime
import wave
import yaml
import numpy as np
import soundfile as sf
from numpy.lib.stride_tricks import as_strided
from keras.models import Sequential
from keras.layers import CuDNNLSTM, BatchNormalization
from keras.callbacks import TensorBoard, ModelCheckpoint, EarlyStopping
from keras.losses import mean_squared_error

def load_wave(wave_file):
    wave,fs=sf.read(wave_file,dtype=np.float32)
    buf = wave.reshape(-1, )
    return buf

# bufferの内容をwavfileにしてoutputする関数
def save_wave(buf, wave_file):
    _buf = buf.reshape(-1,2)
    sf.write(wave_file,_buf,samplerate=192000,subtype="PCM_24")

def flow(dataset, timesteps, batch_size):
    n_data = len(dataset)
    while True:
        i = np.random.randint(n_data)
        x, y = dataset[i]
        yield random_clop(x, y, timesteps, batch_size)

def random_clop(x, y, timesteps, batch_size):
    max_offset = len(x) - timesteps
    offsets = np.random.randint(max_offset, size=batch_size)
    batch_x = np.stack((x[offset:offset+timesteps] for offset in offsets))
    batch_y = np.stack((y[offset:offset+timesteps] for offset in offsets))
    return batch_x, batch_y

def build_model(timesteps):
    model = Sequential()
    model.add(CuDNNLSTM(64, input_shape=(timesteps, 1), return_sequences=True, name="lstm_1"))
    model.add(CuDNNLSTM(64, return_sequences=True, name="lstm_2"))
    model.add(CuDNNLSTM(1, return_sequences=True, name="lstm_out"))
    return model

class LossFunc:

    def __init__(self, timesteps):
        self.__name__ = "LossFunc"
        self.timesteps = timesteps
    
    def __call__(self, y_true, y_pred):
        return mean_squared_error(
            y_true[:, -self.timesteps:, :],
            y_pred[:, -self.timesteps:, :])

def train(model, train_dataflow, val_dataflow, max_epochs, patience):
    timestamp = datetime.datetime.now()

    cp_dir = "./checkpoint/model_24_192"
    if not os.path.exists(cp_dir):
        os.makedirs(cp_dir)
    cp_filepath = os.path.join(cp_dir, "model_{epoch:06d}.h5")
    cb_mc = ModelCheckpoint(filepath=cp_filepath, monitor="val_loss", period=1, save_best_only=True)

    cb_es = EarlyStopping(monitor="val_loss", patience=patience)

    tb_log_dir = "./tensorboard/model_24_192"
    cb_tb = TensorBoard(log_dir=tb_log_dir)
    #モデルをある状態からロードしたい場合に以下を使用する
    #model.load_weights(os.path.join("./checkpoint/model_24_192", "model_000007.h5"))

    model.fit_generator(
        generator=train_dataflow,
        steps_per_epoch=100,
        validation_data=val_dataflow,
        validation_steps=10,
        epochs=max_epochs,
        callbacks=[cb_mc, cb_es, cb_tb])

def sliding_window(x, window, slide):
    n_slide = (len(x) - window) // slide
    remain = (len(x) - window) % slide
    clopped = x[:-remain]
    return as_strided(clopped, shape=(n_slide + 1, window), strides=(slide * 4, 4))


training


In [0]:
import yaml

def main():
    
    
    with open("./config_24_192.yml") as fp:
        config = yaml.safe_load(fp)
    
    input_timesteps = config["input_timesteps"]
    output_timesteps = config["output_timesteps"]
    batch_size = config["batch_size"]
    max_epochs = config["max_epochs"]
    patience = config["patience"]

    train_dataset = [
        (load_wave(_[0]).reshape(-1, 1), load_wave(_[1]).reshape(-1, 1))
        for _ in config["train_data"]]
    train_dataflow = flow(train_dataset, input_timesteps, batch_size)

    val_dataset = [
        (load_wave(_[0]).reshape(-1, 1), load_wave(_[1]).reshape(-1, 1))
        for _ in config["val_data"]]
    val_dataflow = flow(val_dataset, input_timesteps, batch_size)
   
    model = build_model(input_timesteps)
    model.compile(
        loss=LossFunc(output_timesteps),
        optimizer="adam")
    
    train(model, train_dataflow, val_dataflow, max_epochs, patience)

if __name__ == '__main__':
    main()



predict

In [0]:
import numpy as np
import yaml
import soundfile as sf
from keras.models import load_model

def main():
    
    with open("./config_24_192.yml") as fp:
        config = yaml.safe_load(fp)

    input_timesteps = config["input_timesteps"]
    output_timesteps = config["output_timesteps"]
    batch_size = config["batch_size"]

    data = load_wave("./data/recording/recording_input1_192.wav")

    # padding and rounded up to the batch multiple
    block_size = output_timesteps * batch_size
    prepad = input_timesteps - output_timesteps
    postpad = len(data) % block_size
    padded = np.concatenate((
        np.zeros(prepad, np.float32),
        data,
        np.zeros(postpad, np.float32)))
    x = sliding_window(padded, input_timesteps, output_timesteps)
    x = x[:, :, np.newaxis]

    model = load_model(
        "./checkpoint/model_24_192/model_000043.h5",
        custom_objects={"LossFunc": LossFunc(output_timesteps)})
    
    y = model.predict(x, batch_size=batch_size)
    y = y[:, -output_timesteps:, :].reshape(-1)[:len(data)]
    save_wave(y, "./data/recording/recording_predicted1_192.wav")

if __name__ == '__main__':
    main()
