In [None]:
!pip install boto3

In [None]:
import pandas as pd
import time
import boto3

In [None]:
import sagemaker

sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "videos"
filename = "STF.mp4"
documents_path = sess.upload_data(f'./{prefix}/{filename}', bucket = bucket,key_prefix = prefix)
documents_path

In [None]:
transcribe = boto3.client('transcribe')

In [None]:
def check_job_name(job_name):
    job_verification = True

    # all the transcriptions
    existed_jobs = transcribe.list_transcription_jobs()

    for job in existed_jobs['TranscriptionJobSummaries']:
        if job_name == job['TranscriptionJobName']:
            job_verification = False
            break

    if job_verification == False:
        command = input(job_name + " has existed. \nDo you want to override the existed job (Y/N): ")
        if command.lower() == "y" or command.lower() == "yes":
            transcribe.delete_transcription_job(TranscriptionJobName=job_name)
        elif command.lower() == "n" or command.lower() == "no":
            job_name = input("Insert new job name? ")
            check_job_name(job_name)
        else: 
            print("Input can only be (Y/N)")
            command = input(job_name + " has existed. \nDo you want to override the existed job (Y/N): ")
    return job_name

In [None]:
def amazon_transcribe(audio_file_name, max_speakers = -1):
    if max_speakers > 10:
        raise ValueError("Maximum detected speakers is 10.")

    job_uri = documents_path
    job_name = (audio_file_name.split('.')[0]).replace(" ", "")

    # check if name is taken or not
    job_name = check_job_name(job_name)

    if max_speakers != -1:
        transcribe.start_transcription_job(
            TranscriptionJobName=job_name,
            Media={'MediaFileUri': job_uri},
            MediaFormat=audio_file_name.split('.')[1],
            LanguageCode='pt-BR',
            Settings = {'ShowSpeakerLabels': True,
                      'MaxSpeakerLabels': max_speakers
                      }
        )
    else: 
        transcribe.start_transcription_job(
            TranscriptionJobName=job_name,
            Media={'MediaFileUri': job_uri},
            MediaFormat=audio_file_name.split('.')[1],
            LanguageCode='pt-BR',
            Settings = {'ShowSpeakerLabels': True
                      }
        )    

    while True:
        result = transcribe.get_transcription_job(TranscriptionJobName=job_name)
        if result['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
            break
        time.sleep(15)
    if result['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
        data = pd.read_json(result['TranscriptionJob']['Transcript']['TranscriptFileUri'])
    return result

In [None]:
result = amazon_transcribe(filename, 3)

In [None]:
data = pd.read_json(result['TranscriptionJob']['Transcript']['TranscriptFileUri'])
data.head()

In [None]:
data['results'].transcripts

In [None]:
data['results'].speaker_labels

In [None]:
import pandas as pd
import datetime

def read_output(filename):
    filename = filename.split('.')[0]

    # Create an output txt file
    with open(filename + '.txt', 'w') as w:
        data = pd.read_json(result['TranscriptionJob']['Transcript']['TranscriptFileUri'])
        labels = data['results']['speaker_labels']['segments']
        speaker_start_times = {}

        for label in labels:
            for item in label['items']:
                speaker_start_times[item['start_time']] = item['speaker_label']

        items = data['results']['items']
        lines = []
        line = ''
        time = 0
        speaker = 'null'

        # loop through all elements
        for item in items:
            content = item['alternatives'][0]['content']

            # if it's starting time
            if item.get('start_time'):
                current_speaker = speaker_start_times[item['start_time']]

            # in AWS output, there are types as punctuation
            elif item['type'] == 'punctuation':
                line = line + content

            # handle different speaker
            if current_speaker != speaker:
                if speaker:
                    lines.append({'speaker': speaker, 'line': line, 'time': time})
                line = content
                speaker = current_speaker
                time = item['start_time']

            elif item['type'] != 'punctuation':
                line = line + ' ' + content

        lines.append({'speaker': speaker, 'line': line, 'time': time})

        # sort the results by the time
        sorted_lines = sorted(lines, key=lambda k: float(k['time']))

        # write into the .txt file
        for line_data in sorted_lines:
            line = '[' + str(datetime.timedelta(seconds=int(round(float(line_data['time']))))) + '] ' + line_data.get('speaker') + ': ' + line_data.get('line')
            w.write(line + '\n\n')


In [None]:
read_output(filename)