In [2]:
import os
import json
import glob

from server.models.power_model import PowerCall
from nlp.nltk_api import call_nltk_api, remove_punctuation

def jsonLoad(scores_json):
    with open(scores_json) as json_file:
        return json.load(json_file)

def jsonSave(save_json, file_path):
    with open(file_path, "w") as output_file:
        json.dump(save_json, output_file, indent=4, ensure_ascii=False, cls=NpEncoder)

def get_index_to_number_key_dict(input_dict, key):
    first_filter = input_dict.get(key)
    second_filter = {int(k.replace(key, '')): v if v else '|' for k, v in first_filter.items()}
    return dict(sorted(second_filter.items()))

def get_index_to_number_key_dict_for_phone(input_dict, phones, key):
    first_filter = input_dict.get(key)
    second_filter = {int(k.replace(key, '')): v for k, v in first_filter.items()}
    third_filter = {}
    print('second_filter: ', len(second_filter))
    print('phones: ', len(phones))
    for k, v in second_filter.items():
        pp = phones[k]
        third_filter[k] = v if pp != '|' else pp
    return dict(sorted(third_filter.items()))


class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return json.JSONEncoder.default(self, obj)


In [8]:
json_files_dir = '/home/jtlee/projects/data_process/EZAI_Championship2023/audio-annotator/save_json'
audio_dir = '/home/jtlee/projects/data_process/EZAI_Championship2023/audio-annotator/audio_data'
parsed_json_dir = './parsed_json'

json_files = glob.glob(os.path.join(json_files_dir, '**', '*.json'), recursive=True)

def phone_accuracy_score_adjust(phone_score):
    if phone_score:
        if float(phone_score) >= 0.0 and float(phone_score) < 5.0:
            return 0.0
        elif float(phone_score) > 5.0 and float(phone_score) <= 10.0:
            return 10.0
        elif float(phone_score) == 5.0:
            return 5.0
    return phone_score

