Only the phone/word-level mic (production) TextGrids need to be force-aligned. The rest can be mostly automated using a match filter and fuzzy string matching. That's what this notebook does, it makes:

* Sentence-level production TextGrids
* Phone/word/sentence-level perception TextGrids

**Please manually check your forced-aligned phone/word mic TextGrids for accuracy before creating these.** If you skip this step, your results will look terrible. Also, add a third tier to the phone/word mic TextGrids named "task" that denotes when audio is part of the task and when it's off-task banter between the participant and researchers/medical staff. (The TextGrids provided in this repo are all accurate) 

In [1]:
import os
import numpy as np
import csv
import re
import sys
sys.path.append("./")
import textgrid
from fuzzywuzzy import fuzz
from praatio import tgio
import warnings
import scipy

In [None]:
# Local paths, please update accordingly
git_path = '/path/to/git/kurteff2024_code/'
data_path = '/path/to/bids/dataset/'

In [None]:
# Change these values accordingly
subj = "TCH14"
block = "B12"

### Production (mic) sentence-level grids
You will need:

1. Accurate phone/word-level mic TextGrids

In [None]:
# Read sentences from log file
blockid = "_".join([subj,block])
transcript_folder = os.path.join(git_path,"preprocessing","events","transcripts",subj,blockid)
log_path = os.path.join(git_path,"preprocessing","events","logfiles",f"{blockid}.txt")
if os.path.isfile(log_path) and os.path.isdir(transcript_folder):
    transcript_path = os.path.join(transcript_folder,f"{blockid}_mic.txt")
    if os.path.isfile(transcript_path):
        with open(log_path,'r') as f:
            next(f), next(f), next(f) # Skip the header
            d = csv.DictReader(f,delimiter="\t")
            sentences = np.array(
                [[r['MOCHARead']] + ['{NS}','{NS}'] for r in d if r['TrialPart']=='readRepeat'])

In [None]:
fuzzy_thresh = 75 # % match for the fuzzy text matching package. Adjust if you're having issues
match_sentences = np.array([re.sub(r'[^\w\s]','',s).upper() for s in sentences[:,0]])
n_sentences = match_sentences.shape[0]
phone_tg_fpath = os.path.join(
    git_path,"preprocessing","events","textgrids",subj,blockid,f"{blockid}_mic.TextGrid")
sentence_tg_fpath = os.path.join(
    git_path,"preprocessing","events","textgrids",subj,blockid,f"{blockid}_mic_sentence.TextGrid")
if os.path.exists(sentence_tg_fpath):
    print("Mic sentence TextGrid already exists")
