In [None]:
import logging
from datetime import datetime

current_file_name = "7_Elaborations_Transcripts"

dt_string = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = f"logs/{current_file_name}/{dt_string}.log"
logging.basicConfig(level=logging.INFO, filename=log_file,filemode="w", format="%(asctime)s %(levelname)s %(message)s")

# https://blog.sentry.io/logging-in-python-a-developers-guide/

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import sys

import argparse
import io

from google.cloud import speech

import grpc

from openai import OpenAI

In [None]:
from helpers.pages import *
from helpers.constants import *
from helpers.utils import *

In [None]:
pd.set_option('display.max_columns', 500)

In [None]:
with open("tokens/openai_key.txt", "r") as file:
    OPENAI_API_KEY = file.read().rstrip()

# Set environment variable
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

client = OpenAI()

In [None]:
GLOBAL_MODE = "openai"
# GLOBAL_MODE = "google"

In [None]:
if GLOBAL_MODE == "google":
    GLOBAL_FORMAT = ".aac"
elif GLOBAL_MODE == "openai":
    GLOBAL_FORMAT = ".wav"

In [None]:
def get_dict_of_paths(root_path):
    # There are folders in the root path named after the respondents
    # Each of these folders contains the audio files in aac/wav format
    # Create dictionary with the paths to the audio files, where the key is subfolder name and the value is the list of audio files

    dict_of_paths = {}
    for root, dirs, files in os.walk(root_path):
        if len(files) > 0:
            # Only keep the audio files
            files = [f for f in files if f.endswith(GLOBAL_FORMAT)]
            # Full path to the audio files
            files = [os.path.join(root, f) for f in files]
            
            folder_name = root.split("\\")[-1]
            dict_of_paths[folder_name] = files
    return dict_of_paths

In [None]:
extracted_recordings_fg_path = "data\\6_Elaborations_Extraction\\FG"
extracted_recordings_h_path = "data\\6_Elaborations_Extraction\\H"

In [None]:
fg_paths = get_dict_of_paths(extracted_recordings_fg_path)
h_paths = get_dict_of_paths(extracted_recordings_h_path)

In [None]:
@timer
def transcribe_onprem_google(local_file_path: str):
    logging.info(f"Transcribing {local_file_path}")

    # The language of the supplied audio
    language_code = "en-GB"

    # Sample rate in Hertz of the audio data sent
    sample_rate_hertz = 48000

    # Encoding of audio data sent. This sample sets this explicitly.
    # This field is optional for FLAC and WAV audio formats.
    encoding = speech.RecognitionConfig.AudioEncoding.MP3

    config = {
        "encoding": encoding, # only when not using FLAC or WAV
        "sample_rate_hertz": sample_rate_hertz, # only when not using FLAC or WAV
        "language_code": language_code,
        "profanity_filter": False,
        "enable_word_time_offsets": True,
        "enable_word_confidence": True,
        "enable_automatic_punctuation": False,
        "model": "latest_long",
    }
    with io.open(local_file_path, "rb") as f:
        content = f.read()
    audio = {"content": content}

    client = speech.SpeechClient()
    response = client.recognize(request={"config": config, "audio": audio})

    # First alternative is the most probable result
    concatenated_transcript = " ".join([result.alternatives[0].transcript for result in response.results])

    logging.info(f"Transcription of {local_file_path} complete")

    return response, concatenated_transcript


In [None]:
@timer
def transcribe_onprem_openai(local_file_path: str):
    logging.info(f"Transcribing {local_file_path}")

    audio_file = open(local_file_path, "rb")

    transcript = client.audio.transcriptions.create(
        file=audio_file,
        model="whisper-1",
        language="en",
        response_format="verbose_json",
        temperature=0.0, 
        timestamp_granularities=["word", "segment"],
        prompt="Umm, let me think like, hmm... Okay, here's what I'm, like, thinking. Uh. Um. Well. Er. Ah. You know, like. Erm."
    )

    logging.info(f"Transcription of {local_file_path} complete")

    return transcript, transcript.text

