In [1]:
import pandas as pd
import numpy as np
import re
import json
from typing import List, Tuple, Dict
import warnings
warnings.filterwarnings('ignore')

In [24]:
df = pd.read_csv("full_data.csv")

In [3]:
def clean_entity_column(col):
    # Remove digits after commas (e.g., "John Doe,123" → "John Doe")
    return col.apply(lambda x: [re.sub(r',\d+', '', ent) for ent in str(x).split(';') if ent.strip()])

# Clean each entity column
df['persons'] = clean_entity_column(df['persons'])
df['organizations'] = clean_entity_column(df['organizations'])
df['locations'] = clean_entity_column(df['locations'])

In [4]:
df[["text", "persons", "organizations","locations"]].head(5)

Unnamed: 0,text,persons,organizations,locations
0,"""articleBody"":""A federal judge has ruled again...","[Louie Gohmert, Timothy Kelly, Andrew Clyde, L...",[Dc District Court],"[Georgia,Pennsylvania,Texas]"
1,"""articleBody"":""More than a dozen major news or...","[Laura Lee Prather, Haynes Boone, Nicole Carroll]","[Texas Department Of Public Safety, Texas Depa...","[Robb Elementary School,Texas Department Of Pu..."
2,"""articleBody"":""Comedian Jon Stewart and vetera...","[Pat Toomey, Kate Bolduan, Matt Zeller, Jon St...","[Senate Majority Leader Chuck Schumer, While S...","[Iraq,America,Pennsylvania]"
3,"""articleBody"":""A federal judge has ruled again...","[Louie Gohmert, Timothy Kelly, Andrew Clyde, L...",[Dc District Court],"[Georgia,Pennsylvania,Texas]"
4,"""articleBody"":""A version of this story appears...","[Pat Toomey, Joe Manchin, Paul Leblanc, Jake T...","[Union On, Senate Republicans, Veterans Affair...","[Pennsylvania,Capitol Hill,West Virginia,Ameri..."


In [5]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import (
    AutoTokenizer, AutoModelForTokenClassification,
    TrainingArguments, Trainer, DataCollatorForTokenClassification)

from sklearn.metrics import classification_report, f1_score
import numpy as np
from typing import List, Dict, Tuple
import pandas as pd
from seqeval.metrics import classification_report as seq_classification_report
from seqeval.metrics import f1_score as seq_f1_score
import json
import os
from datetime import datetime

In [6]:
class NERDataset(Dataset):
    def __init__(self, texts, tags, tokenizer, max_length=512):
        self.texts = texts
        self.tags = tags
        self.tokenizer = tokenizer
        self.max_length = max_length
        
        # Create tag to id mapping
        unique_tags = set()
        for tag_list in tags:
            unique_tags.update(tag_list)
        
        self.tag2id = {tag: i for i, tag in enumerate(sorted(unique_tags))}
        self.id2tag = {i: tag for tag, i in self.tag2id.items()}
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = self.texts[idx]
        tags = self.tags[idx]
        
        # Tokenize
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_offsets_mapping=True,
            is_split_into_words=True
        )
        
        # Align tags with tokens
        labels = []
        word_ids = encoding.word_ids()
        previous_word_idx = None
        
        for word_idx in word_ids:
            if word_idx is None:
                labels.append(-100)
            elif word_idx != previous_word_idx:
                if word_idx < len(tags):
                    labels.append(self.tag2id[tags[word_idx]])
                else:
                    labels.append(self.tag2id['O'])
            else:
                labels.append(-100)
            previous_word_idx = word_idx
        
        encoding['labels'] = labels
        del encoding['offset_mapping']
        
        return {key: torch.tensor(val) for key, val in encoding.items()}

class ExperimentTracker:
    def __init__(self, experiment_dir='experiments'):
        self.experiment_dir = experiment_dir
        os.makedirs(experiment_dir, exist_ok=True)
        self.experiments = []
        
    def log_experiment(self, experiment_name: str, model_name: str, 
                      hyperparameters: dict, metrics: dict, notes: str = ""):
        experiment = {
            'timestamp': datetime.now().isoformat(),
            'experiment_name': experiment_name,
            'model_name': model_name,
            'hyperparameters': hyperparameters,
            'metrics': metrics,
            'notes': notes
        }
        
        self.experiments.append(experiment)
        
        # Save to file
        with open(os.path.join(self.experiment_dir, 'experiments.json'), 'w') as f:
            json.dump(self.experiments, f, indent=2)
            
        print(f"Experiment '{experiment_name}' logged successfully")
        
    def get_best_experiment(self, metric='f1_score'):
        if not self.experiments:
            return None
        return max(self.experiments, key=lambda x: x['metrics'].get(metric, 0))


In [7]:
import nltk
from nltk.tokenize import word_tokenize

nltk.download('punkt')
nltk.download('punkt_tab')

def create_bio_format(df):
    all_data = []
    sentence_id = 0

    for _, row in df.iterrows():
        text = row['text']
        tokens = word_tokenize(text)
        tags = ['O'] * len(tokens)

        def tag_entities(entities, label_prefix):
            for entity in entities:
                entity_tokens = word_tokenize(entity)
                for i in range(len(tokens) - len(entity_tokens) + 1):
                    if tokens[i:i + len(entity_tokens)] == entity_tokens:
                        tags[i] = f'B-{label_prefix}'
                        for j in range(1, len(entity_tokens)):
                            tags[i + j] = f'I-{label_prefix}'

        tag_entities(row['persons'], 'PER')
        tag_entities(row['organizations'], 'ORG')
        tag_entities(row['locations'], 'LOC')

        for token, tag in zip(tokens, tags):
            all_data.append({
                'sentence_id': sentence_id,
                'word': token,
                'tag': tag
            })

        sentence_id += 1

    return pd.DataFrame(all_data)

bio_df = create_bio_format(df)

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\paicr\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\paicr\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [8]:
class NERModel:
    def __init__(self,experiment_name : str, model_name: str, experiment_tracker: ExperimentTracker):
        self.experiment_name = experiment_name
        self.model_name = model_name
        self.tokenizer = None
        self.model = None
        self.experiment_tracker = experiment_tracker
        
    def prepare_data(self, bio_df: pd.DataFrame):
        """Prepare data for training"""
        # Group by sentence_id
        sentences = []
        sentence_tags = []
        
        for sent_id in bio_df['sentence_id'].unique():
            sent_data = bio_df[bio_df['sentence_id'] == sent_id]
            sentences.append(sent_data['word'].tolist())
            sentence_tags.append(sent_data['tag'].tolist())
        
        return sentences, sentence_tags
    
    def train_model(self, train_texts, train_tags, val_texts, val_tags):
        """Train baseline BERT model"""
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        
        # Create datasets
        train_dataset = NERDataset(train_texts, train_tags, self.tokenizer)
        val_dataset = NERDataset(val_texts, val_tags, self.tokenizer)
        
        # Initialize model
        self.model = AutoModelForTokenClassification.from_pretrained(
            self.model_name,
            num_labels=len(train_dataset.tag2id),
            id2label=train_dataset.id2tag,
            label2id=train_dataset.tag2id
        )

        def compute_metrics(p):
            """Computes F1, precision, and recall for seqeval."""
            predictions, labels = p
            predictions = np.argmax(predictions, axis=2)

            true_predictions = [
                [train_dataset.id2tag[p] for (p, l) in zip(prediction, label) if l != -100]
                for prediction, label in zip(predictions, labels)
            ]
            true_labels = [
                [train_dataset.id2tag[l] for (p, l) in zip(prediction, label) if l != -100]
                for prediction, label in zip(predictions, labels)
            ]
            
            report = seq_classification_report(true_labels, true_predictions, output_dict=True, zero_division=0)
            f1 = seq_f1_score(true_labels, true_predictions, zero_division=0)

            return {
                "precision": report["weighted avg"]["precision"],
                "recall": report["weighted avg"]["recall"],
                "f1": f1,
            }
        
        # Training arguments
        training_args = TrainingArguments(
            output_dir='./bert_ner_results',
            num_train_epochs=3,
            per_device_train_batch_size=8,
            per_device_eval_batch_size=8,
            warmup_steps=500,
            weight_decay=0.01,
            logging_dir='./logs',
            logging_steps= 10,
            eval_strategy= "epoch",
            save_strategy="epoch",
            load_best_model_at_end=True,
            metric_for_best_model="f1",
        )
        
        # Data collator
        data_collator = DataCollatorForTokenClassification(self.tokenizer)
        
        # Trainer
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            data_collator=data_collator,
            tokenizer=self.tokenizer,
            compute_metrics= compute_metrics
        )
        
        # Train
        trainer.train()
        
        # Evaluate
        eval_results = trainer.evaluate()
        
        # Log experiment
        hyperparams = training_args.to_dict()
        
        metrics = {
            'eval_loss': eval_results['eval_loss'],
            'f1_score': eval_results.get('eval_f1', 0)
        }
        
        self.experiment_tracker.log_experiment(
            'BERT_Training', 'bert-base-uncased', hyperparams, metrics
        )
        
        return trainer

In [9]:
# Shuffle again for safety
bio_df = bio_df.sample(frac=1).reset_index(drop=True)

# Split by sentence IDs
train_ids = bio_df['sentence_id'].unique()[:600]
test_ids = bio_df['sentence_id'].unique()[600:]

train_df = bio_df[bio_df['sentence_id'].isin(train_ids)]
test_df = bio_df[bio_df['sentence_id'].isin(test_ids)]

ner_model = NERModel('Experiment_with_bert_N1','bert-base-uncased', ExperimentTracker())

train_texts, train_tags = ner_model.prepare_data(train_df)
test_texts, test_tags = ner_model.prepare_data(test_df)

In [10]:
trainer = ner_model.train_model(train_texts, train_tags, test_texts, test_tags)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss,Precision,Recall,F1
1,0.1456,0.163687,0.0,0.0,0.0
2,0.0718,0.10338,0.346283,0.059072,0.108213
3,0.0568,0.07691,0.539467,0.344937,0.43484


Experiment 'BERT_Training' logged successfully


## NER INFERENCE PIPELINE

In [14]:
LABEL_MAP = {'B-PER': 'persons', 'I-PER': 'persons',
             'B-ORG': 'organizations', 'I-ORG': 'organizations',
             'B-LOC': 'locations', 'I-LOC': 'locations'}

class NERInferencePipeline:
    def __init__(self, model_dir: str):
        self.tokenizer = AutoTokenizer.from_pretrained(model_dir)
        self.model = AutoModelForTokenClassification.from_pretrained(model_dir)
        self.model.eval()
        self.label_map = self.model.config.id2label

    def predict(self, text: str):
        tokens = word_tokenize(text)
        inputs = self.tokenizer(tokens, is_split_into_words=True, return_tensors="pt", truncation=True)
        with torch.no_grad():
            outputs = self.model(**inputs).logits
        preds = torch.argmax(outputs, dim=-1).squeeze().tolist()

        word_ids = inputs.word_ids()
        grouped_preds = {}
        current_entity = ""
        current_type = ""
        entities = {'persons': [], 'organizations': [], 'locations': []}

        for idx, word_id in enumerate(word_ids):
            if word_id is None:
                continue
            label = self.label_map[preds[idx]]
            if label == "O":
                if current_entity and current_type:
                    entities[current_type].append(current_entity.strip())
                current_entity = ""
                current_type = ""
            else:
                entity_type = LABEL_MAP.get(label, "")
                if label.startswith("B-") or entity_type != current_type:
                    if current_entity and current_type:
                        entities[current_type].append(current_entity.strip())
                    current_entity = tokens[word_id]
                    current_type = entity_type
                else:
                    current_entity += " " + tokens[word_id]

        if current_entity and current_type:
            entities[current_type].append(current_entity.strip())

        return {k: ";".join(list(dict.fromkeys(v))) for k, v in entities.items()}

In [18]:
df.iloc[75].text

'"articleBody":"US officials believe Russia is preparing to falsify evidence to blame Ukrainian forces for last week’s deadly blast at the Olenivka prison ahead of visits to the site by outside parties.  An administration official told CNN they expect Russia will falsify evidence, blame Ukrainian forces, and even have “reason to believe that Russia would go so far as to make it appear that Ukrainian HIMARS were to blame before journalists arrive.” John Kirby, National Security Council coordinator for strategic communications, confirmed that reporting Thursday.  “We anticipate that Russian officials will try to frame the Ukrainian Armed Forces in anticipation of journalists and potential investigators visiting the site of the attack,” Kirby said. “In fact, we’ve already seen some spurious press reports to this effect, where they have planted evidence. We have reason to believe that Russia would go so far as to make it appear that Ukrainian HIMARS – the high mobility advanced rocket syst

In [19]:
inp_sent = df.iloc[75].text 
inf_pipe = NERInferencePipeline(model_dir="bert_ner_results/checkpoint-225")
res =inf_pipe.predict(inp_sent)
print(res)

{'persons': 'John Kirby;Jeremy;Zelensky', 'organizations': '', 'locations': ''}


In [21]:
print(inp_sent)
df.iloc[75].persons

"articleBody":"US officials believe Russia is preparing to falsify evidence to blame Ukrainian forces for last week’s deadly blast at the Olenivka prison ahead of visits to the site by outside parties.  An administration official told CNN they expect Russia will falsify evidence, blame Ukrainian forces, and even have “reason to believe that Russia would go so far as to make it appear that Ukrainian HIMARS were to blame before journalists arrive.” John Kirby, National Security Council coordinator for strategic communications, confirmed that reporting Thursday.  “We anticipate that Russian officials will try to frame the Ukrainian Armed Forces in anticipation of journalists and potential investigators visiting the site of the attack,” Kirby said. “In fact, we’ve already seen some spurious press reports to this effect, where they have planted evidence. We have reason to believe that Russia would go so far as to make it appear that Ukrainian HIMARS – the high mobility advanced rocket syste

['Vladimir Putin', 'Volodymyr Zelensky']

In [23]:
df.iloc[75].persons ,df.iloc[75].organizations , df.iloc[75].locations

(['Cnn', 'Cnn', 'Cnn', 'Cnn', 'United Nations'],
 ['Russian,Volnovakha,Kyiv,Kremlin,Ukrainian,Azovstal,Olenivka,Donbas,Washington,Russia,Ukraine,Luhansk,Russians'])

In [25]:
df.iloc[75].persons ,df.iloc[75].organizations , df.iloc[75].locations

('Vladimir Putin,2487;Volodymyr Zelensky,602',
 'Cnn,29;Cnn,261;Cnn,1007;Cnn,1596;United Nations,1099',
 'Russian,Volnovakha,Kyiv,Kremlin,Ukrainian,Azovstal,Olenivka,Donbas,Washington,Russia,Ukraine,Luhansk,Russians')

In [None]:
# class NERModel:
#     def __init__(self, model_name: str, experiment_tracker: ExperimentTracker):
#         self.model_name = model_name
#         self.tokenizer = None
#         self.model = None
#         self.experiment_tracker = experiment_tracker
        
#     def prepare_data(self, bio_df: pd.DataFrame):
#         """Prepare data for training"""
#         # Group by sentence_id
#         sentences = []
#         sentence_tags = []
        
#         for sent_id in bio_df['sentence_id'].unique():
#             sent_data = bio_df[bio_df['sentence_id'] == sent_id]
#             sentences.append(sent_data['word'].tolist())
    
#     def train_roberta_model(self, train_texts, train_tags, val_texts, val_tags):
#         """Train RoBERTa model"""
#         self.tokenizer = AutoTokenizer.from_pretrained('roberta-base')
        
#         train_dataset = NERDataset(train_texts, train_tags, self.tokenizer)
#         val_dataset = NERDataset(val_texts, val_tags, self.tokenizer)
        
#         self.model = AutoModelForTokenClassification.from_pretrained(
#             'roberta-base',
#             num_labels=len(train_dataset.tag2id)
#         )
        
#         training_args = TrainingArguments(
#             output_dir='./roberta_ner',
#             num_train_epochs=4,
#             per_device_train_batch_size=16,
#             per_device_eval_batch_size=64,
#             warmup_steps=1000,
#             weight_decay=0.01,
#             learning_rate=2e-5,
#             logging_dir='./logs',
#             evaluation_strategy="epoch",
#             save_strategy="epoch",
#             load_best_model_at_end=True,
#         )
        
#         data_collator = DataCollatorForTokenClassification(self.tokenizer)
        
#         trainer = Trainer(
#             model=self.model,
#             args=training_args,
#             train_dataset=train_dataset,
#             eval_dataset=val_dataset,
#             data_collator=data_collator,
#             tokenizer=self.tokenizer,
#         )
        
#         trainer.train()
#         eval_results = trainer.evaluate()
        
#         hyperparams = {
#             'model': 'roberta-base',
#             'epochs': 4,
#             'batch_size': 16,
#             'learning_rate': 2e-5,
#             'warmup_steps': 1000
#         }
        
#         metrics = {
#             'eval_loss': eval_results['eval_loss'],
#             'f1_score': eval_results.get('eval_f1', 0)
#         }
        
#         self.experiment_tracker.log_experiment(
#             'RoBERTa_Enhanced', 'roberta-base', hyperparams, metrics
#         )
        
#         return trainer

# class AdvancedNERModel:
#     def __init__(self, experiment_tracker: ExperimentTracker):
#         self.experiment_tracker = experiment_tracker
        
#     def train_deberta_model(self, train_texts, train_tags, val_texts, val_tags):
#         """Train DeBERTa model (state-of-the-art)"""
#         tokenizer = AutoTokenizer.from_pretrained('microsoft/deberta-base')
        
#         train_dataset = NERDataset(train_texts, train_tags, tokenizer)
#         val_dataset = NERDataset(val_texts, val_tags, tokenizer)
        
#         model = AutoModelForTokenClassification.from_pretrained(
#             'microsoft/deberta-base',
#             num_labels=len(train_dataset.tag2id)
#         )
        
#         training_args = TrainingArguments(
#             output_dir='./deberta_ner',
#             num_train_epochs=5,
#             per_device_train_batch_size=12,
#             per_device_eval_batch_size=32,
#             warmup_steps=1500,
#             weight_decay=0.01,
#             learning_rate=1e-5,
#             logging_dir='./logs',
#             evaluation_strategy="epoch",
#             save_strategy="epoch",
#             load_best_model_at_end=True,
#             gradient_accumulation_steps=2,
#         )
        
#         data_collator = DataCollatorForTokenClassification(tokenizer)
        
#         trainer = Trainer(
#             model=model,
#             args=training_args,
#             train_dataset=train_dataset,
#             eval_dataset=val_dataset,
#             data_collator=data_collator,
#             tokenizer=tokenizer,
#         )
        
#         trainer.train()
#         eval_results = trainer.evaluate()
        
#         hyperparams = {
#             'model': 'microsoft/deberta-base',
#             'epochs': 5,
#             'batch_size': 12,
#             'learning_rate': 1e-5,
#             'warmup_steps': 1500,
#             'gradient_accumulation_steps': 2
#         }
        
#         metrics = {
#             'eval_loss': eval_results['eval_loss'],
#             'f1_score': eval_results.get('eval_f1', 0)
#         }
        
#         self.experiment_tracker.log_experiment(
#             'DeBERTa_Advanced', 'microsoft/deberta-base', hyperparams, metrics
#         )
        
#         return trainer, tokenizer
    
#     def train_electra_model(self, train_texts, train_tags, val_texts, val_tags):
#         """Train ELECTRA model"""
#         tokenizer = AutoTokenizer.from_pretrained('google/electra-base-discriminator')
        
#         train_dataset = NERDataset(train_texts, train_tags, tokenizer)
#         val_dataset = NERDataset(val_texts, val_tags, tokenizer)
        
#         model = AutoModelForTokenClassification.from_pretrained(
#             'google/electra-base-discriminator',
#             num_labels=len(train_dataset.tag2id)
#         )
        
#         training_args = TrainingArguments(
#             output_dir='./electra_ner',
#             num_train_epochs=4,
#             per_device_train_batch_size=16,
#             per_device_eval_batch_size=64,
#             warmup_steps=800,
#             weight_decay=0.01,
#             learning_rate=3e-5,
#             logging_dir='./logs',
#             evaluation_strategy="epoch",
#             save_strategy="epoch",
#             load_best_model_at_end=True,
#         )
        
#         data_collator = DataCollatorForTokenClassification(tokenizer)
        
#         trainer = Trainer(
#             model=model,
#             args=training_args,
#             train_dataset=train_dataset,
#             eval_dataset=val_dataset,
#             data_collator=data_collator,
#             tokenizer=tokenizer,
#         )
        
#         trainer.train()
#         eval_results = trainer.evaluate()
        
#         hyperparams = {
#             'model': 'google/electra-base-discriminator',
#             'epochs': 4,
#             'batch_size': 16,
#             'learning_rate': 3e-5,
#             'warmup_steps': 800
#         }
        
#         metrics = {
#             'eval_loss': eval_results['eval_loss'],
#             'f1_score': eval_results.get('eval_f1', 0)
#         }
        
#         self.experiment_tracker.log_experiment(
#             'ELECTRA_Model', 'google/electra-base-discriminator', hyperparams, metrics
#         )
        
#         return trainer, tokenizer