for json_file in json_files:

    uttid = os.path.basename(json_file).strip().split('.')[0]

    print(':::::::::::::uttid: ', uttid)

    parsed_json_file_path = os.path.join(parsed_json_dir, f'{uttid}.parsed.json')
    # if os.path.exists(parsed_json_file_path):
    #     continue

    json_data = jsonLoad(json_file)

    # utterance level labels
    utterance_level_data = json_data.get("utt_score_annotations")
    word_level_data = json_data.get("word_score_annotatoins")
    phone_level_data = json_data.get("phone_score_annotations")
    alignment_information_file_path = os.path.join(audio_dir, uttid, f'{uttid}.json')
    alignment_information_data = jsonLoad(alignment_information_file_path)[uttid][uttid]

    prompt = alignment_information_data.get('prompt')
    stt = call_nltk_api(alignment_information_data.get('stt'))
    if stt is None:
        tmp_stt = alignment_information_data.get('stt')
        stt = remove_punctuation(tmp_stt).upper()

    power_alignment = PowerCall()
    aligner_collect = power_alignment.power_alignment_with_phone_sequence(
        prompt,
        stt,
        None,
        addback_stress=False
    )
    segment_ref_hyp_word_count_align = aligner_collect.get('segment_ref_hyp_word_count_align')

    word_ctm = alignment_information_data.get('word_ctm')
    phone_ctm = alignment_information_data.get('ctm')

    # scores
    ref_phones = segment_ref_hyp_word_count_align.get('ref_phones')
    word_ref = segment_ref_hyp_word_count_align.get('word_ref')
    word_stress_scores = get_index_to_number_key_dict(word_level_data, 's_w_str_anno_')
    word_accuracy_scores = get_index_to_number_key_dict(word_level_data, 's_w_acc_anno_')
    word_total_scores = get_index_to_number_key_dict(word_level_data, 's_w_tol_anno_')
    phone_accuracy_scores = get_index_to_number_key_dict_for_phone(phone_level_data, ref_phones, 's_p_acc_anno_')

    # groups
    phone_acc_scores_group_by_word = power_alignment.split_items_between_pipes(list(phone_accuracy_scores.values()))[1:]
    phone_group_by_word = power_alignment.split_items_between_pipes(ref_phones)[1:]
    max_word_count = power_alignment.count_items_between_pipes(ref_phones)

    assert len(phone_accuracy_scores) == len(ref_phones)

    word_data = []
    for word_count_id, (word, word_stress_score, word_accuracy_score, word_total_score) in enumerate(zip(word_ref, word_stress_scores.values(), word_accuracy_scores.values(), word_total_scores.values())):

        if word == power_alignment.filler:
            continue

        if word_count_id <= len(max_word_count) - 1:
            phone_acc_scores = phone_acc_scores_group_by_word[word_count_id]
            print('phone_acc_scores: ', phone_acc_scores)
            phone_acc_scores = [phone_accuracy_score_adjust(ps) for ps in phone_acc_scores]
            phone_acc_scores = [float(ps)/5 if ps else ps for ps in phone_acc_scores_group_by_word[word_count_id]]

            phones = phone_group_by_word[word_count_id]
            new_phones, new_phone_acc_scores = [], []
            for p, p_score in zip(phones, phone_acc_scores):
                if not p:
                    continue
                if p == power_alignment.blank:
                    continue
                new_phones.append(p)
                new_phone_acc_scores.append(p_score)

            word_data.append(
                {
                    "accuracy": float(word_accuracy_score),
                    "stress": float(word_stress_score),
                    "total": float(word_total_score),
                    "text": word,
                    "phones-accuracy": new_phone_acc_scores,
                    "phones": new_phones,
                }
            )

    data = {
        "accuracy": float(utterance_level_data.get('utt_accuracy')),
        "completeness": float(utterance_level_data.get('utt_completeness')),
        "fluency": float(utterance_level_data.get('utt_fluency')),
        "prosodic": float(utterance_level_data.get('utt_prosodic')),
        "total": float(utterance_level_data.get('utt_total')),
        "text": alignment_information_data.get('prompt'),
        "words": word_data,
    }

    jsonSave(data, parsed_json_file_path)


:::::::::::::uttid:  3962535
word_ref:  ['I', 'OFTEN', 'GO', 'THERE', 'WITH', 'MY', 'FRIENDS']
word_hyp:  ['I', 'OFTEN', 'COME', 'THERE', 'WITH', 'MY', 'FRIENDS']
if info[phone_align] is None:
B
['|', 'ay', '|', 'ao', 'f', 'ah', 'n', '|', 'g', 'ow', '|', 'dh', 'eh', 'r', '|', 'w', 'ih', 'dh', '|', 'm', 'ay', '|', 'f', 'r', 'eh', 'n', 'd', 'z', '|']
['|', 'ay', '|', 'ao', 'f', 'ah', 'n', '|', 'k', 'ah', 'm', '|', 'dh', 'eh', 'r', '|', 'w', 'ih', 'dh', '|', 'm', 'ay', '|', 'f', 'r', 'eh', 'n', 'd', 'z', '|']
[]
[]
[]
deque([(9, 6)])
(9, 6)
(8, 5)
(8, 6)
(7, 5)
(7, 6)
(6, 4)
(6, 5)
(6, 6)
(5, 3)
(5, 4)
(5, 5)
(5, 5)
(4, 2)
(4, 3)
(4, 4)
(4, 4)
(3, 2)
(3, 3)
(3, 3)
(2, 2)
(2, 2)
(1, 1)
(0, 0)
if len(new_aligner_collect) == 1:
new_aligner_collect['segment_ref_hyp_word_count_align']['ref_phones']: ['|', 'ay', '|', 'ao', 'f', 'ah', 'n', '|', '<blank>', 'g', 'ow', '|', 'dh', 'eh', 'r', '|', 'w', 'ih', 'dh', '|', 'm', 'ay', '|', 'f', 'r', 'eh', 'n', 'd', 'z', '|']
new_aligner_collect['segment_r