# Speech 2 Text

## Install and Imports

In [None]:
!pip install SpeechRecognition  gTTS pydub azure-cognitiveservices-speech google-cloud-speech python-dotenv

In [None]:
import contextlib
import glob
import io
import json
import os
import textwrap
import time
import wave
from datetime import datetime
from pathlib import Path
from urllib.request import urlopen

import boto3
import pandas as pd
import requests
import speech_recognition as sr
from dotenv import load_dotenv
from IPython.display import display

import azure.cognitiveservices.speech as speechsdk
from google.cloud import speech_v1 as speech
from google.cloud import storage
from gtts import gTTS
from pydub import AudioSegment

In [None]:
data_path = Path('/content/drive/My Drive/TCC_data/')
env_path = Path('/content/drive/My Drive/Colab Notebooks') / '.env'

load_dotenv(dotenv_path=env_path)

In [None]:
wrapper = textwrap.TextWrapper(width=80)

def wrap_print(text):
    for element in wrapper.wrap(text=text):
        print(element)

In [None]:
def get_audio_length(file_path):
    fname = str(file_path)

    with wave.open(fname,'r') as f:
        frames = f.getnframes()
        rate = f.getframerate()
        duration = frames / float(rate)
    
    return duration

In [None]:
def play_audio_file(file_path):
    with open(file_path, 'rb') as riff:
        audio = AudioSegment.from_file(riff)
    return audio

In [None]:
def translate_text_to_audio(text, lang='pt-br'):
    tts = gTTS(text, lang=lang)

    # Salva o arquivo de audio
    now = datetime.now()
    dt_string = now.strftime("%d-%m-%Y %H:%M:%S")      
    file_path = 'tts-{}.mp3'.format(dt_string)
    tts.save(file_path)
    print("Estou aprendendo o que você disse...")

    # Da play ao audio
    return play_audio_file(file_path)

## [Speech Recognizer](https://github.com/Uberi/speech_recognition)

In [None]:
api_info = {
    'api': {
        'website': 'https://cloud.google.com/dialogflow',
        'pricing': 'https://cloud.google.com/dialogflow/pricing'
    }, 
    'bing': {
        'website': 'https://azure.microsoft.com/pt-br/services/cognitive-services/speech-services/',
        'pricing': 'https://azure.microsoft.com/pt-br/pricing/details/cognitive-services/speech-services/'
    },
    'google': {
        'website': 'https://developers.google.com/web/updates/2013/01/Voice-Driven-Web-Apps-Introduction-to-the-Web-Speech-API',
        'pricing': 'free, very limited quota'
    },
    'gcloud': {
        'website': 'https://cloud.google.com/speech-to-text/',
        'pricing': 'https://cloud.google.com/speech-to-text/pricing'
    },
    'houndify': {
        'website': 'https://www.houndify.com/',
        'pricing': 'https://www.houndify.com/pricing, apenas ingles?'
    },
    'ibm': {
        'website': 'https://www.ibm.com/br-pt/cloud/watson-speech-to-text',
        'pricing':'https://www.ibm.com/br-pt/cloud/watson-speech-to-text/pricing'
    },
    'sphinx': {
        'website': 'https://cmusphinx.github.io/wiki/',
        'pricing': 'offline, apenas International French, Mandarin Chinese, Italian'
    },
    'wit': {
        'website': 'https://wit.ai/',
        'pricing': 'free, https://wit.ai/docs/http/20200513'
    }
}

In [None]:
recognizer = sr.Recognizer()

