# **音频频谱实时分析（realtime spectrogram）**

## **Demo 视频**


播放器播放音乐，基于 **[madmom](https://github.com/CPJKU/madmom)** 库使用 **Python** 实时分析音频信号获得 **频谱（spectrogram）**，继而在 **[Scratch](https://create.codelab.club/projects/editor/)** 中以两种不同的视觉形式表现，Python 与 Scratch 的联系通过 **[CodeLab Adapter](https://adapter.codelab.club/)** 实现。

Demo 2.1 中用 12X10 的彩色圆点矩阵动态表现频谱，1 列 12 行圆点分别对应频谱中 12 个频段在某个时间点上的数值，10 列则是自定义的，对应 10 个时间点，可以在一帧画面中呈现更多或更少，画面随时间的流逝不断刷新；

Demo 2.2 中是 12 个 bar，对应 12 个频段，每一帧画面对应一个时间点，也是随时间流逝不断刷新。

关于这两个 Demo，特别是第一个，可以参看这篇[文档](http://localhost:3000/blog/2021/03/30/livespectrogram)，这里的 Demo 2.2 相比这篇文档写作时已做了改变。

In [38]:
import IPython.display as ipd

ipd.Video("video/demo2_realtimeSpectrogram1.mp4", width=800, height=600)

In [39]:
import IPython.display as ipd

ipd.Video("video/demo2_realtimeSpectrogram2.mp4", width=800, height=600)

&emsp;

## **Demo 代码**

**视频中的示例项目主要包括两部分：**

1. 在 JupyterLab 中使用 Python 实时分析音频信号，得到频谱（12 bands）

2. 在 Scratch 中以不同视觉形式实时映射频谱数值的变化，项目2.1[在此](https://create.codelab.club/projects/10202/)，项目2.2[在此](https://create.codelab.club/projects/12656/)


**需要注意的问题：**

+ 音频分析使用的音源
    
    当我们播放音乐时，电脑 **同时** 在对所播放的音乐做实时 **录音**，而我们分析音乐节拍的音源就是麦克风实时 **录下的音乐**；
    
    我们可能会直接使用电脑的内置音箱播放音乐，也可能会外接耳机或是音箱播放音乐，它们也都有自己内置的麦克风可以录音，所以这时候，要注意选择外接耳机或是音箱的麦克风作为音乐节拍分析的音源；
        
    Linux 系统（包括树莓派）可以安装使用 PulseAudio Volume Control 来查看选择音源
    
+ Pyhon 代码中 EIM NODE_ID 与配套 Scratch 项目内 EIM 积木中输入的 NODE_ID 名称必须一致
    
+ CodeLab Adapter 是否连接正常并已提前开启运行

&emsp;

## **关于 Colormap**

如下图所示，Demo 2.1 中 Scratch 项目内圆点色彩使用的是 [thermal colormap](https://matplotlib.org/cmocean/#thermal)，主要是为了保证对色彩变化的感知与数值变化等价（perceptually uniform），具体参看[这里](http://localhost:3000/blog/2021/03/30/livespectrogram#%E7%9F%A5%E8%A7%89%E7%BB%9F%E4%B8%80%E7%9A%84%EF%BC%88perceptually-uniform%EF%BC%89-colormap)的说明。

![thermal-colormap](img/thermal_colormap.png)

&emsp;

仓库内有一个文件夹叫作 **rgb_txt**，里面存放了 **[cmocean](https://github.com/matplotlib/cmocean/tree/master/cmocean/rgb)** 库中所有可下载使用的色彩 txt 文件，可以在下面 python 代码中替换当前使用的 ***thermal_rgb.txt***，从而得到不同的色彩效果。下面这 8 种是我个人觉得好看的，特别是 dense，截图供大家参考：

从左至右分别是：[balance](rgb_txt/balance_rgb.txt), [deep](rgb_txt/deep_rgb.txt), [dense](rgb_txt/dense_rgb.txt), [diff](rgb_txt/diff_rgb.txt)； 

![colormap示例](img/colormap1.png)

从左至右分别是：[ice](rgb_txt/ice_rgb.txt), [matter](rgb_txt/matter_rgb.txt), [rain](rgb_txt/rain_rgb.txt), [solar](rgb_txt/solar_rgb.txt)

![colormap示例](img/colormap2.png)


&emsp;

### **Demo 2.1**

In [None]:
import time
import numpy as np
from madmom.audio.signal import SignalProcessor, FramedSignalProcessor
from madmom.audio.filters import MelFilterbank
from madmom.audio.stft import ShortTimeFourierTransformProcessor
from madmom.audio.spectrogram import LogarithmicFilteredSpectrogramProcessor
from madmom.processors import IOProcessor, process_online
from codelab_adapter_client import AdapterNode


# EIM 初始化
class MyNode(AdapterNode):
    NODE_ID = "eim/spectrogram_matrix"          # 这个 ID 名（即 "eim/"后面的部分自己定义，配套 Scratch 项目中使用的接收和发送 EIM 消息的积木内要写与此一致的名字

    def __init__(self):
        super().__init__()
    
    def send_data(self, content):
        message = self.message_template()
        message["payload"]["content"] = content
        self.publish(message)


node = MyNode()
node.receive_loop_as_thread()
time.sleep(0.1)



kwargs = dict(
    sample_rate=44100,                        # 采样率
    num_channels=1,                           # 单声道
    norm=True,                                # signal 数据标准化
    frame_size=2048,                          # 以 2048 sample 为一个 frame 做 stft 分析，同时也是窗口大小
    hop_size=441,                             # 间隔 441 sample 有重合的取 frame 做 stft 分析
    fps=100,                                  # 每秒 frame 数（2048 与 441 的组合正好对应 100 fps）
    origin='stream',                          # 窗口相对参照 sample 的位置，参照 sample 在窗口正中央或左侧或右侧
    num_frames=1,                             # online realtime 模式下，num_frames 设为 1，origin 设为 'stream'
    filterbank=MelFilterbank,                 # 对频率做 MelFilter
    num_bands=12,                             # 分为 12 个频段
    infile=None,
    outfile=None
)


color_resolution = 2/256                                                       # spectrogram 最大值不超过 2，取值范围受不同输入设备影响，最好后面加一个开始前测试最大音量的部分
rgb_li = np.loadtxt('rgb_txt/thermal_rgb.txt', delimiter=' ', dtype=float)     # 从 thermal colormap 对应的 txt 文件中读取 RGB 值，以列表形式存储供后面代码使用；可以修改文件名，使用
                                                                               # rgb_txt 文件夹下其他 colormap 对应的 txt 文件，获得不同色彩效果
rgb_li = np.ndarray.tolist(rgb_li)                                             # 需要从 numpy array 转为 list 格式，否则 EIM 后面发送数据时会报格式错误





def blank_start():
    """
    这个函数用来向 Scratch 发送一个由色谱初始极值组成的列表，为了冲刷替换掉 EIM 消息通道里之前可能残留的列表
    使每一次运行此程序时， Scratch 的画面都有一个统一的初始状态
    
    """
    node.send_data([rgb_li[0], rgb_li[0], rgb_li[0], rgb_li[0], rgb_li[0], rgb_li[0], 
                    rgb_li[0], rgb_li[0], rgb_li[0], rgb_li[0], rgb_li[0], rgb_li[0]])
    
blank_start()




def rgb_spec(log_filt_spec):
    """
    将频谱 12 个频段的值映射到色谱对应的 RGB 值，返回一个列表 rgb_spec 后续发给 Scratch
    
    """
    rgb_spec = [i for i in range(12)]
    for i in range(len(log_filt_spec)):
        if np.isnan(log_filt_spec[i]):                                 # 音乐结束静止后会有 nan，这种情况下直接对应色谱中的起始颜色值
            cmap_index = 0

        else:
            cmap_index=int(np.round(log_filt_spec[i]/color_resolution)-1)    # 这部分是在做数值与颜色的映射
            if cmap_index < 0:                                               # 避免超出颜色取值区间，但是如果已提前获得设备的最大响度，这部分代码就不需要了
                cmap_index = 0                                               # 这里主要是先保证程序能持续运行
            if cmap_index > 255:
                cmap_index = 255
        rgb_spec[i] = rgb_li[cmap_index]
        #print(rgb_spec[i])
    
    return rgb_spec


def spec_callback(log_filt_spec, output=None):
    """
    将 process_online 函数返回的频谱对应到色谱RGB值，并以列表形式经 Adapter EIM 发送给 Scratch
    """
    if len(log_filt_spec)>0:
        log_filt_spec = np.ndarray.tolist(log_filt_spec[0])          # 转为列表格式，否则 EIM 报错
        #print(log_filt_spec)
        send_rgb_spec = rgb_spec(log_filt_spec)                      # 调用上方 rgb_spec() 函数
        node.send_data(send_rgb_spec)                                # 发送给 Scratch 的直接是映射过的 RGB 值，列表中有 12 个元素，每个元素内嵌套 3 个元素，即 rgb
       
        
        
try:         
    sig = SignalProcessor(**kwargs)
    frames = FramedSignalProcessor(**kwargs)
    stft = ShortTimeFourierTransformProcessor(**kwargs)
    filt = LogarithmicFilteredSpectrogramProcessor(**kwargs)
    in_processor = [sig, frames, stft, filt]
    out_processor = spec_callback
    processor = IOProcessor(in_processor, out_processor)
    process_online(processor,**kwargs)
    
except KeyboardInterrupt:
    node.send_data("stop") 
    print("interrupt by the user")

  return np.asanyarray(signal / scaling, dtype=signal.dtype)


&emsp;

### **Demo 2.2**

In [37]:
import time
import numpy as np
from madmom.audio.signal import SignalProcessor, FramedSignalProcessor
from madmom.audio.filters import MelFilterbank
from madmom.audio.stft import ShortTimeFourierTransformProcessor
from madmom.audio.spectrogram import LogarithmicFilteredSpectrogramProcessor
from madmom.processors import IOProcessor, process_online
from codelab_adapter_client import AdapterNode



# EIM 初始化
class MyNode(AdapterNode):
    NODE_ID = "eim/spectrogram"          # 这个 ID 名（即 "eim/"后面的部分自己定义，配套 Scratch 项目中使用的接收和发送 EIM 消息的积木内要写与此一致的名字

    def __init__(self):
        super().__init__()
    
    def send_data(self, content):
        message = self.message_template()
        message["payload"]["content"] = content
        self.publish(message)


        
node = MyNode()
node.receive_loop_as_thread()
time.sleep(0.1)



kwargs = dict(
    sample_rate=44100,                        # 采样率
    num_channels=1,                           # 单声道
    norm=True,                                # signal 数据标准化
    frame_size=2048,                          # 以 2048 sample 为一个 frame 做 stft 分析，同时也是窗口大小
    hop_size=441,                             # 间隔 441 sample 有重合的取 frame 做 stft 分析
    fps=100,                                  # 每秒 frame 数（2048 与 441 的组合正好对应 100 fps）
    origin='stream',                          # 窗口相对参照 sample 的位置，参照 sample 在窗口正中央或左侧或右侧
    num_frames=1,                             # online realtime 模式下，num_frames 设为 1，origin 设为 'stream'
    filterbank=MelFilterbank,                 # 对频率做 MelFilter
    num_bands=12,                             # 分为 12 个频段
    infile=None,
    outfile=None
)



def spec_callback(log_filt_spec, output=None):
    """
    将 process_online 函数返回的频谱对应到色谱RGB值，并以列表形式经 Adapter EIM 发送给 Scratch
    """
    if len(log_filt_spec)>0:
        log_filt_spec = np.ndarray.tolist(log_filt_spec[0])                 # 转为列表格式，否则 EIM 报错
        log_filt_spec = [0 if np.isnan(i) else i for i in log_filt_spec]    # 音乐静音的时候，数据为 nan，属于浮点型数值（float），为了 Scratch 画图，替换为 0
        #print(type(log_filt_spec[0]))
        node.send_data(log_filt_spec)


        
try:         
    sig = SignalProcessor(**kwargs)
    frames = FramedSignalProcessor(**kwargs)
    stft = ShortTimeFourierTransformProcessor(**kwargs)
    filt = LogarithmicFilteredSpectrogramProcessor(**kwargs)
    in_processor = [sig, frames, stft, filt]
    out_processor = spec_callback
    processor = IOProcessor(in_processor, out_processor)
    process_online(processor,**kwargs)
    
except KeyboardInterrupt:
    node.send_data("stop") 
    print("interrupt by the user")

interrupt by the user


&emsp;

## **音频实时频谱分析代码**

**频谱（spectrogram）的获得经历了以下 4 个步骤：**

1. 从音频流（stream）中抓取数据，对应 **SignalProcessor**，这一步需要考虑的参数有：

    + 采样率（sample_rate），关系到音质，这里是 44100Hz/s
    
    + 通道数（num_channels），这里是1，将原本的双声道立体声合并为单声道，降低分析的复杂度与计算量
    
    + 标准化（norm），按数据类型（dtype）的最大范围值标准化
 
 
2. 将原始样本数据（samples）按一定大小合并为帧（frame），对应 **FramedSignalProcessor**，相关参数有：

    + 音频帧大小（frame_size），每一帧包含的样本数，这里是 2048
    
    + hop_size，帧与帧之间间隔的样本数
    
    + fps（frame per second），每秒采集的帧数，类似采样率，但这里已变成以帧为单位且有重合的采集
    
    + origin，一帧对应的窗口（window）与参考样本（reference sample）的关系，实时分析时要设置为'stream'，即窗口相对参考样本居右边
    
    + num_frames, 返回的帧数，实时分析时要设为 1 
    
    
3. 对数据做短时傅立叶变换（STFT），对应 **ShortTimeFourierTransformProcessor**


4. 基于 STFT，生成频谱（spectrogram），再经梅尔滤波（MelFilterbank）生成 Mel 频谱，同时对数据取对数，对应 **LogarithmicFilteredSpectrogramProcessor**


In [None]:
from madmom.audio.signal import SignalProcessor, FramedSignalProcessor
from madmom.audio.filters import MelFilterbank
from madmom.audio.stft import ShortTimeFourierTransformProcessor
from madmom.audio.spectrogram import LogarithmicFilteredSpectrogramProcessor
from madmom.processors import IOProcessor, process_online


kwargs = dict(
    sample_rate=44100,                        # 采样率
    num_channels=1,                           # 单声道
    norm=True,                                # signal 数据标准化
    frame_size=2048,                          # 以 2048 sample 为一个 frame 做 stft 分析，同时也是窗口大小
    hop_size=441,                             # 间隔 441 sample 有重合的取 frame 做 stft 分析
    fps=100,                                  # 每秒 frame 数（2048 与 441 的组合正好对应 100 fps）
    origin='stream',                          # 窗口相对参照 sample 的位置，参照 sample 在窗口正中央或左侧或右侧
    num_frames=1,                             # online realtime 模式下，num_frames 设为 1，origin 设为 'stream'
    filterbank=MelFilterbank,                 # 对频率做 MelFilter
    num_bands=12,                             # 分为 12 个频段
    infile=None,                              # 与下面的 outfile 都是函数 process_online 的参数，此处都是无
    outfile=None
)



def spec_callback(log_filt_spec, output=None):
    if len(log_filt_spec)>0:
        print(log_filt_spec)
     
        
        
sig = SignalProcessor(**kwargs)
frames = FramedSignalProcessor(**kwargs)
stft = ShortTimeFourierTransformProcessor(**kwargs)
filt = LogarithmicFilteredSpectrogramProcessor(**kwargs)
in_processor = [sig, frames, stft, filt]
out_processor = spec_callback
processor = IOProcessor(in_processor, out_processor)
process_online(processor,**kwargs)