In [20]:
import os
import re
import csv
import json
import time
import random
import Levenshtein
from phonecodes import phonecodes
import azure.cognitiveservices.speech as speechsdk

In [21]:
def recognize_from_audio():
    # This example requires environment variables named "SPEECH_KEY" and "SPEECH_REGION"
    speech_config = speechsdk.SpeechConfig(subscription=os.environ.get('SPEECH_KEY'), region=os.environ.get('SPEECH_REGION'))
    speech_config.speech_recognition_language="en-US"
    speech_config.set_service_property("speechsdk.PropertyId.Speech_SegmentationSilenceTimeoutMs", str(3 * 1000), speechsdk.ServicePropertyChannel.UriQueryParameter)

    # audio_config = speechsdk.audio.AudioConfig(use_default_microphone=True)
    audio_config = speechsdk.audio.AudioConfig(filename="split-recordings/phrase_van_recast_306.wav")
    speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config)

    print("Speak into your microphone.")
    speech_recognition_result = speech_recognizer.recognize_once_async().get()

    if speech_recognition_result.reason == speechsdk.ResultReason.RecognizedSpeech:
        print("Recognized: {}".format(speech_recognition_result.text))
    elif speech_recognition_result.reason == speechsdk.ResultReason.NoMatch:
        print("No speech could be recognized: {}".format(speech_recognition_result.no_match_details))
    elif speech_recognition_result.reason == speechsdk.ResultReason.Canceled:
        cancellation_details = speech_recognition_result.cancellation_details
        print("Speech Recognition canceled: {}".format(cancellation_details.reason))
        if cancellation_details.reason == speechsdk.CancellationReason.Error:
            print("Error details: {}".format(cancellation_details.error_details))
            print("Did you set the speech resource key and region values?")

In [22]:
def speech_recognize_once(filename):
    # This example requires environment variables named "SPEECH_KEY" and "SPEECH_REGION"
    speech_config = speechsdk.SpeechConfig(subscription=os.environ.get('SPEECH_KEY'), region=os.environ.get('SPEECH_REGION'))
    speech_config.speech_recognition_language="en-US"

    audio_config = speechsdk.audio.AudioConfig(filename=filename)

    speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, language="en-US", audio_config=audio_config)

    # pronunciation_config = speechsdk.PronunciationAssessmentConfig( 
    # reference_text="", 
    # grading_system=speechsdk.PronunciationAssessmentGradingSystem.HundredMark, 
    # granularity=speechsdk.PronunciationAssessmentGranularity.Phoneme, 
    # enable_miscue=False)

    pronunciation_assessment_config = speechsdk.PronunciationAssessmentConfig(json_string="{\"referenceText\":\"\",\"gradingSystem\":\"HundredMark\",\"granularity\":\"Phoneme\",\"phonemeAlphabet\":\"IPA\"}")

    pronunciation_assessment_config.enable_prosody_assessment()

    # speech_recognizer = speechsdk.SpeechRecognizer(
    #     speech_config=speech_config, \
    #     audio_config=audio_config)
    # (Optional) get the session ID
    # speech_recognizer.session_started.connect(lambda evt: print(f"SESSION ID: {evt.session_id}"))
    pronunciation_assessment_config.apply_to(speech_recognizer)
    speech_recognition_result = speech_recognizer.recognize_once()
    # The pronunciation assessment result as a Speech SDK object
    pronunciation_assessment_result = speechsdk.PronunciationAssessmentResult(speech_recognition_result)
    # The pronunciation assessment result as a JSON string
    pronunciation_assessment_result_json = speech_recognition_result.properties.get(speechsdk.PropertyId.SpeechServiceResponse_JsonResult)
    
    return pronunciation_assessment_result_json