sr_apis = {
    # 'api': {
    #     'recognize': recognizer.recognize_api,
    #     'kwargs': {}
    # },
    # 'bing': {
    #     'recognize': recognizer.recognize_bing,
    #     'kwargs': {}
    # },
    'google': {
        'recognize': recognizer.recognize_google,
        'kwargs': {
            'language': 'pt-br'
        }
    },
    # 'gcloud': {
    #     'recognize': recognizer.recognize_google_cloud,
    #     'kwargs': {}
    # },   
    'houndify': {
        'recognize': recognizer.recognize_houndify,
        'kwargs': {
            'client_id': os.getenv('HOUNDIFY_CLIENT_ID'),
            'client_key': os.getenv('HOUNDIFY_CLIENT_KEY')
        }
    },
    # 'ibm': {
    #     'recognize': recognizer.recognize_ibm,
    #     'kwargs': {}
    # },
    # 'sphinx': {
    #     'recognize': recognizer.recognize_sphinx,
    #     'kwargs': {}
    # },
    # 'wit': {
    #     'recognize': recognizer.recognize_wit,
    #     'kwargs': {}
    # }
}

### Audio file to text

In [None]:
def from_audio_file(file_path, api=None, show_all=False):
    audio_file = sr.AudioFile(str(file_path))
    with audio_file as source: 
        recognizer.adjust_for_ambient_noise(source)
        audio = recognizer.record(source)
    recognize = sr_apis[api]['recognize']
    kwargs = sr_apis[api]['kwargs']
    text = recognize(
        audio_data=audio, show_all=show_all, **kwargs
    )
    return text

In [None]:
text = from_audio_file(data_path / 'teste.wav', api='google')
wrap_print(text)

### Microphone capture to text

In [None]:
def from_microphone(api=None, show_all=False):

    # Habilita o microfone para ouvir o usuario
    microphone = sr.Microphone()
    with microphone as source:
        # Chama a funcao de reducao de ruido disponivel na speech_recognition
        recognizer.adjust_for_ambient_noise(source)
        #A visa ao usuario que esta pronto para ouvir
        print("Diga alguma coisa: ")
        # Armazena a informacao de audio na variavel
        audio = microphone.listen(source)
        # audio = microfone.listen(source, phrase_time_limit=5)
        print("gravado")
        # Passa o audio para o reconhecedor de padroes do speech_recognition
    recognize = sr_apis[api]['recognize']
    kwargs = sr_apis[api]['kwargs']
    text = recognize(
        audio_data=audio, show_all=show_all, **kwargs
    )
    return text

In [None]:
# Doesnt work on colab
# from_microphone(api='google')

### Translate audio

In [None]:
def translate_audio_to_text(source=None, api=None, **kwargs):
    assert api in sr_apis, 'please specify an api'
    if source is None:
        return from_microphone(api, **kwargs)
    return from_audio_file(source, api, **kwargs)

## Use APIs

### Google 

In [None]:
def transcribe_audio_google(file_path):
    return from_audio_file(file_path, 'google')

### Wit.ai

In [None]:
def split_into_chunks(segment, length=20000/1001, split_on_silence=False, noise_threshold=-36): 
    chunks = list()
    
    if split_on_silence is False:
        for i in range(0, len(segment), int(length*1000)):
            chunks.append(segment[i:i+int(length*1000)])
    else:
        while len(chunks) < 1:
            chunks = pydub.silence.split_on_silence(segment, noise_threshold)
            noise_threshold += 4

    for i, chunk in enumerate(chunks):
        if len(chunk) > int(length*1000):
            subchunks = split_into_chunks(chunk, length, split_on_silence, noise_threshold+4)
            chunks = chunks[:i-1] + subchunks + chunks[i+1:]

    return chunks

def preprocess_audio(audio):
    return audio.set_sample_width(2).set_channels(1).set_frame_rate(8000)

def read_audio_into_chunks(file_path):
    audio = AudioSegment.from_file(file_path)
    return split_into_chunks(preprocess_audio(audio))

def transcribe_audio_wit(file_path):

    url = 'https://api.wit.ai/speech'

    key = os.getenv('WIT_KEY')

    # defining headers for HTTP request
    headers = {
        'authorization': 'Bearer ' + key,
        'content-type': 'audio/raw;encoding=signed-integer;bits=16;rate=8000;endian=little',
    }

    chunks = read_audio_into_chunks(file_path)

    text = []
    for audio in chunks:
        response = requests.post(
            url,
            headers=headers,
            data=io.BufferedReader(io.BytesIO(audio.raw_data))
        )

        #Get the text
        data = json.loads(response.content)
        if 'text' in data:
            text.append(data['text'])
    
    return ' '.join(text)

