In [None]:
import os
import json
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    pipeline
)

def is_local_model(model_path):
    """Check if model path refers to a local model directory"""
    return os.path.exists(model_path)

def load_model_config(model_path, debug=False):
    """
    Load model configuration from a custom config file if it exists
    
    Args:
        model_path: Path to model directory
        debug: Whether to print debug information
    
    Returns:
        dict: Configuration with id2label and label2id mappings
    """
    config_path = os.path.join(model_path, "model_config.json")
    
    if os.path.exists(config_path):
        if debug:
            print(f"Found custom config at {config_path}")
        
        with open(config_path, "r") as f:
            model_config = json.load(f)
        
        # Extract label mappings
        id2label = model_config.get("id2label", {"0": "O", "1": "B-PERSON", "2": "I-PERSON"})
        label2id = model_config.get("label2id", {"O": 0, "B-PERSON": 1, "I-PERSON": 2})
        
        # Convert string keys to integers for id2label
        id2label = {int(k): v for k, v in id2label.items()}
        
        return {"id2label": id2label, "label2id": label2id}
    else:
        if debug:
            print("No custom config found, using default settings")
        
        # Default settings for person NER
        return {
            "id2label": {0: "O", 1: "B-PERSON", 2: "I-PERSON"},
            "label2id": {"O": 0, "B-PERSON": 1, "I-PERSON": 2}
        }

def load_tokenizer(model_path, debug=False):
    """Load tokenizer from path"""
    if debug:
        print(f"Loading tokenizer from {model_path}")
    return AutoTokenizer.from_pretrained(model_path)

def configure_base_model(model_path, debug=False):
    """
    Check if a model is a base model requiring NER configuration
    
    Args:
        model_path: Path or name of the model
        debug: Whether to print debug info
    
    Returns:
        bool: Whether this is a base model needing configuration
    """
    is_base = "base" in model_path.lower() and "ner" not in model_path.lower()
    if is_base and debug:
        print(f"Detected {model_path} as base model requiring NER configuration")
    return is_base

def load_model(model_path, config=None, debug=False):
    """
    Load model with appropriate configuration
    
    Args:
        model_path: Path to model
        config: Optional configuration dictionary with id2label and label2id
        debug: Whether to print debug info
    
    Returns:
        Model: The loaded model
    """
    if debug:
        print(f"Loading model from {model_path}")
    
    # Check if this is a pre-trained NER model that shouldn't be reconfigured
    is_pretrained_ner = any(x in model_path.lower() for x in 
                           ["conll", "ner-english", "ner-large", "ontonotes"])
    
    if is_pretrained_ner:
        if debug:
            print(f"Detected pre-trained NER model: {model_path}")
            print("Loading with original configuration")
        
        # Load with original configuration without overriding labels
        model = AutoModelForTokenClassification.from_pretrained(model_path)
        
        if debug:
            print(f"Model loaded with {len(model.config.id2label)} labels: {model.config.id2label}")
    else:
        # If no config provided, use default for person NER
        if config is None:
            config = {
                "id2label": {0: "O", 1: "B-PERSON", 2: "I-PERSON"},
                "label2id": {"O": 0, "B-PERSON": 1, "I-PERSON": 2}
            }
        
        # Load model with custom configuration
        model = AutoModelForTokenClassification.from_pretrained(
            model_path,
            id2label=config["id2label"],
            label2id=config["label2id"],
            num_labels=len(config["id2label"])
        )
        
        if debug:
            print(f"Model loaded with {len(config['id2label'])} labels")
    
    return model

def create_ner_pipeline(model, tokenizer, debug=False):
    """
    Create NER pipeline from model and tokenizer
    
    Args:
        model: The model to use
        tokenizer: The tokenizer to use
        debug: Whether to print debug info
    
    Returns:
        pipeline: NER pipeline
    """
    device = 0 if torch.cuda.is_available() else -1
    if debug:
        print(f"Creating NER pipeline on device: {device}")
    
    return pipeline(
        "ner", 
        model=model, 
        tokenizer=tokenizer,
        aggregation_strategy="simple",
        device=device
    )

