In [None]:
# 本 p.82 `3-2-1` から作るrecord関数のColab版

from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode
from pydub import AudioSegment
import soundfile as sf
import io
import numpy as np

def record(duration):
  """PCのマイクで録音する関数"""

  display(Javascript('''
    const message = (text) => {
      const domId = 'message';
      const output = document.querySelector('#output-area');
      let target = document.querySelector(`#${domId}`);
      if (!target) {
        target = document.createElement('div');
        target.id = domId;
        output.insertBefore(target, output.firstChild);
      }
      target.textContent = text;
    };

    const sleep = async (sec) => {
      return new Promise(resolve => setTimeout(resolve, sec * 1000));
    };

    async function recordAudio(duration) {
      const chunks = [];
      const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
      const recorder = new MediaRecorder(stream);
      const fr = new FileReader();
      recorder.ondataavailable = e => chunks.push(e.data);
      recorder.onstop = e => fr.readAsDataURL(new Blob(chunks));
      await recorder.start();
      message('録音中･･･');
      await sleep(duration);
      await recorder.stop();
      message('録音終了');
      return await new Promise(resolve => {
        fr.onloadend = () => resolve(fr.result);
      });
    }
  '''))

  data = eval_js(f'recordAudio({duration})')

  # WAV形式に統一して読み込み
  buffer = io.BytesIO()
  AudioSegment.from_file(io.BytesIO(b64decode(data.split(',')[1]))).export(buffer, format="wav")
  buffer.seek(0)
  waveform, sampling_rate = sf.read(
    buffer,
    dtype='int16' # 無指定なら自動で正規化されるが、あえて本と同じ正規化処理をするため指定
  )

  # バイトデータを数値データに変換
  byte_to_num = np.frombuffer(waveform, dtype='int16')

  # 最大値を計算
  max_value = float((2 ** 16 / 2) - 1)

  # 波形を正規化
  normalized_waveform = byte_to_num / max_value

  return normalized_waveform, sampling_rate

print('録音準備完了')


In [None]:
# 本 p.90 `3-2` をColab用に改変
# 先にrecord関数のあるセルを実行（エラーが出ないことを確認）
# ランタイム再起動したら関数セルを再実行する (面倒. Colab特有)

duration = 5
waveform, sampling_rate = record(duration)
print(len(waveform), waveform)


In [None]:
# 本 p.91-94 `3-3` をColab用に改変
# 実際のグラフはp.93のような滑らかなものでなくp.94にあるもの
# 先にrecord関数のあるセルを実行（エラーが出ないことを確認）
# ランタイム再起動したら関数セルを再実行する (面倒. Colab特有)

import numpy as np
from matplotlib import pyplot as plt

def graph_plot(x, y):
  """波形をグラフにする関数"""

  # グラフの設定
  fig, ax = plt.subplots()
  ax.set_xlabel('Time[s]')
  ax.set_ylabel('Amplitude')

  # データのプロット
  ax.plot(x, y)
  plt.show()
  plt.close()

# 計測条件を設定して録音関数を実行
duration = 5
waveform, sampling_rate = record(duration)
print(len(waveform), waveform, sampling_rate)

# グラフをプロットする
dt = 1 / sampling_rate
t = np.arange(0, len(waveform) * dt, dt) # arrangeではない. array rangeの略
graph_plot(t, waveform)


In [None]:
# 本 p.95-96 `3-4` をColab用に改変
# 事前にrecord関数のあるセルを実行（エラーが出ないことを確認）
# ランタイム再起動したら関数セルを再実行する (面倒. Colab特有)

import numpy as np
from matplotlib import pyplot as plt
import soundfile as sf

def graph_plot(x, y):
  """波形をグラフにする関数"""

  # グラフの設定
  fig, ax = plt.subplots()
  ax.set_xlabel('Time[s]')
  ax.set_ylabel('Amplitude')

  # データのプロット
  ax.plot(x, y)
  plt.show()
  plt.close()

# 計測条件を設定して録音関数を実行
duration = 5
waveform, sampling_rate = record(duration)
print(len(waveform), waveform, sampling_rate)

