# Agent 0 - Whisper + Llama3 (Ollama)

In [106]:
# imports 
import re
import io
import json
import time
import whisper
import requests
import webrtcvad
import numpy as np
import sounddevice as sd
from IPython.display import Audio
import scipy.io.wavfile as wavfile

## Audio

In [9]:
# audio configuration
_DURATION = 5 
_SAMPLE_RATE = 16000 # desired sample rate for models such as Whisper
_CHANNELS = 1

### Getting audio from the microphone

In [63]:
def take_audio(duration:int=_DURATION, samprate:int=_SAMPLE_RATE, chns:int=_CHANNELS, _verbose:bool=True) -> np.int16:
    if _verbose:
        print(f'Recording the microphone audio for {duration} seconds')
    audio = sd.rec(int(duration * samprate), samplerate=samprate, channels=chns, dtype='int16')
    sd.wait()
    if _verbose:
        print('Recording finished')
    return audio

In [11]:
def play_audio(_audio:np.int16, samprate:int=_SAMPLE_RATE) -> None:
    print('Playing the audio')
    sd.play(_audio, samprate)
    sd.wait()
    print('Audio finished')

The function below isn't work (detecting voice any time)

In [88]:
def automatic_audio_cap(samprate:int=_SAMPLE_RATE, aggress:int=3, silence_timeout:float=1.0, frame_duration_ms:int=30) -> np.float32:
    vad = webrtcvad.Vad()
    vad.set_mode(aggress)

    frame_size = int(samprate * frame_duration_ms / 1000)
    _buffer = []
    silent_chunks = 0
    max_silent_chunks = int(silence_timeout * 1000 / frame_duration_ms)
    _recording = False

    with sd.InputStream(samplerate=samprate, channels=1, dtype='int16') as stream:
        while True:
            audio_chunk, _ = stream.read(frame_size)
            audio_bytes = audio_chunk.tobytes()

            is_speech = vad.is_speech(audio_bytes, samprate)

            if not _recording:
                if is_speech:
                    print('Voice detected. Starting recording')
                    _recording = True
                    _buffer.append(audio_chunk)
            else:
                _buffer.append(audio_chunk)
                if not is_speech:
                    silent_chunks += 1
                else:
                    silent_chunks = 0
                if silent_chunks > max_silent_chunks:
                    print('Stopping recording ...')
                    break
    _audio = np.concatenate(_buffer, axis=0)
    _audio = _audio.astype(np.float32) / np.iinfo(np.int16).max
    _audio = np.squeeze(_audio)

    return _audio

#### Testing microphone

In [35]:
_audio_test = take_audio()
play_audio(_audio_test)

Recording the microphone audio for 5 seconds
Recording finished
Playing the audio
Audio finished


### Store audio in memory

In [12]:
def audio_buffer(_audio:np.int16, samprate:int=_SAMPLE_RATE):
    _buffer = io.BytesIO()
    wavfile.write(_buffer, samprate, _audio)
    _buffer.seek(0)
    return _buffer

#### Testing audio and buffer

In [None]:
_buf = audio_buffer(_audio_test)
_buf.seek(0)
Audio(_buf.read(), rate=_SAMPLE_RATE)

## Models

In [13]:
def transcription_audio(audio, model) -> str:
    print('Getting the audio text ...')
    result = model.transcribe(audio, fp16=False)
    return result['text']

#### [Whisper (OpenAI)](https://github.com/openai/whisper/blob/main/README.md)

Use to get the audio transcription

In [94]:
_model_size = 'small'

In [95]:
_model = whisper.load_model(_model_size)

100%|███████████████████████████████████████| 461M/461M [00:42<00:00, 11.4MiB/s]


In [16]:
# whisper expects float32
def _audio_to_model(_audio):
    _model_audio_in = _audio.astype(np.float32) / np.iinfo(_audio.dtype).max
    return np.squeeze(_model_audio_in)

In [56]:
_model_audio = _audio_to_model(_audio_test)

In [85]:
lol = automatic_audio_cap()
res = _model.transcribe(lol, fp16=False)
print(res['text'])

Voice detected. Starting recording
Stopping recording ...



In [58]:
# transcribe
_result = _model.transcribe(_model_audio, fp16=False)

In [None]:
print(_result['text'])

