In [7]:
# Import required libraries
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
    AutoTokenizer, 
    AutoModelForTokenClassification, 
    TrainingArguments, 
    Trainer,
    DataCollatorForTokenClassification
)
from datasets import Dataset
from sklearn.metrics import classification_report
import json
import ast
from pathlib import Path
import os

In [8]:
# Get the current file's directory (src folder)
current_dir = Path(__file__).parent if '__file__' in globals() else Path.cwd()

# Define data directory (one level up from src, then into data)
DATA_DIR = current_dir.parent / 'data'

# Set random seed for reproducibility
RANDOM_SEED = 42
torch.manual_seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

print(f"Data directory path: {DATA_DIR}")
print(f"Data directory exists: {DATA_DIR.exists()}")

Data directory path: c:\Users\gorkemozkan\Desktop\gorkDrive\finnews-insights\data
Data directory exists: True


In [9]:
# Load datasets
df_train = pd.read_excel(DATA_DIR / 'final_train_data.xlsx')
df_valid = pd.read_excel(DATA_DIR / 'final_valid_data.xlsx')
df_test = pd.read_excel(DATA_DIR / 'final_test_data.xlsx')

# Convert string lists to actual lists
def convert_string_to_list(string_list):
    try:
        return ast.literal_eval(string_list)
    except:
        return []

# Convert string lists to actual lists for each dataset
df_train['tokens'] = df_train['tokens'].apply(convert_string_to_list)
df_train['tags'] = df_train['tags'].apply(convert_string_to_list)

df_valid['tokens'] = df_valid['tokens'].apply(convert_string_to_list)
df_valid['tags'] = df_valid['tags'].apply(convert_string_to_list)

df_test['tokens'] = df_test['tokens'].apply(convert_string_to_list)
df_test['tags'] = df_test['tags'].apply(convert_string_to_list)

print("Dataset dimensions:")
print(f"Train: {len(df_train)} samples")
print(f"Valid: {len(df_valid)} samples")
print(f"Test: {len(df_test)} samples")

Dataset dimensions:
Train: 23603 samples
Valid: 3062 samples
Test: 3239 samples


In [None]:
# Check label distribution
def check_label_distribution(df, name):
    all_tags = []
    for tags in df['tags']:
        all_tags.extend(tags)
    unique, counts = np.unique(all_tags, return_counts=True)
    total = sum(counts)
    print(f"\nLabel distribution in {name} dataset:")
    for u, c in zip(unique, counts):
        print(f"Label {u}: {c} occurrences ({(c/total)*100:.2f}%)")

print("\nLabel distribution after filtering:")
check_label_distribution(df_train, "Train")
check_label_distribution(df_valid, "Validation")
check_label_distribution(df_test, "Test")

In [10]:
# Load label file and create label mappings
with open(DATA_DIR / 'label.json', 'r') as f:
    label_dict = json.load(f)

# Create label2id and id2label dictionaries
label2id = label_dict  # mappings from label.json
id2label = {v: k for k, v in label2id.items()}

print("\nLabel mappings:")
for label, idx in label2id.items():
    print(f"{label}: {idx}")


Label mappings:
O: 0
B-DATE: 1
I-DATE: 2
B-PERSON: 3
I-PERSON: 4
B-ORG: 5
I-ORG: 6
B-PERCENT: 7
I-PERCENT: 8
B-MONEY: 9
I-MONEY: 10


In [11]:
# Load model and tokenizer
model_checkpoint = "bert-base-cased"  # BERT model for English
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

# Load model and set number of classes
model = AutoModelForTokenClassification.from_pretrained(
    model_checkpoint,
    num_labels=len(label2id),
    id2label=id2label,
    label2id=label2id
)

print(f"Model loaded: {model_checkpoint}")
print(f"Total number of parameters: {model.num_parameters():,}")

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model loaded: bert-base-cased
Total number of parameters: 107,728,139


In [12]:
# Helper function to tokenize and align labels
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)
    labels = []
    
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(int(label[word_idx]))
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
            
        labels.append(label_ids)
        
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

# Get tokens and labels from DataFrames
train_sentences = df_train['tokens'].tolist()
train_labels = df_train['tags'].tolist()
valid_sentences = df_valid['tokens'].tolist()
valid_labels = df_valid['tags'].tolist()

# Convert data to datasets format
train_dataset = Dataset.from_dict({
    "tokens": train_sentences,
    "ner_tags": train_labels
})

val_dataset = Dataset.from_dict({
    "tokens": valid_sentences,
    "ner_tags": valid_labels
})

# Tokenize data
train_tokenized = train_dataset.map(tokenize_and_align_labels, batched=True)
val_tokenized = val_dataset.map(tokenize_and_align_labels, batched=True)

print("Training set size:", len(train_tokenized))
print("Validation set size:", len(val_tokenized))

