In [1]:
import io
import IPython.display as ipd
import grpc
import os, sys, contextlib
import riva.client
from txtai.embeddings import Embeddings
from txtai.pipeline import Extractor
from ast import literal_eval
from nemollm.api import NemoLLM
from riva.client import RecognitionConfig, StreamingRecognitionConfig, AudioEncoding
import riva.client.audio_io
from riva.client.audio_io import MicrophoneStream
from typing import Iterable
import riva.client.proto.riva_asr_pb2 as rasr
from dotenv import load_dotenv

In [2]:
load_dotenv()

URI_TTS = os.get_env('URI_TTS')
URI_ASR = os.get_env('URI_ASR')
AVATAR_INSTANCE_PATH = os.get_env('AVATAR_INSTANCE_PATH')
OPENAI_API_KEY = os.get_env('OPENAI_API_KEY')

In [3]:
riva.client.audio_io.list_input_devices()

Input audio devices:
0: Microsoft Sound Mapper - Input
1: Remote Audio
4: Primary Sound Capture Driver
5: Remote Audio
9: Remote Audio


In [4]:
# Change to your input device here
ASR_INPUT_DEVICE = 1

In [5]:
# Audio to Speech Module utilizing the Riva SDK
config = riva.client.StreamingRecognitionConfig(
    config=riva.client.RecognitionConfig(
        encoding=riva.client.AudioEncoding.LINEAR_PCM,
        language_code='en-US',
        max_alternatives=1,
        profanity_filter=False,
        enable_automatic_punctuation=True,
        verbatim_transcripts=True,
        sample_rate_hertz=16000
    ),
    interim_results=False,
)


class ASRService:
    def __init__(self):
        """
        """
        self.auth = riva.client.Auth(uri=URI_ASR)
        self.service = riva.client.ASRService(self.auth)
        self.sample_rate_hz = 16000
        self.file_streaming_chunk = 1600
        self.transcript = ""
        self.default_device_info = riva.client.audio_io.get_default_input_device_info()
        self.default_device_index = None if self.default_device_info is None else self.default_device_info['index']

    def run(self) -> None:
        """
        :return: None
        """
        print("ASR service running")
        with riva.client.audio_io.MicrophoneStream(
                rate=self.sample_rate_hz,
                chunk=self.file_streaming_chunk,
                device=ASR_INPUT_DEVICE,
        ) as audio_chunk_iterator:
            #print("mic working")
            self.print_response(responses=self.service.streaming_response_generator(
                audio_chunks=audio_chunk_iterator,
                streaming_config=config))

    def print_response(self, responses: Iterable[rasr.StreamingRecognizeResponse]) -> None:
        """
        :param responses: Streaming Response
        :return: None
        """
        self.transcript = ""
        for response in responses:
            if not response.results:
                continue

            for result in response.results:
                if not result.alternatives:
                    continue
                if result.is_final:
                    partial_transcript = result.alternatives[0].transcript
                    self.transcript += partial_transcript
                    # print(self.transcript)
                    return

In [6]:
uri = URI_TTS
auth = riva.client.Auth(uri=uri)

tts_service = riva.client.SpeechSynthesisService(auth)

sample_rate_hz = 44100
req = { 
        "language_code"  : "en-US",
        "encoding"       : riva.client.AudioEncoding.LINEAR_PCM ,   # Currently only LINEAR_PCM is supported
        "sample_rate_hz" : sample_rate_hz,                          # Generate 44.1KHz audio
        "voice_name"     : "English-US.Female-1"                    # The name of the voice to generate
}
nchannels = 1
sampwidth = 2

In [7]:
# speech to Audio2Face module utilizing the gRPC protocal from audio2face_streaming_utils
from audio2face_streaming_utils import push_audio_track
import riva.client
import io
from pydub import AudioSegment
from scipy.io.wavfile import read
import numpy as np


class Audio2FaceService:
    def __init__(self, sample_rate=44100):
        """
        :param sample_rate: sample rate
        """
        self.a2f_url = 'localhost:50051'   # Set it to the port of your local host 
        self.sample_rate = 44100
        self.avatar_instance = AVATAR_INSTANCE_PATH # Set it to the name of your Audio2Face Streaming Instance

    def tts_to_wav(self, tts_byte, framerate=22050) -> str:
        """
        :param tts_byte: tts data in byte
        :param framerate: framerate
        :return: wav byte
        """
        seg = AudioSegment.from_raw(io.BytesIO(tts_byte), sample_width=2, frame_rate=22050, channels=1)
        wavIO = io.BytesIO()
        seg.export(wavIO, format="wav")
        rate, wav = read(io.BytesIO(wavIO.getvalue()))
        return wav

    def wav_to_numpy_float32(self, wav_byte) -> float:
        """
        :param wav_byte: wav byte
        :return: float32
        """
        return wav_byte.astype(np.float32, order='C') / 32768.0

    def get_tts_numpy_audio(self, audio) -> float:
        """
        :param audio: audio from tts_to_wav
        :return: float32 of the audio
        """
        wav_byte = self.tts_to_wav(audio)
        return self.wav_to_numpy_float32(wav_byte)

    def make_avatar_speaks(self, audio) -> None:
        """
        :param audio: tts audio
        :return: None
        """
        push_audio_track(self.a2f_url, self.get_tts_numpy_audio(audio), self.sample_rate, self.avatar_instance)