def test_pipeline(pipeline, test_text="Barack Obama was president of the United States.", debug=False):
    """
    Test NER pipeline on a sample text
    
    Args:
        pipeline: NER pipeline to test
        test_text: Sample text for testing
        debug: Whether to print full results
    
    Returns:
        list: Detected entities
    """
    if debug:
        print(f"Testing pipeline on: '{test_text}'")
    
    entities = pipeline(test_text)
    
    if debug:
        print(f"Detected entities: {entities}")
    
    return entities

def print_model_info(model, tokenizer):
    """Print detailed information about the model"""
    print(f"Model architecture: {model.__class__.__name__}")
    print(f"Number of parameters: {sum(p.numel() for p in model.parameters())}")
    print(f"Labels: {model.config.id2label}")
    print(f"Tokenizer: {tokenizer.__class__.__name__}")
    print(f"Vocabulary size: {tokenizer.vocab_size}")

def load_and_setup_ner_model(model_path, test=False, debug=False):
    """
    Main function to load and set up an NER model
    
    Args:
        model_path: Path or name of model
        test: Whether to run a quick test
        debug: Whether to print debug info
    
    Returns:
        tuple: (tokenizer, model, ner_pipeline)
    """
    # Check if local model
    local = is_local_model(model_path)
    if debug:
        print(f"Model path {model_path} is {'local' if local else 'from HuggingFace'}")
    
    # Load tokenizer
    tokenizer = load_tokenizer(model_path, debug)
    
    # Check if pre-trained NER model
    is_pretrained_ner = any(x in model_path.lower() for x in 
                           ["conll", "ner-english", "ner-large", "ontonotes"])
    
    # Determine if base model (only if not pretrained NER)
    is_base = False if is_pretrained_ner else configure_base_model(model_path, debug)
    
    # Load configuration
    if local:
        config = load_model_config(model_path, debug)
    elif is_base:
        config = {
            "id2label": {0: "O", 1: "B-PERSON", 2: "I-PERSON"},
            "label2id": {"O": 0, "B-PERSON": 1, "I-PERSON": 2}
        }
    else:
        config = None  # Use model's existing config
    
    # Load model
    model = load_model(model_path, config, debug)
    
    # Create pipeline
    ner_pipe = create_ner_pipeline(model, tokenizer, debug)
    
    # Test pipeline
    if test:
        entities = test_pipeline(ner_pipe, debug=debug)
        if debug:
            print(f"Found {len(entities)} entities in test text")
    
    # Print detailed model info if requested
    if debug:
        print_model_info(model, tokenizer)
    
    return tokenizer, model, ner_pipe
# Example usage:
# tokenizer, model, pipeline = load_and_setup_ner_model("../models/roberta-finetuned-ner", debug=True)

def extract_and_display_names(text, ner_pipeline, debug=False):
    """
    Extract person names from text using a specified NER model and display results.
    
    Args:
        text (str): Text to analyze
        model_path (str): Path or name of NER model
        debug (bool): Whether to print debug information
    
    Returns:
        list: Detected person names
    """
    # Load model
    
    # Get entities
    entities = ner_pipeline(text)
    
    if debug:
        print(f"All entities detected: {entities}")
    
    # Filter for person entities (handle different labeling schemes)
    person_entities = []
    for e in entities:
        entity_type = e["entity_group"]
        # Accept different variations of person entity labels
        if entity_type in ["PERSON", "PER", "B-PERSON", "I-PERSON", "B-PER", "I-PER"]:
            person_entities.append(e)
    
    # Display results
    if not person_entities:
        
        return []
    
    # Create highlighted text display with markdown formatting
    highlighted_text = text
    # Sort entities by start position in reverse to avoid offset issues
    for entity in sorted(person_entities, key=lambda x: x["start"], reverse=True):
        start, end = entity["start"], entity["end"]
        entity_text = text[start:end]
        highlighted_text = highlighted_text[:start] + f"**{entity_text}**" + highlighted_text[end:]
    
    result = []
    
    for entity in person_entities:
        # Strip whitespace from the entity name
        name = entity["word"].strip()
        result.append(name)
        confidence = f"{entity['score']:.4f}"
        entity_type = entity["entity_group"]
        
    
    return result
