# パッケージインポート

In [1]:
from IPython.display import clear_output
import tensorflow as tf
import itertools
import pandas
import numpy as np
import scipy
import pickle
import librosa
import matplotlib.pyplot as plt
import matplotlib.style as ms
ms.use('seaborn-muted')
%matplotlib inline

# データのダウンロード

## ESC-50

- 環境音データセットである[ESC-50](https://github.com/karolpiczak/ESC-50)のzipファイルをダウンロードして展開する

In [None]:
!wget https://github.com/karoldvl/ESC-50/archive/master.zip

In [None]:
!unzip /content/master.zip

## GTZAN

- GTZANのダウンロード

In [None]:
!wget http://opihi.cs.uvic.ca/sound/genres.tar.gz

In [None]:
!tar -zxvf /content/genres.tar.gz

# データの前処理


- テストデータの情報、SoundNetの事前学習済み重みをgithub上から持ってくる

In [None]:
!git clone https://github.com/KMASAHIRO/music2vec

In [7]:
# DANet、music2vecの学習においてテストデータとして使ったもので推論する
testdata_file = "test_data.txt"
with open(testdata_file, "rb") as f:
  testdata_file_dict = pickle.load(f)

lacia_esc_filenames = testdata_file_dict["esc_filenames"]
lacia_gtzan_filenames = testdata_file_dict["gtzan_filenames"]
gtzan_labels = testdata_file_dict["gtzan_labels"]

In [8]:
# DANet訓練時とデータの配置場所が異なるので、ファイルのパスを変更する
esc_filenames = list()
for esc in lacia_esc_filenames:
  esc_filenames.append("/".join(esc.split('/')[2:]))

gtzan_filenames = list()
for gtzan in lacia_gtzan_filenames:
  gtzan_filenames.append("/".join(gtzan.split('/')[2:]))

esc_filenames = np.asarray(esc_filenames)
gtzan_filenames = np.asarray(gtzan_filenames)

- 窓関数(ハニング窓の平方根)を定義

In [9]:
def square_root_of_hann(M, sym=False):
  w = scipy.signal.windows.hann(M, sym)
  w = np.sqrt(w)
  return w

In [10]:
# 音楽データのロードと短時間フーリエ変換を行う
music_list = list()
music_len_list = list()
count = 0
for name in gtzan_filenames:
  y,sr = librosa.load(name,sr=8000)
  D = librosa.stft(y, n_fft=256, hop_length=64, win_length=256, window=square_root_of_hann)
  music_list.append(D)
  music_len_list.append(D.shape[-1]//100)
  count += D.shape[-1]//100

In [11]:
# 環境音データのロードと短時間フーリエ変換を行う
noise_list = list()
for name in esc_filenames:
  y,sr = librosa.load(name,sr=8000)
  D = librosa.stft(y, n_fft=256, hop_length=64, win_length=256, window=square_root_of_hann)
  noise_list.append(D)

# DANetモデルの構築

## レイヤ

- 推論時に位相を持ったスペクトログラムを入力されたとき、前処理するレイヤ

In [12]:
class Preparation(tf.keras.layers.Layer):
  def __init__(self, log_eps,  *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.log_eps = log_eps

  def call(self, input, training):
    if training:
      return input
    else:
      model_input = tf.math.log(tf.math.abs(input) + self.log_eps)
      return model_input

- アトラクターを生成するレイヤ(基本的には訓練時はideal mask、推論時はkmeansを使う)

In [13]:
class Attractor(tf.keras.layers.Layer):
  def __init__(self, kmeans_func, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.kmeans_func = kmeans_func
    self.is_kmeans = False
  
  def call(self, input, training):
    if training:
      if self.is_kmeans:
        attractor = self.kmeans_func(input[0])
        attractor = tf.convert_to_tensor(attractor)
      else:
        att_num = tf.einsum('Ncft,Nftk->Nck',input[0],input[1])
        att_denom = tf.math.reduce_sum(input[0],axis=[2,3]) # batch_size, c
        att_denom = tf.reshape(att_denom,[-1,2,1])
        attractor = att_num / att_denom
    else:
      attractor = self.kmeans_func(input[0])
      attractor = tf.convert_to_tensor(attractor)
    
    return attractor

- maskを混合音声に掛けて音声を分離するレイヤ(推論時には混合音声が位相のある複素数のデータになる)

In [14]:
class Make_clean_reference(tf.keras.layers.Layer):
  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)

  def call(self, input, training):
    if training:
      clean_reference = tf.einsum('Nft,Nftc->Nftc',input[0],input[1])
      return clean_reference
    else:
      clean_reference = tf.einsum('Nft,Nftc->Nftc',tf.cast(input[0], dtype=tf.complex64),tf.cast(input[1], dtype=tf.complex64))
      return clean_reference

## モデル

In [15]:
class DANet(tf.keras.Model):
  def __init__(self, source_num, embed_ndim, batch_size, log_eps=0.0001, *args, **kwargs):
    super().__init__(*args, **kwargs)

    self.source_num = source_num
    self.embed_ndim = embed_ndim
    self.log_eps = log_eps
    self.batch_size = batch_size
    self.cluster_centers_list = np.ones(shape=(self.batch_size,self.source_num,self.embed_ndim))

    self.preparation = Preparation(self.log_eps)
    self.reshape = tf.keras.layers.Lambda(lambda x: tf.transpose(x, perm=[0,2,1]))
    self.lstm1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(300,return_sequences=True),merge_mode='concat')
    self.lstm2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(300,return_sequences=True),merge_mode='concat')
    self.lstm3 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(300,return_sequences=True),merge_mode='concat')
    self.lstm4 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(300,return_sequences=True),merge_mode='concat')
    self.embedding1 = tf.keras.layers.Dense(129*self.embed_ndim)
    self.embedding2 = tf.keras.layers.Reshape((100,129,self.embed_ndim))
    self.embedding3 = tf.keras.layers.Lambda(lambda x: tf.transpose(x, perm=[0,2,1,3]))
    self.make_attractor = Attractor(self.kmeans_predict)
    self.make_mask = tf.keras.layers.Lambda(lambda x: tf.einsum('Nftk,Nck->Nftc',x[0],x[1]))
    self.make_clean_reference = Make_clean_reference()
  
  def call(self,inputs,training):
    x1 = self.preparation(inputs[0], training)
    x1 = self.reshape(x1)
    x1 = self.lstm1(x1)
    x1 = self.lstm2(x1)
    x1 = self.lstm3(x1)
    x1 = self.lstm4(x1)
    x1 = self.embedding1(x1)
    x1 = self.embedding2(x1)
    x1 = self.embedding3(x1)
    attractor = self.make_attractor([inputs[1],x1], training)
    mask = tf.keras.activations.softmax(self.make_mask([x1, attractor]))
    clean_reference = self.make_clean_reference([inputs[0],mask], training)

    return clean_reference
  
  def train_with_kmeans(self, generator, steps, epochs, ideal_epochs):
    loss_result = list()
    for epoch in range(epochs):
      loss_epoch = list()
      if epoch==ideal_epochs:
        self.to_kmeans_train()
      for step in range(steps):
        train_x, train_y = next(generator)
        if ideal_epochs<=epoch:
          self.kmeans_fit(train_x[0])
        loss = self.train_on_batch(x=train_x,y=train_y)
        loss_epoch.append(loss)
        clear_output(wait=True)
        for epo in range(epoch):
          print("Epoch {}/{}".format(epo+1,epochs),"loss: {:.2f}".format(loss_result[epo]), sep=" ", flush=True)
        print("steps: {}/{}".format(step,steps), "{:.2f}".format(np.mean(loss_epoch)), flush=True)
      loss_result.append(np.mean(loss_epoch))
    return loss_result
  
  def to_kmeans_train(self):
    self.make_attractor.is_kmeans = True
  
  def to_idealmask_train(self):
    self.make_attractor.is_kmeans = False
  
  def get_embedded_data(self,inputs,training):
    x1 = self.preparation(inputs,training)
    x1 = self.reshape(x1)
    x1 = self.lstm1(x1)
    x1 = self.lstm2(x1)
    x1 = self.lstm3(x1)
    x1 = self.lstm4(x1)
    x1 = self.embedding1(x1)
    x1 = self.embedding2(x1)
    output = self.embedding3(x1)
    
    return output
  
  def kmeans_fit(self, inputs, max_iter=1000, random_seed=0):
    embedded_data = self.get_embedded_data(inputs, training=False)

    shape = embedded_data.shape
    embedded_data = np.reshape(embedded_data, newshape=(shape[0], shape[1]*shape[2], shape[3]))

    cluster_centers_list = list()

    for n in range(len(inputs)):
      X = embedded_data[n]
      random_state = np.random.RandomState(random_seed)
  
      cycle = itertools.cycle(range(self.source_num))
      labels = np.fromiter(itertools.islice(cycle, X.shape[0]), dtype=np.int)
      random_state.shuffle(labels)
      labels_prev = np.zeros(X.shape[0])
      cluster_centers = np.zeros((self.source_num, X.shape[1]))
    
      for i in range(max_iter):
        for k in range(self.source_num):
          XX = X[labels == k,:]
          cluster_centers[k,:] = XX.mean(axis=0)
      
        dist = ((X[:,:,np.newaxis] - cluster_centers.T[np.newaxis,:,:])**2).sum(axis=1)
        labels_prev = labels
        labels = dist.argmin(axis=1)

        for k in range(self.source_num):
          if not np.any(labels == k):
            labels[np.random.choice(len(labels),1)] = k

        if (labels == labels_prev).all():
          break
      
      for k in range(self.source_num):
          XX = X[labels == k,:]
          cluster_centers[k,:] = XX.mean(axis=0)
      
      cluster_centers_list.append(cluster_centers)

    self.cluster_centers_list = np.asarray(cluster_centers_list)


  def kmeans_predict(self, input):
    return self.cluster_centers_list

  def get_batch_size(self):
    return self.batch_size
  
  def set_batch_size(self, batch_size):
    self.batch_size = batch_size
    self.cluster_centers_list = np.ones(shape=(self.batch_size, self.source_num, self.embed_ndim))
  
  def prediction(self, input):
    self.kmeans_fit(input)
    fake_ideal_mask = np.zeros(shape=(input.shape[0],self.source_num,129,100))
    result = self.predict([input, fake_ideal_mask],batch_size=len(input))
    return result
  
  def loading(self, path):
    input1 = np.zeros(shape=(self.batch_size,129,100))
    input2 = np.zeros(shape=(self.batch_size,self.source_num,129,100))
    temp = self.predict(x=[input1,input2],batch_size=self.batch_size)
    self.load_weights(path)

