---
### The objectives of this notebook are:

1. To convert Amazon Transcribe json response to a Data frame format.
2. To extract and report the confidence scores of the transcript.

---

In [1]:
from pathlib import Path
from time import perf_counter
from scipy.interpolate import make_interp_spline
import urllib.request
import json
import datetime
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import statistics
import os
import argparse
from io import BytesIO

In [2]:
import pandas as pd

In [3]:
# Additional Constants
START_NEW_SEGMENT_DELAY = 2.0  

In [4]:

class SpeechSegment:
    """ Class to hold information about a single speech segment """
    def __init__(self):
        self.segmentStartTime = 0.0
        self.segmentEndTime = 0.0
        self.segmentSpeaker = ""
        self.segmentText = ""
        self.segmentConfidence = []
        self.segmentLoudnessScores = []
        self.segmentInterruption = False
        self.segmentIssuesDetected = []
        
def convert_timestamp(time_in_seconds):
    
    """
    Function to help convert timestamps from s to H:M:S:MM
    :param time_in_seconds: Time in seconds to be displayed
    :return: Formatted string for this timestamp value
    """
    timeDelta = datetime.timedelta(seconds=float(time_in_seconds))
    tsFront = timeDelta - datetime.timedelta(microseconds=timeDelta.microseconds)
    tsSmall = timeDelta.microseconds
    return str(tsFront) + "." + str(int(tsSmall / 10000))

def merge_speaker_segments(input_segment_list):
    """
    Merges together consecutive speaker segments unless:
    (a) There is a speaker change, or
    (b) The gap between segments is greater than our acceptable level of delay
    :param input_segment_list: Full time-sorted list of speaker segments
    :return: An updated segment list
    """
    outputSegmentList = []
    lastSpeaker = ""
    lastSegment = None

    # Step through each of our defined speaker segments
    for segment in input_segment_list:
        if (segment.segmentSpeaker != lastSpeaker) or \
                ((segment.segmentStartTime - lastSegment.segmentEndTime) >= START_NEW_SEGMENT_DELAY):
            # Simple case - speaker change or > n-second gap means new output segment
            outputSegmentList.append(segment)

            # This is now our base segment moving forward
            lastSpeaker = segment.segmentSpeaker
            lastSegment = segment
        else:
            # Same speaker, short time, need to copy this info to the last one
            lastSegment.segmentEndTime = segment.segmentEndTime
            lastSegment.segmentText += " " + segment.segmentText
            segment.segmentConfidence[0]["text"] = " " + segment.segmentConfidence[0]["text"]
            for wordConfidence in segment.segmentConfidence:
                lastSegment.segmentConfidence.append(wordConfidence)

    return outputSegmentList


In [5]:
def write_transcribe_text(speech_segments):
    """
    Writes out each line of the transcript in the Word table structure, optionally including sentiments
    :param output_table: Word document structure to write the table into
    :param speech_segments: Turn-by-turn speech list
    
    """
    list_tuples_segment = list()
    for segment in speech_segments:
        # Before we start, does an angory start at this time?
        start_in_millis = segment.segmentStartTime * 1000.0
        end_in_millis = segment.segmentEndTime * 1000.0

        # Start with the easy stuff
        # tuple format: (start_time, duration, speaker, text, confidence)
        segement_start_time = convert_timestamp(segment.segmentStartTime)
        segement_duration = segment.segmentEndTime - segment.segmentStartTime
        segement_speaker = segment.segmentSpeaker
        segement_text = segment.segmentText
        segment_confidence = segment.segmentConfidence
        list_tuples_segment.append((segement_start_time,
                                    segement_duration,
                                    segement_speaker,
                                    segement_text,
                                    segment_confidence))
    return pd.DataFrame.from_records(list_tuples_segment, columns =['start_time',
                                                                   'end_time',
                                                                   'speaker',
                                                                   'text',
                                                                   'confidence'])