In [23]:
def speech_recognize_continuous_from_file(filename):
    """performs continuous speech recognition with input from an audio file"""
    # <SpeechContinuousRecognitionWithFile>
    speech_config = speechsdk.SpeechConfig(subscription=os.environ.get('SPEECH_KEY'), region=os.environ.get('SPEECH_REGION'))
    speech_config.speech_recognition_language="en-US"
    audio_config = speechsdk.audio.AudioConfig(filename=filename)

    speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config)

    pronunciation_assessment_config = speechsdk.PronunciationAssessmentConfig(json_string="{\"referenceText\":\"\",\"gradingSystem\":\"HundredMark\",\"granularity\":\"Phoneme\",\"phonemeAlphabet\":\"IPA\"}")

    pronunciation_assessment_config.enable_prosody_assessment()
    pronunciation_assessment_config.apply_to(speech_recognizer)

    done = False

    def stop_cb(evt: speechsdk.SessionEventArgs):
        """callback that signals to stop continuous recognition upon receiving an event `evt`"""
        print('CLOSING on {}'.format(evt))
        nonlocal done
        done = True
    
    all_results = []
    def handle_final_result(evt):
        nonlocal all_results
        all_results.append(evt.result.text)
    
    speech_recognizer.recognized.connect(handle_final_result)

    # Connect callbacks to the events fired by the speech recognizer
    speech_recognizer.recognizing.connect(lambda evt: print('RECOGNIZING: {}'.format(evt)))
    speech_recognizer.recognized.connect(lambda evt: print('RECOGNIZED: {}'.format(evt)))
    speech_recognizer.session_started.connect(lambda evt: print('SESSION STARTED: {}'.format(evt)))
    speech_recognizer.session_stopped.connect(lambda evt: print('SESSION STOPPED {}'.format(evt)))
    speech_recognizer.canceled.connect(lambda evt: print('CANCELED {}'.format(evt)))
    # Stop continuous recognition on either session stopped or canceled events
    speech_recognizer.session_stopped.connect(stop_cb)
    speech_recognizer.canceled.connect(stop_cb)


    # Start continuous speech recognition
    speech_recognizer.start_continuous_recognition()
    while not done:
        time.sleep(.5)

    speech_recognizer.stop_continuous_recognition()
    # </SpeechContinuousRecognitionWithFile>
    return all_results

In [24]:
def transcribe_from_audiofile(filename):
    result = speech_recognize_once(filename)

    if not result:
        return ""
    
    result = json.loads(result)
    print(json.dumps(result, indent=2))
    
    phoneme_list = []
    if 'NBest' in result:
        for word in result['NBest'][0]['Words']:
            for phoneme in word['Phonemes']:
                phoneme_list.append(phoneme['Phoneme'])
            phoneme_list.append(" ")
    
    ipa_transcription = "".join(phoneme_list)
    print(ipa_transcription)
    return ipa_transcription

In [25]:
def phonetic_transcription(folderpath, fnames=None, write_to_csv=False):
    rows = []
    limit = 300
    cnt = 0

    # fnames = [f for f in os.listdir(folderpath) if f.endswith(".wav")]
    # print(len(fnames))

    if not fnames:
        fnames = os.listdir(folderpath)
    else:
        limit = len(fnames)
    
    for filename in fnames:
        if filename.endswith(".wav"):
            print(filename)
            filepath = os.path.join(folderpath, filename)
            transcription = transcribe_from_audiofile(filepath)
            if transcription.strip():
                rows.append({
                    "File": filename,
                    "Transcription": transcription
                })

            cnt += 1
            if cnt >= limit:
                break
            
            print(f"--> {cnt}/{limit}")
    
    if write_to_csv:
        # write to csv
        with open(os.path.join(folderpath, "out", "azure_transcriptions.csv"), "w") as f:
            writer = csv.DictWriter(f, fieldnames=rows[0].keys())
            writer.writeheader()
            writer.writerows(rows)
    
    return rows

In [26]:
# path = "./split-recordings"
# phonetic_transcription(folderpath=path, write_to_csv=True)

In [27]:
# recognize_from_audio()

In [28]:
# speech_recognize_once("split-recordings/phrase_game_307.wav")

In [29]:
# transcribe_from_audiofile("split-recordings/phrase_apple_310.wav")

In [30]:
# speech_recognize_continuous_from_file("split-recordings/word_wagon_305.wav")