### Azure Cognitive Services

In [None]:
def transcribe_audio_azure(file_path):
    """performs one-shot speech recognition with input from an audio file"""
    
    speech_key = os.getenv('AZURE_SPEECH_KEY')
    service_region = os.getenv('AZURE_REGION')
    
    speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=service_region)
    audio_config = speechsdk.audio.AudioConfig(filename=file_path)
    # Creates a speech recognizer using a file as audio input, also specify the speech language
    speech_recognizer = speechsdk.SpeechRecognizer(
        speech_config=speech_config, language="pt-BR", audio_config=audio_config
    )

    done = False
    text = []

    def stop_cb(evt):
        nonlocal done
        done = True
    
    def recognized_cb(evt):
        if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
            # Do something with the recognized text
            text.append(evt.result.text)

    speech_recognizer.recognized.connect(recognized_cb)
    speech_recognizer.session_stopped.connect(stop_cb)

    # Start continuous speech recognition
    speech_recognizer.start_continuous_recognition()

    while not done:
        time.sleep(1)
    
    # Stop continuous speech recognition
    speech_recognizer.stop_continuous_recognition()

    return ' '.join(text)

### IBM Watson

In [None]:
def transcribe_audio_watson(file_path):
    headers = {
        'Content-Type': 'audio/wav',
    }

    params = (
        ('model', 'pt-BR_BroadbandModel'),
        ('max_alternatives', 1)
    )

    key = os.getenv('WATSON_KEY')
    url = os.getenv('WATSON_URL')

    with open(file_path, 'rb') as f:
        data = f.read()

    response = requests.post(
        url, 
        headers=headers, 
        params=params, 
        data=data, 
        auth=('apikey', key)
    )

    data = json.loads(response.content)
    
    text = []
    for alternatives in data['results']:
        for alternative in alternatives['alternatives']:
            text.append(alternative['transcript'])
    
    return ' '.join(text)

### AWS Transcribe

In [None]:
def upload_file_to_s3(bucket_name, file_path, file_name, region):
    s3_client = boto3.client('s3', region_name=region)
    
    location = {'LocationConstraint': region}
    
    with open(file_path, "rb") as f:
        s3_client.upload_fileobj(f, bucket_name, file_name)
    

def delete_file_from_s3(bucket_name, file_name, region):
    s3 = boto3.resource('s3', region_name=region)
    obj = s3.Object(bucket_name, file_name)
    obj.delete()


def transcribe_audio_aws(file_path):
    BUCKET_NAME = os.getenv('BUCKET_NAME')
    AWS_REGION = os.getenv('AWS_REGION')
    AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
    AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

    file_name = str(file_path).split('/')[-1]
    format = file_name.split('.')[-1]

    upload_file_to_s3(BUCKET_NAME, file_path, file_name, AWS_REGION)

    job_name = 'speech2text-{}-{}'.format(file_name, time.time())
    job_uri = 'https://s3.amazonaws.com/{}/{}'.format(BUCKET_NAME, file_name)

    transcribe = boto3.client(
        'transcribe', 
        aws_access_key_id=AWS_ACCESS_KEY_ID, 
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY, 
        region_name=AWS_REGION
    )

    transcribe.start_transcription_job(
        TranscriptionJobName=job_name, 
        Media={'MediaFileUri': job_uri}, 
        MediaFormat=format, 
        LanguageCode='pt-BR'
    )

    while True:
        status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
        if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
            break
        time.sleep(1)
    
    
    text = None
    if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
        response = urlopen(status['TranscriptionJob']['Transcript']['TranscriptFileUri'])
        data = json.loads(response.read())
        text = data['results']['transcripts'][0]['transcript']
    
    delete_file_from_s3(BUCKET_NAME, file_name, AWS_REGION)
    
    return text

### Google Cloud Speech-to-Text

In [None]:
storage_client = storage.Client()