# グラフをプロットする
dt = 1 / sampling_rate
t = np.arange(0, len(waveform) * dt, dt) # arrangeではない. array rangeの略
graph_plot(t, waveform)

# wavファイルに保存する
filename = 'recorded.wav'
sf.write(filename, waveform, sampling_rate)


In [None]:
# 本 p.97-101 `3-5` をColab用に改変
# 事前にrecord関数のあるセルを実行（エラーが出ないことを確認）
# ランタイム再起動したら関数セルを再実行する (面倒. Colab特有)

import numpy as np
from matplotlib import pyplot as plt
import soundfile as sf
import librosa

def graph_plot(x, y):
  """波形をグラフにする関数"""

  # グラフの設定
  fig, ax = plt.subplots()
  ax.set_xlabel('Time[s]')
  ax.set_ylabel('Amplitude')

  # データのプロット
  for x_axis, y_axis in zip(x, y):
    ax.plot(x_axis, y_axis)
  plt.show()
  plt.close()

# 計測条件を設定して録音関数を実行
duration = 5
waveform, sampling_rate = record(duration)
print(len(waveform), waveform, sampling_rate)

# グラフをプロットする
dt = 1 / sampling_rate
t = np.arange(0, len(waveform) * dt, dt) # arrangeではない. array rangeの略
graph_plot([t], [waveform])

# wavファイルに保存する
filename = 'recorded.wav'
sf.write(filename, waveform, sampling_rate)

# ボイスチェンジする
n_steps = 8
waveform_shifted = librosa.effects.pitch_shift(waveform, sr=sampling_rate, n_steps=n_steps)

# ピッチシフトされた音声を保存する
sf.write('pitch_shifted.wav', waveform_shifted, sampling_rate)

# 音声をグラフで比較する
graph_plot([t, t], [waveform, waveform_shifted])

In [None]:
# 前のセルに、ボイスチェンジ音声の再生を追加
# 事前にrecord関数のあるセルを実行（エラーが出ないことを確認）
# ランタイム再起動したら関数セルを再実行する (面倒. Colab特有)

import numpy as np
from matplotlib import pyplot as plt
import soundfile as sf
import librosa
from IPython.display import Audio, display # 追加

def graph_plot(x, y):
  """波形をグラフにする関数"""

  # グラフの設定
  fig, ax = plt.subplots()
  ax.set_xlabel('Time[s]')
  ax.set_ylabel('Amplitude')

  # データのプロット
  for x_axis, y_axis in zip(x, y):
    ax.plot(x_axis, y_axis)
  plt.show()
  plt.close()

# 計測条件を設定して録音関数を実行
duration = 5
waveform, sampling_rate = record(duration)

# グラフをプロットする
dt = 1 / sampling_rate
t = np.arange(0, len(waveform) * dt, dt) # arrangeではない. array rangeの略
graph_plot([t], [waveform])

# wavファイルに保存する
filename = 'recorded.wav'
sf.write(filename, waveform, sampling_rate)

# ボイスチェンジする
n_steps = 8
waveform_shifted = librosa.effects.pitch_shift(waveform, sr=sampling_rate, n_steps=n_steps)

# ピッチシフトされた音声を保存する
sf.write('pitch_shifted.wav', waveform_shifted, sampling_rate)

# 音声をグラフで比較する
graph_plot([t, t], [waveform, waveform_shifted])

# ボイスチェンジ音声を再生
display(Audio(f'/content/pitch_shifted.wav', autoplay=True))


In [None]:
# 本 p.102 周波数分析 著者ブログのコードを利用しColab用に改変
# 事前にrecord関数のあるセルを実行（エラーが出ないことを確認）
# ランタイム再起動したら関数セルを再実行する (面倒. Colab特有)

import numpy as np
from matplotlib import pyplot as plt
import soundfile as sf
import librosa
from scipy import fftpack

def freq_graph_plot(x, y, labels):
  """周波数分析をグラフにする関数"""

  fig, ax = plt.subplots()
  ax.set_xlabel('Frequency [Hz]')
  ax.set_ylabel('Amplitude')
  ax.set_yscale('log')
  ax.set_xlim([0, 500])
  for x_axis, y_axis, label in zip(x, y, labels):
      ax.plot(x_axis, y_axis, label=label)
  ax.legend()
  plt.show()
  plt.close()

