<a href="https://colab.research.google.com/github/MS-H2020/Open/blob/main/Anomaly_Sound_Detection_MobileNet_one_hold.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# [機械稼働音の異常検知 （EDA）](https://signate.jp/competitions/358)

## Refference

1 [音声ファイル特徴量変換（その5）メルスペクトログラム(TensorFlow)](https://work-in-progress.hatenablog.com/entry/2020/03/08/095914)  
2 [Tensorflow, 簡単な音声認識: キーワードの認識](https://www.tensorflow.org/tutorials/audio/simple_audio?hl=ja)  
3 [「ToyADMOS:異常音検知」：AutoEncoder](https://note.com/toshi_sugi/n/nc4a5b9c4d6cf)

In [40]:
## Import modules
import gc
import glob

import matplotlib.pyplot as plt
import numpy as np

import tensorflow as tf
flag = False

## Define Functions

In [3]:
# データセットの生のWAVオーディオファイルをオーディオテンソルに前処理する関数を定義
# normalized to the [-1.0, 1.0] range
def decode_audio(audio_binary):
  audio, _ = tf.audio.decode_wav(contents=audio_binary)
  return tf.squeeze(audio, axis=-1) # モノラル信号のため、チャンネル軸を除去
def get_waveform(file_path):
  audio_binary = tf.io.read_file(file_path)
  waveform = decode_audio(audio_binary)
  return waveform

def get_fft(waveform):
  # 波形をフーリエ変換
  # 周波数はlog10スケールに変換
  waveform = tf.complex(waveform, 0.0)
  fft = tf.signal.fft(waveform)
  fft = tf.abs(fft)
  # Convert the frequencies to log scale and transpose
  fft_log_spec = tf.experimental.numpy.log10((fft + 2.2204460492503131e-16) / 2.2204460492503131e-16)
  fft_log_spec = fft_log_spec[0:len(fft)//2]
  return fft_log_spec

def get_stft_spectrogram(waveform):
  # Convert the waveform to a spectrogram via a STFT.
    # Input: A Tensor of [batch_size, num_samples]
    # mono PCM samples in the range [-1, 1].
  stft = tf.signal.stft(waveform,
                        frame_length=255,
                        frame_step=128)

  # Obtain the Power of the STFT.
  spectrogram = tf.square(tf.abs(stft))

  # Add a `channels` dimension, so that the spectrogram can be used
  # as image-like input data with convolution layers (wh
  return spectrogram

def get_mel_spectrogram(stft_spectrogram):
    # STFT-bin
    n_stft_bin = stft_spectrogram.shape[-1]          # --> 257 (= FFT size / 2 + 1)

    linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
        num_mel_bins=128,
        num_spectrogram_bins=n_stft_bin,
        sample_rate=16000,
        lower_edge_hertz=0.0,
        upper_edge_hertz=8000.0
    )
    # --> shape=(257, 128) = (FFT size / 2 + 1, num of mel bins)
    mel_spectrogram = tf.tensordot(
        stft_spectrogram,             # (1, 98?, 257)
        linear_to_mel_weight_matrix,  # (257, 128)
        1)
    
    #log_mel_spectrogram = tf.math.log(mel_spectrogram + 1e-6)

    return mel_spectrogram


In [4]:
def plot_spectrogram(spectrogram, ax, sr=16000, frame_step=128):
  tf.experimental.numpy.experimental_enable_numpy_behavior()
  if len(spectrogram.shape) > 2:
    assert len(spectrogram.shape) == 3
    spectrogram = np.squeeze(spectrogram, axis=-1)
  # Convert the frequencies to log scale and transpose, so that the time is
  # represented on the x-axis (columns).
  # Add an epsilon to avoid taking a log of zero.
  log_spec = np.log10(spectrogram.T + np.finfo(float).eps)
  height = log_spec.shape[0]
  width = log_spec.shape[1]
  time_axis = np.arange(width) * (frame_step / sr)   # 0 ~ width-1 (時間フレームのインデックス)
  freq_axis = np.linspace(0, sr/2, height)  # 0 ~ height-1 (周波数ビンのインデックス)
  
  ax.pcolormesh(time_axis, freq_axis, log_spec)

## Train Notmal

In [45]:
# オーディオクリップをfilenamesというリストに抽出します
filenames = glob.glob('../01_input/wav/train_normal/dummy/*.wav')
num_samples = len(filenames)
filenames.sort()
print('Number of total examples:', num_samples)
print('Example file tensor:', filenames[0])

flag = True

Number of total examples: 300
Example file tensor: ../01_input/wav/train_normal/dummy/000.wav


In [48]:
def show_wav_file(i):
    if flag == False:
        print("Please run the above cell.")
        return
    
    from IPython import display
    train_file = tf.io.read_file(filenames[i])
    train_audio, sampling_rate = tf.audio.decode_wav(contents=train_file)
    
    # Audio Setting
    Data_num = train_audio.shape[0]
    Sampling_freq = sampling_rate.numpy()
    time_length = Data_num / Sampling_freq 

    print("File: ", filenames[i])
    print("Data数: ", Data_num)
    print("サンプリング周波数[Hz]: ", Sampling_freq)
    print("時間窓長[sec]: ", time_length)
    print("分析周波数レンジ[Hz]: ", Sampling_freq / 2,'\n')
    
    waveform = get_waveform(filenames[i])
    fft = get_fft(waveform)
    stft_spectrogram =get_stft_spectrogram(waveform)
    mel_spectrogram = get_mel_spectrogram(stft_spectrogram)
        
    # 時間波形と周波数波形をプロット
    timescale = np.arange(Data_num) #len(waveform)
    timescale = timescale/Sampling_freq
    freq = np.arange(Data_num//2) * Sampling_freq / Data_num 
    
    display.display(display.Audio(waveform, rate=Sampling_freq))

    fig, axes = plt.subplots(4, figsize=(12, 25))
    
    axes[0].plot(timescale, waveform.numpy())
    axes[0].set_title('Waveform')
    axes[0].set_xlim([0, Data_num/Sampling_freq]) # [0, Data_num]
    axes[0].set_xlabel("time[sec]")
    axes[0].grid()
    
    axes[1].plot(freq/1000, fft) 
    axes[1].set_title('FFT Specto')
    axes[1].set_xlabel("frequency[kHz]")
    axes[1].set_ylabel("[dB]")
    axes[1].grid()
    
    axes[2].set_title('STFT Spectrogram')
    plot_spectrogram(stft_spectrogram, axes[2])
    axes[2].set_xlabel("time[sec]")
    axes[2].set_ylabel("frequency")
    axes[2].grid()
    
    axes[3].set_title('Mel Spectrogram')
    plot_spectrogram(mel_spectrogram, axes[3])
    axes[3].set_xlabel("time[sec]")
    axes[3].set_ylabel("frequency")
    axes[3].grid()
    
    plt.show()
    gc.collect()

In [49]:
from  ipywidgets import interact
import ipywidgets as widgets

slider = widgets.IntSlider(
    value=0,                        # 初めの値
    min=0,                          # 最小値
    max=len(filenames)-1,             # 最大値
    step=1,                         # ステップ数
    description='wav file:',   # スライダーの名前
    orientation='horizontal'        # 位置、verticalなら縦になる
)

interact(show_wav_file, i=slider)

interactive(children=(IntSlider(value=0, description='wav file:', max=300), Output()), _dom_classes=('widget-i…

<function __main__.show_wav_file(i)>

In [44]:
flag = False

## Valid Notmal

In [None]:
# オーディオクリップをfilenamesというリストに抽出します
filenames = glob.glob('../01_input/wav/valid_normal/dummy/*.wav')
num_samples = len(filenames)
filenames.sort()
print('Number of total examples:', num_samples)
print('Example file tensor:', filenames[0])

flag = True

Number of total examples: 150
Example file tensor: ../01_input/wav/valid_normal/dummy/000.wav


In [27]:
def show_wav_file(i):
    if flag == False:
        print("Please run the above cell.")
        return
    
    from IPython import display
    valid_normal_file = tf.io.read_file(filenames[i])
    valid_normal_audio, sampling_rate = tf.audio.decode_wav(contents=valid_normal_file)
    
    # Audio Setting
    Data_num = valid_normal_audio.shape[0]
    Sampling_freq = sampling_rate.numpy()
    time_length = Data_num / Sampling_freq 

    print("File: ", filenames[i])
    print("Data数: ", Data_num)
    print("サンプリング周波数[Hz]: ", Sampling_freq)
    print("時間窓長[sec]: ", time_length)
    print("分析周波数レンジ[Hz]: ", Sampling_freq / 2,'\n')
    
    waveform = get_waveform(filenames[i])
    fft = get_fft(waveform)
    stft_spectrogram =get_stft_spectrogram(waveform)
    mel_spectrogram = get_mel_spectrogram(stft_spectrogram)
        
    # 時間波形と周波数波形をプロット
    timescale = np.arange(Data_num) #len(waveform)
    timescale = timescale/Sampling_freq
    freq = np.arange(Data_num//2) * Sampling_freq / Data_num 

    display.display(display.Audio(waveform, rate=Sampling_freq))
    
    fig, axes = plt.subplots(4, figsize=(12, 25))
    
    axes[0].plot(timescale, waveform.numpy())
    axes[0].set_title('Waveform')
    axes[0].set_xlim([0, Data_num/Sampling_freq]) # [0, Data_num]
    axes[0].set_xlabel("time[sec]")
    axes[0].grid()
    
    axes[1].plot(freq/1000, fft) 
    axes[1].set_title('FFT Specto')
    axes[1].set_xlabel("frequency[kHz]")
    axes[1].set_ylabel("[dB]")
    axes[1].grid()
    
    axes[2].set_title('STFT Spectrogram')
    plot_spectrogram(stft_spectrogram, axes[2])
    axes[2].set_xlabel("time[sec]")
    axes[2].set_ylabel("frequency")
    axes[2].grid()
    
    axes[3].set_title('lMel Spectrogram')
    #axes[3].plot(mel_spectrogram) 
    plot_spectrogram(mel_spectrogram, axes[3])
    axes[3].set_xlabel("time[sec]")
    axes[3].set_ylabel("frequency")
    axes[3].grid()
    
    plt.show()
    gc.collect()

In [28]:
slider = widgets.IntSlider(
    value=0,                        # 初めの値
    min=0,                          # 最小値
    max=len(filenames)-1,           # 最大値
    step=1,                         # ステップ数
    description='wav file:',   # スライダーの名前
    orientation='horizontal'        # 位置、verticalなら縦になる
)

interact(show_wav_file, i=slider)

interactive(children=(IntSlider(value=0, description='wav file:', max=150), Output()), _dom_classes=('widget-i…

<function __main__.show_wav_file(i)>

In [None]:
flag = False

### Valid Anomaly

In [50]:
# オーディオクリップをfilenamesというリストに抽出します
filenames = glob.glob('../01_input/wav/valid_anomaly/dummy/*.wav')
num_samples = len(filenames)
filenames.sort()
print('Number of total examples:', num_samples)
print('Example file tensor:', filenames[0])

Number of total examples: 50
Example file tensor: ../01_input/wav/valid_anomaly/dummy/000.wav


In [51]:
def show_wav_file(i):
    if flag == False:
        print("Please run the above cell.")
        return
    
    from IPython import display
    valid_anomaly_file = tf.io.read_file(filenames[i])
    valid_anomaly_audio, sampling_rate = tf.audio.decode_wav(contents=valid_anomaly_file)
    
    # Audio Setting
    Data_num = valid_anomaly_audio.shape[0]
    Sampling_freq = sampling_rate.numpy()
    time_length = Data_num / Sampling_freq 

    print("File: ", filenames[i])
    print("Data数: ", Data_num)
    print("サンプリング周波数[Hz]: ", Sampling_freq)
    print("時間窓長[sec]: ", time_length)
    print("分析周波数レンジ[Hz]: ", Sampling_freq / 2,'\n')
    
    waveform = get_waveform(filenames[i])
    fft = get_fft(waveform)
    stft_spectrogram =get_stft_spectrogram(waveform)
    mel_spectrogram = get_mel_spectrogram(stft_spectrogram)
        
    # 時間波形と周波数波形をプロット
    timescale = np.arange(Data_num) #len(waveform)
    timescale = timescale/Sampling_freq
    freq = np.arange(Data_num//2) * Sampling_freq / Data_num 

    display.display(display.Audio(waveform, rate=Sampling_freq))
    
    fig, axes = plt.subplots(4, figsize=(12, 25))
    
    axes[0].plot(timescale, waveform.numpy())
    axes[0].set_title('Waveform')
    axes[0].set_xlim([0, Data_num/Sampling_freq]) # [0, Data_num]
    axes[0].set_xlabel("time[sec]")
    axes[0].grid()
    
    axes[1].plot(freq/1000, fft) 
    axes[1].set_title('FFT Specto')
    axes[1].set_xlabel("frequency[kHz]")
    axes[1].set_ylabel("[dB]")
    axes[1].grid()
    
    axes[2].set_title('STFT Spectrogram')
    plot_spectrogram(stft_spectrogram, axes[2])
    axes[2].set_xlabel("time[sec]")
    axes[2].set_ylabel("frequency")
    axes[2].grid()
    
    axes[3].set_title('lMel Spectrogram')
    #axes[3].plot(mel_spectrogram) 
    plot_spectrogram(mel_spectrogram, axes[3])
    axes[3].set_xlabel("time[sec]")
    axes[3].set_ylabel("frequency")
    axes[3].grid()
    
    plt.show()
    gc.collect()

In [53]:
slider = widgets.IntSlider(
    value=0,                        # 初めの値
    min=0,                          # 最小値
    max=len(filenames)-1,             # 最大値
    step=1,                         # ステップ数
    description='wav file:',   # スライダーの名前
    orientation='horizontal'        # 位置、verticalなら縦になる
)

interact(show_wav_file, i=slider)

interactive(children=(IntSlider(value=0, description='wav file:', max=49), Output()), _dom_classes=('widget-in…

<function __main__.show_wav_file(i)>

In [55]:
flag = False

## Test

In [58]:
# オーディオクリップをfilenamesというリストに抽出します
filenames = glob.glob('../01_input/wav/test/dummy/*.wav')
num_samples = len(filenames)
filenames.sort()
print('Number of total examples:', num_samples)
print('Example file tensor:', filenames[0])

flag = True

Number of total examples: 200
Example file tensor: ../01_input/wav/test/dummy/000.wav


In [59]:
def show_wav_file(i):
    if flag == False:
        print("Please run the above cell.")
        return
    
    from IPython import display
    valid_anomaly_file = tf.io.read_file(filenames[i])
    valid_anomaly_audio, sampling_rate = tf.audio.decode_wav(contents=valid_anomaly_file)
    
    # Audio Setting
    Data_num = valid_anomaly_audio.shape[0]
    Sampling_freq = sampling_rate.numpy()
    time_length = Data_num / Sampling_freq 

    print("File: ", filenames[i])
    print("Data数: ", Data_num)
    print("サンプリング周波数[Hz]: ", Sampling_freq)
    print("時間窓長[sec]: ", time_length)
    print("分析周波数レンジ[Hz]: ", Sampling_freq / 2,'\n')
    
    waveform = get_waveform(filenames[i])
    fft = get_fft(waveform)
    stft_spectrogram =get_stft_spectrogram(waveform)
    mel_spectrogram = get_mel_spectrogram(stft_spectrogram)
        
    # 時間波形と周波数波形をプロット
    timescale = np.arange(Data_num) #len(waveform)
    timescale = timescale/Sampling_freq
    freq = np.arange(Data_num//2) * Sampling_freq / Data_num 

    display.display(display.Audio(waveform, rate=Sampling_freq))
    
    fig, axes = plt.subplots(4, figsize=(12, 25))
    
    axes[0].plot(timescale, waveform.numpy())
    axes[0].set_title('Waveform')
    axes[0].set_xlim([0, Data_num/Sampling_freq]) # [0, Data_num]
    axes[0].set_xlabel("time[sec]")
    axes[0].grid()
    
    axes[1].plot(freq/1000, fft) 
    axes[1].set_title('FFT Specto')
    axes[1].set_xlabel("frequency[kHz]")
    axes[1].set_ylabel("[dB]")
    axes[1].grid()
    
    axes[2].set_title('STFT Spectrogram')
    plot_spectrogram(stft_spectrogram, axes[2])
    axes[2].set_xlabel("time[sec]")
    axes[2].set_ylabel("frequency")
    axes[2].grid()
    
    axes[3].set_title('lMel Spectrogram')
    plot_spectrogram(mel_spectrogram, axes[3])
    axes[3].set_xlabel("time[sec]")
    axes[3].set_ylabel("frequency")
    axes[3].grid()
    
    plt.show()
    gc.collect()

In [60]:
slider = widgets.IntSlider(
    value=0,                        # 初めの値
    min=0,                          # 最小値
    max=len(filenames)-1,             # 最大値
    step=1,                         # ステップ数
    description='wav file:',   # スライダーの名前
    orientation='horizontal'        # 位置、verticalなら縦になる
)

interact(show_wav_file, i=slider)

interactive(children=(IntSlider(value=0, description='wav file:', max=199), Output()), _dom_classes=('widget-i…

<function __main__.show_wav_file(i)>

In [None]:
flag = False