In [None]:
!pip install transformers accelerate bitsandbytes

In [None]:
import json
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, GenerationConfig
import torch

from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os

def load_data(file_path):
    with open(file_path, "r") as f:
        data = json.load(f)

    texts = [
        {"id": record.get("title", f"doc_{i}"), "text": record.get("doc", "")}
        for i, record in enumerate(data)
    ]

    unique_labels = {
        label
        for record in data
        for label in record.get("entity_label_set", [])
    }

    print(f"Loaded {len(texts)} texts.")
    file_name = os.path.splitext(os.path.basename(file_path))[0]

    return texts, unique_labels, file_name

In [None]:
import transformers
import torch

model_id = "meta-llama/Meta-Llama-3-8B-Instruct"

pipeline = transformers.pipeline(
    "text-generation",
    model=model_id,
    model_kwargs={"torch_dtype": torch.bfloat16},
    device_map="auto",
)

messages = [
    {"role": "system", "content": "You are a pirate chatbot who always responds in pirate speak!"},
    {"role": "user", "content": "Who are you?"},
]

terminators = [
    pipeline.tokenizer.eos_token_id,
    pipeline.tokenizer.convert_tokens_to_ids("<|eot_id|>")
]

outputs = pipeline(
    messages,
    max_new_tokens=256,
    eos_token_id=terminators,
    do_sample=True,
    temperature=0.6,
    top_p=0.9,
)
print(outputs[0]["generated_text"][-1])

In [None]:
import re

def extract_entities(text: str, labels: list) -> dict:
    """
    Prompt the LLaMA model to extract entities of interest and return a dict mapping labels to lists of entities.
    """
    messages = [
      {"role": "system", "content": f"You are an expert in Named Entity Recognition. Please extract entities that match the schema definition from the input. Return an empty list if the entity type does not exist. Please respond in the format of a JSON string.\", \"schema\": {labels}"},
      {"role": "user", "content": text},
    ]

    terminators = [
        pipeline.tokenizer.eos_token_id,
        pipeline.tokenizer.convert_tokens_to_ids("<|eot_id|>")
    ]

    outputs = pipeline(
        messages,
        max_new_tokens=512,
        eos_token_id=terminators,
        do_sample=True,
        temperature=0.6,
        top_p=0.9,
    )
    return(outputs[0]["generated_text"][-1])

In [None]:
import re
import json
from typing import Optional, Dict, Any

def convert_model_output_to_json(
    title: str,
    model_output: str,
    output_path: Optional[str] = None
) -> Dict[str, Any]:
    """
    Extracts a JSON object from a model output string and returns it as a Python dict.
    If `output_path` is provided, also writes the JSON to that file.

    Args:
        model_output: The raw string returned by the model, containing a JSON snippet.
        output_path: Optional path (including '.json') to save the extracted JSON.

    Returns:
        A Python dict representing the JSON data.
    """
    # 1. Try to grab anything between ```...``` fences first
    fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", model_output, re.DOTALL)
    if fence_match:
        json_str = fence_match.group(1)
    else:
        # 2. Fallback: grab from first '{' to last '}'
        start = model_output.find('{')
        end = model_output.rfind('}') + 1
        if start == -1 or end == -1:
            raise ValueError("No JSON object found in the model output.")
        json_str = model_output[start:end]

    # 3. Parse into dict
    try:
        data = json.loads(json_str)
    except json.JSONDecodeError as e:
        raise ValueError(f"Failed to parse JSON: {e}")

    new_entry = {title: data}

    # 4. Optionally write or append to file.
    if output_path:
        if os.path.exists(output_path):
            with open(output_path, 'r', encoding='utf-8') as f:
                try:
                    existing_data = json.load(f)
                    if not isinstance(existing_data, list):
                        existing_data = [existing_data]
                except json.JSONDecodeError:
                    existing_data = []
            existing_data.append(new_entry)
            data_to_write = existing_data
        else:
            data_to_write = [new_entry]

        print(f"Writing JSON to {output_path}")
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data_to_write, f, ensure_ascii=False, indent=4)

    return data



In [None]:
base_folder = "/content/drive/MyDrive/dataset/dev"  # replace with your folder path

# loop through all files in the given folder
for root, dirs, files in os.walk(base_folder):
    for filename in files:
        path = os.path.join(root, filename)

        texts, label_set, file_name = load_data(path)
        print(file_name)

        # loop through all texts in the given file
        for item in texts:

            id = item["id"]
            text = item["text"]

            # extract entities from the text
            result = extract_entities(text, label_set)

            output_file = f"/content/drive/MyDrive/dataset/baseline_output/{file_name}.json"

            try:
              convert_model_output_to_json(id, result["content"], output_file)
            except:
              print(f"Could not convert {id}. Saving raw conetent")
              with open(f"/content/drive/MyDrive/dataset/baseline_output/{id}.json", "w") as f:
                f.write(result["content"])


In [None]:
import json
from utils.NEREvaluator import NEREvaluator

base_folder = "data/processed/NER/llama_baseline"  # replace with your folder path

# loop through all files in the given folder
for root, dirs, files in os.walk(base_folder):
    for file_name in files:
        with open(f"data/raw/dev/{file_name}", "r") as f:
            gt = json.load(f)

        with open(f"data/processed/NER/llama_baseline/{file_name}") as f:
            preds = json.load(f)

        results = []            

        for p in preds:
            doc_id = list(p.keys())[0]
            pred = p[doc_id]["entities"]

            gt_entry = next((entry for entry in gt if entry["title"] == doc_id), None)
            
            if gt_entry is None:
                print(f"No matching entry found for {doc_id}")
                continue

            doc = gt_entry["doc"]
            label_set = gt_entry["entities"]

            evaluator = NEREvaluator(
                doc,
                label_set,
                pred
            )

            metrics = evaluator.evaluate()
            results.append(
                {
                    doc_id: metrics
                }
            )

        with open(f"data/processed/NER/baseline_result/baseline_{file_name}", "w") as f:
                json.dump(results, f, ensure_ascii=False, indent=4)
