# 01. リアルタイム音響可視化

リアルタイムで音声を処理し、動的に可視化する技術を学びます。

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from scipy.fft import fft, fftfreq
from scipy import signal
import warnings
warnings.filterwarnings('ignore')

plt.rcParams['font.family'] = 'DejaVu Sans'

## 1. シミュレートされたリアルタイム可視化

In [None]:
class RealTimeVisualizer:
    def __init__(self, sample_rate=8000, buffer_size=1024):
        self.sample_rate = sample_rate
        self.buffer_size = buffer_size
        self.time_buffer = np.zeros(buffer_size)
        self.freq_buffer = np.zeros(buffer_size // 2 + 1)
        
        # 周波数軸
        self.frequencies = fftfreq(buffer_size, 1/sample_rate)[:buffer_size//2+1]
        
        # 時間軸（相対時間）
        self.time_axis = np.arange(buffer_size) / sample_rate
        
        # 可視化用データの履歴
        self.history_length = 100
        self.spectrum_history = np.zeros((self.history_length, len(self.frequencies)))
        self.time_index = 0
        
    def update_buffer(self, new_data):
        """新しいデータでバッファを更新"""
        if len(new_data) != self.buffer_size:
            raise ValueError(f"Data size must be {self.buffer_size}")
        
        self.time_buffer = new_data
        
        # FFT計算
        windowed_data = new_data * np.hanning(self.buffer_size)
        fft_result = fft(windowed_data)
        self.freq_buffer = np.abs(fft_result[:self.buffer_size//2+1])
        
        # スペクトルの履歴を更新
        self.spectrum_history[self.time_index % self.history_length] = self.freq_buffer
        self.time_index += 1
        
    def get_current_spectrum(self):
        """現在のスペクトルを取得"""
        return self.frequencies, self.freq_buffer
        
    def get_current_waveform(self):
        """現在の波形を取得"""
        return self.time_axis, self.time_buffer
        
    def get_spectrogram_data(self):
        """スペクトログラム用データを取得"""
        # 履歴の有効な部分を取得
        valid_history = min(self.time_index, self.history_length)
        if valid_history == 0:
            return np.array([]), np.array([]), np.array([])
        
        # 時間軸（過去から現在へ）
        time_steps = np.arange(valid_history) * (self.buffer_size / self.sample_rate)
        
        # スペクトルデータ（dB変換）
        spectrum_db = 20 * np.log10(self.spectrum_history[:valid_history].T + 1e-10)
        
        return time_steps, self.frequencies, spectrum_db

# テスト用信号生成器
class SignalGenerator:
    def __init__(self, sample_rate=8000):
        self.sample_rate = sample_rate
        self.time = 0
        
    def generate_sweep(self, duration, f_start=100, f_end=1000):
        """周波数スイープ信号"""
        t = np.linspace(self.time, self.time + duration, 
                       int(self.sample_rate * duration), False)
        self.time += duration
        return signal.chirp(t, f_start, duration, f_end, method='linear')
        
    def generate_multi_tone(self, duration, frequencies, amplitudes=None):
        """マルチトーン信号"""
        if amplitudes is None:
            amplitudes = [1.0] * len(frequencies)
            
        t = np.linspace(self.time, self.time + duration,
                       int(self.sample_rate * duration), False)
        
        signal_sum = np.zeros_like(t)
        for freq, amp in zip(frequencies, amplitudes):
            signal_sum += amp * np.sin(2 * np.pi * freq * t)
            
        self.time += duration
        return signal_sum
        
    def generate_noise(self, duration, noise_type='white'):
        """ノイズ信号"""
        samples = int(self.sample_rate * duration)
        
        if noise_type == 'white':
            noise = np.random.normal(0, 1, samples)
        elif noise_type == 'pink':
            # 簡易ピンクノイズ（1/f特性の近似）
            white = np.random.normal(0, 1, samples)
            freqs = fftfreq(samples, 1/self.sample_rate)
            fft_white = fft(white)
            # 周波数に反比例する重み
            weights = 1 / np.sqrt(np.abs(freqs) + 1)
            fft_pink = fft_white * weights
            noise = np.real(np.fft.ifft(fft_pink))
        
        self.time += duration
        return noise

# リアルタイム可視化のシミュレーション
sample_rate = 8000
buffer_size = 512
buffer_duration = buffer_size / sample_rate  # 約0.064秒

visualizer = RealTimeVisualizer(sample_rate, buffer_size)
generator = SignalGenerator(sample_rate)

# シミュレーション用の信号シーケンス
signal_sequence = [
    ('スイープ', lambda: generator.generate_sweep(2.0, 200, 800)),
    ('マルチトーン', lambda: generator.generate_multi_tone(2.0, [300, 500, 700], [1.0, 0.7, 0.5])),
    ('ホワイトノイズ', lambda: generator.generate_noise(1.5, 'white')),
    ('ピンクノイズ', lambda: generator.generate_noise(1.5, 'pink')),
]

# 各信号タイプのスナップショットを可視化
plt.figure(figsize=(16, 12))

for signal_idx, (signal_name, signal_func) in enumerate(signal_sequence):
    # 信号を生成
    test_signal = signal_func()
    
    # バッファサイズに分割して処理
    num_buffers = len(test_signal) // buffer_size
    
    # 中間地点のバッファを表示用に選択
    mid_buffer_idx = num_buffers // 2
    start_sample = mid_buffer_idx * buffer_size
    end_sample = start_sample + buffer_size
    
    buffer_data = test_signal[start_sample:end_sample]
    visualizer.update_buffer(buffer_data)
    
    # 現在の波形
    plt.subplot(4, 3, signal_idx * 3 + 1)
    time_axis, waveform = visualizer.get_current_waveform()
    plt.plot(time_axis * 1000, waveform, 'b-', linewidth=2)  # ms単位
    plt.title(f'{signal_name}（波形）')
    plt.xlabel('時間 (ms)')
    plt.ylabel('振幅')
    plt.grid(True)
    
    # 現在のスペクトル
    plt.subplot(4, 3, signal_idx * 3 + 2)
    frequencies, spectrum = visualizer.get_current_spectrum()
    plt.plot(frequencies, 20 * np.log10(spectrum + 1e-10), 'r-', linewidth=2)
    plt.title(f'{signal_name}（スペクトル）')
    plt.xlabel('周波数 (Hz)')
    plt.ylabel('振幅 (dB)')
    plt.grid(True)
    plt.xlim(0, 1500)
    
    # 複数バッファを処理してスペクトログラムを作成
    plt.subplot(4, 3, signal_idx * 3 + 3)
    
    # 全信号を段階的に処理
    visualizer_temp = RealTimeVisualizer(sample_rate, buffer_size)
    for i in range(0, len(test_signal) - buffer_size, buffer_size):
        buffer_chunk = test_signal[i:i + buffer_size]
        visualizer_temp.update_buffer(buffer_chunk)
    
    # スペクトログラムデータを取得
    time_steps, freq_axis, spectrogram_data = visualizer_temp.get_spectrogram_data()
    
    if len(time_steps) > 0:
        plt.pcolormesh(time_steps, freq_axis, spectrogram_data, 
                      shading='gouraud', cmap='viridis')
        plt.title(f'{signal_name}（スペクトログラム）')
        plt.xlabel('時間 (秒)')
        plt.ylabel('周波数 (Hz)')
        plt.ylim(0, 1500)
        plt.colorbar(label='振幅 (dB)')

plt.tight_layout()
plt.show()

print("リアルタイム可視化の特徴:")
print(f"- バッファサイズ: {buffer_size} サンプル")
print(f"- 更新間隔: {buffer_duration*1000:.1f} ms")
print(f"- 周波数分解能: {sample_rate/buffer_size:.1f} Hz")
print(f"- 時間分解能: {buffer_duration*1000:.1f} ms")

## 2. インタラクティブなパラメータ調整

In [None]:
# インタラクティブなパラメータ調整のシミュレーション
class InteractiveAudioProcessor:
    def __init__(self, sample_rate=8000):
        self.sample_rate = sample_rate
        self.parameters = {
            'frequency': 440.0,
            'amplitude': 1.0,
            'filter_cutoff': 1000.0,
            'noise_level': 0.0,
            'harmonic_strength': 0.5
        }
        
    def update_parameter(self, param_name, value):
        """パラメータを更新"""
        if param_name in self.parameters:
            self.parameters[param_name] = value
            
    def generate_signal(self, duration=1.0):
        """現在のパラメータで信号を生成"""
        t = np.linspace(0, duration, int(self.sample_rate * duration), False)
        
        # 基本波形（基音 + 倍音）
        freq = self.parameters['frequency']
        amp = self.parameters['amplitude']
        harmonic_str = self.parameters['harmonic_strength']
        
        signal = (amp * np.sin(2 * np.pi * freq * t) +
                 harmonic_str * amp * 0.5 * np.sin(2 * np.pi * freq * 2 * t) +
                 harmonic_str * amp * 0.3 * np.sin(2 * np.pi * freq * 3 * t))
        
        # ノイズを追加
        noise_level = self.parameters['noise_level']
        if noise_level > 0:
            noise = noise_level * np.random.normal(0, 1, len(signal))
            signal += noise
            
        return t, signal
        
    def apply_filter(self, signal):
        """ローパスフィルタを適用"""
        cutoff = self.parameters['filter_cutoff']
        nyquist = self.sample_rate / 2
        
        if cutoff >= nyquist:
            return signal  # フィルタなし
            
        # バターワースフィルタ
        normalized_cutoff = cutoff / nyquist
        b, a = signal.butter(4, normalized_cutoff, btype='low')
        filtered_signal = signal.filtfilt(b, a, signal)
        
        return filtered_signal

# パラメータセットの比較
processor = InteractiveAudioProcessor()

parameter_sets = [
    {'name': 'デフォルト', 'frequency': 440, 'amplitude': 1.0, 'filter_cutoff': 2000, 'noise_level': 0.0, 'harmonic_strength': 0.5},
    {'name': '低音', 'frequency': 220, 'amplitude': 1.0, 'filter_cutoff': 2000, 'noise_level': 0.0, 'harmonic_strength': 0.5},
    {'name': '高音', 'frequency': 880, 'amplitude': 1.0, 'filter_cutoff': 2000, 'noise_level': 0.0, 'harmonic_strength': 0.5},
    {'name': '倍音豊富', 'frequency': 440, 'amplitude': 1.0, 'filter_cutoff': 2000, 'noise_level': 0.0, 'harmonic_strength': 1.0},
    {'name': 'ローパス', 'frequency': 440, 'amplitude': 1.0, 'filter_cutoff': 800, 'noise_level': 0.0, 'harmonic_strength': 0.5},
    {'name': 'ノイズあり', 'frequency': 440, 'amplitude': 1.0, 'filter_cutoff': 2000, 'noise_level': 0.3, 'harmonic_strength': 0.5}
]

plt.figure(figsize=(18, 15))

for i, param_set in enumerate(parameter_sets):
    # パラメータを設定
    for param_name, value in param_set.items():
        if param_name != 'name':
            processor.update_parameter(param_name, value)
    
    # 信号を生成
    t, original_signal = processor.generate_signal(0.5)  # 0.5秒
    filtered_signal = processor.apply_filter(original_signal)
    
    # FFT解析
    fft_original = fft(original_signal)
    fft_filtered = fft(filtered_signal)
    frequencies = fftfreq(len(original_signal), 1/processor.sample_rate)
    positive_mask = frequencies >= 0
    
    # 時間波形（元信号 vs フィルタ後）
    plt.subplot(6, 3, i * 3 + 1)
    plt.plot(t[:400], original_signal[:400], 'b-', alpha=0.7, label='元信号', linewidth=2)
    plt.plot(t[:400], filtered_signal[:400], 'r-', label='フィルタ後', linewidth=2)
    plt.title(f'{param_set["name"]}（波形）')
    plt.ylabel('振幅')
    plt.legend()
    plt.grid(True)
    
    # スペクトル比較
    plt.subplot(6, 3, i * 3 + 2)
    plt.plot(frequencies[positive_mask], 20 * np.log10(np.abs(fft_original)[positive_mask] + 1e-10), 
            'b-', alpha=0.7, label='元スペクトル', linewidth=2)
    plt.plot(frequencies[positive_mask], 20 * np.log10(np.abs(fft_filtered)[positive_mask] + 1e-10), 
            'r-', label='フィルタ後', linewidth=2)
    
    # フィルタのカットオフ周波数を表示
    cutoff = processor.parameters['filter_cutoff']
    plt.axvline(x=cutoff, color='orange', linestyle='--', alpha=0.7, label=f'カットオフ: {cutoff} Hz')
    
    plt.title(f'{param_set["name"]}（スペクトル）')
    plt.ylabel('振幅 (dB)')
    plt.legend()
    plt.grid(True)
    plt.xlim(0, 2000)
    
    # パラメータ表示
    plt.subplot(6, 3, i * 3 + 3)
    param_text = f"""周波数: {processor.parameters['frequency']:.0f} Hz
振幅: {processor.parameters['amplitude']:.1f}
フィルタ: {processor.parameters['filter_cutoff']:.0f} Hz
ノイズ: {processor.parameters['noise_level']:.1f}
倍音: {processor.parameters['harmonic_strength']:.1f}"""
    
    plt.text(0.1, 0.5, param_text, transform=plt.gca().transAxes, 
            fontsize=12, verticalalignment='center',
            bbox=dict(boxstyle="round,pad=0.3", facecolor="lightblue", alpha=0.8))
    plt.title(f'{param_set["name"]}（パラメータ）')
    plt.axis('off')
    
    if i == len(parameter_sets) - 1:
        plt.subplot(6, 3, i * 3 + 1)
        plt.xlabel('時間 (秒)')
        plt.subplot(6, 3, i * 3 + 2)
        plt.xlabel('周波数 (Hz)')

plt.tight_layout()
plt.show()

print("インタラクティブ処理のメリット:")
print("1. リアルタイムでの効果確認")
print("2. パラメータの即座の調整")
print("3. 直感的な音響効果の理解")
print("4. 教育・学習への応用")

## 3. 動的スペクトログラム

In [None]:
# 動的スペクトログラムのシミュレーション
class DynamicSpectrogram:
    def __init__(self, sample_rate=8000, window_size=512, max_time_frames=200):
        self.sample_rate = sample_rate
        self.window_size = window_size
        self.max_time_frames = max_time_frames
        
        # スペクトログラムデータ（周波数 x 時間）
        self.spectrogram_data = np.zeros((window_size // 2 + 1, max_time_frames))
        self.current_frame = 0
        
        # 周波数軸
        self.frequencies = fftfreq(window_size, 1/sample_rate)[:window_size//2+1]
        
        # 時間軸（相対時間）
        self.time_per_frame = window_size / sample_rate
        
    def add_frame(self, audio_frame):
        """新しい音声フレームを追加"""
        if len(audio_frame) != self.window_size:
            raise ValueError(f"Frame size must be {self.window_size}")
        
        # FFT計算
        windowed_frame = audio_frame * np.hanning(self.window_size)
        fft_result = fft(windowed_frame)
        magnitude = np.abs(fft_result[:self.window_size//2+1])
        
        # スペクトログラムを左にシフト（古いデータを削除）
        self.spectrogram_data[:, :-1] = self.spectrogram_data[:, 1:]
        
        # 新しいフレームを右端に追加
        self.spectrogram_data[:, -1] = magnitude
        
        self.current_frame += 1
        
    def get_spectrogram(self):
        """現在のスペクトログラムを取得"""
        # 有効なフレーム数
        valid_frames = min(self.current_frame, self.max_time_frames)
        
        # 時間軸（現在時刻を基準とした相対時間）
        time_axis = np.arange(-valid_frames + 1, 1) * self.time_per_frame
        
        # dB変換
        spectrogram_db = 20 * np.log10(self.spectrogram_data[:, -valid_frames:] + 1e-10)
        
        return time_axis, self.frequencies, spectrogram_db
        
    def reset(self):
        """スペクトログラムをリセット"""
        self.spectrogram_data.fill(0)
        self.current_frame = 0

# 時間変化する複雑な信号の生成
def create_dynamic_signal(duration=10.0, sample_rate=8000):
    """時間とともに変化する複雑な信号"""
    t = np.linspace(0, duration, int(sample_rate * duration), False)
    signal = np.zeros_like(t)
    
    # セクション1: 上昇スイープ (0-2秒)
    mask1 = (t >= 0) & (t < 2)
    signal[mask1] = signal.chirp(t[mask1], 200, 2, 800, method='linear')
    
    # セクション2: 複数トーン (2-4秒)
    mask2 = (t >= 2) & (t < 4)
    for freq in [300, 500, 700]:
        signal[mask2] += np.sin(2 * np.pi * freq * t[mask2]) / 3
    
    # セクション3: 下降スイープ (4-6秒)
    mask3 = (t >= 4) & (t < 6)
    signal[mask3] = signal.chirp(t[mask3] - 4, 800, 2, 200, method='linear')
    
    # セクション4: AM変調信号 (6-8秒)
    mask4 = (t >= 6) & (t < 8)
    carrier_freq = 600
    mod_freq = 5  # 5Hz変調
    carrier = np.sin(2 * np.pi * carrier_freq * t[mask4])
    modulation = 0.5 * (1 + np.sin(2 * np.pi * mod_freq * t[mask4]))
    signal[mask4] = carrier * modulation
    
    # セクション5: ノイズバースト (8-10秒)
    mask5 = (t >= 8) & (t < 10)
    np.random.seed(42)
    noise = np.random.normal(0, 0.5, np.sum(mask5))
    # 時間的にゲートされたノイズ
    gate = np.sin(np.pi * (t[mask5] - 8) / 2)**2  # 0から1に変化
    signal[mask5] = noise * gate
    
    return t, signal

# 動的スペクトログラムのデモ
sample_rate = 8000
window_size = 256
hop_size = window_size // 4  # 75%オーバーラップ

# 信号生成
t, dynamic_signal = create_dynamic_signal(8.0, sample_rate)

# スペクトログラム生成
spectrogram = DynamicSpectrogram(sample_rate, window_size, max_time_frames=150)

# フレームごとに処理
snapshots = []  # 特定時刻でのスナップショット
snapshot_times = [1.0, 3.0, 5.0, 7.0]  # スナップショットを取る時刻

for i in range(0, len(dynamic_signal) - window_size, hop_size):
    frame = dynamic_signal[i:i + window_size]
    spectrogram.add_frame(frame)
    
    # 現在時刻
    current_time = i / sample_rate
    
    # スナップショット時刻の場合、データを保存
    for snapshot_time in snapshot_times:
        if abs(current_time - snapshot_time) < hop_size / sample_rate:
            time_axis, freq_axis, spec_data = spectrogram.get_spectrogram()
            snapshots.append({
                'time': snapshot_time,
                'time_axis': time_axis.copy(),
                'freq_axis': freq_axis.copy(),
                'data': spec_data.copy()
            })

# 最終スペクトログラム
final_time_axis, final_freq_axis, final_spec_data = spectrogram.get_spectrogram()

plt.figure(figsize=(18, 12))

# 元の信号
plt.subplot(3, 3, 1)
plt.plot(t, dynamic_signal, 'b-', linewidth=1)
plt.title('動的信号（全体）')
plt.xlabel('時間 (秒)')
plt.ylabel('振幅')
plt.grid(True)

# スナップショット時刻をマーク
for snap_time in snapshot_times:
    plt.axvline(x=snap_time, color='r', linestyle='--', alpha=0.7)

# 最終スペクトログラム
plt.subplot(3, 3, 2)
plt.pcolormesh(final_time_axis, final_freq_axis, final_spec_data,
              shading='gouraud', cmap='viridis')
plt.title('動的スペクトログラム（最終状態）')
plt.xlabel('相対時間 (秒)')
plt.ylabel('周波数 (Hz)')
plt.colorbar(label='振幅 (dB)')
plt.ylim(0, 1500)

# 各スナップショットでのスペクトログラム
for i, snapshot in enumerate(snapshots[:4]):
    plt.subplot(3, 3, i + 4)
    plt.pcolormesh(snapshot['time_axis'], snapshot['freq_axis'], snapshot['data'],
                  shading='gouraud', cmap='plasma')
    plt.title(f'スナップショット t={snapshot["time"]:.1f}s')
    plt.xlabel('相対時間 (秒)')
    plt.ylabel('周波数 (Hz)')
    plt.ylim(0, 1500)
    
    if i < 2:
        plt.colorbar(label='振幅 (dB)')

# 特定周波数での時間変化
plt.subplot(3, 3, 8)
# 500Hz付近の強度変化
freq_idx = np.argmin(np.abs(final_freq_axis - 500))
intensity_500hz = final_spec_data[freq_idx, :]
plt.plot(final_time_axis, intensity_500hz, 'g-', linewidth=2, label='500Hz')

# 700Hz付近の強度変化
freq_idx = np.argmin(np.abs(final_freq_axis - 700))
intensity_700hz = final_spec_data[freq_idx, :]
plt.plot(final_time_axis, intensity_700hz, 'r-', linewidth=2, label='700Hz')

plt.title('特定周波数の時間変化')
plt.xlabel('相対時間 (秒)')
plt.ylabel('振幅 (dB)')
plt.legend()
plt.grid(True)

# 平均スペクトル
plt.subplot(3, 3, 9)
mean_spectrum = np.mean(final_spec_data, axis=1)
plt.plot(final_freq_axis, mean_spectrum, 'purple', linewidth=2)
plt.title('平均スペクトル')
plt.xlabel('周波数 (Hz)')
plt.ylabel('平均振幅 (dB)')
plt.grid(True)
plt.xlim(0, 1500)

plt.tight_layout()
plt.show()

print("動的スペクトログラムの応用:")
print("1. リアルタイム音響監視")
print("2. 音楽の可視化")
print("3. 音響異常検出")
print("4. 音声認識の前処理")

## 練習問題

1. マイクからのリアルタイム音声入力を使ったスペクトラムアナライザーを作成してみましょう
2. 音楽に合わせて色や形が変化するビジュアライザーを実装してみましょう
3. 声の特徴（ピッチ、フォルマント）をリアルタイムで追跡するシステムを作ってみましょう