## Setup notebook

In [0]:
import os

# Retrieve API keys from environment variables
prodigy_key = os.getenv("MY_PRODIGY_KEY")
if not prodigy_key:
    raise ValueError("Environment variable MY_PRODIGY_KEY is not set.")

In [0]:
# Install library and versions below
%pip install spacy==3.7.5 spacy-transformers==1.3.5 spacy-loggers==1.0.5 mlflow==2.20.1 > /dev/null 2>&1
%pip install prodigy -f https://{prodigy_key}@download.prodi.gy/
dbutils.library.restartPython()

In [0]:
# Core Utilities
import os
import sys
import logging
import ast
from pathlib import Path
import warnings

# Data Handling
import json
import pandas as pd

# MLflow Components
import mlflow
import mlflow.spacy

# SpaCy / NLP Components
import spacy
from spacy.language import Language
from spacy.training import Example
from spacy.training.initialize import init_nlp
from spacy.training.loop import train
from spacy.util import load_config
from spacy.cli.train import train

## Train a NER model to detect benefits in job descriptions
1. Create a ground-truth dataset to train model, using already annotated data
2. Prepare training materials i.e. transform data into format required for spaCy model training
3. Train the model, logging in MLFlow for good documentation

### Create the 'ground-truth' dataset for this model by combining. Usually do this by combining the previous model's ground-truth with the additional data you've annotated

In [0]:
old_ground_truth = "/v5.2_fixing_dataset.jsonl"
new_annotated_data = "/v5.5_ground_truth.jsonl"

df_old_ground_truth = pd.read_json(old_ground_truth, lines=True)
df_new_annotated_data = pd.read_json(new_annotated_data, lines=True)

## Some checks for data before training model

### 1. Drop those annotated examples that're 'reject' or 'skip' and where 'spans' are null as they wont be used to train the model

In [0]:
def drop_non_accept_answers(df, df_name='DataFrame'):
    # Define conditions for rows to drop
    condition_non_accept = df['answer'] != 'accept'
    condition_spans_null = df['spans'].isnull()
    
    # Combine conditions using OR logic
    combined_condition = condition_non_accept | condition_spans_null
    
    # Identify rows that meet either condition
    rows_to_drop = df[combined_condition]
    count_to_drop = rows_to_drop.shape[0]
    
    if count_to_drop > 0:
        # Create a filtered DataFrame with the rows to drop
        filtered_df = rows_to_drop.copy()
        
        # Dynamically create a new variable named 'filtered_df_{df_name}'
        # Ensure df_name is a valid identifier to avoid syntax issues
        valid_df_name = ''.join(char if char.isalnum() or char == '_' else '_' for char in df_name)
        filtered_df_var_name = f"filtered_df_{valid_df_name}"
        globals()[filtered_df_var_name] = filtered_df

        # Drop the identified rows from the original DataFrame
        df.drop(rows_to_drop.index, inplace=True)
        print(f"'{df_name}': Dropped {count_to_drop} row(s) where 'answer' is not 'accept' or 'spans' is null.")
        print(f"Created '{filtered_df_var_name}' containing the dropped rows.")
        
        return filtered_df
    else:
        print(f"'{df_name}': No rows to drop. All 'answer' values are 'accept' and 'spans' are non-null.")
        return None
    
# Apply the function to df_old_ground_truth
filtered_df_df_old_ground_truth = drop_non_accept_answers(df_old_ground_truth, df_name='df_old_ground_truth')

# Apply the function to df_new_annotated_data
filtered_df_df_new_annotated_data = drop_non_accept_answers(df_new_annotated_data, df_name='df_new_annotated_data')

'df_old_ground_truth': No rows to drop. All 'answer' values are 'accept' and 'spans' are non-null.
'df_new_annotated_data': No rows to drop. All 'answer' values are 'accept' and 'spans' are non-null.


### 2. Final check to make sure columns are all same and in consistent format

