In [None]:
!pip install gradio

## 加载模型

In [None]:
#|export
from fastai.vision.all import *
# import gradio as gr
import numpy as np
import pandas as pd
import librosa

In [None]:
#|export
def _resnet_stem(*sizes):
    return [
        nn.Conv1d(sizes[i], sizes[i+1], kernel_size=3, stride = 2 if i==0 else 1)
            for i in range(len(sizes)-1)
    ] + [nn.MaxPool1d(kernel_size=3, stride=2, padding=1)]


def _conv_block(ni, nf, stride):
    return nn.Sequential(
        nn.Conv1d(ni, nf, kernel_size=3, stride=stride, padding=1),
        nn.BatchNorm1d(nf),
        nn.ReLU(),
        nn.Conv1d(nf, nf, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm1d(nf)  # 注意此处没有ReLU
    )

class ResBlock(Module):
    def __init__(self, ni, nf, stride=1):
        self.convs = _conv_block(ni, nf, stride)
        # 如果输入和输出通道数不同或应用了stride，调整身份连接
        if ni != nf or stride != 1:
            self.idconv = nn.Sequential(
                nn.Conv1d(ni, nf, kernel_size=1, stride=stride),
                nn.BatchNorm1d(nf)  # 添加批量归一化
            )
        else:
            self.idconv = nn.Identity()

    def forward(self, x):
        identity = self.idconv(x)
        out = self.convs(x)
        return F.relu(out + identity)



class ResNet(nn.Sequential):
    def __init__(self, n_out, layers, expansion=1):
        stem = _resnet_stem(1,512, 256, 128, 64) # 我不希望一次增加太多参数, 原:1-->512
#         stem = _resnet_stem(1, 32, 64, 64, 128, 256)
        self.block_szs = [64, 64, 128, 256, 512]
        blocks = [self._make_layer(*o) for o in enumerate(layers)]
        super().__init__(*stem, *blocks,
                         nn.AdaptiveAvgPool1d(1), Flatten(),
                         nn.Linear(self.block_szs[-1], n_out))

    def _make_layer(self, idx, n_layers):
        stride = 1 if idx==0 else 2
        ch_in,ch_out = self.block_szs[idx:idx+2]
        return nn.Sequential(*[
            ResBlock(ch_in if i==0 else ch_out, ch_out, stride if i==0 else 1)
            for i in range(n_layers)
        ])

In [None]:
#|export
def get_x(r): return X[r]
def get_y(r): return y_cat[r]
learn_inf = load_learner('/content/drive/MyDrive/RES_896171.pkl')

## 加载提取音频特征的函数

In [None]:
#|export
def extract_features(data, sr=22050, frame_length=2048, hop_length=512):
    # 提取零交叉率（ZCR）特征
    zcr = librosa.feature.zero_crossing_rate(y=data, frame_length=frame_length, hop_length=hop_length)
    zcr = np.squeeze(zcr)

    # 提取均方根能量（RMSE）特征
    rmse = librosa.feature.rms(y=data, frame_length=frame_length, hop_length=hop_length)
    rmse = np.squeeze(rmse)

    # 提取增强的 MFCC 特征
    mfcc = librosa.feature.mfcc(y=data, sr=sr)
    mfcc = np.ravel(mfcc.T)

    features = np.hstack([zcr, rmse, mfcc])

    return features

In [None]:
#|export
def add_noise(data, noise_factor=0.005):
    noise = np.random.randn(len(data))
    augmented_data = data + noise_factor * noise
    return augmented_data

def shift(data, sr, shift_max, shift_direction='both'):
    shift = np.random.randint(sr * shift_max)
    if shift_direction == 'right':
        shift = -shift
    elif shift_direction == 'both':
        direction = np.random.choice(['left', 'right'])
        if direction == 'right':
            shift = -shift
    augmented_data = np.roll(data, shift)
    return augmented_data

def pitch_shift(data, sr, n_steps):
    return librosa.effects.pitch_shift(data, sr=sr, n_steps=n_steps)

# 处理单个文件
def process_file(file_path, target_duration=2.0, target_sr=22050, offset=0.3):
    data, sr = librosa.load(file_path, sr=target_sr, offset=offset)

    # 如果音频长度不足 target_duration 秒，则进行填充
    if len(data) < target_duration * sr:
        padding = target_duration * sr - len(data)
        data = np.pad(data, (0, int(padding)), 'constant')
    else:
        data = data[:int(target_duration * sr)]

    # 原始特征
    features = extract_features(data, sr)

    # 数据增强和特征提取
    augmented_features = []
    augmented_features.append(features)

    # 添加噪声
    noisy_data = add_noise(data)
    noisy_features = extract_features(noisy_data, sr)
    augmented_features.append(noisy_features)

    # 时间偏移
    shifted_data = shift(data, sr, shift_max=2)
    shifted_features = extract_features(shifted_data, sr)
    augmented_features.append(shifted_features)

    # 音高变化
    pitched_data = pitch_shift(data, sr, n_steps=2)
    pitched_features = extract_features(pitched_data, sr)
    augmented_features.append(pitched_features)

    return augmented_features

## 进行预测

In [None]:
#|export
label_categories = learn_inf.dls.vocab

def classify_audio(audio):
    # 提取特征并预测
    augmented_features = process_file(audio)

    # 准备所有增强特征，增加必要的维度并堆叠成一个批次
    all_features = np.array([features.astype(np.float32).reshape(1, -1) for features in augmented_features])
    all_features = np.stack(all_features).reshape(len(augmented_features), 1, -1)

    # 将特征转换为Tensor格式
    X_tensor = torch.tensor(all_features)

    # 进行预测
    learn_inf.model.eval()
    with torch.no_grad():
        output = learn_inf.model(X_tensor)
        probs = torch.nn.functional.softmax(output, dim=1)
        avg_probs = probs.mean(axis=0)
    return dict(zip(label_categories, map(float, avg_probs)))


## UI界面

In [None]:
#|export
audio_input = gr.Audio(type='filepath')
label = gr.Label()
examples = ['Maryl.wav', 'MyAudio_1.wav']
intf = gr.Interface(fn=classify_audio, inputs=audio_input, outputs=label, examples=examples)
intf.launch(inline=False, share=True, debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
Running on public URL: https://dfdf63b45b1a60c1d8.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


## 云函数 | 加载模型

In [None]:
#|export
from fastai.vision.all import *
# import gradio as gr
import numpy as np
import pandas as pd
import librosa

In [None]:
#|export
def _resnet_stem(*sizes):
    return [
        nn.Conv1d(sizes[i], sizes[i+1], kernel_size=3, stride = 2 if i==0 else 1)
            for i in range(len(sizes)-1)
    ] + [nn.MaxPool1d(kernel_size=3, stride=2, padding=1)]


def _conv_block(ni, nf, stride):
    return nn.Sequential(
        nn.Conv1d(ni, nf, kernel_size=3, stride=stride, padding=1),
        nn.BatchNorm1d(nf),
        nn.ReLU(),
        nn.Conv1d(nf, nf, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm1d(nf)  # 注意此处没有ReLU
    )

class ResBlock(Module):
    def __init__(self, ni, nf, stride=1):
        self.convs = _conv_block(ni, nf, stride)
        # 如果输入和输出通道数不同或应用了stride，调整身份连接
        if ni != nf or stride != 1:
            self.idconv = nn.Sequential(
                nn.Conv1d(ni, nf, kernel_size=1, stride=stride),
                nn.BatchNorm1d(nf)  # 添加批量归一化
            )
        else:
            self.idconv = nn.Identity()

    def forward(self, x):
        identity = self.idconv(x)
        out = self.convs(x)
        return F.relu(out + identity)



class ResNet(nn.Sequential):
    def __init__(self, n_out, layers, expansion=1):
        stem = _resnet_stem(1,512, 256, 128, 64) # 我不希望一次增加太多参数, 原:1-->512
#         stem = _resnet_stem(1, 32, 64, 64, 128, 256)
        self.block_szs = [64, 64, 128, 256, 512]
        blocks = [self._make_layer(*o) for o in enumerate(layers)]
        super().__init__(*stem, *blocks,
                         nn.AdaptiveAvgPool1d(1), Flatten(),
                         nn.Linear(self.block_szs[-1], n_out))

    def _make_layer(self, idx, n_layers):
        stride = 1 if idx==0 else 2
        ch_in,ch_out = self.block_szs[idx:idx+2]
        return nn.Sequential(*[
            ResBlock(ch_in if i==0 else ch_out, ch_out, stride if i==0 else 1)
            for i in range(n_layers)
        ])

In [None]:
#|export
def get_x(r): return X[r]
def get_y(r): return y_cat[r]
learn_inf = load_learner('/content/drive/MyDrive/RES_896171.pkl')

## 云函数 | 读取特征

In [None]:
#|export
def extract_features(data, sr=22050, frame_length=2048, hop_length=512):
    # 提取零交叉率（ZCR）特征
    zcr = librosa.feature.zero_crossing_rate(y=data, frame_length=frame_length, hop_length=hop_length)
    zcr = np.squeeze(zcr)

    # 提取均方根能量（RMSE）特征
    rmse = librosa.feature.rms(y=data, frame_length=frame_length, hop_length=hop_length)
    rmse = np.squeeze(rmse)

    # 提取增强的 MFCC 特征
    mfcc = librosa.feature.mfcc(y=data, sr=sr)
    mfcc = np.ravel(mfcc.T)

    features = np.hstack([zcr, rmse, mfcc])

    return features

## 云函数 | 特征增强

In [None]:
#|export
def add_noise(data, noise_factor=0.005):
    noise = np.random.randn(len(data))
    augmented_data = data + noise_factor * noise
    return augmented_data

def shift(data, sr, shift_max, shift_direction='both'):
    shift = np.random.randint(sr * shift_max)
    if shift_direction == 'right':
        shift = -shift
    elif shift_direction == 'both':
        direction = np.random.choice(['left', 'right'])
        if direction == 'right':
            shift = -shift
    augmented_data = np.roll(data, shift)
    return augmented_data

def pitch_shift(data, sr, n_steps):
    return librosa.effects.pitch_shift(data, sr=sr, n_steps=n_steps)

# 处理单个文件, 这里不再传入文件了, 传入data可以只读一次, 加速计算
def process_file(data, target_duration=2.0, sr=22050):
    # data, sr = librosa.load(file_path, sr=target_sr, offset=offset)

    # 如果音频长度不足 target_duration 秒，则进行填充
    if len(data) < target_duration * sr:
        padding = target_duration * sr - len(data)
        data = np.pad(data, (0, int(padding)), 'constant')
    else:
        data = data[:int(target_duration * sr)]

    # 原始特征
    features = extract_features(data, sr)

    # 数据增强和特征提取
    augmented_features = []
    augmented_features.append(features)

    # 添加噪声
    noisy_data = add_noise(data)
    noisy_features = extract_features(noisy_data, sr)
    augmented_features.append(noisy_features)

    # 时间偏移
    shifted_data = shift(data, sr, shift_max=2)
    shifted_features = extract_features(shifted_data, sr)
    augmented_features.append(shifted_features)

    # 音高变化
    pitched_data = pitch_shift(data, sr, n_steps=2)
    pitched_features = extract_features(pitched_data, sr)
    augmented_features.append(pitched_features)

    return augmented_features

## 云函数 | 预测函数

In [None]:
# 预测函数
def predict_emotion(data, sr=22050):
    augmented_features = process_file(data)

    # 准备所有增强特征，增加必要的维度并堆叠成一个批次
    all_features = np.array([features.astype(np.float32).reshape(1, -1) for features in augmented_features])
    all_features = np.stack(all_features).reshape(len(augmented_features), 1, -1)

    # 将特征转换为Tensor格式
    X_tensor = torch.tensor(all_features)

    learn_inf.model.eval()
    with torch.no_grad():
        output = learn_inf.model(X_tensor)
        probs = torch.nn.functional.softmax(output, dim=1)
        avg_probs = probs.mean(axis=0)
    return list(map(float, avg_probs.squeeze()))

In [None]:
# 主处理函数
def process_and_classify_audio(file_path, step=0.1, target_duration=2.0, target_sr=22050):
    data, sr = librosa.load(file_path, sr=target_sr)
    total_duration = len(data) / sr
    results = []

    current_offset = 0
    while current_offset + target_duration <= total_duration:
        end_sample = int((current_offset + target_duration) * sr)
        sample_data = data[int(current_offset * sr):end_sample]
        probs = predict_emotion(sample_data, sr)
        results.append(probs)
        current_offset += step

    # 处理最后两秒数据
    while current_offset <= total_duration:
        last_data = data[-int(target_duration * sr):]
        probs = predict_emotion(last_data, sr)
        results.append(probs)
        current_offset += step

    return results

In [None]:
# 示例用法
output = process_and_classify_audio('/content/XP6a.wav')
output

[[0.685805082321167,
  7.120128486803878e-09,
  0.3141906261444092,
  4.270370027370518e-06,
  2.3486059319566266e-08,
  2.0365515496223452e-08,
  4.1602574718524465e-09],
 [0.23795385658740997,
  1.8054338468687092e-08,
  0.7620455622673035,
  4.869178269473196e-07,
  2.9089322950426322e-08,
  2.9440469617725284e-08,
  7.598616846848927e-09],
 [0.15064381062984467,
  2.9149298530484202e-09,
  0.8493555784225464,
  6.730027166668151e-07,
  1.3621130889873712e-08,
  2.134722087987484e-08,
  1.5891925553290775e-09],
 [0.12126141786575317,
  2.5706334838559997e-09,
  0.8787378668785095,
  7.27698932223575e-07,
  8.89624462985239e-09,
  6.502262728247388e-09,
  6.638436467198972e-10],
 [0.11013179272413254,
  9.014196500345406e-09,
  0.889866292476654,
  1.4484658095170744e-06,
  1.56212621504892e-07,
  3.221688587018434e-07,
  1.38034628172079e-09],
 [0.3702980875968933,
  7.505541077534872e-09,
  0.629700779914856,
  1.2251963426024304e-06,
  1.0455813104215395e-08,
  8.421038977246553e-

In [None]:
np.save('XP6a.npy', output)

In [None]:
output = np.load(f'XP6a.npy')

In [None]:
len(output)

2121

In [None]:
!pip install numpy

