In [1]:
import os
import sys
from collections import Counter, defaultdict
import tgt
from copy import deepcopy
from re import match, sub, findall, finditer
import glob
import spacy


In [20]:
root_dir = './'
word_level_timing = root_dir + 'word_level_timing'
motion_label = root_dir + 'motion_labels' 
original_annotation = root_dir + 'transcriptions_annotations'
lang = 'de'
target_dir = "./DUEL/{}".format(lang)
german_tagger = spacy.load("de_core_news_sm")

In [21]:
def get_all_textgrid_files(path):
    filenames = []
    for root, dirs, files in os.walk(path):
        for file in files:
            if file.endswith(".TextGrid"):
                filenames.append(os.path.join(root, file))
    return filenames
word_level_timing_annotation = get_all_textgrid_files(word_level_timing)
print(word_level_timing_annotation)


['./word_level_timing/r1A_wordlevel.TextGrid', './word_level_timing/r13A_wordlevel.TextGrid', './word_level_timing/r3A_wordlevel.TextGrid', './word_level_timing/r19B_wordlevel.TextGrid', './word_level_timing/r15B_wordlevel.TextGrid', './word_level_timing/r11B_wordlevel.TextGrid', './word_level_timing/r19A_wordlevel.TextGrid', './word_level_timing/r2B_wordlevel.TextGrid', './word_level_timing/r3B_wordlevel.TextGrid', './word_level_timing/r12B_wordlevel.TextGrid', './word_level_timing/r9A_wordlevel.TextGrid', './word_level_timing/r10A_wordlevel.TextGrid', './word_level_timing/r9B_wordlevel.TextGrid', './word_level_timing/r18A_wordlevel.TextGrid', './word_level_timing/r17B_wordlevel.TextGrid', './word_level_timing/r8B_wordlevel.TextGrid', './word_level_timing/r15A_wordlevel.TextGrid', './word_level_timing/r5A_wordlevel.TextGrid', './word_level_timing/r12A_wordlevel.TextGrid', './word_level_timing/r7B_wodlevel.TextGrid', './word_level_timing/r6B_wordleve.TextGrid', './word_level_timing/r5B

In [22]:
task_index = {
    1 : "dream_apartment",
    2: "film_script",
    3: "border_control"
             }

legal_tiers = {"A-utts" : [u"A", u"A-utts;"], 
               "B-utts" : [u"B", u"B-utts;", u"B_utts"], 
               "A-turns" : [u"A-turns;","A_turns"], 
               "B-turns" : [ u"B-turns;",u"B_turns", u"B-turns    "],
               "A-laughter" : [], 
               "B-laughter" : [u"B−laughter"],
               "A-en" : [u"A-eng", u"A-english",
                         u"A-fr_en", u"A-fr-en",
                         u"A-fr_en;",u"Translation A",
                         u"translation A", u"A translation", u"A Translation"], 
               "B-en" : [u"B-eng", u"B-english",
                         u"B-fr_en", u"B-fr_en;",
                         u"B_fr-en", u"Translation B", 
                         u"translation B", u"B translation",
                         u"B Translation", u"B-fr-en"],
               "Comments" : [u"Comments & questions",
                             u"comments", u"Problems"], 
               "Part" : [u"part"], 
               "O" : [u"E"]
              }

c = Counter()
missing_c = defaultdict(list)
global_tag_count = Counter()
log_file = open("{}_errors.log".format(lang), "w")

In [23]:
"""
Read textgrid function
"""
# simply : tg = tgt.read_textgrid(tg_path)

'\nRead textgrid function\n'

In [24]:
def clean_utt(utt, literal=False):
    if not literal:
        #replace variants, partial and misspoken words with standard spelling
        utt = sub("""<[vpm]="(.+?)">.+?</[vpm]>""", lambda m:m.group(1), utt)
        #remove fillers like "{F aehm}" entirely
        utt = sub("""{.*?}""", "", utt)
        
        #TO DO: resolve complex replacements like "(der + der) + die) Katze"
        
    else:
        #remove brackets from fillers, i.e. "{F aehm}" becomes "aehm"
        utt = sub("""{(.*?)}""",lambda m:m.group(1),utt)
    #remove all remaining xml-style tags    
    utt = sub("""<.*?>""","",utt)
    #remove open tags at the end of an utterance (can be removed once problems with the TextGrids are fixed)
    utt = sub("""<.*$""","",utt)
    #remove all remaining punctuation and brackets
    utt = sub("""[\.:;,\(\)\+\$]""","",utt)
    #remove whitespace at the beginning and end of an utterance
    utt = utt.strip()
    #replace any amount of whitespace with a single space
    utt = sub("""\s+"""," ",utt)
    return utt

In [25]:
"""
Methods to consume textgrids and convert to the disfluency
corpus style for consistency across different possible raw formats.

This file is distributed as part of DUEL corpus.
"""

# corpus, start_time deleted as parameters
# how to do the basic version? e rps and f
def disfluency_tags(utt):
    """returns the list of tags for each word (simply defined by split)
    and also the list of tags for boundaries (one more than the utt length) 
    for repair points and laughter bouts. NB problem is: the laughter bout itself is a word
    may in fact instead need to do this after we establish which words are proper words"""
    utt = utt.split()
    labels = ["",] * len(utt)
    boundaries = ["",] * (len(utt)+1) # where do we use this?
    inRepair = 0
    inFP = False # why does this start with True, changed to False
    inLS = False
    for i in range(0,len(utt)):
        word = utt[i]
        word_clean = clean_utt(word) # this is added
        if word_clean == "-": # this was "-"
            continue
        
        '''if "<laughter>" in word or "<laughter/>" in word:
            inLS = True'''
    
        if "<p" in word:
            labels[i] = "<f/>"
        for j in range(0,len(word)):
            filled_pause_begin = False
            c = word[j]
            # if c=="(":
                
            if c == "{":
                if j == len(word)-1:
                    pass #edit term (non-fp)
                elif word[j+1] == "F":
                    inFP = True
                    filled_pause_begin = True
                else:
                    pass
        
        # choose where to put these conditions
        
        if inFP or filled_pause_begin: # using and instead of or removed all edit tags in {F Ahm
            labels[i] += "<e/>"
            
            
        elif inRepair>0 and inFP==False:
            labels[i] += "<rps/>" # = instead of += for only one tag. however, open and close </rm> </rm> should be +=

        for j in range(0,len(word)):
            c = word[j]
            if c == "+": 
                inRepair += 1 # inRepair boolean but 
            if c == ")": inRepair-=1 # for now counting interegnum within the repairs

            if c =="}": #out of the filled pause
                inFP=False
            if c =="{":
                inFP=True
                

        # fluent terms
        if labels[i] == "":
            labels[i] = "<f/>"               
    #if inLS == True:
    #    print "WARNING NO LS END", corpus, start_time
        #raw_input()
        
        # labels[i-1] + utt[i] + labels[i]
       # sandwiched_labels = labels[0] + utt + labels[1] 
       # zip(word, label) two lists of tuples
        
    return (zip(utt, labels))

In [26]:
def textgrid_to_dict(tgfile):
    """Returns a dict with the tier names as keys and a list of
    intervals of (start_time, end_time, text) as values.

    :param tgfile: path to textgrid file"""

    textgrid = tgt.read_textgrid(textgrid_file_name)
    
    tgdict = dict()
    for tiername in textgrid.get_tier_names():
        tgdict[tiername] = []
        for textinterval in textgrid.get_tier_by_name(tiername):
            if textinterval.text != '<sil>':
                tgdict[tiername].append((float(textinterval.start_time),
                                         float(textinterval.end_time),
                                         str(textinterval.text
                                             .encode("utf-8").decode("utf-8"))))
    return tgdict

In [28]:
transcription_dir = original_annotation
tgsdict = dict()
for experiment_name in sorted(os.listdir(transcription_dir)):
    if ".DS_Store" in experiment_name:
        continue
    tgsdict[experiment_name] = []
    session_no = experiment_name[1: len(experiment_name)]
    print(experiment_name[-1]) # r1, r2, r3...
        
    textgrid_file_name = transcription_dir + os.sep + experiment_name + os.sep + experiment_name + ".TextGrid" # original transcription for that particular session    
    # read textgrids
    textgrid_dict = textgrid_to_dict(textgrid_file_name)

    for index, utts in enumerate(textgrid_dict['A-utts']): # it return a list containing tuple of three elememt like starttime, endtime, utterance
        isVisited = [False]*len(textgrid_dict['A-utts'])
        participant = session_no + 'a'
        word_level_textgrid_file_name = './word_level_timing/' + experiment_name + 'A_wordlevel.TextGrid' # reading corresponding word level timimg file
        word_level_textgrid= tgt.read_textgrid(word_level_textgrid_file_name)
        tier_names = word_level_textgrid.get_tier_names()
        
        utterance_start_time = utts[0] 
        utterance_end_time = utts[1]
        utterance = utts[2]
        
        tagged_utt = list(disfluency_tags(utterance))
              
        for names in tier_names:
            if names == 'ORT-MAU':
                text_tier =  word_level_textgrid.get_tier_by_name(names)
                for annotation in text_tier.annotations:
                    
                    word_start_time = annotation.start_time
                    
                    word_end_time = annotation.end_time
                    
                    word_annotation = annotation.text

                    for i in range(0,len(tagged_utt)):
                        word, label = tagged_utt[i]
                        clean_word = clean_utt(word)
                        pos_tag = german_tagger(clean_word)
                        if clean_word == word_annotation and utterance_start_time <= word_start_time and word_end_time <= utterance_end_time and isVisited[index]==False:
                            isVisited[index] = True
                            for token in pos_tag:
                                print('label:', label, 'word', word_annotation, 'word_start_time', word_start_time, 'pos tags', token.pos_)
                            
                            
                    #doc = german_tagger(annotation.text)
                    #for token in doc:
                        #text_tier.add_annotations(token.pos_)
                
               # for annotation in text_tier.annotations:
                    
                # for i in range(0,len(utt)):
                       # if utts[i] == annotation.text:
                        #    doc = german_tagger(annotation.text)
                        #    annotation.text, doc
    ,
       # print(utts.start_time)
       # utts = list(disfluency_tags(utts[2]))
       # print(participant, utts)
        
       # utts[i] == words 
        
       #     start_time, end_time, pos_tag, word, participant_id
    for index, utts in enumerate(textgrid_dict['A-utts']): # it return a list containing tuple of three elememt like starttime, endtime, utterance
        isVisited = [False]*len(textgrid_dict['A-utts'])
        participant = session_no + 'b'
        word_level_textgrid_file_name = './word_level_timing/' + experiment_name + 'B_wordlevel.TextGrid' # reading corresponding word level timimg file
        word_level_textgrid= tgt.read_textgrid(word_level_textgrid_file_name)
        tier_names = word_level_textgrid.get_tier_names()
        
        utterance_start_time = utts[0] 
        utterance_end_time = utts[1]
        utterance = utts[2]
        
        tagged_utt = list(disfluency_tags(utterance))
        for names in tier_names:
            if names == 'ORT-MAU':
                text_tier =  word_level_textgrid.get_tier_by_name(names)
                for annotation in text_tier.annotations:
                    
                    word_start_time = annotation.start_time
                    
                    word_end_time = annotation.end_time
                    
                    word_annotation = annotation.text

                    for i in range(0,len(tagged_utt)):
                        word, label = tagged_utt[i]
                        clean_word = clean_utt(word)
                        pos_tag = german_tagger(clean_word)
                        if clean_word == word_annotation and utterance_start_time <= word_start_time and word_end_time <= utterance_end_time and isVisited[index]==False:
                            isVisited[index] = True
                            for token in pos_tag:
                                print('label:', label, 'word', word_annotation, 'word_start_time', word_start_time, 'pos tags', token.pos_)
        
        
        
    # for clean_utts in texgrids
    #    if start_time <= time and end_time 
    
    # tgt.io.write_to_file(textgrid, './disf_tags/'+str(f.split('/')[3].split('.')[0])+".textgrid")

    # for uttsB in textgrid_dict['B-utts']:
         
    tgsdict[experiment_name].append(textgrid_dict)
    # print(textgrid_file_name) # ./transcriptions_annotations/r1/r1.TextGrid


1
label: <f/> word Mhm word_start_time 555.791958 pos tags PROPN
label: <f/> word ja word_start_time 557.064417 pos tags ADV
label: <f/> word sehr word_start_time 562.649333 pos tags ADV
label: <f/> word ja word_start_time 564.572188 pos tags ADV
label: <f/> word also word_start_time 565.862271 pos tags ADV
label: <f/> word mehr word_start_time 567.680271 pos tags ADV


KeyboardInterrupt: 

In [26]:
# transcription_dir = original_annotation

# tgsdict = dict()

# for experiment_name in sorted(os.listdir(transcription_dir)):
    
#     if ".DS_Store" in experiment_name:
#         continue
        
#     tgsdict[experiment_name] = []
#     session_no = experiment_name[1: len(experiment_name)]
#     print(experiment_name[-1]) # r1, r2, r3...
        
#     textgrid_file_name = transcription_dir + os.sep + experiment_name + os.sep + experiment_name + ".TextGrid"
#     textgrid_file_name_target = target_dir + os.sep + experiment_name + os.sep + experiment_name + ".TextGrid"
    
    
#     # read textgrids
#     textgrid_dict = textgrid_to_dict(textgrid_file_name)
        
#     for i, interval in enumerate(textgrid_dict['A-utts']):
        
#         participant = session_no + 'a'
        
#         word_level_textgrid_file_name = './word_level_timings/' + experiment_name + 'A_wordlevel.TextGrid'
    
#         word_level_textgrid= tgt.read_textgrid(word_level_textgrid_file_name)
        
#         tier_names = word_level_textgrid.get_tier_names()
        
#         # utts = list(disfluency_tags(utts[2]))
#         print(interval)
#         utterance_start_time = utts[1] 
#         utterance = utts[2]
        
#         tagged_utt = list(disfluency_tags(utterance))
        
#         for i in range(0,len(tagged_utt)):
#             word, label = tagged_utt[i]
#             # clean_word = 
#             print(label)
    
#         # print(utts[2])
#         # utt = list(disfluency_tags(utts[2]))
#         # print(participant, utt)
#         # print(len(utt))

#     for utts in textgrid_dict['B-utts']:
#         participant = session_no + 'b'
#         word_level_textgrid_file_name = experiment_name + 'B_wordlevel.TextGrid'
        
#         print(utts[2])
#         utts = list(disfluency_tags(utts[2]))
#         print(participant, utts)
        
        
        
#     # for clean_utts in texgrids
#     #    if start_time <= time and end_time 
    
#     # tgt.io.write_to_file(textgrid, './disf_tags/'+str(f.split('/')[3].split('.')[0])+".textgrid")

#     # for uttsB in textgrid_dict['B-utts']:
         
#     tgsdict[experiment_name].append(textgrid_dict)
#     # print(textgrid_file_name) # ./transcriptions_annotations/r1/r1.TextGrid


v


NotADirectoryError: [Errno 20] Not a directory: './transcriptions_annotations/annotation.csv/annotation.csv.TextGrid'

In [None]:
# tg = tgsdict['r2']
# tg[0]['B-utts'][0][2]

KeyError: 'r2'