Map: 100%|██████████| 23603/23603 [00:01<00:00, 12327.96 examples/s]
Map: 100%|██████████| 3062/3062 [00:00<00:00, 12770.36 examples/s]

Training set size: 23603
Validation set size: 3062





In [13]:
# Set training arguments
training_args = TrainingArguments(
    output_dir=str(DATA_DIR / "results"),  # Convert Path to string for training arguments
    eval_strategy="epoch",          # Evaluate at the end of each epoch
    learning_rate=5e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=5,            
    weight_decay=0.001,           
    save_strategy="epoch",        
    logging_steps=50,           
    load_best_model_at_end=True,   
    metric_for_best_model="f1",    
    greater_is_better=True,       
    save_total_limit=2,           
    hub_strategy="end",           
    report_to=["none"],          
    warmup_steps=200,
    fp16=True,                    
    gradient_accumulation_steps=2,
    label_smoothing_factor=0.1,
    dataloader_num_workers=2,    
    optim="adamw_torch"         
)

# Set up data collator
data_collator = DataCollatorForTokenClassification(
    tokenizer=tokenizer,
    padding=True,
    return_tensors="pt"
)

# Define metric function for evaluation
def compute_metrics(eval_preds):
    predictions, labels = eval_preds
    predictions = np.argmax(predictions, axis=2)
    
    # Remove ignored index (special tokens) and convert to labels
    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [id2label[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    
    # Flatten the predictions and labels
    flat_predictions = [p for pred in true_predictions for p in pred]
    flat_labels = [l for label in true_labels for l in label]
    
    # Calculate metrics using sklearn's classification_report
    results = classification_report(flat_labels, flat_predictions, output_dict=True)
    
    # Return the metrics
    return {
        'precision': results['weighted avg']['precision'],
        'recall': results['weighted avg']['recall'],
        'f1': results['weighted avg']['f1-score'],
        'accuracy': results['accuracy']
    }

# Create Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized,
    eval_dataset=val_tokenized,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics
)

print("Trainer ready, training can begin.")

Trainer ready, training can begin.


  trainer = Trainer(


In [None]:
# Train the model
trainer.train()

In [None]:
# Evaluate the model
predictions = trainer.predict(val_tokenized)

# Get predictions and true labels
true_predictions = [
    [id2label[p] for (p, l) in zip(pred, label) if l != -100]
    for pred, label in zip(predictions.predictions.argmax(-1), predictions.label_ids)
]

true_labels = [
    [id2label[l] for (p, l) in zip(pred, label) if l != -100]
    for pred, label in zip(predictions.predictions.argmax(-1), predictions.label_ids)
]

# Create classification report
report = classification_report(
    [item for sublist in true_labels for item in sublist],
    [item for sublist in true_predictions for item in sublist]
)

print("Model Evaluation Results:\n")
print(report)

In [None]:
def predict_entities(text, hardware='cpu'):
    """
    Identifies entities in the text.
    Args:
        text (str): Text to process
        hardware (str): Hardware to use - 'cpu' or 'gpu'
    """
    # Device selection
    device = 'cuda' if hardware.lower() == 'gpu' else 'cpu'
    
    # If GPU is selected but not available, warn and switch to CPU
    if device == 'cuda' and not torch.cuda.is_available():
        print("GPU not found, using CPU...")
        device = 'cpu'
    
    # Move model to selected device
    model.to(device)
    
    # Split text into words
    tokens = text.split()
    
    # Tokenize
    # First do encoding
    encoding = tokenizer(tokens, truncation=True, is_split_into_words=True, return_tensors="pt")
    word_ids = encoding.word_ids()
    
    # Move input tensors to selected device
    inputs = {k: v.to(device) for k, v in encoding.items()}
    
    # Predict
    with torch.no_grad():  # No gradient calculation
        outputs = model(**inputs)
        predictions = outputs.logits.argmax(-1)
    
    # Convert predictions to labels
    predicted_labels = []
    for i, pred in enumerate(predictions[0]):
        if word_ids[i] is not None:  # if not a special token
            predicted_labels.append(id2label[pred.item()])
    
    # Visualize results
    results = []
    for token, label in zip(tokens, predicted_labels):
        if label != 'O':
            results.append((token, label))
    
    return results

In [None]:
# Test text
test_text = "Apple CEO Tim Cook introduced the new iPhone model at a conference held in San Francisco."

# Test with selected hardware
hardware = 'cpu'  # can be changed to 'gpu'
print(f"\nTesting with {hardware.upper()}:")

# Make predictions
results = predict_entities(test_text, hardware)

# Show results
print("\nTest text:", test_text)
print("\nFound entities:")
for token, label in results:
    print(f"{token}: {label}")

In [None]:
# Save the model and tokenizer
output_dir = DATA_DIR / "ner_model"
model.save_pretrained(str(output_dir))  # Convert Path to string for save_pretrained
tokenizer.save_pretrained(str(output_dir))

print(f"Model and tokenizer saved to directory: {output_dir}")