In [6]:
def generate_confidence_stats(speech_segments):
    """
    Creates a map of timestamps and confidence scores to allow for both summarising and graphing in the document.
    We also need to bucket the stats for summarising into bucket ranges that feel important (but are easily changed)
    
    :param speech_segments: List of call speech segments 
    :return: Confidence and timestamp structures for graphing 
    """""

    # Stats dictionary
    stats = {
        "timestamps": [],
        "accuracy": [],
        "9.8": 0, "9": 0, "8": 0, "7": 0, "6": 0, "5": 0, "4": 0, "3": 0, "2": 0, "1": 0, "0": 0,
        "parsedWords": 0}

    # Confidence count - we need the average confidence score regardless
    for line in speech_segments:
        for word in line.segmentConfidence:
            stats["timestamps"].append(word["start_time"])
            conf_value = word["confidence"]
            stats["accuracy"].append(int(conf_value * 100))
            if conf_value >= 0.98:
                stats["9.8"] += 1
            elif conf_value >= 0.9:
                stats["9"] += 1
            elif conf_value >= 0.8:
                stats["8"] += 1
            elif conf_value >= 0.7:
                stats["7"] += 1
            elif conf_value >= 0.6:
                stats["6"] += 1
            elif conf_value >= 0.5:
                stats["5"] += 1
            elif conf_value >= 0.4:
                stats["4"] += 1
            elif conf_value >= 0.3:
                stats["3"] += 1
            elif conf_value >= 0.2:
                stats["2"] += 1
            elif conf_value >= 0.1:
                stats["1"] += 1
            else:
                stats["0"] += 1
            stats["parsedWords"] += 1
    return stats