In [31]:
def clean_csv_transcriptions(folderpath):
    rows = []
    for file in os.listdir(folderpath):
        if file.endswith(".csv"):
            with open(os.path.join(folderpath, file), "r", encoding="utf-8-sig") as f:
                reader = csv.DictReader(f)
                for row in reader:
                    # print(row)
                    # print(f"file {file}")
                    # print(row['File'])
                    # print(row['Transcription'])
                    # print(len(row['Transcription']))
                    # for c in row['Transcription']:
                    #     print(c)
                    
                    transc = re.sub(u'[\u0300-\u036f,\u02B0,\u1D4A,\u02B7,\u02B2, \uA71B]', '', row['Transcription'], flags=re.UNICODE)
                    transc = re.sub(r'[,:|]', '', transc)
                    
                    if '|' in row['Transcription']:
                        print("before:", row['Transcription'])
                        print("after:", transc)
                    
                    rows.append({
                        "File": row['File'],
                        "Transcription": transc
                    })
            
            with open(os.path.join(folderpath, "cleaned", f"{file}_cleaned.csv"), "w") as f:
                writer = csv.DictWriter(f, fieldnames=rows[0].keys())
                writer.writeheader()
                writer.writerows(rows)

In [32]:
clean_csv_transcriptions("./human_transcriptions")

before: |sɑk
after: sɑk
before: kɪt|sɪn
after: kɪtsɪn
before: kɪt|sɪn
after: kɪtsɪn


In [33]:
def read_csv_transcriptions(folderpath):
    rows = []
    for file in os.listdir(folderpath):
        if file.endswith(".csv"):
            with open(os.path.join(folderpath, file), "r", encoding="utf-8-sig") as f:
                reader = csv.DictReader(f)
                for row in reader:
                    print(row)
                    print(f"file {file}")
                    print(row['File'])
                    print(len(row['File']))
                    print(row['Transcription'])
                    print(len(row['Transcription']))
                    for c in row['Transcription']:
                        print(c)
                    
                    if row['File'].strip():
                        rows.append({
                            "File": row['File'] + ".wav",
                            "Transcription": row['Transcription'],
                            "rater": f"rater{file[0]}"
                        })
    
    return rows

In [34]:
def compare_transcriptions():
    path_human_transcriptions = './human_transcriptions/cleaned'
    rows = read_csv_transcriptions(path_human_transcriptions)
    fnames = {row['File'] for row in rows}
    print(len(rows))
    print(len(fnames))
    print(fnames)

    random_rater_transcriptions = dict()
    for row in rows:
        if row['File'] not in random_rater_transcriptions:
            random_rater_transcriptions[row['File']] = [row]
        elif random.random() > 0.5:
            random_rater_transcriptions[row['File']].append(row)


    random_subset = random.sample(list(fnames), 10)
    azure_transcriptions = phonetic_transcription('./split-recordings', fnames=random_subset)
    print(azure_transcriptions)

    comparison_rows = []
    for row in azure_transcriptions:
        filename = row['File']
        rand = random.randint(0, len(random_rater_transcriptions[filename])-1)
        
        ipa_azure_transcription = row['Transcription']
        ipa_human_transcription = random_rater_transcriptions[filename][rand]['Transcription']

        print("azure ipa:", ipa_azure_transcription)
        print("human ipa:", ipa_human_transcription)

        arpabet_azure_transcription = phonecodes.ipa2arpabet(ipa_azure_transcription).upper()
        arpabet_human_transcription = phonecodes.ipa2arpabet(ipa_human_transcription).upper()

        print("azure arpabet:", arpabet_azure_transcription)
        print("human arpabet:", arpabet_human_transcription)

        distance = Levenshtein.distance(arpabet_azure_transcription.strip().split(" "), arpabet_human_transcription.strip().split(" "))
        print(distance)

        comparison_rows.append({
            "file": filename,
            "randomRater": random_rater_transcriptions[filename][rand]['rater'],
            "transcription": ipa_human_transcription,
            "autoTranscription": ipa_azure_transcription,
            "transcription(arpabet)": arpabet_human_transcription,
            "autoTranscription(arpabet)": arpabet_azure_transcription,
            "MED": distance
        })
    
    print(len(comparison_rows))
    with open("comparison.csv", "w") as f:
        writer = csv.DictWriter(f, fieldnames=comparison_rows[0].keys())
        writer.writeheader()
        writer.writerows(comparison_rows)



In [None]:
compare_transcriptions()