In [None]:
import os
from medcat.cat import CAT
from medcat import cat
import pandas as pd
import json
from tqdm.notebook import tqdm
import re
import pickle

In [None]:
# Add file logger
import logging
medcat_logger = logging.getLogger('medcat')
fh = logging.FileHandler('medcat.log')
medcat_logger.addHandler(fh)

# Paths and Config

In [None]:
# relative path to working_with_cogstack folder
_rel_path = os.path.join("..", "..", "..")
# absolute path to working_with_cogstack folder
base_path = os.path.abspath(_rel_path)
vocab_dir = os.path.join(base_path, "models", "vocab")

In [None]:
# Changes these according to your project
project_name = 'test_project' # Name of your project. Annotated documents relating to this project will be stored here.
documents_to_annotate = "cogstack_search_results/example documents to annotate.csv" # Add your data file here

modelpack = ''  # enter your model here. Should the the output of trained 'output_modelpack'.
snomed_filter_path = None


# Constants (nothing to change below)
data_dir = 'working_with_cogstack/data'

data_path = os.path.join(base_path, data_dir, documents_to_annotate)
doc_id_column = "id"
doc_text_column = "description"

model_dir = 'working_with_cogstack/models/modelpack'
model_pack_path = os.path.join(base_path, model_dir, modelpack)

ann_folder_path = os.path.join(base_path, data_dir, f'annotated_docs', project_name)
if not os.path.exists(ann_folder_path):
    os.makedirs(ann_folder_path)
    print(f'Created folder to store annotations here: {ann_folder_path}')
    
save_path_annotations_per_doc = os.path.join(base_path, ann_folder_path, "<output_filename>.json")


# Load MedCAT model

In [None]:
# Create CAT - the main class from medcat used for concept annotation
cat = CAT.load_model_pack(model_pack_path)

# Annotate

In [None]:
# Set snomed filter if needed
# This is a white list filter of concepts
if snomed_filter_path:
    snomed_filter = set(json.load(open(snomed_filter_path)))
else:
    print('There is no concept filter set')
    snomed_filter = set(cat.cdb.cui2info.keys())

cat.config.linking.filters.cuis = snomed_filter 


In [None]:
df = pd.read_csv(data_path)[[doc_id_column, doc_text_column]]  # Not necessary to filter at this step. But this loads only what is required


In [None]:
# Create generator object
def data_iterator(data, doc_name, doc_text):
    for id, row in data.iterrows():
        yield (row[doc_name], row[doc_text])

In [None]:
batch_char_size = 50000  # Batch size (BS) in number of characters
for text_id, text in data_iterator(df, doc_id_column, doc_text_column):
    cat.get_entities(text,
                     only_cui=False,
                    #  nproc=8, # Number of processors
                    #  out_split_size_chars=20*batch_char_size,
                    #  save_dir_path=ann_folder_path,
                    #  min_free_memory=0.1,
                     )

medcat_logger.warning(f'Annotation process complete!')


### Double check if everything has been annotated.

This does not check meta-annotations

In [None]:
# Check if everything has run smoothly. If an error has been raised check the logs
try:
    # Path to your pickle file
    pickle_file_path = os.path.join(ann_folder_path, "annotated_ids.pickle")
    # Open the pickle file in read mode
    with open(pickle_file_path, "rb") as pickle_file:
        loaded_data = pickle.load(pickle_file)
    assert len(df) == len(loaded_data[0])
except AssertionError as e:
    print("Error:", "There are documents which havent been annotated! Check 'medcat.log' for more info")


END OF SCRIPT

### Inspect the model

In [None]:
text = "He was diagnosed with heart failure"
doc = cat(text)
print(doc.final_ents)

In [None]:
# Display Snomed codes
for ent in doc.final_ents:
    print(ent, " - ", ent.cui, " - ", cat.cdb.cui2info[ent.cui]['preferred_name'])

In [None]:
# To show semantic types for each entity
for ent in doc.final_ents:
    print(ent, " - ", cat.cdb.cui2info[ent.cui]['type_ids'])

In [None]:
# Display
from spacy import displacy
displacy.render(doc._delegate, style='ent', jupyter=True)

# Alternative approach

In [None]:
# This approach does not use multiprocessing. But iterates line by line through your dataset.

docs = {}
print(f"Len of df: {len(df)}") 

for i, row in tqdm(df.iterrows(), total=df.shape[0]):
    text = str(row[doc_text_column])
    
    # Skip text if under 10 characters,
    if len(text) > 10:
        docs[row[doc_id_column]] = cat.get_entities(text)
    else:
        docs[row[doc_id_column]] = []

In [None]:
cat.cdb.get_basic_info()

In [None]:
# Save to file (docs is docs 2 annotations)
json.dump(docs, open(save_path_annotations_per_doc, "w"))