In [7]:
def create_turn_by_turn_segments(data):
    """
    This creates a list of per-turn speech segments based upon the transcript data.  It has to work in three
    slightly different ways, as each operational mode from Transcribe outputs slightly different JSON structures.
    These modes are (a) Speaker-separated audio, (b) Channel-separated audio, and (c) Call Analytics audio
    :param data: JSON result data from Transcribe
    :param cli_args: CLI arguments used for this processing run
    :return: List of transcription speech segments
    """
    speechSegmentList = []


    # Channel/Speaker-mode only relevant if not using analytics
    isChannelMode = "channel_labels" in data["results"]
    isSpeakerMode = not isChannelMode

    lastSpeaker = ""
    lastEndTime = 0.0
    skipLeadingSpace = False
    confidenceList = []
    nextSpeechSegment = None

    # Process a Speaker-separated non-analytics file
    if isSpeakerMode:
        # A segment is a blob of pronunciation and punctuation by an individual speaker
        for segment in data["results"]["speaker_labels"]["segments"]:

            # If there is content in the segment then pick out the time and speaker
            if len(segment["items"]) > 0:
                # Pick out our next data
                nextStartTime = float(segment["start_time"])
                nextEndTime = float(segment["end_time"])
                nextSpeaker = str(segment["speaker_label"])

                # If we've changed speaker, or there's a gap, create a new row
                if (nextSpeaker != lastSpeaker) or ((nextStartTime - lastEndTime) >= START_NEW_SEGMENT_DELAY):
                    nextSpeechSegment = SpeechSegment()
                    speechSegmentList.append(nextSpeechSegment)
                    nextSpeechSegment.segmentStartTime = nextStartTime
                    nextSpeechSegment.segmentSpeaker = nextSpeaker
                    skipLeadingSpace = True
                    confidenceList = []
                    nextSpeechSegment.segmentConfidence = confidenceList
                nextSpeechSegment.segmentEndTime = nextEndTime

                # Note the speaker and end time of this segment for the next iteration
                lastSpeaker = nextSpeaker
                lastEndTime = nextEndTime

                # For each word in the segment...
                for word in segment["items"]:

                    # Get the word with the highest confidence
                    pronunciations = list(filter(lambda x: x["type"] == "pronunciation", data["results"]["items"]))
                    word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
                    try:
                        result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
                        confidence = float(result["confidence"])
                    except:
                        result = word_result[-1]["alternatives"][0]
                        confidence = float(result["redactions"][0]["confidence"])

                    # Write the word, and a leading space if this isn't the start of the segment
                    if skipLeadingSpace:
                        skipLeadingSpace = False
                        wordToAdd = result["content"]
                    else:
                        wordToAdd = " " + result["content"]

                    # If the next item is punctuation, add it to the current word
                    try:
                        word_result_index = data["results"]["items"].index(word_result[0])
                        next_item = data["results"]["items"][word_result_index + 1]
                        if next_item["type"] == "punctuation":
                            wordToAdd += next_item["alternatives"][0]["content"]
                    except IndexError:
                        pass

                    nextSpeechSegment.segmentText += wordToAdd
                    confidenceList.append({"text": wordToAdd,
                                           "confidence": confidence,
                                           "start_time": float(word["start_time"]),
                                           "end_time": float(word["end_time"])})

    # Process a Channel-separated non-analytics file
    elif isChannelMode:

        # A channel contains all pronunciation and punctuation from a single speaker
        for channel in data["results"]["channel_labels"]["channels"]:

            # If there is content in the channel then start processing it
            if len(channel["items"]) > 0:

                # We have the same speaker all the way through this channel
                nextSpeaker = str(channel["channel_label"])
                for word in channel["items"]:
                    # Pick out our next data from a 'pronunciation'
                    if word["type"] == "pronunciation":
                        nextStartTime = float(word["start_time"])
                        nextEndTime = float(word["end_time"])

                        # If we've changed speaker, or we haven't and the
                        # pause is very small, then start a new text segment
                        if (nextSpeaker != lastSpeaker) or\
                                ((nextSpeaker == lastSpeaker) and ((nextStartTime - lastEndTime) > 0.1)):
                            nextSpeechSegment = SpeechSegment()
                            speechSegmentList.append(nextSpeechSegment)
                            nextSpeechSegment.segmentStartTime = nextStartTime
                            nextSpeechSegment.segmentSpeaker = nextSpeaker
                            skipLeadingSpace = True
                            confidenceList = []
                            nextSpeechSegment.segmentConfidence = confidenceList
                        nextSpeechSegment.segmentEndTime = nextEndTime

                        # Note the speaker and end time of this segment for the next iteration
                        lastSpeaker = nextSpeaker
                        lastEndTime = nextEndTime

                        # Get the word with the highest confidence
                        pronunciations = list(filter(lambda x: x["type"] == "pronunciation", channel["items"]))
                        word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
                        try:
                            result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
                            confidence = float(result["confidence"])
                        except:
                            result = word_result[-1]["alternatives"][0]
                            confidence = float(result["redactions"][0]["confidence"])
                        # result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]

                        # Write the word, and a leading space if this isn't the start of the segment
                        if (skipLeadingSpace):
                            skipLeadingSpace = False
                            wordToAdd = result["content"]
                        else:
                            wordToAdd = " " + result["content"]

                        # If the next item is punctuation, add it to the current word
                        try:
                            word_result_index = channel["items"].index(word_result[0])
                            next_item = channel["items"][word_result_index + 1]
                            if next_item["type"] == "punctuation":
                                wordToAdd += next_item["alternatives"][0]["content"]
                        except IndexError:
                            pass

                        # Finally, add the word and confidence to this segment's list
                        nextSpeechSegment.segmentText += wordToAdd
                        confidenceList.append({"text": wordToAdd,
                                               "confidence": confidence,
                                               "start_time": float(word["start_time"]),
                                               "end_time": float(word["end_time"])})

        # Sort the segments, as they are in channel-order and not speaker-order, then
        # merge together turns from the same speaker that are very close together
        speechSegmentList = sorted(speechSegmentList, key=lambda segment: segment.segmentStartTime)
        speechSegmentList = merge_speaker_segments(speechSegmentList)

#     # Process a Call Analytics file
#     elif isAnalyticsMode:

#         # Lookup shortcuts
#         interrupts = data["ConversationCharacteristics"]["Interruptions"]

#         # Each turn has already been processed by Transcribe, so the outputs are in order
#         for turn in data["Transcript"]:

#             # Setup the next speaker block
#             nextSpeechSegment = SpeechSegment()
#             speechSegmentList.append(nextSpeechSegment)
#             nextSpeechSegment.segmentStartTime = float(turn["BeginOffsetMillis"]) / 1000.0
#             nextSpeechSegment.segmentEndTime = float(turn["EndOffsetMillis"]) / 1000.0
#             nextSpeechSegment.segmentSpeaker = turn["ParticipantRole"].title()
#             nextSpeechSegment.segmentText = turn["Content"]
#             nextSpeechSegment.segmentLoudnessScores = turn["LoudnessScores"]
#             confidenceList = []
#             nextSpeechSegment.segmentConfidence = confidenceList
#             skipLeadingSpace = True

#             # Check if this block is within an interruption block for the speaker
#             if turn["ParticipantRole"] in interrupts["InterruptionsByInterrupter"]:
#                 for entry in interrupts["InterruptionsByInterrupter"][turn["ParticipantRole"]]:
#                     if turn["BeginOffsetMillis"] == entry["BeginOffsetMillis"]:
#                         nextSpeechSegment.segmentInterruption = True

#             # Record any issues detected
#             if "IssuesDetected" in turn:
#                 for issue in turn["IssuesDetected"]:
#                     # Grab the transcript offsets for the issue text
#                     nextSpeechSegment.segmentIssuesDetected.append(issue["CharacterOffsets"])

#             # Process each word in this turn
#             for word in turn["Items"]:
#                 # Pick out our next data from a 'pronunciation'
#                 if word["Type"] == "pronunciation":
#                     # Write the word, and a leading space if this isn't the start of the segment
#                     if skipLeadingSpace:
#                         skipLeadingSpace = False
#                         wordToAdd = word["Content"]
#                     else:
#                         wordToAdd = " " + word["Content"]

#                     # If the word is redacted then the word confidence is a bit more buried
#                     if "Confidence" in word:
#                         conf_score = float(word["Confidence"])
#                     elif "Redaction" in word:
#                         conf_score = float(word["Redaction"][0]["Confidence"])

#                     # Add the word and confidence to this segment's list
#                     confidenceList.append({"text": wordToAdd,
#                                            "confidence": conf_score,
#                                            "start_time": float(word["BeginOffsetMillis"]) / 1000.0,
#                                            "end_time": float(word["BeginOffsetMillis"] / 1000.0)})
#                 else:
#                     # Punctuation, needs to be added to the previous word
#                     last_word = nextSpeechSegment.segmentConfidence[-1]
#                     last_word["text"] = last_word["text"] + word["Content"]

#             # Tag on the sentiment - analytics has no per-turn numbers
#             turn_sentiment = turn["Sentiment"]
#             if turn_sentiment == "POSITIVE":
#                 nextSpeechSegment.segmentIsPositive = True
#                 nextSpeechSegment.segmentPositive = 1.0
#                 nextSpeechSegment.segmentSentimentScore = 1.0
#             elif turn_sentiment == "NEGATIVE":
#                 nextSpeechSegment.segmentIsNegative = True
#                 nextSpeechSegment.segmentNegative = 1.0
#                 nextSpeechSegment.segmentSentimentScore = 1.0

    # Return our full turn-by-turn speaker segment list 
    return speechSegmentList




In [8]:
def write_confidence_scores(stats):
    """
    Using the pre-build confidence stats list, create a summary table of confidence score
    spreads, as well as a scatter-plot showing each word against the overall mean
    
    """
    stats_list = []
    parsedWords = stats["parsedWords"]
    confidenceRanges = ["98% - 100%", "90% - 97%", "80% - 89%", "70% - 79%", "60% - 69%", "50% - 59%", "40% - 49%",
                        "30% - 39%", "20% - 29%", "10% - 19%", "0% - 9%"]
    confidenceRangeStats = ["9.8", "9", "8", "7", "6", "5", "4", "3", "2", "1", "0"]
   
    for confRange, rangeStats in zip(confidenceRanges, confidenceRangeStats):
        row = (confRange,str(stats[rangeStats]),str(round(stats[rangeStats] / parsedWords * 100, 2)) + "%" )
        stats_list.append(row)

    # Confidence of each word as scatter graph, and the mean as a line across
    fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(6, 4))
    ax.scatter(stats["timestamps"], stats["accuracy"])
    ax.plot([stats["timestamps"][0], stats["timestamps"][-1]], [statistics.mean(stats["accuracy"]),
                                                                statistics.mean(stats["accuracy"])], "r")
    # Formatting
    ax.set_xlabel("Time (seconds)")
    ax.set_ylabel("Word Confidence (percent)")
    ax.set_yticks(range(0, 101, 10))
    fig.suptitle("Word Confidence During Transcription", fontsize=11, fontweight="bold")
    ax.legend(["Word Confidence Mean", "Individual words"], loc="lower center")
    return pd.DataFrame.from_records(stats_list, columns=['Confidence', 'Count', 'Percentage']), fig