def names_from_ner_tags(entry):
    result = []
    tokens = entry["tokens"]
    tags = entry["ner_tags"]
    current_name = ""
    tokens.append(".")
    tags.append(0)
    for i in range(len(tokens)):
        if tags[i] == 1:
            current_name = tokens[i]
        elif tags[i] == 2 and current_name:
            current_name += " " + tokens[i]
        else:
            if current_name:
                result.append(current_name)
                current_name = ""
               
    return result
            
   
   
def benchmark(models, text_json, debug=False):
    result = {}
    for model in models:
        print(f"Loading model {model}...")
        print("-" * 50)
        correct = 0
        total = 0
        miss = []
        a,b, pipeline = load_and_setup_ner_model(model, test=False, debug=debug)
        print(f"----- Benchmarking {model} ------------")
        for entry in text_json:
            
            text = " ".join(entry["tokens"])
            if debug:
                print(f"Text: {text}")
                print("-" * 50)
            
            model_names = extract_and_display_names(
                text,
                pipeline,
                debug=False
            )
            names = names_from_ner_tags(entry)
            if debug:
                print(f"Expected names: {names}")
                print(f"Detected names: {model_names}")
                print("-" * 50)
            for i in range(len(names)):
                if debug:
                    print("-" * 50)
                    print(f"Checking FOR {names[i]}...")
                if names[i] in model_names:
                    if debug:
                        print(f"Correctly detected {names[i]}")
                    correct+= 1
                    total+= 1
                else:
                    miss.append(names[i])
                    total+= 1
        result["model"] = model
        result["precision"] = correct / total
        result["missed"] = miss
        
    return result

#load Json file
with open("../data/json/conllpp_train.json", "r") as f:
    text_json = json.load(f)

print(benchmark(
    ["../models/roberta-finetuned-ner"],
    text_json
))
        

Device set to use cuda:0


Loading model ../models/roberta-finetuned-ner...
--------------------------------------------------
----- Benchmarking ../models/roberta-finetuned-ner ------------
Loading model dbmdz/bert-large-cased-finetuned-conll03-english...
--------------------------------------------------


Some weights of the model checkpoint at dbmdz/bert-large-cased-finetuned-conll03-english were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cuda:0


----- Benchmarking dbmdz/bert-large-cased-finetuned-conll03-english ------------
{'model': 'dbmdz/bert-large-cased-finetuned-conll03-english', 'precision': 0.8826561552456034, 'missed': ['Loyola de Palacio', 'Shen Guofang', 'Eliahu Ben-Elissar', 'Hafez al-', 'Phil', 'D.A. Weibring', 'Mark', "O'Meara", 'PIVOTAL', 'PRESCOTT', 'Eveningperformance', 'Shi-Ting Wang', 'DALGLISH', 'Inzamam-ul-Haq', 'FERGUSON', "Alex O'Brien", 'Hwang', 'Jean-Philippe Fleurian', 'Carl-Uwe Steeb', "Alex O'Brien", 'DAVEY JOHNSON', 'GREER', 'COCU', 'KNUP', 'David J. Russell', 'Mats', 'Greg', 'Derrick', 'Cooper', 'Jose-Maria Canizares', 'Jamie', 'Hassan al-Turabi', 'Hans-Otto Sieg', 'Kennedys', 'I.K. Gujral', 'H.D. Deve Gowda', 'C. Rangarajan', 'Chua Jui', 'Gerhard Berger', 'David Coulthard', 'Jacques Villeneuve', 'Mika Hakkinen', 'Heinz-Harald Frentzen', 'Jean Alesi', 'Damon Hill', 'Michael Schumacher', 'Martin Brundle', 'Rubens Barrichello', 'Johnny Herbert', 'Olivier Panis', 'Inzamam-ul-Haq', 'Inzamam-ul-Haq', '