In [1]:
from datasets import Dataset, DatasetDict
import xml.etree.ElementTree as ET
import os
import re
from utils import preprocess_xml_illegal_chars

  from .autonotebook import tqdm as notebook_tqdm


### Specify Src and Tar Dir

Note: make sure the following files:
- obesity_test_annotation.xml
- obesity_test_record.xml
- obesity_training_2_annotation.xml
- obesity_training_2.xml
- obesity_training_annotation.xml
- obesity_training.xml

are saved to the src dir

In [2]:
src_path = "../datasets/n2c2_raw/2008/obesity/"
tar_path = "../datasets/n2c2/obesity-classification-2008"

### Data Cleaning

The record files directly downloaded from N2C2 contains illegal characters for XML formats. Replace those with escaped characters.

In [4]:
train_record_file = os.path.join(src_path, "obesity_training.xml")
train2_record_file = os.path.join(src_path, "obesity_training_2.xml")
test_record_file = os.path.join(src_path, "obesity_test_record.xml")

for file in [train_record_file, train2_record_file, test_record_file]:
    assert os.path.exists(file), f"File {file} doesn't exist!"
    patterns = [
        r"\<doc id\=\"\d+\"\>", 
        r"\</doc[s]?\>", 
        r"\<text\>", 
        r"\<\/text\>", 
        r"\<root\>", 
        r"\<\/root\>", 
        r"\<doc[s]?\>"
    ]
    preprocess_xml_illegal_chars(file, escaped_patterns=patterns)

### Run

In [27]:
train_record_file = os.path.join(src_path, "obesity_training.xml")
train_annotation_file = os.path.join(src_path, "obesity_training_annotation.xml")

train2_record_file = os.path.join(src_path, "obesity_training_2.xml")
train2_annotation_file = os.path.join(src_path, "obesity_training_2_annotation.xml")

train1_set, train2_set = [], []

train_tree = ET.parse(train_record_file)
train_root = train_tree.getroot()

for docs in train_root:
    for doc in docs:
        doc_id = doc.attrib["id"]
        for text in doc:
            doc_text = text.text
            train1_set.append({"id": doc_id, "text": doc_text})

train_annotation_tree = ET.parse(train_annotation_file)
train_annotation_root = train_annotation_tree.getroot()

for diseaseset in train_annotation_root:
    judgment_type = diseaseset.attrib["source"]
    for disease in diseaseset:
        disease_name = disease.attrib["name"]
        for doc in disease:
            doc_id = doc.attrib["id"]
            doc_jugdment = doc.attrib["judgment"]
            for i in range(len(train1_set)):
                if train1_set[i]["id"] == doc_id:
                    feature_name = f"{disease_name}_{judgment_type}"
                    assert feature_name not in train1_set[i].keys(), f"Feature {feature_name} already exists!"
                    train1_set[i][feature_name] = doc_jugdment

train2_tree = ET.parse(train2_record_file)
train2_root = train2_tree.getroot()

for docs in train2_root:
    for doc in docs:
        doc_id = doc.attrib["id"]
        for text in doc:
            doc_text = text.text
            train2_set.append({"id": doc_id, "text": doc_text})
    
train2_annotation_tree = ET.parse(train2_annotation_file)
train2_annotation_root = train2_annotation_tree.getroot()

for diseaseset in train2_annotation_root:
    judgment_type = diseaseset.attrib["source"]
    for disease in diseaseset:
        disease_name = disease.attrib["name"]
        for doc in disease:
            doc_id = doc.attrib["id"]
            doc_jugdment = doc.attrib["judgment"]
            for i in range(len(train2_set)):
                if train2_set[i]["id"] == doc_id:
                    feature_name = f"{disease_name}_{judgment_type}"
                    assert feature_name not in train2_set[i].keys(), f"Feature {feature_name} already exists!"
                    train2_set[i][feature_name] = doc_jugdment

train_set = train1_set + train2_set

In [28]:
test_record_file = os.path.join(src_path, "obesity_test_record.xml")
test_annotation_file = os.path.join(src_path, "obesity_test_annotation.xml")

test_set = []

test_tree = ET.parse(test_record_file)
test_root = test_tree.getroot()

for docs in test_root:
    for doc in docs:
        doc_id = doc.attrib["id"]
        for text in doc:
            doc_text = text.text
            test_set.append({"id": doc_id, "text": doc_text})
    
test_annotation_tree = ET.parse(test_annotation_file)
test_annotation_root = test_annotation_tree.getroot()

for diseaseset in test_annotation_root:
    judgment_type = diseaseset.attrib["source"]
    for disease in diseaseset:
        disease_name = disease.attrib["name"]
        for doc in disease:
            doc_id = doc.attrib["id"]
            doc_jugdment = doc.attrib["judgment"]
            for i in range(len(test_set)):
                if test_set[i]["id"] == doc_id:
                    feature_name = f"{disease_name}_{judgment_type}"
                    assert feature_name not in test_set[i].keys(), f"Feature {feature_name} already exists!"
                    test_set[i][feature_name] = doc_jugdment

In [35]:
train_set = Dataset.from_list(train_set)
test_set = Dataset.from_list(test_set)

dataset = DatasetDict({"train": train_set, "test": test_set})

if not os.path.exists(tar_path):
    os.makedirs(tar_path)
dataset.save_to_disk(tar_path)

                                                                                            