In [None]:
!git clone https://github.com/HSETEAMSPB/BotDeterminant/

In [None]:
!apt-get update && apt-get install -y libsndfile1 ffmpeg
!pip install Cython
!pip install nemo_toolkit['all']

In [3]:
import os
import subprocess
import nemo
import torch
import nltk
import nemo.collections.asr as nemo_asr


nltk.download('wordnet')
nltk.download('punkt')

asr_model = nemo_asr.models.EncDecRNNTBPEModel.from_pretrained(model_name="stt_en_contextnet_512")

vad_model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
                                  model='silero_vad',
                                  force_reload=True,
                                  onnx=False)
(get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils

if not os.path.exists("checkpoints"):
    subprocess.run(["wget", "https://pjreddie.com/media/files/yolov3.weights"])
    subprocess.run(["mkdir", "checkpoints"])

In [None]:
!apt-get install opus-tools
!pip install aiogram
!pip install soundfile
!pip install nest-asyncio
!pip install -q torchaudio
!pip install pydub
!pip install webrtcvad
!pip install -U denoiser
!python -m spacy download en_core_web_sm

In [5]:
from BotDeterminant.prsr.config import *
from BotDeterminant.prsr.parsing import prs
from BotDeterminant.cv.predict import cv_recognize
from nemo.core.classes.modelPT import path

from aiogram import types, executor, Dispatcher, Bot
from aiogram.types import ContentType, File, Message, ParseMode, ReplyKeyboardRemove, \
    ReplyKeyboardMarkup, KeyboardButton, \
    InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.utils.markdown import text, bold, italic, code, pre
from aiogram.dispatcher.filters import Text

import os
import random
import subprocess
from pathlib import Path
import soundfile as sf
import nest_asyncio
import pydub
import torch
import torchaudio
from nltk.stem import WordNetLemmatizer

from denoiser import pretrained
from denoiser.dsp import convert_audio

from IPython.display import Audio
from pprint import pprint

torch.set_num_threads(1)
nest_asyncio.apply()

In [None]:
lemmatizer = WordNetLemmatizer()

TOKEN = "6039927111:AAF3VMIlflb7Gqz-DYrfPzAOMfq0HE6ZpEM"
bot = Bot(token=TOKEN)
dp = Dispatcher(bot)

use_vad = False
use_denoising = False
vad_hardness = 0.5

log = ""

def asr_module(file: str, vad: bool, denoise: bool, id) -> str:
    """
    Automatic Speech Recognition module
    file: path to .ogg voice message file
    vad: responsible for using voice activity detector
    rigidity [0.3, 0.7]: determines the stiffness of vad
    denoise: responsible for noise reduction tool

    Returns: transcription of audio
    """

    def convert() -> str:
        """
        Decodes the opus file to .wav
        Returns: decoded file name
        """
        subprocess.run(["opusdec", file, f"decoded_voice{id}.wav"])
        sound = pydub.AudioSegment.from_file(f"decoded_voice{id}.wav")
        sound = sound.set_frame_rate(16000)
        export_name = f"{os.path.splitext(file)[0]}.wav"
        sound.export(out_f = export_name, format = "wav")

        return export_name

    def vad(file: str):
        """
        Voice Activity Detection
        file: path to input .wav file
        """
        wav = read_audio(file, sampling_rate=16000)
        # get speech timestamps from full audio file
        speech_timestamps = get_speech_timestamps(wav, vad_model, sampling_rate=16000, threshold=vad_hardness)
        save_audio(file, collect_chunks(speech_timestamps, wav), sampling_rate=16000)

    def denoise(file: str) -> str:
        """
        audio denoiser (facebook research)
        file: path to input .wav file
        Returns: path to output .wav file with denoised audio
        """
        model = pretrained.dns64().cuda()
        wav, sr = torchaudio.load(file)
        wav = convert_audio(wav.cuda(), sr, model.sample_rate, model.chin)
        with torch.no_grad():
            denoised = model(wav[None])[0]
        torchaudio.save(file, denoised.data.cpu(), 16000)

    encoded = [convert()]

    if use_denoising:
        denoise(encoded)
    if use_vad:
        speech = vad(encoded)

    out = asr_model.transcribe(paths2audio_files=encoded)[0][0]
    return out



# menu keyboard
kb = [
        [types.KeyboardButton(text="vad hardness")],
        [types.KeyboardButton(text="noise reduction")],
        [types.KeyboardButton(text="log info")]
    ]
keyboard = types.ReplyKeyboardMarkup(resize_keyboard=True, keyboard=kb)

@dp.message_handler(commands=['start'])
async def begin(message: types.Message):
    await bot.send_message(message.chat.id, text("Wassup! Welcome to picture and voice recognition bot.",
                                            "To take advantage of me send an image first, then a voice message",
                                            "\n\nYou have the following options:\n",
                                            "[<b>noise reduction</b>] - you can connect the removal of background noise when recognizing audio\n",
                                            "[<b>vad hardness</b>] - you can set a threshold for voice activity detector"),
                                            reply_markup=keyboard, parse_mode=types.ParseMode.HTML)

@dp.message_handler(Text("log info"))
async def log_info(message: types.Message):
    await message.answer(log, reply_markup=keyboard)

@dp.message_handler(Text("noise reduction"))
async def vad_hardness(message: types.Message):
    kb = [
        [types.KeyboardButton(text="enable")],
        [types.KeyboardButton(text="disable")],
    ]
    keyboard = types.ReplyKeyboardMarkup(resize_keyboard=True, keyboard=kb)
    await message.answer("select mode:",  reply_markup=keyboard)

@dp.message_handler(Text("vad hardness"))
async def vad_hardness(message: types.Message):
    kb = [
        [types.KeyboardButton(text="low")],
        [types.KeyboardButton(text="medium")],
        [types.KeyboardButton(text="high")]
    ]
    keyboard = types.ReplyKeyboardMarkup(resize_keyboard=True, keyboard=kb)
    await message.answer("choose the degree of hardness", reply_markup=keyboard)

@dp.message_handler(lambda message: message.text in ["low", "medium", "high", "enable", "disable"])
async def vad_hardness_applying(message: types.Message):
    if message.text in ["low", "medium", "high"]:
        use_vad = True
        if message.text == "low":
            vad_hardness = 0.3
        elif message.text == "medium":
            vad_hardness = 0.5
        else:
            vad_hardness = 0.7
        await message.answer("perfect! you set a " + message.text + " vad mode", reply_markup=keyboard)
    else:
        if message.text == "enable":
            use_denoising = True
        elif message.text == "disable":
            use_denoising = False
        await message.answer(f"deoising is {message.text}d", reply_markup=keyboard)

@dp.message_handler(content_types=ContentType.PHOTO)
async def process_photo(message: types.Message):
    photos = message.photo
    photos = photos[-1]
    await photos.download(destination=f'pic{message.from_user.id}.jpg')
    await message.answer("now send a voice")

@dp.message_handler(content_types=[
    types.ContentType.VOICE,
    types.ContentType.AUDIO,
])
async def voice_message_handler(message: types.Message):
    if message.content_type == types.ContentType.VOICE:
        file_id = message.voice.file_id
    elif message.content_type == types.ContentType.AUDIO:
        file_id = message.audio.file_id
    else:
        await message.answer("Document format not supported :(")
        return

    file = await bot.get_file(file_id)
    file_path = file.file_path
    file_on_disk = Path("", f"voice{message.from_user.id}.ogg")
    await bot.download_file(file_path, destination=file_on_disk)
    if (os.path.exists(f"pic{message.from_user.id}.jpg")):
        global log
        log = ""
        transcription = asr_module(f"voice{message.from_user.id}.ogg", True, True, message.from_user.id)
        log += f"[asr]: {transcription}\n"
        classes = prs(transcription, class_names)

        cv_classes = ", ".join(classes)
        cv_classes = "[" + cv_classes + "]"
        cv_recognize(cv_classes, f"pic{message.from_user.id}.jpg")

        if not len(classes):
            log += f"[parser]: no recognitions\n"
            await message.answer("no classes are recognized, sorry :(", parse_mode=ParseMode.MARKDOWN)

        classes_to_str = ", ".join(classes)
        log += f"[parser]: {classes_to_str}\n"
        photo = open(f"detected_pic{message.from_user.id}.jpg", "rb")

        names = [f"pic{message.from_user.id}.jpg", f"voice{message.from_user.id}.ogg", f"voice{message.from_user.id}.wav", f"detected_pic{message.from_user.id}.jpg", f"decoded_voice{message.from_user.id}.wav"]

        for name in names:
            os.remove(name)

        await bot.send_photo(message.from_user.id, photo)
    else:
        await message.answer("send a photo to get started")

executor.start_polling(dp)