<a href="https://colab.research.google.com/github/0x0checo/NLP/blob/main/SpeechBrain_Voice_Assistant.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
清洁指令语音识别系统
使用 SpeechBrain 实现语音识别，专门用于解析清洁机器人指令
"""

In [None]:
!pip install torch torchaudio speechbrain

In [None]:
import torch
# 导入pytorch音频库
import torchaudio
import speechbrain as sb
# 导入用于加载预训练的自动语音识别（ASR）模型的库
from speechbrain.pretrained import EncoderDecoderASR
import re
from typing import Dict, List, Tuple, Optional
# 导入日志库，用于记录系统运行信息和错误
import logging
import os

In [None]:
# 配置日志,设置日志级别为 INFO，
# 意味着会记录 INFO 及以上级别（WARNING、ERROR）的消息
logging.basicConfig(level=logging.INFO)
# 创建一个日志记录器，__name__ 是当前模块的名称
#（对于主脚本，通常是 "__main__"），用于区分日志来源
logger = logging.getLogger(__name__)

In [None]:
class CleaningCommandParser:
    """清洁指令解析器"""

    def __init__(self):
        self.cleaning_actions = {
            '清洁': ['clean', 'cleaning', '清洁', '打扫', '清理'],
            '扫地': ['sweep', 'sweeping', '扫地', '扫', '清扫'],
            '拖地': ['mop', 'mopping', '拖地', '拖', '擦地'],
            '吸尘': ['vacuum', 'vacuuming', '吸尘', '吸', '除尘'],
            '擦拭': ['wipe', 'wiping', '擦拭', '擦', '清洁'],
            '消毒': ['disinfect', 'sanitize', '消毒', '杀菌', '清毒']
        }

        self.room_locations = {
            '客厅': ['living room', 'lounge', '客厅', '大厅', '起居室'],
            '卧室': ['bedroom', 'bed room', '卧室', '睡房', '寝室'],
            '厨房': ['kitchen', '厨房', '灶间'],
            '浴室': ['bathroom', 'washroom', '浴室', '洗手间', '卫生间'],
            '餐厅': ['dining room', '餐厅', '饭厅'],
            '走廊': ['hallway', 'corridor', '走廊', '过道'],
            '阳台': ['balcony', '阳台', '露台'],
            '书房': ['study', 'office', '书房', '办公室']
        }

        self.priority_levels = {
            '立即': ['immediately', 'now', 'urgent', '立即', '马上', '现在'],
            '高优先级': ['high priority', 'important', '高优先级', '重要', '优先'],
            '普通': ['normal', 'regular', '普通', '一般', '常规'],
            '低优先级': ['low priority', 'later', '低优先级', '稍后', '延后']
        }

In [None]:
class CleaningSpeechRecognizer:
    """清洁语音识别主类"""

    def __init__(self, model_name = "speechbrain/asr-wav2vec2-commonvoice-14-en"):
        self.model_name = model_name
        # 初始化语音识别模型为 None，后续由 load_model 方法加载
        self.asr_model = None
        self.parser = CleaningCommandParser()
        # 调用 load_model 方法加载预训练模型
        self.load_model()

    def load_model(self):
        """加载预训练的语音识别模型"""
        try:
            logger.info(f'正在加载语音识别模型：{self.model_name}')
            self.asr_model = EncoderDecoderASR.from_hparams(
                source=self.model_name,
                # 指定模型缓存目录，基于模型名称的最后部分
                savedir=f"pretrained_models/{self.model_name.split('/')[-1]}"
            )
            logger.info('模型加载成功')
        except Exception as e:
            logger.error(f'模型加载失败：{e}')

    def prepocess_audio(self, audio_path):
        """
        预处理音频文件

        waveform：一个 PyTorch 张量，表示音频的波形数据。
        通常形状为 (C, T)，其中：
            C 是声道数（单声道为 1，立体声为 2，等等）。
            T 是时间维度（样本数）。

        sample_rate：音频的采样率（整数，例如 44100 Hz 表示每秒 44100 个样本）
        """

        try:
            # 加载音频文件
            waveform, sample_rate = torchaudio.load(audio_path)

            # 转换为单声道
            if waveform.shape[0] > 1:
                waveform = torch.mean(waveform, dim=0, keepdim=True)

            # 将音频的采样率重采样到模型期望的采样率
            target_sr = 16000
            if sample_rate != target_sr:
                # 使用插值技术（如线性插值）调整音频的样本点，以匹配目标采样率
                resampler = torchaudio.transforms.Resample(sample_rate, target_sr)
                # 应用重采样，将 waveform 的时间维度调整到新的采样率
                waveform = resampler(waveform)

            # 对音频波形的幅度进行归一化，使其值范围在 [-1, 1] 之间.
            # 归一化确保音频的幅度在标准范围内，这对语音识别模型的稳定性很重要。
            # 不同的音频文件可能有不同的音量（幅度），归一化消除了这种差异。
            waveform = waveform / torch.max(torch.abs(waveform))

            # 使用 torch.squeeze 去除张量中所有维度为 1 的轴，返回一维张量
            return waveform.squeeze()

        except Exception as e:
            logger.error(f"音频预处理失败: {e}")
            raise e

    def recognize_speech(self, audio_input):
        """执行语音识别"""

        try:
            if isinstance(audio_input, str):
                # 如果输入的是文件路径，先预处理
                waveform = self.prepocess_audio(audio_input)

            else:
                # 如果输入的是张量，直接使用
                waveform = audio_input

            # 执行语音识别
            predicted_words, predictied_tokens = self.asr_model.transcribe_batch(
                # unsqueeze(0) 在第 0 维添加一个维度，将 (T,)
                # 变为 (1, T)，表示批量大小为 1 的批次。
                waveform.unsqueeze(0)
              )

            # 从 transcribe_batch 的输出中提取文本，并记录日志
            recognized_text = predicted_words[0] if predicted_words else ""
            logger.info(f'语音识别结果: {recognized_text}')

        except Exception as e:
            logger.error(f"语音识别失败: {e}")
            return ""

    def parse_cleaning_command(self, text):
        """解析清洁指令"""

        text = text.lower()

        command = {
            "original_text": text,
            "action": None,
            "location": None,
            "priority": "普通",
            "confidence": 0.0,
            "parameters": {}
        }

        # 解析清洁动作
        max_action_score = 0
        for action, keywords in self.parser.cleaning_actions.items():
            score = sum(1 for keyword in keywords if keyword.lower() in text)
            if score > max_action_score:
                max_action_score = score
                command['action'] = action

        # 解析位置
        max_location_score = 0
        for location, keywords in self.parser.room_locations.items():
            score = sum(1 for keyword in keywords if keyword.lower() in text)
            if score > max_location_score:
                max_location_score = score
                command["location"] = location

        # 解析优先级
        for priority, keywords in self.parser.priority_levels.items():
            if any(keyword.lower() in text for keyword in keywords):
                command['priority'] = priority
                break

        # 计算置信度
        total_matches = max_action_score + max_location_score
        word_count = len(text.split())
        command['confidence'] = min(total_matches / max(word_count, 1), 1.0) if word_count > 0 else 0.0

        # 提取其它参数
        command['parameters'] = self._extract_parameters(text)

        return command

    def _extract_parameters(self, text):
        """提取其它参数（时间，次数等）"""
        parameters = {}

        # 提取时间相关信息
        time_patterns = [
            (r'(\d+)\s*分钟', 'duration_minutes'),
            (r'(\d+)\s*小时', 'duration_hours'),
            (r'(\d+)\s*次', 'repeat_count')
        ]

        for pattern, param_name in time_patterns:
            match = re.search(pattern, text)
            if match:
                  parameters[param_name] = int(match.group(1))

        # 检查是否需要深度清洁
        if any(keyword in text for keyword in ['深度', '彻底', 'deep', 'thorough']):
            parameters['deep_clean'] = True

        return parameters

    def process_voice_command(self, audio_input):
        """ 处理语音指令的完整流程"""

        try:
            # 语音识别
            recognized_text = self.recognize_speech(audio_input)

            if not recognized_text:
                return{
                    'success': False,
                    'error': '语音识别失败',
                    'command': None
                }

            # 指令解析
            command = self.parse_cleaning_command(recognized_text)

            # 验证指令完整性
            is_valid = self._validate_command(command)

            return {
                "success": True,
                "recognized_text": recognized_text,
                "command": command,
                "is_valid": is_valid,
                "validation_message": self._get_validation_message(command, is_valid)
            }
        except Exception as e:
            logger.error(f"处理语音指令失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "command": None
            }

    def _validate_command(self, command):
        """验证指令是否完整有效"""

        return (
            command["action"] is not None and
            command["location"] is not None and
            command["confidence"] > 0.3
        )

    def _get_validation_message(self, command, is_valid):
        """获取验证消息"""

        if is_valid:
            return "指令有效"

        else:
            missing = []
            if not command["action"]:
                missing.append("清洁动作")
            if not command["location"]:
                missing.append("清洁位置")
            if command["confidence"] <= 0.3:
                missing.append("指令不够清晰")

            return f"指令不完整，缺少：{', '.join(missing)}"

In [None]:
def create_sample_audio():
    """创建实例音频文件"""

    import numpy as np

    # 创建一个测试音频信号
    sample_rate = 16000
    duration = 3
    frequency = 440

    t = np.linspace(0, duration, int(sample_rate * duration))
    audio_signal = 0.5 * np.sin(2 * np.pi * frequency * t)

    # 转换为torch张量
    audio_tensor = torch.tensor(audio_signal, dtype=torch.float32)

    return audio_tensor

In [None]:
def main():
    """主函数，演示系统使用"""
    try:
        # 初始化语音识别系统
        print("初始化清洁指令语音识别系统...")
        recognizer = CleaningSpeechRecognizer()

        # 模拟一些文本输入（在实际应用中会是语音识别结果）
        sample_commands = [
            "请在客厅进行吸尘清洁",
            "立即清洁浴室",
            "在卧室拖地，需要深度清洁",
            "扫地客厅和厨房",
            "消毒洗手间，重复3次"
        ]

        print("\n=== 文本指令解析测试 ===")
        for i, text in enumerate(sample_commands, 1):
            print(f"\n测试 {i}: {text}")
            command = recognizer.parse_cleaning_command(text)
            print(f"动作: {command['action']}")
            print(f"位置: {command['location']}")
            print(f"优先级: {command['priority']}")
            print(f"置信度: {command['confidence']:.2f}")
            print(f"参数: {command['parameters']}")
            print(f"有效性: {'有效' if recognizer._validate_command(command) else '无效'}")

        print("\n=== 语音识别系统已就绪 ===")
        print("系统功能：")
        print("1. 支持多种清洁动作：清洁、扫地、拖地、吸尘、擦拭、消毒")
        print("2. 识别房间位置：客厅、卧室、厨房、浴室等")
        print("3. 解析优先级：立即、高优先级、普通、低优先级")
        print("4. 提取参数：时间、次数、深度清洁等")

        # 注意：实际的语音文件测试需要真实的音频文件
        print("\n要测试真实语音文件，请调用：")
        print("result = recognizer.process_voice_command('path/to/audio.wav')")

    except Exception as e:
        print(f"系统初始化失败: {e}")
        print("请确保已安装 speechbrain 和相关依赖")

if __name__ == "__main__":
    main()