测试音频的tokenize, 将wav转换成向量, 后续和llava的向量进行拼接

In [2]:
import os
import sys
sys.path.append('/home/wly/szl_all_code/triper-project')

In [2]:
import torch
from transformers import WhisperFeatureExtractor
from speech_tokenizer.modeling_whisper import WhisperVQEncoder
from speech_tokenizer.utils import extract_speech_token

# 1. 设置模型路径和设备
tokenizer_path = "/sda1/glm-4-voice-tokenizer" # 或者您本地的路径
device = "cuda:0" 

# 2. 加载模型和特征提取器
whisper_model = WhisperVQEncoder.from_pretrained(tokenizer_path).eval().to(device)
feature_extractor = WhisperFeatureExtractor.from_pretrained(tokenizer_path)


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# 3. 准备音频文件
audio_paths = ["/home/wly/szl_all_code/triper-project/tests/介绍大语言模型.wav"] 

# 4. 提取音频 token
# 这个函数会处理所有必要的步骤：加载、重采样、特征提取、编码
audio_tokens_list = extract_speech_token(whisper_model, feature_extractor, audio_paths)

# 5. 查看结果
# audio_tokens_list 是一个列表，每个元素对应一个输入音频的 token 序列
audio_tokens = audio_tokens_list[0] 
print(f"音频文件 '{audio_paths[0]}' 被编码为了 {len(audio_tokens)} 个 token。")
print("部分 Token 示例:", audio_tokens[:10])

# 将 token 序列转换为字符串格式，以便输入到LLM
audio_tokens_str = "".join([f"<|audio_{x}|>" for x in audio_tokens])
audio_tokens_str = "<|begin_of_audio|>" + audio_tokens_str + "<|end_of_audio|>"
print("\n可用于LLM输入的字符串格式:")
print(audio_tokens_str)

音频文件 '/home/wly/szl_all_code/triper-project/tests/介绍大语言模型.wav' 被编码为了 58 个 token。
部分 Token 示例: [10815, 5966, 7767, 11760, 14770, 11760, 13229, 11760, 11760, 11760]

可用于LLM输入的字符串格式:
<|begin_of_audio|><|audio_10815|><|audio_5966|><|audio_7767|><|audio_11760|><|audio_14770|><|audio_11760|><|audio_13229|><|audio_11760|><|audio_11760|><|audio_11760|><|audio_11760|><|audio_11760|><|audio_11760|><|audio_11760|><|audio_11760|><|audio_11760|><|audio_11760|><|audio_15643|><|audio_14725|><|audio_2671|><|audio_14164|><|audio_8431|><|audio_13786|><|audio_12459|><|audio_10426|><|audio_4811|><|audio_5242|><|audio_14023|><|audio_1878|><|audio_5024|><|audio_7393|><|audio_16240|><|audio_12515|><|audio_9761|><|audio_8572|><|audio_5736|><|audio_15485|><|audio_12607|><|audio_14023|><|audio_3192|><|audio_14066|><|audio_3207|><|audio_5460|><|audio_4278|><|audio_13305|><|audio_10977|><|audio_12037|><|audio_13472|><|audio_5539|><|audio_1656|><|audio_4898|><|audio_9374|><|audio_15513|><|audio_15513|><|audio_3616

In [None]:
import torch
from transformers import WhisperFeatureExtractor
from speech_tokenizer.modeling_whisper import WhisperVQEncoder
import torchaudio

def extract_audio_features_continuous(model, feature_extractor, audio_path, return_attention_mask=True):
    """提取音频的连续特征向量"""
    
    # 1. 加载音频
    audio, sample_rate = torchaudio.load(audio_path)
    audio = audio.cuda()
    
    # 2. 重采样到16kHz
    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(
            orig_freq=sample_rate, new_freq=16000
        ).to('cuda')
        audio = resampler(audio)
    
    # 3. 转为单声道
    if audio.shape[0] > 1:
        audio = audio.mean(dim=0, keepdim=True)
    audio = audio[0].cpu().numpy()
    
    # 4. 提取mel特征
    features = feature_extractor(
        audio, 
        sampling_rate=16000,
        return_attention_mask=return_attention_mask, 
        return_tensors="pt"
    )
    features = features.to(device="cuda")
    
    # 5. 通过编码器获取连续特征
    with torch.no_grad():
        # 设置quantized_token_ids=None以获取连续特征
        outputs = model(
            input_features=features.input_features,
            attention_mask=features.attention_mask,
            quantized_token_ids=None  # 关键：不传入量化的token
        )
        
        # 获取连续的隐藏状态
        continuous_features = outputs.last_hidden_state  # [batch, seq_len, hidden_dim]
        
        if return_attention_mask:
            # 计算attention_mask（因为音频可能有padding）
            attention_mask = features.attention_mask
            # 根据模型的stride调整attention_mask
            stride = model.conv1.stride[0] * model.conv2.stride[0]
            attention_mask = attention_mask[:, ::stride]
            
            # 如果有pooling，进一步调整
            if hasattr(model.config, 'pooling_kernel_size') and model.config.pooling_kernel_size:
                attention_mask = attention_mask[:, ::model.config.pooling_kernel_size]
            
            return continuous_features, attention_mask
        else:
            return continuous_features

