In [None]:
import json
import os
import time
from pathlib import Path

import pandas as pd
import requests
from tqdm.notebook import tqdm
import xml.etree.ElementTree as ET

from transformers import BertTokenizer

### Read data from the German FrameNet Constructicon for BERT processing:

In [None]:
def parse_masked_sentences(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    
    sentences_data = []
    
    constr_id = int(root.attrib.get('id'))
    
    for sentence in root.findall('.//sentence'):
        if sentence.attrib.get('uid') is None:
            continue
        
        uid = sentence.attrib.get('uid')
        text = sentence.find('.//text').text.strip()
        
        text_pos = []
        text_xpos = []
        text_dep = []
        text_head = []
        kees = []
        kees_idx = []
        kes = []
        kes_idx = []
        
        # Loop through annotations:
        for layer in sentence.findall('.//layer'):
            layer_name = layer.attrib.get('name')
            
            # Get the KEE annotations:
            if "KE-" in layer_name or "KEE-" in layer_name:
                # Loop over all KEEs or KEs:
                for label in layer.findall('.//label'):
                    start = int(label.attrib.get('start'))
                    end = int(label.attrib.get('end'))
                    if "KEE-" in layer_name:
                        kees.append(text[start:end])  # Read the KEE
                        kees_idx.append((start, end))  # Log the position of the KEE
                    else:
                        kes.append(text[start:end])  # Read the KE
                        kes_idx.append((start, end))  # Log the position of the KE
            elif "UPOS" == layer_name:
                for label in layer.findall('.//label'):
                    text_pos.append(label.attrib.get('name'))
            elif "XPOS" == layer_name:
                for label in layer.findall('.//label'):
                    text_xpos.append(label.attrib.get('name'))
            elif "DEP_REL" == layer_name:
                for label in layer.findall('.//label'):
                    text_dep.append(label.attrib.get('name'))
            elif "DEP_HEAD" == layer_name:
                for label in layer.findall('../label'):
                    text_head.append(label.attrib.get('name'))
                    
        sentences_data.append({
            'uid': uid,
            'constr_id': constr_id,
            'text': text,
            'text_pos': text_pos,
            'text_xpos': text_xpos,
            'text_dep': text_dep,
            'text_head': text_head,
            'kees': kees,
            'kees_idx': kees_idx,
            'kes': kes,
            'kes_idx': kes_idx,
        })
    return sentences_data
    
sentence_list = []
xml_directory = ('../../data/constructicon/construction')

for filename in tqdm(list(os.listdir(xml_directory))):
    if filename.endswith('.xml'):
        constr_id = Path(filename).stem
        if b"fa-triangle-exclamation" in requests.get(f"https://gsw.phil.hhu.de/constructicon/construction?id={constr_id}").content:
            print(constr_id, "does not exist online!")
            continue
        xml_file = os.path.join(xml_directory, filename)
        data = parse_masked_sentences(xml_file)
        if data:
            sentence_list += data
        time.sleep(.5)

sentences = pd.DataFrame.from_dict(sentence_list)
# sentences.set_index('uid', inplace=True)
sentences

In [None]:
sentences = sentences.explode(["kees", "kees_idx"], ignore_index=True)
sentences = sentences.explode(["kes", "kes_idx"], ignore_index=True)
sentences

In [None]:
json_comapp = []
csv_comapp = []
errors = 0
problematic_constructions = set()
unproblematic_constructions = set()
tokenizer = BertTokenizer.from_pretrained('bert-base-german-cased')

for _, row in tqdm(sentences.iterrows(), total=len(sentences)):
    tokenized = str(row["text"]).split()
    
    try:
        ke_start, ke_end = row["kes_idx"]
        tokenized_kees = str(row["kees"]).split()  # Split the KEEs if there are multi-word KEEs
    except TypeError:
        continue  # If there is nothing to mask or if there is no KEE, we can't use this example.
    
    tokenized_kes = str(row["text"][ke_start:ke_end]).split()
    
    masked_texts = []
    tokenized_masked_list = []
    
    for k, tokenized_ke in enumerate(tokenized_kes):
        masked_text = (
                row["text"][:ke_start] + " "
                + " ".join(tokenized_kes[:k]) 
                + " [MASK] " 
                + " ".join(tokenized_kes[k+1:]) 
                + " " + row["text"][ke_end:]
        ).replace("  ", " ").replace("  ", " ")
        masked_texts.append(masked_text)
        tokenized_masked_list.append(str(masked_text).split())
        
    for masked_text, tokenized_masked in zip(masked_texts, tokenized_masked_list):
        try:
            kee_idx = [tokenized.index(tokenized_kee) for tokenized_kee in tokenized_kees]
            kee_query_idx = []
            for i in kee_idx:
                kee_query_idx.append(i)
            assert len(masked_text.split()) == len(row["text"].split())
            
        except (ValueError, AssertionError) as e:
            print(row["constr_id"], type(e), e, "... Continuing ...")
            errors += 1
            problematic_constructions.add(row["constr_id"])
            continue
            
        unproblematic_constructions.add(row["constr_id"])

        assert "[MASK]" in masked_text

        out_json = [{
            "label": kee + str(row["constr_id"]),
            "target1": row["text"], 
            "target1_idx": idx, 
            "query": masked_text,
            "query_idx": q
        } for kee, idx, q in zip(tokenized_kees, kee_idx, kee_query_idx)]  # Split the KEEs if there are multi-word KEEs
        out_csv = [{
            "text": row["text"],
            "pos_tags": row["text_pos"],
            "xpos_tags": row["text_xpos"],
            "dep_rels": row["text_dep"],
            "dep_heads": row["text_head"],
            "mask": row["text"][ke_start:ke_end],
            "ambiguous_word": kee,
            "label": kee + str(row["constr_id"])  # This will be the new token that we will add to the LLM (named "<kee><i>" where <kee> and <i> are replaced by the KEE's name and i will be replaced by the construction it appears in).
        } for kee in tokenized_kees]
        
        json_comapp += out_json
        csv_comapp += out_csv
        
with open("../../data/pseudowords/CoMaPP_all_bert.json", "w") as file:
    json.dump(json_comapp, file, ensure_ascii=False)

f"{errors} elements from {len(problematic_constructions)} different constructions could not be saved as intended."

In [None]:
import csv

with open("../../data/pseudowords/CoMapp_Dataset_bert.csv", "w+", newline="") as file:
    writer = csv.DictWriter(file, fieldnames=["label", "text", "pos_tags", "xpos_tags", "dep_rels", "dep_heads", "mask", "ambiguous_word"])
    writer.writeheader()
    writer.writerows(csv_comapp)