def calc_fft(data, samplerate):
  """高速フーリエ変換（周波数分析）する関数"""

  spectrum = fftpack.fft(data)
  amp = np.sqrt((spectrum.real ** 2) + (spectrum.imag ** 2))
  amp = amp / (len(data) / 2)
  phase = np.arctan2(spectrum.imag, spectrum.real)
  phase = np.degrees(phase)
  freq = np.linspace(0, samplerate, len(data))

  return amp, freq

# 計測条件を設定して録音関数を実行
duration = 5
waveform, sampling_rate = record(duration)

# ボイスチェンジする
n_steps = 8
waveform_shifted = librosa.effects.pitch_shift(waveform, sr=sampling_rate, n_steps=n_steps)

# FFT（高速フーリエ変換）を計算
original_amp, original_freq = calc_fft(waveform, sampling_rate)
shifted_amp, shifted_freq = calc_fft(waveform_shifted, sampling_rate)

# 周波数比較をグラフに描画
freq_graph_plot(
  [original_freq, shifted_freq],
  [original_amp, shifted_amp],
  ['recorded', 'pitch-shifted']
)


In [None]:
# 独自のrecord_auto_stop関数 (record関数の自動停止版、無音を検知するまで録音)
# 参考 https://pavi2410.com/blog/detect-silence-using-web-audio/

from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode
from pydub import AudioSegment
import soundfile as sf
import io

error_count = 0

def record_auto_stop(
  SILENCE_RMS,      # 無音レベルの大きさの指標
  SILENCE_SEC,      # 秒. 無音がこの時間続いたら録音終了
):
  display(Javascript('''

    // メッセージ表示
    const message = (text) => {
      const domId = 'message';
      const output = document.querySelector('#output-area');
      let target = document.querySelector(`#${domId}`);
      if (!target) {
        target = document.createElement('div');
        target.id = domId;
        output.insertBefore(target, output.firstChild);
      }
      target.innerHTML += `${text}<br>`;
    };

    // 音量の指標を計算
    const calculateRMS = (data) => {
      let sum = 0;
      for (let i = 0; i < data.length; i++) {
        const normalized = data[i] / 128 - 1;
        sum += normalized * normalized;
      }
      return Math.sqrt(sum / data.length);
    };

    async function recordAndAutoStop(SILENCE_RMS, SILENCE_SEC) {
      // マイク使用可否チェック
      let stream = null;
      try {
        stream = await navigator.mediaDevices.getUserMedia({ audio: true });
      } catch(e) {
        return;
      }

      const audioContext = new AudioContext();
      const source = audioContext.createMediaStreamSource(stream);
      const analyser = audioContext.createAnalyser();
      source.connect(analyser);

      analyser.fftSize = 2048;
      const bufferLength = analyser.fftSize;
      const dataArray = new Uint8Array(bufferLength);
      let silenceStart = performance.now();

      const chunks = [];
      const recorder = new MediaRecorder(stream);
      recorder.ondataavailable = e => {
        message('録音中');
        chunks.push(e.data);
      };

      // 無音検知
      const detectSilence = () => {
        analyser.getByteTimeDomainData(dataArray);
        if (calculateRMS(dataArray) < SILENCE_RMS) {
          const now = performance.now();
          if (now - silenceStart > (SILENCE_SEC * 10**3)) {
            recorder.stop();
            return;
          }
        } else {
          silenceStart = performance.now();
          if (recorder.state === 'inactive') {
            recorder.start();
          }
        }
        requestAnimationFrame(detectSilence);
      }

      const fr = new FileReader();
      message('録音開始');
      recorder.onstop = e => {
        message('録音終了');
        fr.readAsDataURL(new Blob(chunks))
      };
      recorder.start(1000);
      detectSilence();

      return await new Promise(resolve => {
        fr.onloadend = () => resolve(fr.result);
      });
    };
  '''))

  data = eval_js(f'recordAndAutoStop({SILENCE_RMS}, {SILENCE_SEC})')

  # 簡易エラー処理
  global error_count
  if data == None:
    if error_count == 0:
      error_count += 1
      print('''
        ブラウザ画面の中央にマイク使用を求める表示があると思います。
        許可を選択し、もう一度セルを実行して下さい。
        また別のマイク使用を求めるポップアップが出たら、許可して下さい。
      '''.replace(' ', ''))
    else:
      print('マイクを使用できません')
    return [], 0

  # WAV形式に統一
  buffer = io.BytesIO()
  AudioSegment.from_file(
    io.BytesIO(b64decode(data.split(',')[1]))
  ).export(buffer, format="wav")
  buffer.seek(0)
  return sf.read(buffer)

