In [1]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import torch
from torch import nn
import numpy as np
from transformers import BertForSequenceClassification, Trainer, TrainingArguments
import re
import nltk
from nltk.stem import WordNetLemmatizer
import seaborn as sns
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import nlpaug.augmenter.word as naw

In [2]:
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\gduln001\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\gduln001\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


True

In [3]:
#Step 1: Data Preprocessing

In [4]:
data = pd.read_excel('\\\\vi240c060002.woc.prod\\e$\\datasets\\WCMLDataset12_18.xlsx')
example_data = data.copy()

In [5]:
lemmatizer = WordNetLemmatizer()

In [6]:
def clean_text(text):
    # Lowercase
    text = text.lower()

    # Remove or normalize unwanted characters (digits, special symbols)
    text = re.sub(r"[^a-z0-9.,!?'\s-]", '', text)

    # Replace multiple spaces with a single space
    text = re.sub(r"\s+", " ", text).strip()

    # Normalize excessive punctuation
    text = re.sub(r"!+", "!", text)
    text = re.sub(r"\?+", "?", text)

    # Tokenize by whitespace to apply lemmatization
    tokens = text.split()

    # Lemmatize each token
    # The WordNetLemmatizer defaults to nouns, so for a more accurate approach,
    # nltk.download('averaged_perceptron_tagger')
    # then map POS tags to WordNet POS and lemmatize accordingly.
    tokens = [lemmatizer.lemmatize(token) for token in tokens]

    # Rejoin after lemmatization
    text = " ".join(tokens)

    return text

In [7]:
text_fields = [
    'Incident Description', 
    'Activity Engaged in During Accident', 
    'General HS Comments', 
    'Injury Description'
]

In [8]:
# Fill NaN with empty strings
example_data[text_fields] = example_data[text_fields].fillna('')

In [9]:
# Apply the cleaning function to each text field
for field in text_fields:
    example_data[field] = example_data[field].apply(clean_text)

In [10]:
# Combine text fields into a single input column (they are already lowercase from the cleaning step)
example_data['Combined_Text'] = (
    example_data['Incident Description'] + ' ' +
    example_data['Activity Engaged in During Accident'] + ' ' +
    example_data['General HS Comments'] + ' ' +
    example_data['Injury Description']
).str.strip()


In [12]:
syn_aug = naw.SynonymAug(aug_src='wordnet', aug_min=1, aug_max=3, aug_p=0.1)

def augment_text(text, augmenter=syn_aug):
    return augmenter.augment(text)

# Apply augmentation to rare class samples
rare_threshold = 50

# Compute the frequency of each class
class_counts = example_data['Source of Injury Desc'].value_counts()

# Identify which classes are rare
rare_classes_list = class_counts[class_counts < rare_threshold].index.tolist()

# Now this variable is defined, you can filter the dataframe
rare_class_filter = example_data['Source of Injury Desc'].isin(rare_classes_list)
rare_class_data = example_data[rare_class_filter]

augmented_samples = []
for _, row in rare_class_data.iterrows():
    augmented_text = augment_text(row['Combined_Text'])
    new_row = row.copy()
    new_row['Combined_Text'] = augmented_text
    augmented_samples.append(new_row)

augmented_df = pd.DataFrame(augmented_samples)
example_data = pd.concat([example_data, augmented_df], ignore_index=True)

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\gduln001\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping taggers\averaged_perceptron_tagger.zip.


In [None]:
# Encode target labels for all outputs
targets = [
    'Event of Injury Desc', 
    'Source of Injury Desc', 
    'Event of Incident Desc', 
    'Source of Incident Desc',
    'EDI Cause Desc'
]

In [None]:
label_encoders = {}
for target in targets:
    le = LabelEncoder()
    example_data[target + '_Encoded'] = le.fit_transform(example_data[target])
    label_encoders[target] = le

In [None]:
# Initialize the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
#split data for the "Source of Injury Desc" target
X_train, X_test, y_train, y_test = train_test_split(
    example_data['Combined_Text'], 
    example_data['Source of Injury Desc_Encoded'], 
    test_size=0.2, 
    random_state=42
)

In [None]:
# Tokenize the text
train_encodings = tokenizer(list(X_train), truncation=True, padding=True, max_length=512)
test_encodings = tokenizer(list(X_test), truncation=True, padding=True, max_length=512)

In [None]:
class_counts = np.bincount(y_train)  # counts how many samples of each class are in y_train

num_classes = len(class_counts)
total_samples = len(y_train)

class_weights = total_samples / (num_classes * class_counts.astype(float))
class_weights = torch.tensor(class_weights, dtype=torch.float)

print("Class Weights:", class_weights)

In [None]:
class WeightedTrainer(Trainer):
    def __init__(self, class_weights, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.loss_fn = nn.CrossEntropyLoss(weight=class_weights)

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels")
        # forward pass
        outputs = model(**{k: v for k, v in inputs.items() if k != "labels"})
        logits = outputs.get("logits")

        loss = self.loss_fn(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = logits.argmax(axis=1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average='weighted')
    acc = accuracy_score(labels, predictions)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

In [None]:
train_dataset = Dataset(train_encodings, list(y_train))
test_dataset = Dataset(test_encodings, list(y_test))

num_labels = len(label_encoders['Source of Injury Desc'].classes_)
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=num_labels)

In [None]:
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=4,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=64,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
)

In [None]:
# Create an instance of WeightedTrainer
trainer = WeightedTrainer(
    class_weights=class_weights,
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
)

In [None]:
trainer.train()

In [None]:
model.save_pretrained('\\\\vi240c060002.woc.prod\\e$\\Machine Learning\\fine_tuned_source_of_injury')

In [None]:
# Inference example
model = BertForSequenceClassification.from_pretrained('\\\\vi240c060002.woc.prod\\e$\\Machine Learning\\fine_tuned_source_of_injury')

In [None]:
new_text = ["food borne illness FSH Incident per guest email dated 12/08/24 ppw to be sent. tmg. Guest states that 3 guests in their party became sick with food poisoning symptoms after eating at the buffet and one of them needed medical attention for the symptoms after returning home."]
new_text = [clean_text(t) for t in new_text]  # Clean the new text before prediction
new_encodings = tokenizer(new_text, truncation=True, padding=True, max_length=512, return_tensors='pt')

In [None]:
model.eval()
outputs = model(**new_encodings)
predicted_class = torch.argmax(outputs.logits, dim=1).item()
decoded_class = label_encoders['Source of Injury Desc'].inverse_transform([predicted_class])
print(f"Predicted Source of Injury: {decoded_class[0]}")

In [None]:
plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoders['Source of Injury Desc'].classes_, yticklabels=label_encoders['Source of Injury Desc'].classes_)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

In [None]:
# Get predictions from the model on the test dataset
predictions = trainer.predict(test_dataset)
preds = np.argmax(predictions.predictions, axis=1)
labels = predictions.label_ids

# Inverse transform labels (Human Readable)
decoded_preds = label_encoders['Source of Injury Desc'].inverse_transform(preds)
decoded_labels = label_encoders['Source of Injury Desc'].inverse_transform(labels)

# Confusion matrix
cm = confusion_matrix(decoded_labels, decoded_preds)
print("Confusion Matrix:")
print(cm)

# Classification report
report = classification_report(decoded_labels, decoded_preds, zero_division=0)
print("Classification Report:")
print(report)