else:
    with open(phone_tg_fpath,'r') as f:
        phone_tg = textgrid.TextGrid(f.read())
        task_times = phone_tg.tiers[2].simple_transcript
    word_transcript = np.array(phone_tg.tiers[1].simple_transcript)
    word_onsets = word_transcript[:,0].astype(float); word_offsets = word_transcript[:,1].astype(float)
    words = np.array([re.sub(r'[^\w\s]','',w) for w in word_transcript[:,2]])
    nonword_inds = np.array([i for i,w in enumerate(words) if w in ['NS','sp','CG','LG','BR','SL','LS']])
    word_onsets = np.delete(word_onsets,nonword_inds); word_offsets = np.delete(word_offsets,nonword_inds)
    words = np.delete(words,nonword_inds)
    task_onsets = np.array([float(r[0]) for r in task_times if r[2] == 'task'])
    task_offsets = np.array([float(r[1]) for r in task_times if r[2] == 'task'])
    task_times = np.array([[t, task_offsets[i]] for i,t in enumerate(task_onsets)])
    task_inds = np.hstack((
        [[i for i,o in enumerate(
            word_onsets) if o >= interval[0] and o < interval[1]] for interval in task_times]
    ))
    words = words[task_inds]
    word_onsets = word_onsets[task_inds]; word_offsets = word_offsets[task_inds]; first_word = word_onsets[0]
    with open(log_path,'r') as f:
        next(f), next(f), next(f)
        d = csv.DictReader(f,delimiter='\t')
        sentence_onsets = np.array([[r['Time']] for r in d if r['TrialPart']=='readRepeat']).astype(float)
    sentence_onsets = (sentence_onsets - sentence_onsets[0] + first_word).squeeze()
    file_time = 0.; all_match_onsets, all_match_offsets, all_match_transcriptions = [], [], []
    for i,sen in enumerate(match_sentences):
        sentence_complete = False; first_two_words = sen.split()[:2]; last_two_words = sen.split()[-2:]
        for ii, word in enumerate(words):
            if word_onsets[ii] >= file_time and word_offsets[ii] <= file_time + 50:
                if ii == 0:
                    prev_word = ''; next_word = words[ii+1]
                elif ii >= words.shape[0]-1:
                    prev_word = words[ii-1]; next_word = ''
                else:
                    prev_word = words[ii-1]; next_word = words[ii+1]
                if fuzz.ratio(' '.join([word,next_word]), ' '.join(first_two_words)) > fuzzy_thresh:
                    match_sentence_onset = word_onsets[ii]; sen_onset_idx = ii
                if (ii>0) & (fuzz.ratio(' '.join([prev_word,word]), ' '.join(last_two_words)) > fuzzy_thresh):
                    if not sentence_complete:
                        sen_offset_idx = ii
                        matched_sen = words[sen_onset_idx:sen_offset_idx+1]
                        matched_sen_onset = word_onsets[sen_onset_idx]
                        matched_sen_offset = word_offsets[sen_offset_idx]
                        sentence_complete = True; file_time = matched_sen_offset
        if not sentence_complete:
            # Try to get the sentence end another way. This might trigger when what the participant said
            # deviates heavily from the stimulus (i.e., they made a lot of errors while reading).
            approx_sentence_onset = sentence_onsets[i]-10; approx_sentence_offset = sentence_onsets[i]+40
            approx_word_matches = np.intersect1d(np.where(word_onsets >= approx_sentence_onset)[0],
                np.where(word_offsets <= approx_sentence_offset)[0])
            fuzz_onset, fuzz_offset = [], []
            for ii in approx_word_matches:
                if ii == 0:
                    prev_word = ''; next_word = words[ii+1]
                elif ii >= words.shape[0]-1:
                    prev_word = words[ii-1]; next_word = ''
                else:
                    prev_word = words[ii-1]; next_word = words[ii+1]
                fuzz_onset.append(fuzz.ratio(' '.join([words[ii], next_word]), ' '.join(first_two_words)))
                fuzz_offset.append(fuzz.ratio(' '.join([prev_word, words[ii]]), ' '.join(last_two_words)))
            sen_onset_idx = approx_word_matches[np.array(fuzz_onset).argmax()]
            sen_offset_idx = approx_word_matches[np.array(fuzz_offset).argmax()]
            matched_sen = words[sen_onset_idx:sen_offset_idx+1]
            matched_sen_onset = word_onsets[sen_onset_idx]; matched_sen_offset = word_offsets[sen_offset_idx]
            file_time = matched_sen_offset
            warnings.warn(f"""
                Couldn't find a match for sentence {sen} using standard procedure.
                The likely cause is the participant made too many errors for fuzzy matching to work.

                After trying an alternate matching method, the best matched sentence transcription is:
                {' '.join(matched_sen)}
                Inserting sentence '{' '.join(matched_sen)}' in sentence TextGrid between
                {matched_sen_onset} s and {matched_sen_offset} s.

                Please manually confirm this is accurate!
            """)
        all_match_onsets.append(matched_sen_onset); all_match_offsets.append(matched_sen_offset)
        all_match_transcriptions.append(' '.join(matched_sen))
    sentence_textgrid = []
    for i, s in enumerate(all_match_transcriptions):
        if i == 0:
            sentence_textgrid.append([0., all_match_onsets[i], 'sp'])
        sentence_textgrid.append([all_match_onsets[i], all_match_offsets[i], s])
        if i == len(all_match_transcriptions)-1:
            sentence_textgrid.append([all_match_offsets[i], int(word_transcript[-1][1]), 'sp'])
        else:
            sentence_textgrid.append([all_match_offsets[i], all_match_onsets[i+1], 'sp'])
    output_tg = tgio.Textgrid(); sentence_tier = tgio.IntervalTier('sentences', sentence_textgrid)
    output_tg.addTier(sentence_tier); output_tg.save(sentence_tg_fpath)

### Perception (spkr) sentence-level TextGrids
You will need:

1. Accurate sentence-level production TextGrids
2. Click eventfiles

In [None]:
corr_thresh = 0.5 # adjust
spkr_wav_fpath = os.path.join(data_path,f"sub-{subj}",blockid,"audio",f"{blockid}_spkr.wav")
mic_wav_fpath = os.path.join(data_path,f"sub-{subj}",blockid,"audio",f"{blockid}_mic.wav")
mic_sentence_tg_fpath = os.path.join(git_path,"preprocessing","events","textgrids",subj,blockid,
                                     f"{blockid}_mic_sentence.TextGrid")
