This notebook can be used to create an API client on AWS platform and then use a bucket for transcription of audio speech files (.wav format) using Speaker diarization for multiple speakers.

In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
%cd /content/gdrive/My Drive/speech2text/

/content/gdrive/My Drive/speech2text


In [None]:
filepath = "~/audio_wav/"
output_filepath = "~/Transcripts/"

In [None]:
from __future__ import print_function
import time
import boto3
import json
import os
import botocore

bucketName = "audiofiles"

In [None]:
def upload_file_to_s3(audio_file_name):
    
    Key = filepath + audio_file_name
    outPutname = audio_file_name

    s3 = boto3.client('s3')
    s3.upload_file(Key,bucketName,outPutname)

In [None]:
def download_file_from_s3(audio_file_name):
    
    s3 = boto3.resource('s3')
    
    Key = outPutname = audio_file_name.split('.')[0] + '.json'
    
    try:
        s3.Bucket(bucketName).download_file(Key, outPutname)
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise

In [None]:
def delete_file_from_s3(audio_file_name):
    
    s3 = boto3.resource('s3')
    s3.Object(bucketName, audio_file_name).delete()
    s3.Object(bucketName, audio_file_name.split('.')[0] + '.json').delete()

In [None]:
def transcribe(audio_file_name):
    
    transcripts = ''
    
    upload_file_to_s3(audio_file_name)
    
    transcribe = boto3.client('transcribe', region_name='us-east-2')
    job_name = audio_file_name.split('.')[0]
    job_uri = "https://s3.us-east-2.amazonaws.com/" + bucketName + "/" + audio_file_name
    transcribe.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={'MediaFileUri': job_uri},
        MediaFormat='wav',
        LanguageCode='en-US',
        Settings={'MaxSpeakerLabels':2,'ShowSpeakerLabels':True},
        OutputBucketName=bucketName
    )
    while True:
        status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
        if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
            break
        time.sleep(5)
    
    download_file_from_s3(audio_file_name)
    
    transcribe.delete_transcription_job(TranscriptionJobName=job_name)
    
    delete_file_from_s3(audio_file_name)
    
    with open(audio_file_name.split('.')[0] + '.json') as f:
        text = json.load(f)
    
    for i in text['results']['transcripts']:
        transcripts += i['transcript']
    
    #os.remove(audio_file_name.split('.')[0] + '.json')
    
    return transcripts

In [None]:
def write_transcripts(transcript_filename,transcript):
    f= open(output_filepath + transcript_filename,"w+")
    f.write(transcript)
    f.close() 

In [None]:
if __name__ == "__main__":
    files = [f for f in os.listdir(filepath) if f.endswith(".wav")]
    for audio_file_name in files:
        exists = os.path.isfile(output_filepath + audio_file_name.split('.')[0] + '.txt')
        if exists:
            pass
        else:
            print(audio_file_name)
            transcript = transcribe(audio_file_name)
            transcript_filename = audio_file_name.split('.')[0] + '.txt'
            write_transcripts(transcript_filename,transcript)