In [0]:
def compare_and_clean_columns(df1, df2, df1_name='df1', df2_name='df2'):
    columns_df1 = set(df1.columns)
    columns_df2 = set(df2.columns)

    # Check if columns are the same
    if list(df1.columns) == list(df2.columns):
        print("Both DataFrames have the same column names in the same order.")
    else:
        print("DataFrames have different column names or different column orders.")
        
        # Identify extra and missing columns
        extra_in_df1 = columns_df1 - columns_df2
        extra_in_df2 = columns_df2 - columns_df1
        common_columns = columns_df1 & columns_df2

        if extra_in_df1:
            print(f"\nColumns only in {df1_name}:")
            for col in extra_in_df1:
                print(f" - {col}")
        else:
            print(f"\nNo extra columns in {df1_name}.")

        if extra_in_df2:
            print(f"\nColumns only in {df2_name}:")
            for col in extra_in_df2:
                print(f" - {col}")
        else:
            print(f"\nNo extra columns in {df2_name}.")

        # Check for column order differences
        if common_columns:
            ordered_common_df1 = [col for col in df1.columns if col in common_columns]
            ordered_common_df2 = [col for col in df2.columns if col in common_columns]
            if ordered_common_df1 != ordered_common_df2:
                print("\nCommon columns are in different orders:")
                print(f" - {df1_name} order: {ordered_common_df1}")
                print(f" - {df2_name} order: {ordered_common_df2}")
            else:
                print("\nCommon columns are in the same order.")

compare_and_clean_columns(
    df_old_ground_truth, 
    df_new_annotated_data, 
    df1_name='df_old_ground_truth', 
    df2_name='df_new_annotated_data'
)

DataFrames have different column names or different column orders.

Columns only in df_old_ground_truth:
 - len
 - id
 - JobDescriptioninHTML
 - CompanyName
 - country
 - title

No extra columns in df_new_annotated_data.

Common columns are in the same order.


### If above says 'DataFrames have different column names or different column orders', continue below. If not, skip code in this section

In [0]:
# INSTRUCTION: Specify columns to drop based on the identified differences for 'columns_to_drop' for df1 and df2
columns_to_drop_df1 = ['len', 'id', 'JobDescriptioninHTML', 'CompanyName', 'country', 'title']
columns_to_drop_df2 = ['unwanted_column1_df2', 'unwanted_column2_df2']

def drop_unwanted_columns(df, columns_to_drop, df_name='DataFrame'):
    existing_columns_to_drop = [col for col in columns_to_drop if col in df.columns]
    if not existing_columns_to_drop:
        print(f"No specified columns to drop in '{df_name}'.")
        return df

    df_dropped = df.drop(columns=existing_columns_to_drop)
    print(f"Dropped columns from '{df_name}': {existing_columns_to_drop}")
    return df_dropped

# Drop unwanted columns from df_old_ground_truth
df_old_ground_truth = drop_unwanted_columns(
    df_old_ground_truth, 
    columns_to_drop=columns_to_drop_df1, 
    df_name='df_old_ground_truth'
)

# Drop unwanted columns from df_new_annotated_data
df_new_annotated_data = drop_unwanted_columns(
    df_new_annotated_data, 
    columns_to_drop=columns_to_drop_df2, 
    df_name='df_new_annotated_data'
)

Dropped columns from 'df_old_ground_truth': ['len', 'id', 'JobDescriptioninHTML', 'CompanyName', 'country', 'title']
No specified columns to drop in 'df_new_annotated_data'.


### Final bit of column alignment

In [0]:
def align_column_order(df_reference, df_to_align, df_to_align_name='df_to_align'):
    common_columns = df_reference.columns.intersection(df_to_align.columns)
    df_aligned = df_to_align[common_columns]
    print(f"Aligned column order of '{df_to_align_name}' to match the reference DataFrame.")
    return df_aligned

df_new_annotated_data = align_column_order(
    df_reference=df_old_ground_truth, 
    df_to_align=df_new_annotated_data, 
    df_to_align_name='df_new_annotated_data'
)

compare_and_clean_columns(
    df_old_ground_truth, 
    df_new_annotated_data, 
    df1_name='df_old_ground_truth', 
    df2_name='df_new_annotated_data'
)

Aligned column order of 'df_new_annotated_data' to match the reference DataFrame.
Both DataFrames have the same column names in the same order.


## Prep training materials

In [0]:
# Combining datasets to create new_ground_truth
df_new_ground_truth = pd.concat([df_old_ground_truth,df_new_annotated_data])

expected_rows = df_old_ground_truth.shape[0] + df_new_annotated_data.shape[0]
actual_rows = df_new_ground_truth.shape[0]

if expected_rows == actual_rows:
    print("The number of rows in df_new_ground_truth is correct.")
else:
    print("The number of rows in df_new_ground_truth is incorrect.")

The number of rows in df_new_ground_truth is correct.


In [0]:
# Save as jsonl as needed to feed into training

ground_truth_path = "/TEMP2.jsonl"
df_new_ground_truth.to_json(ground_truth_path,lines=True,orient='records')

In [0]:
# INSTRUCTION: Name the prodigy session you're creating to house the training data for this model

prodigy_session_name = "benefits_TEST2"

!python -m prodigy db-in $prodigy_session_name "$ground_truth_path" --overwrite

