In [None]:
import os
from glob import glob
import torch
from datasets import load_dataset

model_name =  '../../saved_models/Llama3_70B_Instruct/'  # PATH TO LLM 

# GET original file names for store results
folder_path =  'example_test/'   #'CHANGE TO PATH OF THE DATA FOR BIO Test files
files = [f for f in os.listdir(folder_path) if f.endswith('.bio')]

# load processed data
data = load_dataset('/home/jupyter/20000360102458359xu/LingfeiQian/saved_dataset/YBXL/Bilingual_example_test/') # Path for processed huggingface format test dataset
out_dir = "llm_results" # path to output files

gpu_number = 2 # GPU number

from vllm import LLM, SamplingParams
sampling_params = SamplingParams(max_tokens=512,stop='<EOS>',temperature=0)
llm = LLM(model=f"{model_name}", tensor_parallel_size = gpu_number, dtype=torch.bfloat16,device = 'auto',max_model_len=2000)  # Create an LLM.

In [2]:
def batch_list(input_list, batch_size):
    batched_list = []
    for i in range(0, len(input_list), batch_size):
        batched_list.append(input_list[i:i + batch_size])
    return batched_list

In [None]:
batch_size = 100

# USE LLMS TO INFERENCE
prompts_list = batch_list(data['test']['query'], batch_size)

outputs = []
for i,prompt_list in enumerate(prompts_list):
    print (f'batch:{i+1} of total:{len(prompts_list)}', flush=True)
    output = llm.generate(prompt_list,sampling_params)
    outputs += output

In [None]:
# FOR NER TASK
! mkdir out_dir
for i, (seq, file_name) in enumerate(zip(outputs,files)):
    file_name = file_name.replace('.bio','')
    with open(f'{out_dir}/{file_name}.html','w',encoding='utf-8') as f_write:
        f_write.write(seq.outputs[0].text)

In [5]:
outputs[1].outputs[0].text

' <span class="Language_Other">Tibetan</span> interpreter - # '

In [6]:
data['test']['query'][1]

'### Your task is to generate an HTML version of an input text, using HTML <span> tags to mark up specific entities.\n\n### Entity Markup Guides:\nUse <span class="Language_Fluent"> to denote a language spoken by the patient fluently.\nUse <span class="Language_Some"> to denote a language spoken by the patient in moderate level.\nUse <span class="Language_No"> To denote a language which cannot be spoken or can only be spoken a little by the patient.\nUse <span class="Language_Other"> to denote a language that not related to patient.\n\n### Entity Definitions:\nLanguage_Fluent: The person speaks the language fluently, including native speakers and those who have achieved nearnative fluency. They can use the language effectively in  various contexts with complete fluency and cultural understanding. Instances where a patient’s fluency is not explicitly stated but can be directly inferred (e.g. mention of interpreter/translator, preference of language on prescription) are included in this 

# Evaluation

In [7]:
from bs4 import BeautifulSoup as bs
from bs4 import NavigableString, Tag
from glob import glob
import spacy
import random,os
import pandas as pd
import time
from ner_metrics import classification_report

os.environ['TOKENIZERS_PARALLELISM'] = 'False'

py_nlp = spacy.load ("en_core_web_lg")

In [9]:
def split_then_concatnate_tokens(text):
    tokens = tokenizer.tokenize(text)
    merged_tokens = []
    for token in tokens:
        if token.startswith('##'):
            # If the token starts with '##', merge it with the previous token (remove '##' and concatenate)
            merged_tokens[-1] += token[2:]
        else:
            # Otherwise, add the token as a new element in the list
            merged_tokens.append(token)

    # Join the tokens with a space to form a sentence
    merged_sentence = ' '.join(merged_tokens)
    return merged_sentence

In [10]:
def bio2html(file):
    with open(file,'r') as f_read:
        lines = f_read.readlines()
    
    processed_text = ''
    for i, line in enumerate(lines):
        token, e_type = line.strip().split('\t')
        if e_type == 'O':
            processed_text += token+' '
            
        if e_type.startswith('B-'):
            if i <= len(lines)-2:
                if lines[i+1]=='\n' or lines[i+1].strip().split('\t')[1]=='O' or lines[i+1].strip().split('\t')[1].startswith('B-'):
                    processed_text += f'<span class="{e_type[2:]}">'+token+'</span> '
                else:
                    processed_text += f'<span class="{e_type[2:]}">'+token+' '
            else:
                processed_text += f'<span class="{e_type[2:]}">'+token+'</span> '
            
        if e_type.startswith('I-'):
            if i <= len(lines)-2:
                if lines[i+1]=='\n' or lines[i+1].strip().split('\t')[1]=='O' or lines[i+1].strip().split('\t')[1].startswith('B-'):
                    processed_text += token+'</span> '
                else:
                    processed_text += token+' '
            else:
                processed_text += token+'</span> '
    return processed_text 