In [96]:
def transcribe_audio(model, _audio, language:str='pt') -> str:
    print(' Transcribing  audio ...')
    _audio = whisper.pad_or_trim(_audio)
    _mel_spec = whisper.log_mel_spectrogram(_audio).to(model.device)
    _, lang_prob = model.detect_language(_mel_spec)
    _options = whisper.DecodingOptions( language='pt',
                                        fp16=False,
                                      )
    _text = whisper.decode(model, _mel_spec, _options)
    return _text.text.strip().upper()

In [117]:
def searching_for_keyword(model, _keyword:str='JORGE', _timeout:int=3) -> bool:
    _audio = take_audio(_timeout, _verbose=True)
    _audio = _audio_to_model(_audio)
    _text = transcribe_audio(model, _audio)
    _text =  re.sub(r'[^A-Za-zÀ-ÖØ-öø-ÿ ]+', '', _text).strip()
    for t in _text.split():
        if t == _keyword.upper():
            return True
    return False

#### Ollama

In [55]:
_OLLAMA_MODEL = 'llama3'
_OLLAMA_URL = 'http://localhost:11434/api/generate'
_LLM_MEM = []
_MAX_MEM_TURNS = 5

In [56]:
def mem_lim():
    global _LLM_MEM
    max_items = _MAX_MEM_TURNS * 2 # User message + assistant message
    _LLM_MEM = _LLM_MEM[-max_items:]

In [57]:
def json2text(_json) -> str:
    lines = []
    for l in _json:
        role = l['role'].capitalize()
        content = l['content']
        lines.append(f'{role} : {content}')
    return '\n'.join(lines)

In [62]:
def question_ollama_with_mem(message:str, model:str=_OLLAMA_MODEL, model_url:str=_OLLAMA_URL, _verbose:bool=True) -> str:

    global _LLM_MEM
    _LLM_MEM.append({'role' : 'user', 'content' : message})

    _full_message = f'{json2text(_LLM_MEM)}\nAssistant:'

    if _verbose: 
        print(f'Sending message to Ollama model {model} ...')
        print(message)
        
    _payload = {
        'model' : model,
        'prompt' : _full_message,
        'stream' : False
    } 

    _time = time.time()

    try:
        # Problem with timeout - by increasing the history we increase the model's response time
        # implement retry or adaptive timeout 
        response = requests.post(model_url, json=_payload, timeout=60)
        _time_slapsed = time.time() - _time
        if _verbose:
            print(f'Response received in {_time_slapsed:.2f} seconds')

        if response.status_code == 200:
            result = response.json()
            answer = result.get('response', '').strip()

            _LLM_MEM.append({'role' : 'assistant', 'content' : answer})

            mem_lim()

            return answer
        else:
            raise Exception(f'Error - {response.status_code}')
        
    except requests.exceptions.Timeout:
        raise Exception("Request timed out : (")

In [59]:
def question_ollama(prompt, model=_OLLAMA_MODEL, temp:float=0.7) -> None:

    _headers = {'Content-Type' : 'application/json'}
    
    print(f'Sending message to Ollama model {model} ...')
    _payload = {
        'model' : model,
        'prompt' : prompt,
        'stream' : True,
        'temperature' : temp
    } 

    response = requests.post(_OLLAMA_URL, headers=_headers, data=json.dumps(_payload), stream=True)
    
    print(f'{model.upper()} answer:')
    for line in response.iter_lines():
        if line:
            _data = json.loads(line.decode('utf-8'))
            if 'done' in _data and _data['done']:
                break
            print(_data.get('response', ''), end='', flush=True)

In [60]:
def sim_streaming_text(_text:str, _delay:float=0.01) -> None:
    for char in _text:
        print(char, end='', flush=True)
        time.sleep(_delay)
    print()

In [61]:
def llama3_conversation(model) -> None:
    # get audio
    _audio = take_audio()
    # fix data
    _audio_fixed = _audio_to_model(_audio)
    # transcription
    _human_text = transcription_audio(_audio_fixed, model)
    # send text
    # _answer = question_ollama(_human_text)
    _answer = question_ollama_with_mem(_human_text)
    # get answer
    sim_streaming_text(_answer)

In [None]:
def loop_test_conversation_with_command_voice(model):

    # reset history
    global _LLM
    _LLM_MEM = []

    while True:
       if searching_for_keyword(model):
            # get audio
            _audio = take_audio()
            # fix data
            _audio_fixed = _audio_to_model(_audio)
            # transcription
            _human_text = transcription_audio(_audio_fixed, model)
            # send text
            # _answer = question_ollama(_human_text)
            _answer = question_ollama_with_mem(_human_text)
            # get answer
            sim_streaming_text(_answer)
    

In [None]:
llama3_conversation(_model)