spkr_fs, spkr_audio = scipy.io.wavfile.read(spkr_wav_fpath)
if spkr_fs != 11025:
    spkr_audio = scipy.signal.resample(spkr_audio, int((spkr_audio.shape[0]/spkr_fs)*11025))[:,0]
    scipy.io.wavfile.write(spkr_path, 11025, spkr_audio); spkr_fs = 11025
else:
    print("Spkr audio already at 11025 Hz.")
click_events = np.loadtxt(os.path.join(git_path,"preprocessing","events","csv",subj,blockid,
                                       f"{blockid}_click_eve.txt"))
for click in click_events:
    click_onset = int(click[0]*spkr_fs); click_offset = int(click[1]*spkr_fs)
    spkr_audio[click_onset:click_offset] = 0
mic_fs, mic_audio = scipy.io.wavfile.read(mic_wav_fpath)
spkr_audio = spkr_audio/spkr_audio.max(); mic_audio = mic_audio/mic_audio.max()
with open(mic_sentence_tg_fpath) as f:
    sen_tg = textgrid.TextGrid(f.read())
mic_sentence_grid = np.array([s for s in sen_tg.tiers[0].simple_transcript if s[2] != 'sp'])
spkr_match_onsets, spkr_match_offsets, spkr_match_transcriptions = [], [], []
for i,mic_sentence in enumerate(mic_sentence_grid):
    if playback_condition[i] == 'echolalia':
        if fuzz.ratio(mic_sentence[2], mic_sentence_grid[0][2]) > 90:
            print(f"Skipping alignment for sentence {mic_sentence[2]}")
        else:
            mic_sentence_onset = int(float(mic_sentence[0])*mic_fs)
            mic_sentence_offset = int(float(mic_sentence[1])*mic_fs)
            transcription = mic_sentence[2]
            mic_sentence_clip = mic_audio[mic_sentence_onset:mic_sentence_offset]
            matches = match_filter(mic_sentence_clip, spkr_audio, spkr_fs,
                                   corr_thresh=corr_thresh, nreps=2, debug=True)
            for match in matches[0]:
                spkr_match_onsets.append(match[0]); spkr_match_offsets.append(match[1])
                spkr_match_transcriptions.append(transcription)
event_order = np.argsort(np.array(spkr_match_onsets).astype(float))
spkr_matches = np.vstack((spkr_match_onsets,spkr_match_offsets,spkr_match_transcriptions)).T[event_order]
spkr_sentence_tg_fpath = mic_sentence_tg_fpath.replace('mic','spkr') 
if os.path.isfile(spkr_sentence_tg_fpath):
    print("Spkr sentence TextGrid already exists.")
else:
    spkr_sentence_textgrid = []
    for i, s in enumerate(spkr_matches[:,2]):
        if i == 0:
            spkr_sentence_textgrid.append([0., spkr_matches[i,0], 'sp'])
        spkr_sentence_textgrid.append([spkr_matches[i,0], spkr_matches[i,1], s])
        if i == len(spkr_matches)-1:
            spkr_sentence_textgrid.append([spkr_matches[i,1], spkr_audio.shape[0]/spkr_fs, 'sp'])
        else:
            spkr_sentence_textgrid.append([spkr_matches[i,1], spkr_matches[i+1,0], 'sp'])
    output_tg = tgio.Textgrid(); sentence_tier = tgio.IntervalTier('sentences', spkr_sentence_textgrid)
    output_tg.addTier(sentence_tier); output_tg.save(spkr_sentence_tg_fpath)

### Perception (spkr) word and phone-level TextGrids

You will need:

1. Accurate sentence-level perception TextGrids

In [None]:
mic_ph_event_fpath = os.path.join(git_path,"preprocessing","events","csv",subj,blockid,
                                  f"{blockid}_mic_ph_all.txt")
mic_wr_event_fpath = os.path.join(git_path,"preprocessing","events","csv",subj,blockid,
                                  f"{blockid}_mic_wr_all.txt")
with open(spkr_sentence_tg_fpath) as f:
    spkr_sen_tg = textgrid.TextGrid(f.read())
