In [None]:
import pandas as pd
import torch
import datasets
import pandas as pd
import evaluate
import numpy as np
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorWithPadding, AutoTokenizer, BitsAndBytesConfig
from transformers import DataCollatorWithPadding
import evaluate
import numpy as np
from datasets import load_dataset_builder, load_dataset
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
import pandas as pd
from datasets import Dataset
import os
import glob
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer
from transformers import DataCollatorForTokenClassification
from datasets import Dataset, load_metric
from sklearn.model_selection import train_test_split
import numpy as np

os.environ['WANDB_API_KEY']= "x"
os.environ["WANDB_PROJECT"]="LLLM"
os.environ['WANDB_WATCH']="all"
os.environ['WANDB_LOG_MODEL']="true"

In [None]:
#With metrics per NER tag



# Constants
MAX_LENGTH = 128
MODEL_NAME = "m3rg-iitd/matscibert"
publishing_name= "MatSciBERT800abstractsNER"
BATCH_SIZE = 16
EPOCHS = 4
LEARNING_RATE = 2e-5
FILE_PATH = r"C:\Users\alan\train.txt"  # Update this with the actual path to your data file

def read_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return file.read()

def read_ner_data(file_content):
    sentences, labels = [], []
    current_sentence, current_labels = [], []
    
    for line in file_content.split('\n'):
        line = line.strip()
        if line:
            parts = line.split()
            if len(parts) >= 2:
                word, label = parts[0], parts[-1]
                current_sentence.append(word)
                current_labels.append(label)
        elif current_sentence:
            sentences.append(current_sentence)
            labels.append(current_labels)
            current_sentence, current_labels = [], []
    
    if current_sentence:
        sentences.append(current_sentence)
        labels.append(current_labels)
    
    return sentences, labels

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True, max_length=MAX_LENGTH)
    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    
    tokenized_inputs["labels"] = labels
    return tokenized_inputs
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (special tokens)
    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    # Compute overall metrics
    results = metric.compute(predictions=true_predictions, references=true_labels)

    # Manually compute per-tag metrics
    per_tag_metrics = {}
    for tag in set(label_list):
        # Filter predictions and references for the specific tag
        tag_predictions = []
        tag_labels = []
        for pred, label in zip(true_predictions, true_labels):
            filtered_pred = [1 if p == tag else 0 for p in pred]
            filtered_label = [1 if l == tag else 0 for l in label]
            tag_predictions.append(filtered_pred)
            tag_labels.append(filtered_label)
        
        tag_results = metric.compute(predictions=tag_predictions, references=tag_labels)
        per_tag_metrics[tag] = {
            "precision": tag_results["overall_precision"],
            "recall": tag_results["overall_recall"],
            "f1": tag_results["overall_f1"],
            "accuracy": tag_results["overall_accuracy"],
        }

    # Add per-tag metrics to the results
    results["per_tag_metrics"] = per_tag_metrics

    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
        "per_tag_metrics": per_tag_metrics
    }

# Main function
def main():
    global tokenizer, label_list, metric

    # Load and preprocess data
    file_content = read_file(FILE_PATH)
    sentences, labels = read_ner_data(file_content)

    # Create label mappings
    label_list = sorted(list(set(label for sent_labels in labels for label in sent_labels)))
    label2id = {label: i for i, label in enumerate(label_list)}
    id2label = {i: label for label, i in label2id.items()}

    # Prepare dataset
    train_texts, val_texts, train_labels, val_labels = train_test_split(sentences, labels, test_size=0.2)

    train_dataset = Dataset.from_dict({"tokens": train_texts, "ner_tags": [[label2id[l] for l in label] for label in train_labels]})
    val_dataset = Dataset.from_dict({"tokens": val_texts, "ner_tags": [[label2id[l] for l in label] for label in val_labels]})

    # Initialize tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    model = AutoModelForTokenClassification.from_pretrained(
        MODEL_NAME, num_labels=len(label_list), id2label=id2label, label2id=label2id
    )

    # Tokenize and align labels
    train_tokenized = train_dataset.map(tokenize_and_align_labels, batched=True)
    val_tokenized = val_dataset.map(tokenize_and_align_labels, batched=True)

    # Data collator
    data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

    # Evaluation metric
    metric = load_metric("seqeval")

    # Training arguments
    training_args = TrainingArguments(
        output_dir= publishing_name,
        evaluation_strategy="epoch",
        learning_rate=LEARNING_RATE,
        per_device_train_batch_size=BATCH_SIZE,
        per_device_eval_batch_size=BATCH_SIZE,
        num_train_epochs=EPOCHS,
        weight_decay=0.01,
    )

    # Initialize Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_tokenized,
        eval_dataset=val_tokenized,
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics
    )

    # Train the model
    trainer.train()

    # Evaluate the model
    results = trainer.evaluate()
    
    # Print overall metrics
    print("Overall Metrics:")
    print(f"Precision: {results['eval_precision']:.4f}")
    print(f"Recall: {results['eval_recall']:.4f}")
    print(f"F1 Score: {results['eval_f1']:.4f}")
    print(f"Accuracy: {results['eval_accuracy']:.4f}")
    
    # Print per-tag metrics
    print("\nPer-tag Metrics:")
    for tag, metrics in results['eval_per_tag_metrics'].items():
        print(f"\n{tag}:")
        print(f"  Precision: {metrics['precision']:.4f}")
        print(f"  Recall: {metrics['recall']:.4f}")
        print(f"  F1 Score: {metrics['f1']:.4f}")
        print(f"  Accuracy: {metrics['accuracy']:.4f}")

    # Save the model
    trainer.save_model("./ner_model")
    trainer.push_to_hub()

if __name__ == "__main__":
    main()