In [None]:
def get_transcript(path_dict, variant, fixing=[]):
    # For each respondent, transcribe all the audio files and save the transcript
    for respondent, paths in path_dict.items():
        logging.info(f"Transcribing {respondent}")
        respondent_path = f"data\\7_Elaborations_Transcripts\\{variant}\\{respondent}"

        # Get all elements from fixing that has as their first element the respondent
        fixes = [f for f in fixing if f[0] == respondent]

        if len(fixes) == 0:
            if os.path.exists(respondent_path):
                logging.info(f"Folder {respondent_path} already exists")
                continue
            else:
                os.makedirs(respondent_path, exist_ok=True)

        for path in paths:
            skip = True

            if fixes != []:
                skip = True

                for fix in fixes:
                    if fix[1] in path:
                        skip = False
                        print(f"Fixing {path}")
            else:
                skip = False

            if skip:
                continue

            logging.info(f"Transcribing {path} using {GLOBAL_MODE}")

            if GLOBAL_MODE == "google":
                response, transcript = transcribe_onprem_google(path)
            if GLOBAL_MODE == "openai":
                response, transcript = transcribe_onprem_openai(path)

            file_name_transcript = path.split("\\")[-1].replace(GLOBAL_FORMAT, ".txt")

            file_name_response = path.split("\\")[-1].replace(GLOBAL_FORMAT, "_response.json")

            transcript_path = os.path.join(respondent_path, file_name_transcript)
            response_path = os.path.join(respondent_path, file_name_response)

            with open(transcript_path, "w") as f:
                # Sanitaze transcript to remove \ufffd
                transcript = transcript.replace("\ufffd", "")
                f.write(transcript)

            if GLOBAL_MODE == "google":
                with open(response_path, "w") as f:
                    f.write(str(response))
            if GLOBAL_MODE == "openai":
                with open(response_path, "w") as f:

                    try:
                        f.write(response.model_dump_json())
                    except:
                        dump = response.model_dump_json()
                        dump = dump.replace("\ufffd", "")
                        f.write(dump)


In [None]:
fg_fix_1 = [['respondent_104', 'elaboration_3_2'],
            ['respondent_106', 'elaboration_1_2'],
            ['respondent_106', 'elaboration_4_1'],
            ['respondent_12', 'elaboration_2_2'],
            ['respondent_12', 'elaboration_3_2'],
            ['respondent_21', 'elaboration_3_1'],
            ['respondent_25', 'elaboration_5_2'],
            ['respondent_31', 'elaboration_4_1'],
            ['respondent_35', 'elaboration_3_2'],
            ['respondent_35', 'elaboration_4_1'],
            ['respondent_35', 'elaboration_4_2'],
            ['respondent_37', 'elaboration_2_2'],
            ['respondent_38', 'elaboration_4_2'],
            ['respondent_45', 'elaboration_1_1'],
            ['respondent_45', 'elaboration_2_1'],
            ['respondent_45', 'elaboration_2_2'],
            ['respondent_45', 'elaboration_3_1'],
            ['respondent_54', 'elaboration_3_2']]

h_fix_1 = [['respondent_107', 'elaboration_1_2'],
           ['respondent_110', 'elaboration_3_1'],
           ['respondent_110', 'elaboration_3_2'],
           ['respondent_22', 'elaboration_2_2'],
           ['respondent_22', 'elaboration_4_2'],
           ['respondent_29', 'elaboration_5_1'],
           ['respondent_35', 'elaboration_5_1'],
           ['respondent_42', 'elaboration_4_2'],
           ['respondent_47', 'elaboration_1_1'],
           ['respondent_47', 'elaboration_1_2'],
           ['respondent_47', 'elaboration_3_2'],
           ['respondent_47', 'elaboration_4_1'],
           ['respondent_48', 'elaboration_1_2'],
           ['respondent_48', 'elaboration_3_2'],
           ['respondent_50', 'elaboration_2_2'],
           ['respondent_55', 'elaboration_2_1'],
           ['respondent_57', 'elaboration_5_2'],
           ['respondent_58', 'elaboration_4_2'],
           ['respondent_8', 'elaboration_2_2'],
           ['respondent_8', 'elaboration_3_1'],
           ['respondent_8', 'elaboration_3_2'],
           ['respondent_8', 'elaboration_5_2']]

In [None]:
fg_fix_2 = [['respondent_35', 'elaboration_4_2'],
            ['respondent_45', 'elaboration_2_1']]


h_fix_2 = [['respondent_29', 'elaboration_5_1'],
           ['respondent_47', 'elaboration_1_1'],
           ['respondent_47', 'elaboration_1_2'],
           ['respondent_47', 'elaboration_3_2'],
           ['respondent_47', 'elaboration_4_1'],
           ['respondent_55', 'elaboration_2_1'],
           ['respondent_8', 'elaboration_3_2']]

In [None]:
if GLOBAL_MODE == "google":
    get_transcript(fg_paths, "FG_Google")
if GLOBAL_MODE == "openai":
    get_transcript(fg_paths, "FG", fg_fix_2)


In [None]:
if GLOBAL_MODE == "google":
    get_transcript(h_paths, "H_Google")
if GLOBAL_MODE == "openai":
    get_transcript(h_paths, "H", h_fix_2)