In [1]:
import numpy as np
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import boto3
import requests
from botocore import UNSIGNED
from botocore.config import Config
from IPython.display import Audio
from torchaudio.utils import download_asset
import torchaudio

In [8]:
audio_path = r"/public1/cjh/workspace/DepressionPrediction/dataset/EATD-Corpus/train/2/positive.wav"

In [10]:
waveform, sample_rate = torchaudio.load(audio_path)
if waveform.shape[0] > 1:
    waveform = waveform[0]
waveform.shape

torch.Size([65376])

In [11]:
sample_rate

16000

In [12]:
waveform = torch.unsqueeze(waveform,dim = 0)
waveform.shape

torch.Size([1, 65376])

### try to pad audio to the same length

In [11]:
def pad_audio(signal, target_length):
    B,L = signal.shape
    # 计算需要填充的长度
    padding_length = target_length - L
    if padding_length > 0:
        # 使用零填充
        signal = torch.cat([signal, torch.zeros(B,padding_length)], dim=-1)
    return signal

target_length = 300000  # 例如，填充到 1 秒的长度，假设采样率为 16 kHz

padded_signal = pad_audio(waveform, target_length)
padded_signal.shape

torch.Size([1, 300000])

### try to use native pytorch api to do STFT

In [14]:
window_length = 2048  # 窗口长度
hop_length = waveform.shape[-1]  // 1024  # 窗口滑动步长
result = torch.stft(waveform,n_fft=window_length,hop_length=hop_length, return_complex=True)[:,:,0:1025]
result.shape    # (N,T) N-#freuqencies,T-#frames

torch.Size([1, 1025, 1025])

In [15]:
result.dtype

torch.complex64

In [16]:
result.real

tensor([[[-0.8640, -1.1376, -0.7755,  ...,  0.5400,  0.7155,  0.3819],
         [-0.3644, -0.6454, -0.3040,  ...,  0.4600,  0.6292,  0.2743],
         [-0.7611, -1.0355, -0.6692,  ...,  0.5474,  0.6951,  0.2645],
         ...,
         [ 0.0059, -0.0452,  0.1658,  ...,  0.0248, -0.0226, -0.0128],
         [ 0.0047, -0.0440,  0.1647,  ...,  0.0251, -0.0231, -0.0119],
         [ 0.0051, -0.0444,  0.1651,  ...,  0.0249, -0.0230, -0.0117]]])

In [17]:
result.imag

tensor([[[ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00],
         [-6.2585e-07,  9.8414e-02,  1.7925e-01,  ...,  1.8681e-03,
           8.8565e-02,  1.0981e-01],
         [-8.0466e-07,  4.5920e-02,  6.1775e-02,  ..., -5.8796e-03,
           1.9922e-01,  2.5276e-01],
         ...,
         [ 8.9407e-08,  3.2350e-04, -8.1351e-04,  ..., -3.8803e-04,
           1.4542e-03, -2.2108e-03],
         [-4.9174e-07, -8.0407e-05,  6.0692e-05,  ..., -3.7193e-04,
           9.7570e-04, -1.4924e-03],
         [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00]]])

### SE-attention mechanism
最后将要压缩频谱特征，因此在时间特征上加注意力机制

In [21]:
input_tensor = torch.stack([result.real,result.imag],dim = 1)
input_tensor.shape

torch.Size([1, 2, 1025, 1025])

### input shape = [2, 1025, 586], which means [C, N, T]
N : frequencies 
T : #frames

try to attach a se-attentive module

In [19]:
input = input_tensor
input.shape

torch.Size([1, 2, 1025, 1025])

In [20]:
sequeeze = torch.nn.AdaptiveAvgPool1d(output_size=1)
input_se = sequeeze(input)
input_se.shape

RuntimeError: Expected 2 to 3 dimensions, but got 4-dimensional tensor for argument #1 'self' (while checking arguments for adaptive_avg_pool1d)

In [24]:
input_se_real = input_se[0]
input_se_imag = input_se[1]
input_se_real = torch.squeeze(input_se_real,dim=-1)
input_se_real.shape

torch.Size([1025])

In [28]:
extract = nn.Sequential(
            nn.Linear(1025,2048),
            nn.ReLU(),
            nn.Linear(2048,1025),
            nn.Sigmoid(),
        )
s = extract(input_se_real)
s.shape

torch.Size([1025])

In [27]:
input_real = input[0]
input_real.shape

torch.Size([1025, 586])

In [37]:
s = torch.unsqueeze(s,dim = -1)
s.shape

torch.Size([1025, 1])

In [38]:
input_real_enhance = input_real * s
input_real_enhance.shape

torch.Size([1025, 586])

In [22]:
class SE_module(nn.Module):
    def __init__(self, in_channel:int = 1025, k:int = 2048):
        super(SE_module,self).__init__()
        self.in_channel = in_channel
        self.k = k
        self.sequeeze = torch.nn.AdaptiveAvgPool1d(output_size=1)
        self.extract = nn.Sequential(
            nn.Linear(self.in_channel,self.k),
            nn.ReLU(),
            nn.Linear(self.k,self.in_channel),
            nn.Sigmoid(),
        )

    def forward(self,x):
        '''
        x:shape: (batch_size,C, N, T)
        '''
        
        u_r = self.sequeeze(x[:,0,:,:])    # b,1025,1
        u_i = self.sequeeze(x[:,1,:,:])
        u_r = torch.squeeze(input=u_r, dim=-1)  # b,1025
        u_i = torch.squeeze(input=u_i, dim=-1)
        a_r = torch.unsqueeze(self.extract(u_r),dim = -1) # b,1025,1
        a_i = torch.unsqueeze(self.extract(u_i),dim = -1)

        x_r_enhance = x[:,0,:,:] * a_r
        x_i_enhance = x[:,1,:,:] * a_i

        output = torch.stack([x_r_enhance,x_i_enhance],dim=1)
        return output

In [23]:
se = SE_module()
dummy_y = se.forward(torch.randn(1, 2, 1025, 586))
dummy_y.shape

torch.Size([1, 2, 1025, 586])

### Shrink module
input： [B,2,N,T]
output: [B,V]

In [50]:
shrink_axis = 586
a = nn.Conv2d(in_channels=2,out_channels=1,kernel_size=(1,shrink_axis//3))(dummy_y)
b = nn.Conv2d(in_channels=1,out_channels=1,kernel_size=(1,shrink_axis//2))(a)
c = nn.Conv2d(in_channels=1,out_channels=1,kernel_size=(1,b.shape[-1]))(b)
c.shape

torch.Size([1, 1, 1025, 1])

In [53]:
class Shrink(nn.Module):
    def __init__(self,shrink_size:int = 586):
        super(Shrink, self).__init__()
        self.shrink_size = shrink_size
        self.inner_net = nn.Sequential(
            nn.Conv2d(in_channels=2,out_channels=1,kernel_size=(1,shrink_axis//3)),
            nn.ReLU(),
            nn.Conv2d(in_channels=1,out_channels=1,kernel_size=(1,shrink_axis//2)),
            nn.ReLU(),
            nn.Conv2d(in_channels=1,out_channels=1,kernel_size=(1,b.shape[-1]))
        )

    def forward(self,x):
        return torch.squeeze(self.inner_net(x))


In [54]:
shrink = Shrink(shrink_size=586)
y = shrink(dummy_y)
y.shape

torch.Size([1025])