In [None]:
!pip install datasets

In [None]:
!pip install bioc

In [None]:
from datasets import load_dataset, Dataset
import re
import os
import json
import spacy
import random
from tqdm import tqdm
import numpy as np

nlp = spacy.load('en_core_web_sm')

dataset = []

global_id2label = {0: 'O', 1: 'B-ENT', 2: 'I-ENT'}

prefix2id = {'O': 0, 'B': 1, 'I': 2, 'S': 1, 'E':2}

def tokenize(text):
    tokens = [tok.text for tok in nlp.make_doc(text)]
    return tokens

def tokenize_and_match(text):
    doc = nlp.make_doc(text)
    tokens = []
    s2t = {}
    e2t = {}
    for t, tok in enumerate(doc):
        tokens.append(tok.text)
        s = tok.idx
        s2t[s] = t
        e = tok.idx+len(tok.text)
        e2t[e] = t
    return tokens, s2t, e2t

def search_span(text, label, s2t, e2t):
    sts = []
    ets = []
    try:
        searches = re.finditer(re.escape(label), text, re.IGNORECASE)
    except:
        return sts, ets
    for search in searches:
        s, e = search.span()
        if s in s2t and e in e2t:
            st = s2t[s]
            et = e2t[e]
            sts.append(st)
            ets.append(et)
    return sts, ets

def handle_dataset(example, id2ent, id2global_label, dataset = [], curr_ents=[]):
    tokens = example['tokens']
    if 'tags' in example:
        old_tags = example['tags']
    elif 'ner_tags' in example:
        old_tags = example['ner_tags']

    tags = []
    start = 0
    label = 'other'
    end = 0
    prev_id = 0
    for i, id in enumerate(old_tags):
        ent = id2ent[id]
        if ent not in curr_ents:
            continue
        global_id = id2global_label[id]
        if prev_id == 2 and global_id!=2:
            tags.append([start, end, label])

        if global_id==1:
            start = i
            end = i
            label = ent
        elif global_id==2:
            end+=1
        prev_id = global_id

    if prev_id:
        tags.append([start, end, label])

    result = {"tokenized_text": tokens, "ner": tags}
    dataset.append(result)
    return example

def process_ncbi():
    ncbi = load_dataset('ncbi_disease')

    label2id = {'O': 0, 'B-Disease': 1, 'I-Disease': 2}

    label2ent = {'O': 'other', 'Disease': 'disease'}

    id2label = {id:label for label, id in label2id.items()}
    id2global_label = {id:prefix2id[label.split('-')[0]] for id, label in id2label.items()}
    id2ent = {id: label2ent[label.split('-')[-1]] for id, label in id2label.items()}

    all_ents = ['disease']

    ncbi_gliner_train = []
    ncbi['train'].map(handle_dataset, fn_kwargs = {"id2ent":id2ent, "id2global_label": id2global_label, "curr_ents": all_ents, "dataset": ncbi_gliner_train})

    ncbi_gliner_test = []
    ncbi['test'].map(handle_dataset, fn_kwargs = {"id2ent":id2ent, "id2global_label": id2global_label, "curr_ents": all_ents, "dataset": ncbi_gliner_test})

    ncbi_gliner_validation = []
    ncbi['validation'].map(handle_dataset, fn_kwargs = {"id2ent":id2ent, "id2global_label": id2global_label, "curr_ents": all_ents, "dataset": ncbi_gliner_validation})

    return ncbi_gliner_train, ncbi_gliner_test, ncbi_gliner_validation


def process_bc5cdr():
    bc5cdr = load_dataset('tner/bc5cdr')

    label2id = {"O": 0, "B-Chemical": 1, "B-Disease": 2, "I-Disease": 3, "I-Chemical": 4}

    label2ent = {'O': 'other', 'Chemical': 'chemical', 'Disease': 'disease'}

    id2label = {id:label for label, id in label2id.items()}
    id2global_label = {id:prefix2id[label.split('-')[0]] for id, label in id2label.items()}
    id2ent = {id: label2ent[label.split('-')[-1]] for id, label in id2label.items()}

    all_ents = list(label2ent.values())

    bc5cdr_gliner_train = []
    bc5cdr['train'].map(handle_dataset, fn_kwargs = {"id2ent":id2ent, "id2global_label": id2global_label, "curr_ents": all_ents, "dataset": bc5cdr_gliner_train})

    bc5cdr_gliner_test = []
    bc5cdr['test'].map(handle_dataset, fn_kwargs = {"id2ent":id2ent, "id2global_label": id2global_label, "curr_ents": all_ents, "dataset": bc5cdr_gliner_test})

    bc5cdr_gliner_validation = []
    bc5cdr['validation'].map(handle_dataset, fn_kwargs = {"id2ent":id2ent, "id2global_label": id2global_label, "curr_ents": all_ents, "dataset": bc5cdr_gliner_validation})

    return bc5cdr_gliner_train, bc5cdr_gliner_test, bc5cdr_gliner_validation



In [None]:
ncbi_train, ncbi_test, ncbi_val = process_ncbi()