In [0]:
# INSTRUCTION: Set the folder location you want to house all the training data
# Keep training split at 0.1 (90 training : 10 evaluation) unless want more evaluation data, then can change to 80 : 20

training_data_path = "/Workspace/Users/justin.ngam@towerswatson.com/1_Benefits/1_NER/Notebooks/temp/ner"

!python -m prodigy data-to-spacy "$training_data_path" --ner $prodigy_session_name --eval-split 0.1

  _torch_pytree._register_pytree_node(
[38;5;4mℹ Using language 'en'[0m
  _torch_pytree._register_pytree_node(
[1m
Components: ner
Merging training and evaluation data for 1 components
  - [ner] Training: 7110 | Evaluation: 786 (10% split)
Training: 1906 | Evaluation: 211
Labels: ner (1)
[38;5;2m✔ Saved 1906 training examples[0m
/Workspace/Users/justin.ngam@towerswatson.com/1_Benefits/1_NER/Notebooks/temp/ner/train.spacy
[38;5;2m✔ Saved 211 evaluation examples[0m
/Workspace/Users/justin.ngam@towerswatson.com/1_Benefits/1_NER/Notebooks/temp/ner/dev.spacy
[1m
[38;5;4mℹ Auto-generating config with spaCy[0m
[38;5;2m✔ Generated training config[0m
[1m
[38;5;2m✔ Saving label data for component 'ner'[0m
/Workspace/Users/justin.ngam@towerswatson.com/1_Benefits/1_NER/Notebooks/temp/ner/labels/ner.json
[1m
[38;5;2m✔ Saved training config[0m
/Workspace/Users/justin.ngam@towerswatson.com/1_Benefits/1_NER/Notebooks/temp/ner/config.cfg

To use this data for tr

In [0]:
# Create a folder in your DBFS to hold the trained model. Since the model is big, prefer to store in DBFS

dbutils.fs.mkdirs('/NER')

True

In [0]:
# Output of your DBFS folder for the model
output_path = Path("/mlflow_log_test")

# dev.spacy and train'spacy files produced from the above 'data-to-spacy' command you ran
train_path = "/train.spacy"
dev_path = "/dev.spacy"

In [0]:
config_path = "/ner_config_v2.cfg"
local_model_path = "/jdbert-384"

overrides = {
    "paths.train": train_path,
    "paths.dev": dev_path,
    "paths.ground_truth": ground_truth_path,
    "variables.transformer_model_name": local_model_path
}

In [0]:
# Run to train model. View the experiment and use the 'run_id' to call the best model from this training
train(config_path, output_path, overrides=overrides)

[38;5;4mℹ Saving to output directory:
/dbfs/Workspace/Users/justin.ngam@towerswatson.com/mlflow_log_test[0m
[38;5;4mℹ Using CPU[0m
[1m


Some weights of RobertaModel were not initialized from the model checkpoint at /Workspace/Users/justin.ngam@towerswatson.com/1_Benefits/1_NER/Data/jdbert-384 and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


[38;5;2m✔ Initialized pipeline[0m
[1m
[38;5;4mℹ Pipeline: ['transformer', 'ner'][0m
[38;5;4mℹ Initial learn rate: 0.0[0m
E    #       LOSS TRANS...  LOSS NER  ENTS_F  ENTS_P  ENTS_R  SCORE 
---  ------  -------------  --------  ------  ------  ------  ------
  0       0        2120.26   1449.26    0.20    0.10    4.26    0.00


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  0     200      142738.71  56289.62   21.26   24.58   18.73    0.21


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  0     400        3652.59   4504.70   51.11   51.34   50.88    0.51


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  0     600        2042.91   3205.74   65.19   74.74   57.80    0.65


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  0     800        1835.24   3474.09   69.42   70.54   68.33    0.69


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  0    1000        1871.81   3195.99   71.22   77.56   65.84    0.71


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  1    1200        1395.91   2823.55   71.61   73.91   69.45    0.72


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  1    1400        1372.56   2631.39   74.22   75.23   73.23    0.74


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  1    1600        1469.83   2713.49   77.55   80.00   75.24    0.78


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  1    1800        1514.50   2928.56   77.10   79.05   75.24    0.77
  1    2000        1401.63   3102.66   73.29   71.66   75.00    0.73
  2    2200        1039.25   2206.35   78.36   80.91   75.96    0.78


Uploading artifacts:   0%|          | 0/21 [00:00<?, ?it/s]

  2    2400        1129.41   2534.85   77.48   76.07   78.94    0.77


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecutio

## Now we've trained and saved our Benefits NER model within MLFlow. All that's needed to use this model in the future is the unique 'run_id'! Easy peasy.