## モデル構築

- 損失関数を定義

In [None]:
def loss_function(y_true, y_pred):
  frequency = tf.shape(y_true)[1]
  time = tf.shape(y_true)[2]
  frequency = tf.cast(frequency, tf.float32)
  time = tf.cast(time, tf.float32)
  return tf.reduce_sum((y_true - y_pred)**2) / (frequency*time)

- モデル構築

In [None]:
def create_model(source_num=2, embed_ndim=20, optimizer=None, loss=loss_function):
    batch_size = 30
    model = DANet(source_num=source_num, embed_ndim=embed_ndim, batch_size=batch_size, log_eps=0.0001)

    if optimizer is None:
        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=0.0001, decay_steps=51450,
                                                                     decay_rate=0.03)
        model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=lr_schedule), loss=loss)
    else:
        model.compile(optimizer=optimizer, loss=loss)

    input1 = np.zeros(shape=(batch_size, 129, 100))
    input2 = np.zeros(shape=(batch_size, source_num, 129, 100))
    temp = model.predict(x=[input1, input2], batch_size=batch_size)

    return model

# music2vecモデルの構築

- 事前学習済みの重みを用いてSoundNetを構築する関数(https://github.com/pseeth/soundnet_keras/blob/master/soundnet.py より)

In [17]:
from tensorflow.keras.layers import BatchNormalization, Activation, Conv1D, MaxPooling1D, ZeroPadding1D, InputLayer
from tensorflow.keras.models import Sequential
import numpy as np
import librosa


def preprocess(audio):
    audio *= 256.0  # SoundNet needs the range to be between -256 and 256
    # reshaping the audio data so it fits into the graph (batch_size, num_samples, num_filter_channels)
    audio = np.reshape(audio, (1, -1, 1))
    return audio


def load_audio(audio_file):
    sample_rate = 22050  # SoundNet works on mono audio files with a sample rate of 22050.
    audio, sr = librosa.load(audio_file, dtype='float32', sr=22050, mono=True)
    audio = preprocess(audio)
    return audio


def build_model():
    """
    Builds up the SoundNet model and loads the weights from a given model file (8-layer model is kept at models/sound8.npy).
    :return:
    """
    model_weights = np.load('/content/music2vec/music2vec/sound8.npy',allow_pickle=True,encoding='bytes').item()

    keys = list()
    for key in model_weights.keys():
      keys.append(key)
    for name in keys:
      model_weights[name.decode('utf-8')] = model_weights[name]
      model_weights.pop(name)
      ch_keys = list()
      for key in model_weights[name.decode('utf-8')]:
        ch_keys.append(key)
      for ch_name in ch_keys:
        model_weights[name.decode('utf-8')][ch_name.decode('utf-8')] = model_weights[name.decode('utf-8')][ch_name]
        model_weights[name.decode('utf-8')].pop(ch_name)
    
    model = Sequential()
    model.add(InputLayer(batch_input_shape=(None, None, 1)))

    filter_parameters = [{'name': 'conv1', 'num_filters': 16, 'padding': 32,
                          'kernel_size': 64, 'conv_strides': 2,
                          'pool_size': 8, 'pool_strides': 8},

                         {'name': 'conv2', 'num_filters': 32, 'padding': 16,
                          'kernel_size': 32, 'conv_strides': 2,
                          'pool_size': 8, 'pool_strides': 8},

                         {'name': 'conv3', 'num_filters': 64, 'padding': 8,
                          'kernel_size': 16, 'conv_strides': 2},

                         {'name': 'conv4', 'num_filters': 128, 'padding': 4,
                          'kernel_size': 8, 'conv_strides': 2},

                         {'name': 'conv5', 'num_filters': 256, 'padding': 2,
                          'kernel_size': 4, 'conv_strides': 2,
                          'pool_size': 4, 'pool_strides': 4},

                         {'name': 'conv6', 'num_filters': 512, 'padding': 2,
                          'kernel_size': 4, 'conv_strides': 2},

                         {'name': 'conv7', 'num_filters': 1024, 'padding': 2,
                          'kernel_size': 4, 'conv_strides': 2},

                         {'name': 'conv8_2', 'num_filters': 401, 'padding': 0,
                          'kernel_size': 8, 'conv_strides': 2},
                         ]

    for x in filter_parameters:
        model.add(ZeroPadding1D(padding=x['padding']))
        model.add(Conv1D(x['num_filters'],
                         kernel_size=x['kernel_size'],
                         strides=x['conv_strides'],
                         padding='valid'))
        weights = model_weights[x['name']]['weights'].reshape(model.layers[-1].get_weights()[0].shape)
        biases = model_weights[x['name']]['biases']

        model.layers[-1].set_weights([weights, biases])

        if 'conv8' not in x['name']:
            gamma = model_weights[x['name']]['gamma']
            beta = model_weights[x['name']]['beta']
            mean = model_weights[x['name']]['mean']
            var = model_weights[x['name']]['var']


            model.add(BatchNormalization())
            model.layers[-1].set_weights([gamma, beta, mean, var])
            model.add(Activation('relu'))
        if 'pool_size' in x:
            model.add(MaxPooling1D(pool_size=x['pool_size'],
                                   strides=x['pool_strides'],
                                   padding='valid'))

    return model


def predict_scene_from_audio_file(audio_file):
    model = build_model()
    audio = load_audio(audio_file)
    return model.predict(audio)


def predictions_to_scenes(prediction):
    scenes = []
    with open('categories/categories_places2.txt', 'r') as f:
        categories = f.read().split('\n')
        for p in range(prediction.shape[1]):
            scenes.append(categories[np.argmax(prediction[0, p, :])])
    return scenes

- モデル構築

In [None]:
soundnet = build_model()

In [None]:
music2vec_input = tf.keras.Input(shape=(675808,1))
x = soundnet(music2vec_input)
x = tf.keras.layers.LSTM(200,return_sequences=True)(x)
x = tf.keras.layers.LSTM(200,return_sequences=True)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(400)(x)
x = tf.keras.layers.Dense(100)(x)
music2vec_output = tf.keras.layers.Dense(10,activation='softmax')(x)
music2vec = tf.keras.Model(inputs=music2vec_input,outputs=music2vec_output)

# テストデータの生成

In [18]:
# 音楽に環境音を1つ混ぜるとき
def create_testdata1(noise_Fourier_list, music_Fourier_list, log_eps = 0.0001):
  noise_index1 = 0
  noise_index2 = 0
  music_index1 = 0
  music_index2 = 0

  mixture = list()
  ideal_mask = list()
  correct = list()
  
  while True:
    try:
      if noise_Fourier_list[noise_index1].shape[-1]<100:
        raise TypeError("The noise data is too short.")
      if music_Fourier_list[music_index1].shape[-1]<100:
        raise TypeError("The music data is too short.")
    except TypeError as message:
      print(message)
      
    noise_input = noise_Fourier_list[noise_index1][:,noise_index2*100:(noise_index2+1)*100]
    music_input = music_Fourier_list[music_index1][:,music_index2*100:(music_index2+1)*100]
    noise_index2 += 1
    music_index2 += 1
    
    if (noise_index2+1)*100 > noise_Fourier_list[noise_index1].shape[-1]:
      noise_index1  += 1
      noise_index2 = 0
      if noise_index1 == len(noise_Fourier_list):
        noise_index1 = 0

    if (music_index2+1)*100 > music_Fourier_list[music_index1].shape[-1]:
      music_index1  += 1
      music_index2 = 0
      if music_index1 == len(music_Fourier_list):
        music_index1 = 0
        noise_index1 = 0
        noise_index2 = 0

    
    with np.errstate(all="raise"):
      mix = noise_input + music_input

      mixture.append(mix)
      correct.append(np.transpose(np.asarray([music_input, noise_input]),axes=[1,2,0]))
      max = np.max(np.asarray([music_input, noise_input]), axis=0)
      music_ideal = np.logical_not(music_input - max).astype(np.float32)
      noise_ideal = np.logical_not(noise_input - max).astype(np.float32)
        
      if not np.any(music_ideal):
        mixture.pop()
        correct.pop()
        continue
      if not np.any(noise_ideal):
        mixture.pop()
        correct.pop()
        if music_index2==0:
          if music_index1==0:
            music_index1 = len(music_Fourier_list) - 1
            music_index2 = music_Fourier_list[music_index1].shape[-1] // 100 - 1
          else:
            music_index1 -= 1
            music_index2 = music_Fourier_list[music_index1].shape[-1] // 100 - 1
        else:
          music_index2 -= 1
        continue
      ideal_mask.append([music_ideal, noise_ideal])

      if music_index1==0 and music_index2==0:
        x_train1 = np.asarray(mixture)
        x_train2 = np.asarray(ideal_mask)
        y_train = np.asarray(correct)

        mixture = list()
        ideal_mask = list()
        correct = list()

        return [x_train1, x_train2], y_train

In [19]:
# 音楽に環境音を2つ混ぜるとき
def create_testdata2(noise_Fourier_list, music_Fourier_list, log_eps = 0.0001):
  noise_index1 = 0
  noise_index2 = 0
  music_index1 = 0
  music_index2 = 0

  mixture = list()
  ideal_mask = list()
  correct = list()
  
  while True:
    try:
      if noise_Fourier_list[noise_index1].shape[-1]<100:
        raise TypeError("The noise data is too short.")
      if music_Fourier_list[music_index1].shape[-1]<100:
        raise TypeError("The music data is too short.")
    except TypeError as message:
      print(message)
      
    noise_input = noise_Fourier_list[noise_index1][:,noise_index2*100:(noise_index2+1)*100] + \
    noise_Fourier_list[(noise_index1+1)%len(noise_Fourier_list)][:,noise_index2*100:(noise_index2+1)*100]
    music_input = music_Fourier_list[music_index1][:,music_index2*100:(music_index2+1)*100]
    noise_index2 += 1
    music_index2 += 1
    
    if (noise_index2+1)*100 > noise_Fourier_list[noise_index1].shape[-1]:
      noise_index1  += 1
      noise_index2 = 0
      if noise_index1 == len(noise_Fourier_list):
        noise_index1 = 0

    if (music_index2+1)*100 > music_Fourier_list[music_index1].shape[-1]:
      music_index1  += 1
      music_index2 = 0
      if music_index1 == len(music_Fourier_list):
        music_index1 = 0
        noise_index1 = 0
        noise_index2 = 0

    
    with np.errstate(all="raise"):
      mix = noise_input + music_input

      mixture.append(mix)
      correct.append(np.transpose(np.asarray([music_input, noise_input]),axes=[1,2,0]))
      max = np.max(np.asarray([music_input, noise_input]), axis=0)
      music_ideal = np.logical_not(music_input - max).astype(np.float32)
      noise_ideal = np.logical_not(noise_input - max).astype(np.float32)
        
      if not np.any(music_ideal):
        mixture.pop()
        correct.pop()
        continue
      if not np.any(noise_ideal):
        mixture.pop()
        correct.pop()
        if music_index2==0:
          if music_index1==0:
            music_index1 = len(music_Fourier_list) - 1
            music_index2 = music_Fourier_list[music_index1].shape[-1] // 100 - 1
          else:
            music_index1 -= 1
            music_index2 = music_Fourier_list[music_index1].shape[-1] // 100 - 1
        else:
          music_index2 -= 1
        continue
      ideal_mask.append([music_ideal, noise_ideal])

      if music_index1==0 and music_index2==0:
        x_train1 = np.asarray(mixture)
        x_train2 = np.asarray(ideal_mask)
        y_train = np.asarray(correct)

        mixture = list()
        ideal_mask = list()
        correct = list()

        return [x_train1, x_train2], y_train

# 推論

## 環境音1つ

- DANetモデルの作成、重みをロード

In [22]:
model = create_model()

In [23]:
filepath = "DANet_music_kmeans1_weights.h5"
model.loading(filepath)

- DANetの推論

In [24]:
test_data, before_data = create_testdata1(noise_Fourier_list=noise_list,music_Fourier_list=music_list)

In [25]:
DANet_result = model.prediction(test_data[0])

In [26]:
# DANetの出力(スペクトログラム)を音声波形に戻す
after = list()
for i in range(len(DANet_result)):
  after_i_1 = librosa.istft(DANet_result[i,:,:,0].reshape((129,100)), hop_length=64, win_length=256, 
                            window=square_root_of_hann, dtype=np.float32)
  after_i_2 = librosa.istft(DANet_result[i,:,:,1].reshape((129,100)), hop_length=64, win_length=256,
                            window=square_root_of_hann, dtype=np.float32)
  after.append([after_i_1,after_i_2])

after = np.asarray(after)

In [27]:
# music2vecの結果の比較に利用するため、DANetの教師データ(音楽データ)を音声波形に戻す
ideal_data = list()
for i in range(len(before_data)):
  music = librosa.istft(before_data[i,:,:,0].reshape((129,100)), hop_length=64, win_length=256, 
                            window=square_root_of_hann, dtype=np.float32)
  noise = librosa.istft(before_data[i,:,:,1].reshape((129,100)), hop_length=64, win_length=256,
                            window=square_root_of_hann, dtype=np.float32)
  ideal_data.append([music,noise])

ideal_data = np.asarray(ideal_data)

In [28]:
# music2vecの結果の比較に利用するため、DANetの入力データ(音楽と環境音の混合音声)を音声波形に戻す
mix_data = list()
for i in range(len(test_data[0])):
  mix = librosa.istft(test_data[0][i].reshape((129,100)), hop_length=64, win_length=256, 
                            window=square_root_of_hann, dtype=np.float32)
  mix_data.append(mix)

mix_data = np.asarray(mix_data)

### DANetの出力

In [31]:
# music2vecモデルのロード
model_path = "music2vec_20epochs.h5"
music2vec = tf.keras.models.load_model(model_path)

In [32]:
# music2vecに入力するための前処理
which = 0
total_num = 0
soundnet_input = list()

for i in range(len(music_len_list)):
  input = np.concatenate(after[total_num:total_num+music_len_list[i],which],axis=0)
  resampled = librosa.resample(input,8000,22050)
  padded = np.concatenate([resampled, np.zeros(675808-len(resampled))],axis=0)
  padded *= 256.0
  padded = np.reshape(padded,(-1,1))
  soundnet_input.append(padded)
  total_num += music_len_list[i]

soundnet_input = np.asarray(soundnet_input)

In [33]:
# 推論
music2vec_result = music2vec.predict(soundnet_input,batch_size=25)

In [37]:
# モデルの出力やラベルと、それが表すジャンルを対応させる
genre_list = ["blues","classical","country","disco","hiphop","jazz","metal","pop","reggae","rock"]

genre_pred = list()
genre_true = list()
for i in range(len(gtzan_labels)):
  genre_pred.append(genre_list[tf.math.argmax(music2vec_result[i],axis=0)])
  genre_true.append(genre_list[tf.math.argmax(gtzan_labels[i],axis=0)])

In [40]:
# 混同行列の作成
from sklearn.metrics import confusion_matrix
import pandas as pd
df = pd.DataFrame(confusion_matrix(genre_true, genre_pred, labels=genre_list))
df.columns = genre_list
df.index = genre_list

In [None]:
df

Unnamed: 0,blues,classical,country,disco,hiphop,jazz,metal,pop,reggae,rock
blues,0,0,0,0,0,2,2,0,0,0
classical,0,1,0,0,2,0,1,0,0,0
country,0,1,0,0,2,5,0,0,0,0
disco,0,0,0,0,6,1,0,0,0,0
hiphop,0,0,0,0,3,0,0,0,0,0
jazz,0,0,0,0,0,7,0,0,0,0
metal,0,0,0,0,2,0,0,0,0,0
pop,0,0,0,0,2,1,1,0,0,0
reggae,0,0,0,0,3,1,0,0,0,0
rock,0,0,0,0,2,5,0,0,0,0


In [44]:
# Accuracy・Precision・Recall・F1の計算
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
print("accuracy:",accuracy_score(genre_true,genre_pred))
print("precision:",precision_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("recall:",recall_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("f1:",f1_score(genre_true,genre_pred,average="weighted",zero_division=0))

accuracy: 0.22
precision: 0.09272727272727273
recall: 0.22
f1: 0.10865287356321839


### 音楽

In [50]:
# music2vecモデルのロード
model_path = "music2vec_20epochs.h5"
music2vec = tf.keras.models.load_model(model_path)

In [51]:
# music2vecに入力するための前処理
total_num = 0
soundnet_input = list()

for i in range(len(music_len_list)):
  input = np.concatenate(ideal_data[total_num:total_num+music_len_list[i],0],axis=0)
  resampled = librosa.resample(input,8000,22050)
  padded = np.concatenate([resampled, np.zeros(675808-len(resampled))],axis=0)
  padded *= 256.0
  padded = np.reshape(padded,(-1,1))
  soundnet_input.append(padded)
  total_num += music_len_list[i]

soundnet_input = np.asarray(soundnet_input)

In [52]:
# 推論
music2vec_result = music2vec.predict(soundnet_input,batch_size=25)

In [53]:
# モデルの出力やラベルと、それが表すジャンルを対応させる
genre_list = ["blues","classical","country","disco","hiphop","jazz","metal","pop","reggae","rock"]

genre_pred = list()
genre_true = list()
for i in range(len(gtzan_labels)):
  genre_pred.append(genre_list[tf.math.argmax(music2vec_result[i],axis=0)])
  genre_true.append(genre_list[tf.math.argmax(gtzan_labels[i],axis=0)])

In [56]:
# 混同行列の作成
from sklearn.metrics import confusion_matrix
import pandas as pd
df = pd.DataFrame(confusion_matrix(genre_true, genre_pred, labels=genre_list))
df.columns = genre_list
df.index = genre_list

In [None]:
df

Unnamed: 0,blues,classical,country,disco,hiphop,jazz,metal,pop,reggae,rock
blues,3,0,1,0,0,0,0,0,0,0
classical,0,4,0,0,0,0,0,0,0,0
country,1,1,5,0,0,0,0,1,0,0
disco,0,0,1,3,2,0,0,0,1,0
hiphop,0,0,0,0,2,0,1,0,0,0
jazz,0,1,0,0,0,6,0,0,0,0
metal,0,0,0,1,0,0,1,0,0,0
pop,2,0,0,0,0,0,0,2,0,0
reggae,1,0,0,0,0,1,0,0,2,0
rock,1,2,2,0,0,0,1,0,0,1


In [59]:
# Accuracy・Precision・Recall・F1の計算
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
print("accuracy:",accuracy_score(genre_true,genre_pred))
print("precision:",precision_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("recall:",recall_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("f1:",f1_score(genre_true,genre_pred,average="weighted",zero_division=0))

accuracy: 0.58
precision: 0.6738888888888889
recall: 0.58
f1: 0.5605289024700789


### 音楽+環境音

In [60]:
# music2vecモデルのロード
model_path = "music2vec_20epochs.h5"
music2vec = tf.keras.models.load_model(model_path)

In [61]:
# music2vecに入力するための前処理
total_num = 0
soundnet_input = list()

for i in range(len(music_len_list)):
  input = np.concatenate(mix_data[total_num:total_num+music_len_list[i]],axis=0)
  resampled = librosa.resample(input,8000,22050)
  padded = np.concatenate([resampled, np.zeros(675808-len(resampled))],axis=0)
  padded *= 256.0
  padded = np.reshape(padded,(-1,1))
  soundnet_input.append(padded)
  total_num += music_len_list[i]

soundnet_input = np.asarray(soundnet_input)

In [None]:
# 推論
music2vec_result = music2vec.predict(soundnet_input,batch_size=25)

In [63]:
# モデルの出力やラベルと、それが表すジャンルを対応させる
genre_list = ["blues","classical","country","disco","hiphop","jazz","metal","pop","reggae","rock"]

genre_pred = list()
genre_true = list()
for i in range(len(gtzan_labels)):
  genre_pred.append(genre_list[tf.math.argmax(music2vec_result[i],axis=0)])
  genre_true.append(genre_list[tf.math.argmax(gtzan_labels[i],axis=0)])

In [66]:
# 混同行列の作成
from sklearn.metrics import confusion_matrix
import pandas as pd
df = pd.DataFrame(confusion_matrix(genre_true, genre_pred, labels=genre_list))
df.columns = genre_list
df.index = genre_list

In [None]:
df

Unnamed: 0,blues,classical,country,disco,hiphop,jazz,metal,pop,reggae,rock
blues,1,0,1,0,0,2,0,0,0,0
classical,0,2,0,0,2,0,0,0,0,0
country,3,0,1,0,0,1,0,2,1,0
disco,0,0,0,2,3,1,0,0,1,0
hiphop,0,0,0,0,2,0,1,0,0,0
jazz,0,0,0,0,1,6,0,0,0,0
metal,0,0,0,0,1,0,1,0,0,0
pop,2,0,0,0,0,0,0,1,1,0
reggae,0,0,0,0,1,1,0,0,2,0
rock,1,2,0,0,1,1,1,0,0,1


In [69]:
# Accuracy・Precision・Recall・F1の計算
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
print("accuracy:",accuracy_score(genre_true,genre_pred))
print("precision:",precision_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("recall:",recall_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("f1:",f1_score(genre_true,genre_pred,average="weighted",zero_division=0))

accuracy: 0.38
precision: 0.5643376623376624
recall: 0.38
f1: 0.36374428495481126


## 環境音2つ

- DANetモデルの作成、重みをロード

In [None]:
model = create_model()

In [71]:
filepath = "DANet_music_kmeans2_weights.h5"
model.loading(filepath)

- DANetの推論

In [72]:
test_data, before_data = create_testdata2(noise_Fourier_list=noise_list,music_Fourier_list=music_list)

In [None]:
DANet_result = model.prediction(test_data[0])

In [74]:
# DANetの出力(スペクトログラム)を音声波形に戻す
after = list()
for i in range(len(DANet_result)):
  after_i_1 = librosa.istft(DANet_result[i,:,:,0].reshape((129,100)), hop_length=64, win_length=256, 
                            window=square_root_of_hann, dtype=np.float32)
  after_i_2 = librosa.istft(DANet_result[i,:,:,1].reshape((129,100)), hop_length=64, win_length=256,
                            window=square_root_of_hann, dtype=np.float32)
  after.append([after_i_1,after_i_2])

after = np.asarray(after)

In [75]:
# music2vecの結果の比較に利用するため、DANetの教師データ(音楽データ)を音声波形に戻す
ideal_data = list()
for i in range(len(before_data)):
  music = librosa.istft(before_data[i,:,:,0].reshape((129,100)), hop_length=64, win_length=256, 
                            window=square_root_of_hann, dtype=np.float32)
  noise = librosa.istft(before_data[i,:,:,1].reshape((129,100)), hop_length=64, win_length=256,
                            window=square_root_of_hann, dtype=np.float32)
  ideal_data.append([music,noise])

ideal_data = np.asarray(ideal_data)

In [76]:
# music2vecの結果の比較に利用するため、DANetの入力データ(音楽と環境音の混合音声)を音声波形に戻す
mix_data = list()
for i in range(len(test_data[0])):
  mix = librosa.istft(test_data[0][i].reshape((129,100)), hop_length=64, win_length=256, 
                            window=square_root_of_hann, dtype=np.float32)
  mix_data.append(mix)

mix_data = np.asarray(mix_data)

### DANetの出力

In [77]:
# music2vecモデルのロード
model_path = "music2vec_20epochs.h5"
music2vec = tf.keras.models.load_model(model_path)

In [78]:
# music2vecに入力するための前処理
which = 0
total_num = 0
soundnet_input = list()

for i in range(len(music_len_list)):
  input = np.concatenate(after[total_num:total_num+music_len_list[i],which],axis=0)
  resampled = librosa.resample(input,8000,22050)
  padded = np.concatenate([resampled, np.zeros(675808-len(resampled))],axis=0)
  padded *= 256.0
  padded = np.reshape(padded,(-1,1))
  soundnet_input.append(padded)
  total_num += music_len_list[i]

soundnet_input = np.asarray(soundnet_input)

In [None]:
# 推論
music2vec_result = music2vec.predict(soundnet_input,batch_size=25)

In [80]:
# モデルの出力やラベルと、それが表すジャンルを対応させる
genre_list = ["blues","classical","country","disco","hiphop","jazz","metal","pop","reggae","rock"]

genre_pred = list()
genre_true = list()
for i in range(len(gtzan_labels)):
  genre_pred.append(genre_list[tf.math.argmax(music2vec_result[i],axis=0)])
  genre_true.append(genre_list[tf.math.argmax(gtzan_labels[i],axis=0)])

In [83]:
# 混同行列の作成
from sklearn.metrics import confusion_matrix
import pandas as pd
df = pd.DataFrame(confusion_matrix(genre_true, genre_pred, labels=genre_list))
df.columns = genre_list
df.index = genre_list

In [84]:
df

Unnamed: 0,blues,classical,country,disco,hiphop,jazz,metal,pop,reggae,rock
blues,0,0,0,0,1,2,1,0,0,0
classical,0,0,0,0,3,0,0,0,1,0
country,0,1,0,0,2,4,1,0,0,0
disco,0,0,0,0,5,2,0,0,0,0
hiphop,0,0,0,0,3,0,0,0,0,0
jazz,1,0,0,0,1,5,0,0,0,0
metal,0,0,0,0,1,1,0,0,0,0
pop,1,0,0,0,2,1,0,0,0,0
reggae,0,0,0,0,3,1,0,0,0,0
rock,1,1,0,0,3,2,0,0,0,0


In [86]:
# Accuracy・Precision・Recall・F1の計算
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
print("accuracy:",accuracy_score(genre_true,genre_pred))
print("precision:",precision_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("recall:",recall_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("f1:",f1_score(genre_true,genre_pred,average="weighted",zero_division=0))

accuracy: 0.16
precision: 0.046388888888888896
recall: 0.16
f1: 0.06933333333333333


### 音楽

In [87]:
# music2vecモデルのロード
model_path = "music2vec_20epochs.h5"
music2vec = tf.keras.models.load_model(model_path)

In [88]:
# music2vecに入力するための前処理
total_num = 0
soundnet_input = list()

for i in range(len(music_len_list)):
  input = np.concatenate(ideal_data[total_num:total_num+music_len_list[i],0],axis=0)
  resampled = librosa.resample(input,8000,22050)
  padded = np.concatenate([resampled, np.zeros(675808-len(resampled))],axis=0)
  padded *= 256.0
  padded = np.reshape(padded,(-1,1))
  soundnet_input.append(padded)
  total_num += music_len_list[i]

soundnet_input = np.asarray(soundnet_input)

In [None]:
# 推論
music2vec_result = music2vec.predict(soundnet_input,batch_size=25)

In [90]:
# モデルの出力やラベルと、それが表すジャンルを対応させる
genre_list = ["blues","classical","country","disco","hiphop","jazz","metal","pop","reggae","rock"]

genre_pred = list()
genre_true = list()
for i in range(len(gtzan_labels)):
  genre_pred.append(genre_list[tf.math.argmax(music2vec_result[i],axis=0)])
  genre_true.append(genre_list[tf.math.argmax(gtzan_labels[i],axis=0)])

In [93]:
# 混同行列の作成
from sklearn.metrics import confusion_matrix
import pandas as pd
df = pd.DataFrame(confusion_matrix(genre_true, genre_pred, labels=genre_list))
df.columns = genre_list
df.index = genre_list

In [94]:
df

Unnamed: 0,blues,classical,country,disco,hiphop,jazz,metal,pop,reggae,rock
blues,3,0,1,0,0,0,0,0,0,0
classical,0,4,0,0,0,0,0,0,0,0
country,1,1,5,0,0,0,0,1,0,0
disco,0,0,1,3,2,0,0,0,1,0
hiphop,0,0,0,0,2,0,1,0,0,0
jazz,0,1,0,0,0,6,0,0,0,0
metal,0,0,0,1,0,0,1,0,0,0
pop,2,0,0,0,0,0,0,2,0,0
reggae,1,0,0,0,0,1,0,0,2,0
rock,1,2,2,0,0,0,1,0,0,1


In [96]:
# Accuracy・Precision・Recall・F1の計算
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
print("accuracy:",accuracy_score(genre_true,genre_pred))
print("precision:",precision_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("recall:",recall_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("f1:",f1_score(genre_true,genre_pred,average="weighted",zero_division=0))

accuracy: 0.58
precision: 0.6738888888888889
recall: 0.58
f1: 0.5605289024700789


### 音楽+環境音

In [97]:
# music2vecモデルのロード
model_path = "music2vec_20epochs.h5"
music2vec = tf.keras.models.load_model(model_path)

In [98]:
# music2vecに入力するための前処理
total_num = 0
soundnet_input = list()

for i in range(len(music_len_list)):
  input = np.concatenate(mix_data[total_num:total_num+music_len_list[i]],axis=0)
  resampled = librosa.resample(input,8000,22050)
  padded = np.concatenate([resampled, np.zeros(675808-len(resampled))],axis=0)
  padded *= 256.0
  padded = np.reshape(padded,(-1,1))
  soundnet_input.append(padded)
  total_num += music_len_list[i]

soundnet_input = np.asarray(soundnet_input)

In [None]:
# 推論
music2vec_result = music2vec.predict(soundnet_input,batch_size=30)

In [100]:
# モデルの出力やラベルと、それが表すジャンルを対応させる
genre_list = ["blues","classical","country","disco","hiphop","jazz","metal","pop","reggae","rock"]

genre_pred = list()
genre_true = list()
for i in range(len(gtzan_labels)):
  genre_pred.append(genre_list[tf.math.argmax(music2vec_result[i],axis=0)])
  genre_true.append(genre_list[tf.math.argmax(gtzan_labels[i],axis=0)])

In [103]:
# 混同行列の作成
from sklearn.metrics import confusion_matrix
import pandas as pd
df = pd.DataFrame(confusion_matrix(genre_true, genre_pred, labels=genre_list))
df.columns = genre_list
df.index = genre_list

In [104]:
df

Unnamed: 0,blues,classical,country,disco,hiphop,jazz,metal,pop,reggae,rock
blues,1,1,0,1,0,1,0,0,0,0
classical,0,1,0,0,2,0,1,0,0,0
country,1,3,1,0,0,2,0,1,0,0
disco,0,0,0,3,2,0,2,0,0,0
hiphop,0,0,0,0,2,0,1,0,0,0
jazz,0,0,0,0,3,4,0,0,0,0
metal,0,0,0,0,1,0,1,0,0,0
pop,2,0,0,0,1,0,0,1,0,0
reggae,0,0,0,0,2,2,0,0,0,0
rock,0,2,0,0,2,1,2,0,0,0


In [106]:
# Accuracy・Precision・Recall・F1の計算
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
print("accuracy:",accuracy_score(genre_true,genre_pred))
print("precision:",precision_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("recall:",recall_score(genre_true,genre_pred,average="weighted",zero_division=0))
print("f1:",f1_score(genre_true,genre_pred,average="weighted",zero_division=0))

accuracy: 0.28
precision: 0.40614285714285714
recall: 0.28
f1: 0.26123588829471184
