In [1]:
from io import StringIO
import json
import os
from pathlib import Path
import sys
import zipfile

import boto3
import numpy as np
import pandas as pd

from transcription.text import collapse_transcript_turns
from transcription.transcript_config import Transcript, TranscriptConfig, baseline, natural_turn

In [2]:
# Specify path to zip archive of CANDOR corpus AWS Transcribe JSON files
zip_file_path = 'data/aws-2024-candor-raw-api.zip'

# Assuming file naming convention is unchanged
# Example: 0020a0c5-1658-4747-99c1-2839e736b481--transcribe--Jan-2024.json
# Split on "--" and take the first split segment to get conversation ID

conversation_ids = []
with zipfile.ZipFile(zip_file_path, 'r') as stt_zip:
    # Get a list of file names within the ZIP file
    for filename in stt_zip.namelist():
        # Extract the conversation_id from the filename
        conversation_id = filename.split('--')[0]
        # The filename may include a path, so we split on '/' and take the last part
        conversation_id = conversation_id.split('/')[-1]
        conversation_ids.append(conversation_id)

In [9]:
class TranscriptFormatter(object):
    """Converts raw JSON from AWS speech-to-text API output into a formatted transcript.  
        Steps:  
            1. Initialize TranscriptFormatter() instance with a conversation ID.  
            2. Create `baseline` transcript (as Pandas data frame) from AWS Transcribe raw API JSON output.  
            3. Create `natural_turn` transcript and edit history data frame from `baseline`.  
            4. (optional) Save transcript data frames (object attrs: baseline, edit_history, transcript) to CSV

        Notes:
            - Ensure `path` definitions are updated to match local environment  
            - If using custom (non-CANDOR) metadata to link channels <-> speaker IDs, see JSON formatting 
              requirements in replace_speaker_channel_with_user_id() comments
    """

    def __init__(
        self,
        conversation_id,
        save_dir,
        channel_speaker_map=None,
        stt_model='aws', # Deepgram parser not yet integrated
    ):
        self.conversation_id = conversation_id
        self.channel_speaker_map = channel_speaker_map
        self.stt_model = stt_model
        # adjust paths to match local directory structure and filename convention
        self.stt_path = f"data/aws-2024-candor-raw-api/{conversation_id}--transcribe--Jan-2024.json"
        self.metadata_path = f'data/candor_metadata_files/{conversation_id}_metadata.json'
        self.save_dir = save_dir
        
    def construct_conversation_transcript(self):
        """Parses a two-channel AWS Transcribe response into a simple time-ordered data structure.
    
        Returns: dict
        """
        response_data = json.load(open(self.stt_path, 'r'))
        
        def _annotate_two_channel_chunks(channel_data, speaker_label):
            """Utility for parsing multi speaker Amazon Transcribe result. 
               Creates keys for "speaker," "start_time," and "end_time."
    
            Returns: dict
            """
            most_recent_start = 0
            most_recent_end = 0
            for i, chunk in enumerate(channel_data):
                # keep track of most recent start/end timestamps
                # if we hit a punctuation mark, assign the most recent start/end as that mark's timestamps
                if "start_time" in chunk.keys():
                    most_recent_start = float(chunk['start_time'])
                    most_recent_end = float(chunk['end_time']) # if there is start, there is always an end
                channel_data[i]["speaker"] = speaker_label
                if channel_data[i]["type"] == "punctuation":
                    channel_data[i]["start_time"] = most_recent_start
                    channel_data[i]["end_time"] = most_recent_end
            return channel_data
    
        def _parse_two_channel_transcript(combined_annotated_data):
            """Converts word-level data into utterance-level data.
    
            Returns: list
            """
            lines = []
            line = []
            line_confidences = []
            for i, chunk in enumerate(combined_annotated_data):
                segment = chunk["alternatives"][0]["content"]
                confidence = float(chunk["alternatives"][0]["confidence"])
                if i == 0:
                    line.append(segment)
                    line_confidences.append(confidence)
                    line_start = chunk["start_time"]
    
                elif chunk["speaker"] == combined_annotated_data[i - 1]["speaker"]:
                    if chunk["type"] == "punctuation":
                        line[-1] += segment
                        line_confidences.append(confidence)
                    else:
                        line.append(segment)
                        line_confidences.append(confidence)
                else:
    
                    # wrap up current line
                    line_speaker = combined_annotated_data[i - 1]["speaker"]
                    labeled_line = " ".join(line)
                    line_end = combined_annotated_data[i - 1]["end_time"]
                    lines.append(
                        {
                            "speaker": line_speaker,
                            "start": line_start,
                            "stop": line_end,
                            "utterance": labeled_line,
                            "confidence": line_confidences,
                        }
                    )
    
                    # initialize next line
                    line_start = chunk["start_time"]
                    line = []
                    line_confidences = []
                    line.append(segment)
                    line_confidences.append(confidence)
    
            # Add an id to each turn
            for ind, line in enumerate(lines):
                line["turn_id"] = ind
    
            return lines
    
        channel_0 = response_data["results"]["channel_labels"]["channels"][0]["items"]
        channel_1 = response_data["results"]["channel_labels"]["channels"][1]["items"]
    
        # Channel 0 is left, channel 1 is right
        if self.channel_speaker_map:
            speaker_0_id = self.channel_speaker_map["L"]
            speaker_1_id = self.channel_speaker_map["R"]
        else:
            speaker_0_id = "L"
            speaker_1_id = "R"
    
        annotated_c0 = _annotate_two_channel_chunks(channel_0, speaker_0_id)
        annotated_c1 = _annotate_two_channel_chunks(channel_1, speaker_1_id)
    
        combined_annotated = annotated_c0 + annotated_c1
        combined_annotated = sorted(combined_annotated, key=lambda chunk: float(chunk["start_time"]))
    
        transcript = _parse_two_channel_transcript(combined_annotated)
        transcript_dict = {"transcript": transcript}
    
        return transcript_dict

    def json_to_baseline_df(self, json_transcript):
        TRANSCRIPT_COLUMNS = ("speaker", "start", "stop", "utterance", "confidence")
        TRANSCRIPT_DTYPES = (str, np.float64, np.float64, str, object)
        
        df = pd.DataFrame.from_records(json_transcript['transcript'], index="turn_id")
        df = self.replace_speaker_channel_with_user_id(df)
        
        assert set(df.columns) == set(TRANSCRIPT_COLUMNS)
        
        for c, d in zip(TRANSCRIPT_COLUMNS, TRANSCRIPT_DTYPES):
            df[c] = df[c].astype(d)
        
        if df.empty:
            raise ValueError("Conversation is empty. Did you pass a dict or list?")

        return df

    def baseline_to_natural_turn(self, baseline_df, config):
        return collapse_transcript_turns(Transcript(baseline_df, config))

    def save_transcripts_to_local(self, transcript_obj, model_label):
        # update paths to reflect local save file directory
        baseline_filename = f'{self.save_dir}/{self.conversation_id}_baseline.csv'
        natural_turn_filename = f'{self.save_dir}/{self.conversation_id}_{model_label}.csv'
        edit_history_filename = f'{self.save_dir}/{self.conversation_id}_{model_label}_edit-history.csv'

        transcript_obj.baseline.to_csv(baseline_filename)
        transcript_obj.transcript.to_csv(natural_turn_filename)
        transcript_obj.edit_history.to_csv(edit_history_filename)
        
    def replace_speaker_channel_with_user_id(self, df):
        """Use any JSON file with the following structure to map stereo channels (L/R) to speaker IDs:
            metadata: full JSON object
            metadata['speakers']: 'speakers' key is a list, each entry is a dict with 'channel' and 'user_id' keys
            metadata['speakers'][0]['channel']: takes values 'L' or 'R'
            metadata['speakers'][0]['user_id']: unique identifier for a speaker
        """
        metadata = json.load(open(self.metadata_path, 'r'))
        
        # Create mapping from channel to user_id
        speaker_mapping = {speaker['channel']: speaker['user_id'] for speaker in metadata['speakers']}
        
        # Replace R/L 'speaker' values with user_id values
        df['speaker'] = df['speaker'].map(speaker_mapping)
    
        return df