In [None]:
bc5cdr_gliner_train, bc5cdr_gliner_test, bc5cdr_gliner_validation = process_bc5cdr()

In [None]:
def bigbio_proc_example(example, ent2label, dataset=[]):
    text = ' '.join([passage['text'][0] for passage in example['passages']])
    tokens, s2t, e2t = tokenize_and_match(text)

    entities = example['entities']
    tags = []
    for id, ent in enumerate(entities):
        offset = ent['offsets'][0]

        start = offset[0]
        end = offset[1]

        type_ = ent['type']
        ent_label = ent2label[type_]

        if start in s2t and end in e2t:
            st = s2t[start]
            et = e2t[end]
            if st-et<0:
                continue
            tags.append([st, et, ent_label])

    dataset.append({"tokenized_text": tokens, "ner": tags})
    return example

def get_bigbio_dicts(dataset):
    all_entities = set()

    for example in dataset['train']:
        for ent in example['entities']:
            all_entities.add(ent['type'])


    if 'test' in dataset:
        for example in dataset['test']:
            for ent in example['entities']:
                all_entities.add(ent['type'])


    if 'validation' in dataset:
        for example in dataset['validation']:
            for ent in example['entities']:
                all_entities.add(ent['type'])


    ent2label = {ent:' '.join(ent.split('_')).lower() for ent in all_entities}
    ent2label[""] = 'other'

    all_ents = list(ent2label.values())

    return all_ents, ent2label

def process_chia():
    bigbio = load_dataset('bigbio/chia', 'chia_bigbio_kb')
    all_ents, ent2label = get_bigbio_dicts(bigbio)

    gliner_dataset = []
    bigbio.map(bigbio_proc_example, fn_kwargs={"ent2label": ent2label, "dataset": gliner_dataset})

    return gliner_dataset


def process_biored():
    biored = load_dataset('bigbio/biored', 'biored_bigbio_kb')

    all_entities = set()

    for example in biored['train']:
        for ent in example['entities']:
            all_entities.add(ent['type'])

    ent2label = {
        'CellLine': 'cell line',
        'ChemicalEntity': 'chemical entity',
        'DiseaseOrPhenotypicFeature': 'disease or phenotype',
        'GeneOrGeneProduct': 'gene or gene product',
        'OrganismTaxon': 'organism',
        "SequenceVariant": 'sequence variant'
    }
    all_ents = list(ent2label.values())

    biored_gliner_train = []
    biored['train'].map(bigbio_proc_example, fn_kwargs={"ent2label": ent2label, "dataset": biored_gliner_train})

    biored_gliner_test = []
    biored['test'].map(bigbio_proc_example, fn_kwargs={"ent2label": ent2label, "dataset": biored_gliner_test})

    biored_gliner_validation = []
    biored['validation'].map(bigbio_proc_example, fn_kwargs={"ent2label": ent2label, "dataset": biored_gliner_validation})

    return biored_gliner_train, biored_gliner_test, biored_gliner_validation


In [None]:
chia = process_chia()

In [None]:
biored_gliner_train, biored_gliner_test, biored_gliner_validation = process_biored()

In [None]:
def annotate_bio(example, label2ent, dataset = []):
    try:
      text = example['text']
      ents = example['entities']

      tokens, s2t, e2t = tokenize_and_match(text)
      tags = []
      prev_ent = ents[0]
      for id, ent in enumerate(ents):
          es = ent['start']
          ee = ent['end']
          ts = 0
          te = 0

          if es in s2t:
              ts = s2t[es]
          if ee in e2t:
              te = e2t[ee]

          if ts==0:
              continue
          elif ts>te:
              te = ts
          if ent['class'] in label2ent:
              curr_ent = label2ent[ent['class']]
              tags.append([ts, te, curr_ent])

      dataset.append({"tokenized_text": tokens, "ner": tags})
      return example
    except:
        return None
def process_biomed_ner():
    biomed_ner = load_dataset("knowledgator/biomed_NER")['train']

    classes = ["CHEMICALS", "CLINICAL DRUG", "BODY SUBSTANCE", "ANATOMICAL STRUCTURE", "CELLS AND THEIR COMPONENTS",
        "GENE AND GENE PRODUCTS", "INTELLECTUAL PROPERTY", "LANGUAGE", "REGULATION OR LAW",
        "GEOGRAPHICAL AREAS", "ORGANISM", "GROUP", "PERSON", "ORGANIZATION", "PRODUCT", "LOCATION", "PHENOTYPE",
          "DISORDER", "SIGNALING MOLECULES", "EVENT", "MEDICAL PROCEDURE", "ACTIVITY", "FUNCTION", "MONEY"]
    label2id = {l:id for id, l in enumerate(classes)}
    id2label = {v:k for k, v in label2id.items()}
    label2ent = {label: label.lower() for label in label2id.keys()}

    biomed_ner_dataset = []
    biomed_ner.map(annotate_bio, fn_kwargs={"label2ent": label2ent, "dataset": biomed_ner_dataset},
                                                                                  keep_in_memory=True,
                                                                                  load_from_cache_file=False)
    return biomed_ner_dataset

In [None]:
biomed_ner_dataset = process_biomed_ner()