spkr_sentence_grid = np.array([s for s in spkr_sen_tg.tiers[0].simple_transcript if s[2] != 'sp'])
mic_ph_events = np.loadtxt(mic_ph_event_fpath, dtype=str, delimiter="\t")
mic_ph_onsets = mic_ph_events[:,0].astype(float); mic_ph_offsets = mic_ph_events[:,1].astype(float)
mic_ph_transcripts = mic_ph_events[:,3]
mic_wr_events = np.loadtxt(mic_wr_event_fpath, dtype=str, delimiter="\t")
mic_wr_onsets = mic_wr_events[:,0].astype(float); mic_wr_offsets = mic_wr_events[:,1].astype(float)
mic_wr_transcripts = mic_wr_events[:,3]
spkr_phone_tier, spkr_word_tier = [], []
spkr_phone_tier.append([0., float(spkr_sentence_grid[0][0]), 'sp'])
spkr_word_tier.append([0., float(spkr_sentence_grid[0][0]), 'sp'])
for i, sen in enumerate(spkr_sentence_grid):
    spkr_onset = float(sen[0]); spkr_offset = float(sen[1]); transcript = sen[2]
    mic_match_idx = np.where(np.array([s[2] for mi,s in enumerate(
        mic_sentence_grid) if playback_condition[mi] == 'echolalia'])==transcript)[0][0]
    mic_match_onset = float(mic_sentence_grid[mic_match_idx,0])
    mic_match_offset = float(mic_sentence_grid[mic_match_idx,1])
    mic_sen_ph_matches = np.intersect1d(np.where(mic_ph_onsets >= mic_match_onset)[0],
        np.where(mic_ph_offsets <= mic_match_offset)[0])
    spkr_ph_onsets = (mic_ph_onsets[mic_sen_ph_matches] - mic_ph_onsets[mic_sen_ph_matches][0]) + spkr_onset
    spkr_ph_offsets = (mic_ph_offsets[mic_sen_ph_matches] - mic_ph_onsets[mic_sen_ph_matches][0]) + spkr_onset
    for si, mi in enumerate(mic_sen_ph_matches):
        spkr_phone_tier.append([spkr_ph_onsets[si], spkr_ph_offsets[si], mic_ph_transcripts[mi]])
    mic_sen_wr_matches = np.intersect1d(
        np.where(mic_wr_onsets >= mic_match_onset)[0],
        np.where(mic_wr_offsets <= mic_match_offset)[0]
    )
    spkr_wr_onsets = (mic_wr_onsets[mic_sen_wr_matches] - mic_wr_onsets[mic_sen_wr_matches][0]) + spkr_onset
    spkr_wr_offsets = (mic_wr_offsets[mic_sen_wr_matches] - mic_wr_onsets[mic_sen_wr_matches][0]) + spkr_onset
    for si, mi in enumerate(mic_sen_wr_matches):
        spkr_word_tier.append([spkr_wr_onsets[si], spkr_wr_offsets[si], mic_wr_transcripts[mi]])
spkr_phone_tier.append([float(spkr_sentence_grid[-1][0]), spkr_audio.shape[0]/spkr_fs, 'sp'])
spkr_word_tier.append([float(spkr_sentence_grid[-1][0]), spkr_audio.shape[0]/spkr_fs, 'sp'])
spkr_phone_tg_fpath = phone_tg_fpath.replace('mic','spkr')
if os.path.isfile(spkr_phone_tg_fpath):
    print("Spkr phone/word TextGrid already exists.")
else:
    output_tg = tgio.Textgrid()
    phone_tier = tgio.IntervalTier('phone', spkr_phone_tier); word_tier = tgio.IntervalTier('word', spkr_word_tier)
    output_tg.addTier(phone_tier); output_tg.addTier(word_tier)
    output_tg.save(spkr_phone_tg_fpath)
    with open(spkr_phone_tg_fpath) as f:
        spkr_tg = textgrid.TextGrid(f.read())
    spkr_phone_tg = spkr_tg.tiers[0].simple_transcript
    spkr_phones = []; for row in spkr_phone_tg:
        if row[2] == '':
            spkr_phones.append([row[0], row[1], 'sp'])
        else:
            spkr_phones.append([row[0], row[1], row[2]])
    spkr_word_tg = spkr_tg.tiers[1].simple_transcript
    spkr_words = []; for row in spkr_word_tg:
        if row[2] == '':
            spkr_words.append([row[0], row[1], 'sp'])
        else:
            spkr_words.append([row[0], row[1], row[2]])
    output_tg = tgio.Textgrid()
    phone_tier = tgio.IntervalTier('phone', spkr_phones); word_tier = tgio.IntervalTier('word', spkr_words)
    output_tg.addTier(phone_tier); output_tg.addTier(word_tier); output_tg.save(spkr_phone_tg_fpath)