In [None]:
import json
from dataclasses import asdict
from typing import List
import re
from CTnlp.patient import load_patients_from_xml
from CTnlp.patient import Patient

In [None]:

def convert_patients_to_jsonl(patients: List[Patient], outfile):
    with open(outfile, 'w') as fp:
        for patient in patients:
            fp.write(json.dumps(asdict(patient)))
            fp.write('\n')

In [None]:
patients = []
patients.extend(load_patients_from_xml("../data/external/topics2014.xml", input_type="CSIRO"))
patients.extend(load_patients_from_xml("../data/external/topics2021.xml"))
patients.extend(load_patients_from_xml("../data/external/topics2022.xml"))

In [None]:
patients[0]

In [None]:
def extract_past_medical_history(patient):
    description = patient.description.lower()
    match = re.search(r"[!\.][^!\.]*medical history.*?\.", description, re.IGNORECASE)
    if not match:
        match = re.search(r"[!\.][^!\.]*has (no )?(a )?(positive )?history.*?\.", description, re.IGNORECASE)
    if not match:
        match = re.search(r"[!\.][^!\.]*past medical history:?\n([\d|-]?[^\n]*\n)*", description, re.IGNORECASE)
    return match

In [None]:
def extract_family_history(patient):
    description = patient.description.lower()
    match = re.search(r"\.[^\.]*family history.*?\.", description, re.IGNORECASE)
    return match

In [None]:
def extract_sections(patient):
    _description = patient.description
    rest = patient.description
    pmh_text = ''
    fh_text = ''

    pmh = extract_past_medical_history(patient)
    fh = extract_family_history(patient)

    if pmh and fh:
        if pmh.start() > fh.start():
            _first = fh
            _second = pmh
        else:
            _first = pmh
            _second = fh

        rest = _description[:_second.start()+2] + _description[_second.end()+1:]
        rest = rest[:_first.start()+2] + rest[_first.end()+1:]
        pmh_text = _description[pmh.start()+1:pmh.end()].strip()
        fh_text = _description[fh.start()+1:fh.end()].strip()

    if pmh and not fh:
        rest = _description[:pmh.start()+2] + _description[pmh.end()+1:]
        pmh_text = _description[pmh.start()+1:pmh.end()].strip()

    if fh and not pmh:
        rest = _description[:fh.start()+2] + _description[fh.end()+1:]
        fh_text = _description[fh.start()+1:fh.end()].strip()

    print(f"{pmh=}\t{fh=}")

    return rest, pmh_text, fh_text

In [None]:
out_dict = []
for patient in patients:
    rest, pmh, fh = extract_sections(patient)
    patient.pmh = pmh
    patient.fh = fh
    patient.rest = rest

    out_dict.append({
        'patient_id': patient.patient_id,
        'description':rest,
        'pmh':pmh,'fh':fh})

In [None]:
OUTFILE = '../data/external/sections.jsonl'
# convert_patients_to_jsonl(patients=patients, outfile=OUTFILE)

In [None]:
with open(OUTFILE, 'w') as fp:
    for item in out_dict:
        fp.write(json.dumps(item))
        fp.write('\n')

In [None]:
rest

In [None]:
pmh

# Medspacy testing

In [None]:
import medspacy
from medspacy.context import ConTextRule, ConTextComponent
# from medspacy.visualization import visualize_dep, visualize_ent
# from scispacy.linking import EntityLinker


In [None]:
nlp = medspacy.load("en_ner_bc5cdr_md", enable=["sentencizer", "context"])

In [None]:
# nlp.add_pipe("scispacy_linker", config={"resolve_abbreviations": True, "linker_name": "umls"})

In [None]:
# nlp = medspacy.load(enable=["sentencizer", "context"])

In [None]:
context = ConTextComponent(nlp, rules="default")

In [None]:
context.rules

In [None]:
doc = nlp("She has no allergies to any food or drugs. There is abscess in the abdomen. There is a collection of fluid in the jejunum. hematomas are seen around the right lower quadrant. There is no cancer.")

In [None]:
context(doc)

In [None]:
doc._.context_graph

In [None]:
doc._.context_graph.targets

In [None]:
doc._.context_graph.modifiers

In [None]:
from medspacy.visualization import visualize_dep, visualize_ent

In [None]:
visualize_ent(doc)

In [None]:
visualize_dep(doc)

In [None]:
nlp = medspacy.load("en_info_3700_i2b2_2012", enable=['sentencizer', 'tagger', 'parser',
                                                      'ner', 'target_matcher', 'context',
                                                     'sectionizer'])

In [None]:
expanded = []
for patient in patients[120:130]:
    doc = nlp(patient.description)
    # visualize_ent(doc)
    for target, modifier in doc._.context_graph.edges:
        print("[{0}] is modified by [{1}]".format(target, modifier))
        if modifier.category == 'NEGATED_EXISTENCE':
            expanded.append(f"no_{'_'.join(str(target).lower().split())}")
    # print(doc._.context_graph)

In [None]:
doc._.context_graph.targets

In [None]:
doc._.context_graph.targets

In [None]:
doc.ents

In [None]:
visualize_ent(doc)

In [None]:
expanded

In [None]:
from CTnlp.parsers import parse_clinical_trials_from_folder
cts = parse_clinical_trials_from_folder("/Users/wojciechkusa/projects/shared-tasks/clinical-trials/data/external/")
len(cts)

In [None]:
for ct in cts:
    doc = nlp(ct.criteria)
    visualize_ent(doc)

In [None]:
def get_entities(text):
    doc = nlp(text)

    entities = []
    negated_entities = []
    pmh_entities = []
    fh_entities = []
    for ent in doc.ents:
        if any([ent._.is_negated, ent._.is_uncertain, ent._.is_historical, ent._.is_family, ent._.is_hypothetical, ]):
            # print("'{0}' modified by {1} in: '{2}'".format(ent, ent._.modifiers, ent.sent))
            # print()
            if ent._.is_negated:
                # negated_entities.append(f"no_{'_'.join(str(ent).lower().split())}")
                negated_entities.append(str(ent))
            elif ent._.is_historical:
                pmh_entities.append(str(ent))
            elif ent._.is_family:
                fh_entities.append(str(ent))
            else:
                entities.append(str(ent))
        else:
            entities.append(str(ent))

    return {"entities": entities, "negated_entities": negated_entities, "pmh_entities": pmh_entities, "fh_entities": fh_entities,}

In [None]:
output_entities = []
for patient in patients:
    entities_dict = get_entities(patient.description)
    entities_dict["id"] = patient.patient_id
    output_entities.append(entities_dict)

In [None]:
output_entities

In [None]:
OUTFILE = '../data/external/entities.jsonl'

In [None]:
with open(OUTFILE, 'w') as fp:
    for item in output_entities:
        fp.write(json.dumps(item))
        fp.write('\n')

In [None]:
cts[2].conditions

In [None]:
patient.description

In [None]:
doc = nlp(patient.description)

In [None]:
for ent in doc.ents:
    if any([ent._.is_negated, ent._.is_uncertain, ent._.is_historical, ent._.is_family, ent._.is_hypothetical, ]):
        print(ent, [ent._.is_negated, ent._.is_uncertain, ent._.is_historical, ent._.is_family, ent._.is_hypothetical, ])

In [None]:
doc.ents