In [None]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # bucket_name = "your-bucket-name"
    # source_file_name = "local/path/to/file"
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)


def delete_blob(bucket_name, blob_name):
    """Deletes a blob from the bucket."""
    # bucket_name = "your-bucket-name"
    # blob_name = "your-object-name"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.delete()


def transcribe_audio_gcloud(file_path):

    GOOGLE_APPLICATION_CREDENTIALS = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
    GOOGLE_BUCKET_NAME = os.getenv('GOOGLE_BUCKET_NAME')

    file_name = str(file_path).split('/')[-1]

    upload_blob(GOOGLE_BUCKET_NAME, file_path, file_name)

    storage_uri = "gs://{}/{}".format(GOOGLE_BUCKET_NAME, file_name)

    client = speech.SpeechClient.from_service_account_json(
        GOOGLE_APPLICATION_CREDENTIALS
    )
    
    config = speech.types.RecognitionConfig(
        encoding='LINEAR16',
        language_code='pt-BR',
        sample_rate_hertz=44100,
        audio_channel_count=2,
    )

    audio = speech.types.RecognitionAudio(uri=storage_uri)

    operation = client.long_running_recognize(config=config, audio=audio)
    op_result = operation.result()
    
    text = []
    for result in op_result.results:
        for alternative in result.alternatives:
            text.append(alternative.transcript)
    
    delete_blob(GOOGLE_BUCKET_NAME, file_name)

    return ' '.join(text)

## Tests

In [None]:
wav_files = sorted(glob.glob(str(data_path / '*.wav')))
wav_lengths = [get_audio_length(fp) for fp in wav_files]

In [None]:
def benchmark():

    benchmark_apis = {
        'google_web_speech': {
            'call': transcribe_audio_google,
        },
        'wit_speech': {
            'call': transcribe_audio_wit,
        },
        'azure_cognitive_services': {
                'call': transcribe_audio_azure,
            },
        'watson_speech_to_text': {
            'call': transcribe_audio_watson,
        },
        'aws_transcribe': {
            'call': transcribe_audio_aws,
        },
        'gcloud_speech_to_text': {
            'call': transcribe_audio_gcloud,
        },
    }

    for api in benchmark_apis:
        for col in ['file', 'length', 'text', 'time']:
            benchmark_apis[api][col] = []

    for api in benchmark_apis:

        print(api)
        start_api = time.time()
        api_call = benchmark_apis[api]['call']
        
        for audio_file, audio_length in zip(wav_files, wav_lengths):
            file_name = audio_file.split('/')[-1]
            print(file_name)

            start_transcribe = time.time()
            text = api_call(audio_file)
            total_time_transcribe = time.time() - start_transcribe

            benchmark_apis[api]['file'].append(file_name)
            benchmark_apis[api]['length'].append(audio_length)
            benchmark_apis[api]['text'].append(text)
            benchmark_apis[api]['time'].append(total_time_transcribe)

        total_time_api = time.time() - start_api
        print('took {} seconds'.format(total_time_api))
        print('#' * 80)
    
    return benchmark_apis

In [None]:
results = benchmark()

In [None]:
dataframes = dict()

for api in results:
    dataframes[api] = pd.DataFrame(results[api]).drop(columns='call')

In [None]:
for api, df in dataframes.items():
    print('results {} api'.format(api))
    display(df)
    print('\n')

In [None]:
for fp, length in zip(wav_files, wav_lengths):
    file_name = fp.split('/')[-1]
    print('=' * 80)
    print('transcriptions for {} ({:.03f})'.format(file_name, length))
    display(play_audio_file(fp))
    for api, df in dataframes.items(): 
        # print('{} results with {}'.format(api))
        filter_ = df['file'] == file_name
        text = df[filter_]['text'].values[0]
        exec = df[filter_]['time'].values[0]
        print('-' * 80)
        print('{}: {:.03f}s'.format(api, exec))
        wrap_print(text)
    print('\n')

In [None]:
for api, df in dataframes.items():
    df.to_csv(data_path / 'stt_tests' / '{}.csv'.format(api), index=False)