In [9]:
import requests
from openai import OpenAI

client = OpenAI(api_key=OPENAI_API_KEY)

def api(input):
    system_prompt = f"""You are a helpful assistant."""
    completion = client.chat.completions.create(
      model="gpt-3.5-turbo-1106",
      # model = "gpt-4-1106-preview",
      messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": input},
      ],
      stream=True
    )

    # response = completion.choices[0].message.content.strip()
    return [completion]


extractor = api

#Ad-hoc questions
question = "Who are the leaders of the arena?"

#Ad-hoc questions
question = "Who are the leaders of the arena?"

print("----", question, "----")
completion = extractor(question)[0]
for chunk in completion:        
    output0 = chunk.choices[0].delta.content
    print(output0)

---- Who are the leaders of the arena? ----

The
 leaders
 of
 the
 arena
 may
 vary
 depending
 on
 the
 context
.
 Could
 you
 please
 specify
 which
 arena
 you
 are
 referring
 to
?
None


In [10]:
# function to filter out half sentences of the response of the LLM
def remove_half_seq(text) -> str:
    """
    :param text: a textual string
    :return: a textual string similar or shorter than the input
    """
    marks = [".", "?", "!",":",",",";"]
    if text.strip()[-1] in marks:
        return text
    len_1= []
    lens = []
    for mark in marks:
        splitted_text = text.strip().split(mark)
        if len(splitted_text)>1:
            len_mark= len(splitted_text[-1])
            lens.append(len_mark)
        elif len(splitted_text) == 1:
            len_1.append(1)
    if len(lens)>0:
        return text.strip()[:(len(text)-min(lens))]
    if len(len_1) == len(marks):
        return text
        
    

# function for tts modification for pronunciation:
def phonetic_modification(text) -> str:
    """
    :param text: a textual string
    :return: a textual in which some words are tagged with new phonetics
    """
    token_list = ["Johanna","Umeå", "AI","LLMs", "II", "VII", "VI", "åre" ]
    modified_phonetic = ["ˈjohana","ˈoomijo", "ˈeiˈai", "ˈellˈellˈemz", "sekˈend","ˈseven", "ˈsix", ""]
    for token, phon in zip(token_list,modified_phonetic) :
        customized_phoneme = '<phoneme ph='+'"'+ phon+ '"'+'>'+token+'</phoneme>'
        text = text.replace(token,customized_phoneme ) 
    return text

def splitting_text(text) -> list:
    """
    :param text: a textual string
    :return:  list of <speak> tag added substrings of the input text
    """
    ## splitting text considering .:
    output_chunks0 = text.split(".")
    # splitting text considering \n:
    output_chunks = []
    for opt_ch in output_chunks0:
        output_chunks.extend(opt_ch.split("\n"))
    # adding <speak> tags to the substrings
    output_ch_sk = []
    for chunk in output_chunks:
        if chunk.strip() =="":
            print("chunk", chunk)
        else:
            output_ch_sk.append(f'<speak>{chunk}</speak>')
    return output_ch_sk 

In [14]:
# Avatar + ASR block

# K.B model PLUS ASR block + tts modification for pronunciation
# Saving the ASR transcripts of the prompts
# Filtering out half sequences in the LLM responses
# resolving the issue of long input for tts block
# saving avatars answer (users prompts are in prompts.txt)

audio2face_service = Audio2FaceService()
asr_service = ASRService()

# writing the prompts in a file:
file_asr_transcripts = "./docs/prompts_transcripts.txt"
file_avatar_replies = "./docs/full_model_avatar_replies.txt"
    
with  open(file_asr_transcripts,"a")  as f1, open(file_avatar_replies,"a")  as f2:
    while True:
        print("Ask avatar")
        
        # speech recognition
        asr_service.run()
        transcript = asr_service.transcript
        print(transcript)
        print("Done transcribing")
        
        # inference block
        completion = extractor(transcript)[0]
        current_output = ''
        for chunk in completion:
            output0 = chunk.choices[0].delta.content
            if output0 is None:
                break
            current_output += output0
            if output0 in ['\n', '.', '?', '!']:
                # tts
                # modification for pronunciation:
                current_output = phonetic_modification(current_output)
                current_output = f'<speak>{current_output}</speak>' 

                audio = tts_service.synthesize(current_output, language_code="en-US", sample_rate_hz=sample_rate_hz,  voice_name="English-US.Male-1")
                audio_bytes = audio.audio
                audio2face_service.make_avatar_speaks(audio_bytes)
                current_output = ''


Ask avatar
ASR service running
I just bought a computer mouse, but now I cannot connect it to my laptop. 
Done transcribing
Sending audio data...
SUCCESS
Closed channel
Sending audio data...
SUCCESS
Closed channel
Sending audio data...
SUCCESS
Closed channel
Sending audio data...
SUCCESS
Closed channel
Sending audio data...
SUCCESS
Closed channel
Sending audio data...
SUCCESS
Closed channel
Ask avatar
ASR service running
Have to know if my mouth is turned on or not. 
Done transcribing
Sending audio data...
SUCCESS
Closed channel
Ask avatar
ASR service running
Have to know if my computer mouse is turned on or not. 
Done transcribing
Sending audio data...
SUCCESS
Closed channel
Sending audio data...
SUCCESS
Closed channel
Sending audio data...
SUCCESS
Closed channel
Ask avatar
ASR service running


KeyboardInterrupt: 