In [11]:
def html2bio(html_path,entity_list):
    with open(html_path) as f:
        
        html = f.read()
        
        # Parse HTML using BeautifulSoup
        soup = bs(html, "html.parser")

        # Extract text under 'p' tags and convert to BIO format
        bio_format = []
        

        for child in soup.children:
            if isinstance(child, NavigableString):
                child = split_then_concatnate_tokens(child)
                for word in child.split():
                    bio_format.append(f"{word}\tO\n")
            elif isinstance(child, Tag):
                words = split_then_concatnate_tokens(child.get_text()).split()
                try:
                    entity = child.attrs['class'][0]
                except:
                    entity = 'O'
                if len(words) != 0:
                    if entity != 'O' and entity in entity_list:
                        bio_format.append(f"{words[0]}\tB-{entity}\n")
                        for word in words[1:]:
                            bio_format.append(f"{word}\tI-{entity}\n")
                    else:
                        bio_format.append(f"{words[0]}\tO\n")
                        for word in words[1:]:
                            bio_format.append(f"{word}\tO\n")
    return bio_format

In [None]:
#from transformers import AutoTokenizer
#tokenizer = AutoTokenizer.from_pretrained("microsoft/BiomedNLP-BiomedBERT-base-uncased-abstract-fulltext", use_fast=True)
import nltk
import re
from nltk.tokenize import word_tokenize, WordPunctTokenizer, RegexpTokenizer

nltk.download('punkt')
tokenizer = RegexpTokenizer(r'\w+|[/;\-]|[^\w\s]', flags=re.UNICODE)

In [13]:
import pandas as pd
from ner_metrics import classification_report

def get_performance(files, entity_list):
    all_pre_tags = []
    all_tokens = []
    all_gold_tags = []

    for file in files:
        file_name = file.split('/')[-1].split('.')[0]
        gold_html = bio2html(file)
        with open('./tmp_html.html', 'w') as f:
            f.write(gold_html)
        gold_bio = html2bio('./tmp_html.html', entity_list)
        tokens = [line.strip().split('\t')[0] for line in gold_bio]
        tags = [line.strip().split('\t')[-1] for line in gold_bio]

        prediction = f'{out_dir}/{file_name}.html'
        bio_2 = html2bio(prediction, entity_list)
        pre_tokens = [line.strip().split('\t')[0] for line in bio_2]
        pre_tags = [line.strip().split('\t')[-1] for line in bio_2]
        all_tokens += tokens

        if len(gold_bio) == len(bio_2):
            for token, gold_tag, pre_tag in zip(tokens, tags, pre_tags):
                all_gold_tags.append(gold_tag)
                all_pre_tags.append(pre_tag)
        else:
            for i, token in enumerate(tokens):
                if token != '':
                    match = False
                    for i2 in range(i, -1, -1):
                        try:
                            token_2, tag_2 = bio_2[i2].strip().split('\t')
                        except:
                            token_2, tag_2 = None, None
                        if token_2 is not None:
                            if token in token_2 or token_2 in token:
                                match = True
                                break
                    if not match:
                        tag_2 = 'O'
                else:
                    tag_2 = ''
                all_gold_tags.append(tags[i])
                all_pre_tags.append(tag_2)

    # Get classification reports
    lenient = classification_report(tags_true=all_gold_tags, tags_pred=all_pre_tags, mode="lenient")
    strict = classification_report(tags_true=all_gold_tags, tags_pred=all_pre_tags, mode="strict")
   # print(lenient)
    data = []

    for entity in strict.keys():
        if entity == 'macro avg' or entity == 'micro avg':
            continue  # skip these, we will handle overall separately
        strict_scores = strict[entity]
        lenient_scores = lenient.get(entity, {'precision': 0.0, 'recall': 0.0, 'f1-score': 0.0})
        data.append({
            'entity': entity,
            'strict_precision': f"{float(strict_scores['precision']):.4f}",
            'strict_recall': f"{float(strict_scores['recall']):.4f}",
            'strict_f1-score': f"{float(strict_scores['f1-score']):.4f}",
            'lenient_precision': f"{float(lenient_scores['precision']):.4f}",
            'lenient_recall': f"{float(lenient_scores['recall']):.4f}",
            'lenient_f1-score': f"{float(lenient_scores['f1-score']):.4f}",
        })

    df = pd.DataFrame(data)

    # Display
    print(df.to_string(index=False, justify='left', line_width=1000))
    print()
    return df

In [None]:
#70b instruct
entity_list = ['Language_Fluent','Language_Some','Language_Other','Language_No']

folder_path =  'example_test'   #'CHANGE TO PATH OF THE DATA FOR NAMED ENTITY RECOGINITION, THE FORMAT IS BIO FILES IN THIS CASE'

files = [folder_path+'/'+f for f in os.listdir(folder_path) if f.endswith('.bio')] # files for NER

df = get_performance(files,entity_list)