In [9]:
# Load in the JSON file for processing
json_filepath = Path('asrOutputRedacted (2).json')
if json_filepath.is_file():
    json_data = json.load(open(json_filepath.absolute(), "r", encoding="utf-8"))
else:
    print("FAIL: Specified JSON file '{0}' does not exists.".format(cli_args.inputFile))
    exit(-1)
    
# Generate the core transcript
start = perf_counter()
speech_segments = create_turn_by_turn_segments(json_data)

In [10]:
stats = generate_confidence_stats(speech_segments)

In [11]:
df= write_transcribe_text(speech_segments)

In [12]:
df.head()

Unnamed: 0,start_time,end_time,speaker,text,confidence
0,0:00:00.0,9.13,spk_0,And resources at the National University of Si...,"[{'text': 'And', 'confidence': 0.9411, 'start_..."
1,0:00:09.13,1.23,spk_1,[PII] nice to be here.,"[{'text': '[PII]', 'confidence': 0.9999, 'star..."
2,0:00:10.37,3.37,spk_0,Dr [PII] on general practitioner from Dr anywh...,"[{'text': 'Dr', 'confidence': 0.7912, 'start_t..."
3,0:00:13.75,2.07,spk_2,Thank you very much for inviting us here today.,"[{'text': 'Thank', 'confidence': 1.0, 'start_t..."
4,0:00:15.83,4.04,spk_0,And dr medical director at Felix Medical group.,"[{'text': 'And', 'confidence': 1.0, 'start_tim..."


In [13]:
df.tail()

Unnamed: 0,start_time,end_time,speaker,text,confidence
61,0:30:42.77,40.63,spk_3,I will say vaccinations. You know the accessib...,"[{'text': 'I', 'confidence': 0.9977, 'start_ti..."
62,0:31:23.40,2.42,spk_0,also the flick of switch and everyone is tomorrow,"[{'text': 'also', 'confidence': 0.7743, 'start..."
63,0:31:25.83,1.15,spk_3,somehow.,"[{'text': 'somehow.', 'confidence': 0.999, 'st..."
64,0:31:29.0,27.58,spk_1,"Well, I wish we had done all of this five or s...","[{'text': 'Well,', 'confidence': 0.9869, 'star..."
65,0:31:56.59,6.16,spk_0,"Well, better late than never. And here we are....","[{'text': 'Well,', 'confidence': 1.0, 'start_t..."


In [14]:
stats_df, plt2 = write_confidence_scores(stats)
# Write out the chart
chart_file_name = "./" + "chart.png"
plt2.savefig(chart_file_name, facecolor="aliceblue")
plt2.clf()


<Figure size 432x288 with 0 Axes>

In [15]:
stats_df

Unnamed: 0,Confidence,Count,Percentage
0,98% - 100%,5266,85.89%
1,90% - 97%,376,6.13%
2,80% - 89%,158,2.58%
3,70% - 79%,117,1.91%
4,60% - 69%,71,1.16%
5,50% - 59%,71,1.16%
6,40% - 49%,39,0.64%
7,30% - 39%,21,0.34%
8,20% - 29%,10,0.16%
9,10% - 19%,1,0.02%


In [16]:
df.to_excel('../res/processed_transcript_from_json_to_dataframe.xlsx')

In [17]:
stats_df.to_excel('../res/transcript_stats.xlsx')