# 使用示例
tokenizer_path = "/sda1/glm-4-voice-tokenizer"
device = "cuda:0"

whisper_model = WhisperVQEncoder.from_pretrained(tokenizer_path).eval().to(device)
feature_extractor = WhisperFeatureExtractor.from_pretrained(tokenizer_path)

audio_path = "/home/wly/szl_all_code/triper-project/tests/audio.wav"

# 获取连续特征向量
audio_features, attention_mask = extract_audio_features_continuous(
    whisper_model, feature_extractor, audio_path
)

print(f"音频连续特征形状: {audio_features.shape}")  # [1, seq_len, hidden_dim]
print(f"注意力掩码形状: {attention_mask.shape}")
print(f"特征维度: {audio_features.shape[-1]}")

音频连续特征形状: torch.Size([1, 375, 1280])
注意力掩码形状: torch.Size([1, 375])
特征维度: 1280


In [None]:
import torch.nn as nn

class AudioCompressor(nn.Module):
    """可学习的音频特征压缩器"""
    
    def __init__(self, input_dim, output_seq_len, hidden_dim=None):
        super().__init__()
        self.output_seq_len = output_seq_len
        hidden_dim = hidden_dim or input_dim
        
        # 注意力池化
        self.attention_pool = nn.MultiheadAttention(
            embed_dim=input_dim,
            num_heads=8,
            batch_first=True
        )
        
        # 可学习的查询向量
        self.queries = nn.Parameter(torch.randn(output_seq_len, input_dim))
        
    def forward(self, x):
        """
        Args:
            x: [batch, seq_len, input_dim]
        Returns:
            compressed: [batch, output_seq_len, input_dim]
        """
        batch_size = x.shape[0]
        
        # 扩展查询向量到batch维度
        queries = self.queries.unsqueeze(0).expand(batch_size, -1, -1)  # [batch, output_seq_len, input_dim]
        compressed, _ = self.attention_pool(queries, x, x)
        
        return compressed

# 使用示例
compressor = AudioCompressor(
    input_dim=1280, 
    output_seq_len=64  # 压缩到64个token
).cuda()

audio_features_compressed = compressor(audio_features)
print(f"压缩后音频特征: {audio_features_compressed.shape}")  # [1, 64, 1280]

压缩后音频特征: torch.Size([1, 64, 1280])


In [7]:
# MLP 投影层, 模仿LLaVA设计
class AudioMLP(nn.Module):
    """音频特征的MLP投影层"""
    
    def __init__(self, input_dim=1280, hidden_dim=2048, output_dim=5120):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.projector = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim, output_dim)
        )
        
        self.layer_norm = nn.LayerNorm(output_dim)
        
        self.init_weights()
        
    def init_weights(self):
        """初始化权重"""
        for module in self.modules():
            if isinstance(module, nn.Linear):
                # 使用Xavier初始化
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.zeros_(module.bias)
            elif isinstance(module, nn.LayerNorm):
                nn.init.ones_(module.weight)
                nn.init.zeros_(module.bias)
                
    def forward(self, x):
        """
        Args:
            x: [batch, seq_len, input_dim]
        Returns:
            projected: [batch, seq_len, output_dim]
        """
        projected = self.projector(x)
        projected = self.layer_norm(projected)
        return projected

In [9]:
mlp_projector = AudioMLP(
    input_dim=1280,  # 输入维度与Whisper的输出一致
    hidden_dim=2048,  # 隐藏层维度
    output_dim=5120  # 最终输出维度
).to(device)
projected_features = mlp_projector(audio_features_compressed)
print(f"MLP投影后的音频特征: {projected_features.shape}")

MLP投影后的音频特征: torch.Size([1, 64, 5120])