In [10]:
for conversation_id in sorted(conversation_ids[:10]):
    print(f'{conversation_id} processing...')
    formatter = TranscriptFormatter(
        conversation_id=conversation_id, 
        save_dir="data/new_transcripts"
    )
    try:
        formatted_json = formatter.construct_conversation_transcript()
    except Exception as e:
        print(f'Error for convo {conversation_id}: {str(e)}')
        continue
        
    baseline_df = formatter.json_to_baseline_df(formatted_json)
    transcript_obj = formatter.baseline_to_natural_turn(baseline_df, config=natural_turn)
    
    # save files
    # adjust model_label as needed. here we use the natural_turn max_pause setting to disambiguate model variants.
    max_pause = int(natural_turn.max_pause * 1000) # max_pause to milliseconds
    model_label = f'natural_turn-{max_pause}'
    formatter.save_transcripts_to_local(transcript_obj, model_label)

42d99fff-daee-41bc-ba4a-55b6a787e51d processing...
473435df-b97f-4c9f-a8e2-8e3fc66a8902 processing...
5d7dc5a5-8dbe-4fb0-bf59-2d53f0ce3b65 processing...
6cc56ecc-d29c-4bb3-bbd9-c67236c15b0c processing...
7fc59ebb-a980-4df1-9d39-f15ae25f3f27 processing...
a003874c-1d39-4924-857f-94a865951ae9 processing...
afa4d802-a9ed-4b0b-a607-b5f16d899275 processing...
cc7a5173-9c79-4773-83dc-0d9177bcc524 processing...
e12fed95-8f11-4255-b8c7-f5c0346b8e59 processing...
f5ced37a-1596-4e77-ab1d-dae5e084384d processing...


### Concat all natural_turns transcripts into a single data frame, save as CSV

In [6]:
def concatenate_transcripts_from_local(conversation_ids, max_pause, file_path_template):
    """Convenience function to concatenate all processed transcripts into a single Pandas data frame."""
    all_dfs = []
    for i, conversation_id in enumerate(conversation_ids):
        csv_path = file_path_template.format(conversation_id=conversation_id, max_pause=max_pause)
        try:
            df = pd.read_csv(csv_path)
            df['conversation_id'] = conversation_id
            all_dfs.append(df)
        except Exception as e:
            print(f"File not found: {csv_path}")
            print(str(e))
    concatenated_df = pd.concat(all_dfs, axis=0, ignore_index=True)
    return concatenated_df

# Usage 

# Note: Adjust file_path_template to concatenate baseline or edit_history instead of NT transcripts
conv_ids = sorted(conversation_ids[:10])
max_pause = 1500
file_path_template = '{conversation_id}_natural_turn-{max_pause}.csv'
save_path = f'aws-2024-natural_turn-{max_pause}--candor.csv'
natural_turn_df = concatenate_transcripts_from_local(conv_ids, max_pause, file_path_template)
natural_turn_df.to_csv(save_path, index=None)