print('録音準備完了')


In [None]:
# record_auto_stop関数のテスト
# 事前にrecord_auto_stop関数のあるセルを実行（エラーが出ないことを確認）
# ランタイム再起動したら関数セルを再実行する (面倒. Colab特有)

from IPython.display import Audio

SILENCE_RMS = 0.01 # 無音レベルの指標（環境音が大きければ増やす）
SILENCE_SEC = 2    # 秒. 無音がこの時間続いたら録音終了

try:
  record_auto_stop
except NameError:
  print('先にrecord_auto_stop関数のセルの実行が必要です')
else:
  # 実行後、無音になるまで録音し再生
  waveform, sampling_rate = record_auto_stop(SILENCE_RMS, SILENCE_SEC)

  if len(waveform) > 0:
    display(Audio(data=waveform, rate=sampling_rate, autoplay=True))


In [None]:
# record_auto_stop関数を使い、ボイスチェンジ音声を保存・再生
# 事前にrecord_auto_stop関数のあるセルを実行（エラーが出ないことを確認）
# ランタイム再起動したら関数セルを再実行する (面倒. Colab特有)
# 初回実行時、録音完了から再生まで少し待つかも

from IPython.display import Audio
import librosa

SILENCE_RMS = 0.01 # 無音レベルの指標（環境音が大きければ増やす）
SILENCE_SEC = 2    # 秒. 無音がこの時間続いたら録音終了

try:
  record_auto_stop
except NameError:
  print('先にrecord_auto_stop関数のセルの実行が必要です')
else:
  # 実行後、無音になるまで録音
  waveform, sampling_rate = record_auto_stop(SILENCE_RMS, SILENCE_SEC)

  if len(waveform) > 0:
    # ボイスチェンジ
    n_steps = 8
    waveform_shifted = librosa.effects.pitch_shift(
      waveform, sr=sampling_rate, n_steps=n_steps
    )

    # ボイスチェンジ音声を再生
    display(Audio(data=waveform_shifted, rate=sampling_rate, autoplay=True))


In [None]:
# NG例 1 Google Colab ではpyAudio を使えない
# https://qiita.com/aikimasaki/items/6621c10c3d92a36b30ab
# https://qiita.com/Gaudi1116/questions/26f9ff01ff2b30caad7b

# インストールはできたが
!apt install -y libasound2-dev portaudio19-dev libportaudio2 libportaudiocpp0 ffmpeg
!pip install pyaudio

import pyaudio

# 音声デバイスを認識できない
pa = pyaudio.PyAudio()
print(pa.get_device_count())


In [None]:
# NG例 2
# https://qiita.com/aikimasaki/items/6621c10c3d92a36b30ab
!pip install ipywebrtc

from ipywebrtc import AudioRecorder
import IPython
import time

recorder = AudioRecorder(recording=True)
recorder.recording = True
print('Started')
time.sleep(3)
recorder.recording = False
print('Ended')
# recorder.save('test.wav')
# ValueError: No data, did you record anything?
# 録音されてない. おそらく音声デバイスが認識されてない
IPython.display.display(recorder.audio)


In [None]:
# NG例 3 マイク認識されない

import numpy as np
import gradio as gr

def reverse_audio(audio):
  sr, data = audio
  return (sr, np.flipud(data))

mic = gr.Audio("microphone", type="numpy")
gr.Interface(reverse_audio, mic, "audio").launch(debug=True)