# Automated NER Annotation Pipeline for Legislative Text

### Description
This Jupyter notebook implements an automated Named Entity Recognition (NER) annotation pipeline for legislative text using Claude 3.5 Sonnet and Diffgram. The pipeline automates the process of identifying and labeling entities in legal documents, focusing on elements such as section references, subsection references, and metadata fields. The system integrates with AWS Bedrock for AI processing and Diffgram for annotation management.
Key Components

### Configuration and Setup
- AWS Bedrock integration for Claude 3.5 Sonnet
- Diffgram project setup and configuration
- Schema management for NER labels

### Core Functionality
- Automated text processing and entity recognition
- Integration with Diffgram's annotation system
- Batch processing capabilities for multiple documents
- Custom NER prompt engineering for legislative text

#### Install required dependencies for the annotation pipeline
Core ML and data processing libraries
AWS and API integration libraries

In [None]:
!pip install torch transformers diffgram neo4j anthropic pandas tqdm
!pip install llama_index
!pip install boto3

In [None]:
!pip install arize-phoenix-otel
!pip install openinference-instrumentation-bedrock opentelemetry-exporter-otlp

In [15]:
import torch
from transformers import BertTokenizerFast, BertForTokenClassification
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from diffgram import Project
from typing import List, Dict, Optional
import anthropic
import json
from neo4j import GraphDatabase
from tqdm import tqdm
import logging
import os
import sys
import boto3
import requests
import pprint

## Connect to Arize
### Please install the arise container before using this script. Arise can help with observability when using A.I to annotate the data.

In [16]:
# Import the new tracking code
#from bedrock_output_fix import setup_claude_tracking, get_response_with_tracking
from opentelemetry.trace import get_tracer_provider

In [17]:
from phoenix.otel import register

tracer_provider = register(
  project_name="pre-annotation-with-AI", # Default is 'default'
  endpoint="http://phoenix:6006/v1/traces",
)

🔭 OpenTelemetry Tracing Details 🔭
|  Phoenix Project: pre-annotation-with-AI
|  Span Processor: SimpleSpanProcessor
|  Collector Endpoint: http://phoenix:6006/v1/traces
|  Transport: HTTP + protobuf
|  Transport Headers: {}
|  
|  Using a default SpanProcessor. `add_span_processor` will overwrite this default.
|  
|  `register` has set this TracerProvider as the global OpenTelemetry default.
|  To disable this behavior, call `register` with `set_global_tracer_provider=False`.



In [18]:
from openinference.instrumentation.bedrock import BedrockInstrumentor
BedrockInstrumentor().instrument(tracer_provider=tracer_provider,
                                capture_response_body=True  # Enable response capture
                                )

## Connect to bedrock

In [19]:
# Use os.getcwd() since __file__ is not available in interactive environments
current_dir = os.getcwd()

# If your structure is such that the package is in the parent directory, compute the parent directory:
parent_dir = os.path.abspath(os.path.join(current_dir, '..'))

# Add the parent directory to sys.path if it's not already there
if parent_dir not in sys.path:
    sys.path.insert(0, parent_dir)

In [20]:
from AgenticWorkflow.bedrock_session import get_boto_session

In [21]:
import  get_claude_kwargs
from get_claude_kwargs import get_response

In [22]:
prompt = "Does this work?"

In [23]:
def get_response_with_tracking(prompt, job_data = None):
    with get_tracer_provider().get_tracer(__name__).start_as_current_span("claude_request") as span:
        # Convert nested structure to flat attributes with dot notation
        #span.set_attribute("llm.model_name", "anthropic.claude-3-5-sonnet-20240620-v1:0")
        #span.set_attribute("llm.token_count.prompt", len(prompt.split()))
        #span.set_attribute("llm.invocation_parameters", get_response(prompt))
        
        try:
            span.set_attribute("input.value", prompt)
            # Get response using original function
            output = get_response(prompt)
                     
            # Set output as string
            span.set_attribute("output.value", output if output else "None")
        
            span.set_attribute("task.name", job_data['nickname'] if job_data else "None")
            span.set_attribute("task.index", job_data['index'] if job_data else "None")
            span.set_attribute("task.fileID", job_data['file'] if job_data else "None")
            
            # Set span kind as string
            span.set_attribute("openinference.span.kind", "LLM")
            
            return output
            
        except Exception as e:
            # Log error as flat strings
            span.set_attribute("error.message", str(e))
            span.set_attribute("error.type", e.__class__.__name__)
            raise

In [24]:
response = get_response_with_tracking(prompt)

In [25]:
response

'I apologize, but I don\'t have enough context to determine what "this" refers to or what you\'re asking about. Could you please provide more information or context about what you\'re trying to do or what you\'re asking about? That way, I\'ll be able to give you a more accurate and helpful response.'

## Connect to Diffgram

In [26]:
# Diffgram project configuration
DIFFGRAM_CONFIG = {
    "host": "http://dispatcher:8085",
    "project_string_id": "translucenttracker",
    "client_id": "LIVE__u3v8q0m7tx1p851dp0ap",
    "client_secret": "1qgd8as7xfcbuem6mw9j1z0xvjfmmvlagbugqr8z1g1ntypugr2ul24cce5k"
}

In [27]:
# Initialize connection to Diffgram project
project = Project(host=DIFFGRAM_CONFIG["host"],
        project_string_id = "translucenttracker",
        client_id = "LIVE__u3v8q0m7tx1p851dp0ap",
        client_secret = "1qgd8as7xfcbuem6mw9j1z0xvjfmmvlagbugqr8z1g1ntypugr2ul24cce5k"
      )
project_local = project

## Fetch Schema

In [28]:
# Define and manage NER schema in Diffgram
# Retrieve and process existing schema labels
NER_schema_name = 'ENTITY_TRAINING_SCHEMA'

In [29]:
def get_schema_list(id):
    auth = project.session.auth
    url = f"{DIFFGRAM_CONFIG['host']}/api/project/{DIFFGRAM_CONFIG['project_string_id']}/labels?schema_id={id}"
    # Step 4: Make the POST request using the SDK's session auth
    response = requests.get(url, auth=auth)
    # Step 5: Handle the response
    if response.status_code == 200:
        #print("Annotation update successful!")
        #pprint.pprint(response.json())  # View the updated data
        return response.json()
    else:
        print(f"Error: {response.status_code}")
        print(response.text)  # Print error details for debugging

In [30]:
schema_id = None

# List the existing schemas in your Diffgram project.
schemas = project.schema.list()
print("Existing Schemas in Diffgram:")
print(json.dumps(schemas, indent=2))

# Check if a schema with the name NER_schema_name already exists.
for schema in schemas:
    if schema.get('name') == NER_schema_name:
        schema_id = schema.get('id')
        break

# If the schema does not exist, create a new one.
if schema_id is None:
    print(f"Schema '{NER_schema_name}' not found. Creating a new one...")
    json_response = project.new_schema(name=NER_schema_name)
    schema_id = json_response.get("id")
    print(f"Created new schema with id: {schema_id}")
else:
    print(f"Schema '{NER_schema_name}' already exists with id: {schema_id}")

schema_labels = get_schema_list(schema_id)

# Retrieve existing labels for the schema to avoid duplicates.
schema_label_id_value = []
if schema_labels is not None:
    labels = schema_labels['labels_out']
    for label in labels:
        value = {}
        value['id'] = label['id']
        value['name'] = label['label']['name']
        schema_label_id_value.append(value)

existing_label_names = set()
try:
    schema_label_id_value[0]['name']
    for label in schema_label_id_value:
            label_name = label.get("name")
            if label_name:
                existing_label_names.add(label_name)
    print(existing_label_names)      
except:
     print("There are no schema labels") 

Existing Schemas in Diffgram:
[
  {
    "archived": false,
    "id": 8,
    "is_default": true,
    "member_created_id": 1,
    "member_updated_id": null,
    "name": "Default Schema",
    "project_id": 4,
    "time_created": "2025-02-04 22:16:17",
    "time_updated": null
  },
  {
    "archived": false,
    "id": 9,
    "is_default": false,
    "member_created_id": 10,
    "member_updated_id": null,
    "name": "NER_TRAINING_SCHEMA",
    "project_id": 4,
    "time_created": "2025-02-05 17:08:24",
    "time_updated": null
  },
  {
    "archived": false,
    "id": 11,
    "is_default": false,
    "member_created_id": 10,
    "member_updated_id": null,
    "name": "ENTITY_TRAINING_SCHEMA",
    "project_id": 4,
    "time_created": "2025-02-05 17:20:02",
    "time_updated": null
  }
]
Schema 'ENTITY_TRAINING_SCHEMA' already exists with id: 11
{'I-SEQUENCE_ID', 'I-REGULATION_ID', 'B-ACT_NAME', 'I-DEFINITION', 'I-CHUNK_ID', 'I-ACT_NAME', 'O', 'B-REGULATION_ID', 'B-SECTION_ID', 'I-REQUIREMENT

In [31]:
schema_list = project.schema.list()

## NER Prompt

In [32]:
# Create structured prompt for NER tagging
# Include rules and formatting guidelines for entity recognition
def create_ner_prompt(text: str, schema: List[Dict]) -> str:
    """
    Creates a prompt for NER tagging that defaults to O tag for most text.
    """
    schema_tags = sorted(set(s['name'] for s in schema))
    
    prompt = """You are a Named Entity Recognition system for legislative text.
Your task is to label each word using only the following allowed tags:
{allowed_tags}

Rules:
1. Default to 'O' tag - most words should be tagged as Outside
2. Only tag these specific elements:
   - Section references (e.g., "section 46" -> B-SECTION_REF, I-SECTION_REF)
   - Subsection references (e.g., "(2)", "(a)" -> B-SUBSECTION_REF)
   - Essential metadata fields and their values
3. Tag principles:
   - All punctuation should be 'O'
   - Use B- prefix only for start of key references
   - Use I- prefix for continuing words of same reference
   - When in doubt, use 'O' tag

Each space should be annotated separately. For e.g. If you see child 's annotate it as child and 's as two different words as there is a sapce between them. All ' s must be treated as one word 's.
If there are no spaces between words like Act-Motor or 2subsection or 1in or b. treat this as one word. If b. and c. are separatd by a space then b. is one word and c. is another word. If there is a space between say en . then en is one word and the . is another word which should have its own label.

Output format:
[
    {{"word": "section", "tag": "B-SECTION_REF"}},
    {{"word": "46", "tag": "I-SECTION_REF"}},
    {{"word": ",", "tag": "O"}},
    {{"word": "subsection", "tag": "O"}},
    {{"word": "(", "tag": "O"}},
    {{"word": "2", "tag": "B-SUBSECTION_REF"}},
    {{"word": "'s", "tag": "O"}},
    {{"word": ")", "tag": "O"}}
]

Text to label:
{text}

Provide only the JSON output with no additional explanation."""

    tags_str = "\n".join(f"- {tag}" for tag in schema_tags)
    return prompt.format(allowed_tags=tags_str, text=text)

## Import all the tasks

In [33]:
results = project_local.job

In [34]:
get_job = project_local.job.list(limit=10000, page_number=1)

In [35]:
jobs_with_data_index = []
for job_key, job_list in enumerate(get_job):
    try:
        nickname = job_list['attached_directories_dict']['attached_directories_list'][0]['nickname']
        if nickname:
            job_value = {}
            job_value['nickname'] = nickname
            job_value['index'] = job_key
            jobs_with_data_index.append(job_value)
        #print(nickname)
    except KeyError:
        print("Key not found.")
    except IndexError:
        print("List index out of range.")

In [None]:
jobs_with_data_index

## Diffgram get utility

In [37]:
# Utility functions for processing Diffgram annotations
# Extract and format word-level data from files
def get_file_number(completed_annotations, files_index_in_job):
    data = []
    sentences = []
    labels = []
    data_index = 0
    for completed_annotation in completed_annotations:
        try:
            file_index = int(completed_annotation)
            files_index_in_job.append(completed_annotation)
            #print(completed_annotation)
            continue
        except:
            #print(f"{completed_annotation} is not a file")
            continue
            
        #print(f"{completed_annotation} ----")
        if (completed_annotation != 'attribute_groups_reference')  \
            and (completed_annotation != 'export_info') \
            and (completed_annotation != 'label_map') \
            and (completed_annotation != 'readme') \
            and (completed_annotation != 'label_colour_map'):
            sentence_local = []
            labels_local = []

            # First get the point where the annotation is started
            for start in completed_annotations[completed_annotation]['instance_list']:
                if 'start_token' in start:
                    start_token =  start['start_token']
                    break

            #start_token = completed_annotations[completed_annotation]['instance_list'][0]['start_token']
            for annotated_index in range(start_token, len(completed_annotations[completed_annotation]['text']['tokens']['words'])):
                # check if this text is annotated
                for data in completed_annotations[completed_annotation]['instance_list']:
                    if 'start_token' in data:
                        if annotated_index == data['start_token']:
                            sentence_local.append(completed_annotations[completed_annotation]['text']['tokens']['words'][annotated_index]['value'])
                            labels_local.append(completed_annotations['label_map'][str(data['label_file_id'])])
                            #print(f"{completed_annotations[completed_annotation]['text']['tokens']['words'][annotated_index]['value']} - {completed_annotations['label_map'][str(data['label_file_id'])]}")
                            break;
            sentences.append(sentence_local)       
            labels.append(labels_local)
            data_index+=1

In [38]:
def extract_word_data(url):
    # Original URL with localhost
    # Replace localhost with ngrok URL (example: "https://example.ngrok.io")
    file_url = url.replace("http://localhost:8085", DIFFGRAM_CONFIG['host'])

    # Make the GET request to fetch the file
    response = requests.get(file_url)

    # Check if the request was successful
    if response.status_code == 200:
        # Parse the JSON content into a Python dictionary
        data = response.json()  # Assuming the file is in JSON format
        return data
    else:
        print(f"Failed to retrieve the file. Status code: {response.status_code}")

## Diffgram Send Utility

In [39]:
# Functions to send pre-annotations back to Diffgram
# Handle API communication and response processing
def send_preannotation_to_diffgram(file):
    # Step 1: Extract the session's auth credentials (client_id and client_secret)
    auth = project_local.session.auth

    # Step 2: Define the API URL for the custom annotation update endpoint
    file_id = file.id  # Replace with your file ID
    #project_string_id = "your_project_string_id"  # Replace with your project string ID
    url = f"{DIFFGRAM_CONFIG['host']}/api/project/{DIFFGRAM_CONFIG['project_string_id']}/file/{file_id}/annotation/update"

    # Step 3: Define the data (e.g., instance_list) for updating annotations
    data = {
        "instance_list": file.__dict__['instance_list']
    }

    # Step 4: Make the POST request using the SDK's session auth
    response = requests.post(url, json=data, auth=auth)

    # Step 5: Handle the response
    if response.status_code == 200:
        print("Annotation update successful!")
        # print(response.json())  # View the updated data
    else:
        print(f"Error: {response.status_code}")
        print(response.text)  # Print error details for debugging

## Run annotation

In [None]:
job_index = 15

In [None]:
results.refresh_from_dict(get_job[job_index])

In [None]:
completed_annotations = results.generate_export()

In [None]:
print(len(completed_annotations))

In [None]:
files_index_in_job = []

In [None]:
get_file_number(completed_annotations, files_index_in_job)

In [None]:
files_index_in_job

## Automate the A.I annotation

In [40]:
def normalize_string(s):
    # Handle double backticks first
    s = s.replace('``', '"')
    # Handle other quote variations
    s = s.replace('"', '"').replace('"', '"').replace('\'\'', '"')
    s = s.replace('\\', '')
    s = s.replace('\'s', 'is')
    return s

In [41]:
def process_per_task_preannotation(file_index, job_data,  start_index=None):
    if start_index is None:  # Better to check for None explicitly
        start_index = file_index[0]
    
    # enumerate needs an iterable and returns both index and value
    # Also, we want to slice the file_index from start_index
    for i, file in enumerate(file_index[0:]):
        # Your processing code here
        print(file)
        job_data['file'] = file
        file = project_local.file.get_by_id(file,with_instances=True)
        url = file.__dict__['text']['tokens_url_signed']
        #print(url)
        data = extract_word_data(url)
        schema_list = project.schema.list()

        words = ""
        for word in data['nltk']['words']:
            words += word['value'] + ' '
        prompt = create_ner_prompt(words, schema_label_id_value)
        #print(prompt)
        try:
            response = get_response_with_tracking(prompt, job_data)
            ai_annotation = json.loads(response)
        except:
            continue
            
        len(ai_annotation)

        annotate_index = 0
        instance_list = []
        for word_idx, words in enumerate(data['nltk']['words']):
            #print(f"Word idx {word_idx} and Word is {words['value']}")
            if (annotate_index >= len(ai_annotation)):
                print("no more annotation from ai")
                #instance_list = default_annotation_O(instance_list, word_idx, schema_label_id_value)
            elif (normalize_string(words['value']) == normalize_string(ai_annotation[annotate_index]['word'])):
                #print("match")
                label_id = next((label['id'] for label in schema_label_id_value 
                        if label['name'] == ai_annotation[annotate_index]['tag']), None)
                
                if label_id:
                    instance = {
                        "label_file_id": label_id,
                        "start_token": word_idx,
                        "end_token": word_idx,
                        "type": 'text_token'
                    }
                    instance_list.append(instance)
                annotate_index +=1
            else:
                continue
                #print("")
                #print("probably a new line. Move annotate pointer:")
        file.__dict__['instance_list'] = instance_list
        sent = send_preannotation_to_diffgram(file)
        #break

In [None]:
# Main processing loop for automated annotation
# Handle text normalization and annotation matching
# Process annotations batch-wise across multiple files
for job_index in jobs_with_data_index[100:101]:
    print(f"The job nickname is {job_index['nickname']} and the index is {job_index['index']}")
    results.refresh_from_dict(get_job[job_index['index']])
    completed_annotations = results.generate_export()
    files_index_in_job = []
    get_file_number(completed_annotations, files_index_in_job)
    files_index_in_job
    process_per_task_preannotation(files_index_in_job, job_index, 12705)

### Parallel Pre-annotation

In [None]:
from concurrent.futures import ThreadPoolExecutor  # Changed from ProcessPoolExecutor

def parallel_process_jobs(jobs_with_data_index, jobs, get_job, max_workers=4):
    """Parallelize at the job level using threads instead of processes"""
    jobs_to_process = jobs_with_data_index
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:  # Changed to ThreadPoolExecutor
        futures = [
            executor.submit(process_single_job, job, jobs, get_job)
            for job in jobs_to_process
        ]
        
        # Track results
        processed_jobs = []
        for future in futures:
            try:
                result = future.result()
                processed_jobs.append(result)
                
                if result['status'] == 'success':
                    print(f"Completed job {result['nickname']}, processed {result['files_processed']} files")
                else:
                    print(f"Failed job {result['nickname']}: {result.get('error')}")
            except Exception as e:
                print(f"Exception in worker thread: {str(e)}")  # Updated error message
        
        return processed_jobs

In [None]:
def process_single_job(job_data: Dict, results_obj, get_job):
    """Process a single job with sequential file handling"""
    try:
        print(f"Starting job: {job_data['nickname']} (index: {job_data['index']})")
        
        # Get job data
        results_obj.refresh_from_dict(get_job[job_data['index']])  # Use the shared object directly
        
        # Get files for this job
        completed_annotations = results_obj.generate_export()
        files_index_in_job = []
        get_file_number(completed_annotations, files_index_in_job)
        
        # Process files sequentially
        process_per_task_preannotation(files_index_in_job, job_data, 12705)
        
        return {
            'status': 'success',
            'job_index': job_data['index'],
            'nickname': job_data['nickname'],
            'files_processed': len(files_index_in_job)
        }
    except Exception as e:
        print(f"Error in job {job_data['nickname']}: {str(e)}")  # Added error logging
        return {
            'status': 'error',
            'job_index': job_data['index'],
            'nickname': job_data['nickname'],
            'error': str(e)
        }

# Usage remains the same|

If you want to annotate specific indexs

In [1]:
#### jobs = project_local.job
#results = parallel_process_jobs(jobs_with_data_index[100:101], jobs, get_job, 30)

In [2]:
#indices = [59, 97, 99, 104, 117, 120, 121, 130, 131, 144]
#selected_jobs = [jobs_with_data_index[i] for i in indices]
#results = parallel_process_jobs(selected_jobs, jobs, get_job, 30)

### If the token for the output is a lot which can happen when each character may need annotation, it is best to break the input into two prompts and then append them. 

In [50]:
import math
def process_per_file_annotation(job_data):
    try:
        file_id = job_data['file']
        print(f"Processing file id {file_id} of task {job_data['nickname']} of index {job_data['index']}")
        file = project_local.file.get_by_id(file_id,with_instances=True)
        url = file.__dict__['text']['tokens_url_signed']
        #print(url)
        data = extract_word_data(url)
        schema_list = project.schema.list()
    
        words = ""
        for word in data['nltk']['words']:
            words += word['value'] + ' '
        #prompt = create_ner_prompt(words, schema_label_id_value)
        #print(prompt)
    
        prompt = create_ner_prompt(words[0: math.floor(len(words)/2)], schema_label_id_value)
        response1 = get_response_with_tracking(prompt, job_data)
        prompt = create_ner_prompt(words[math.floor(len(words)/2)+1:], schema_label_id_value)
        response2 = get_response_with_tracking(prompt,  job_data)
        
        response1 = json.loads(response1)
        response2 = json.loads(response2)
        response = response1 + response2
        ai_annotation = response

            
        len(ai_annotation)
    
        annotate_index = 0
        instance_list = []
        for word_idx, words in enumerate(data['nltk']['words']):
            #print(f"Word idx {word_idx} and Word is {words['value']}")
            if (annotate_index >= len(ai_annotation)):
                print("no more annotation from ai")
                #instance_list = default_annotation_O(instance_list, word_idx, schema_label_id_value)
            elif (normalize_string(words['value']) == normalize_string(ai_annotation[annotate_index]['word'])):
                #print("match")
                label_id = next((label['id'] for label in schema_label_id_value 
                        if label['name'] == ai_annotation[annotate_index]['tag']), None)
                
                if label_id:
                    instance = {
                        "label_file_id": label_id,
                        "start_token": word_idx,
                        "end_token": word_idx,
                        "type": 'text_token'
                    }
                    instance_list.append(instance)
                annotate_index +=1
            else:
                continue
                #print("")
                #print("probably a new line. Move annotate pointer:")
        file.__dict__['instance_list'] = instance_list
        sent = send_preannotation_to_diffgram(file)
        return {
            'status': 'success',
            'job_index': job_data['index'],
            'nickname': job_data['nickname'],
            'file_id': job_data['file']
        }
    except Exception as e:
        print(f"Error in job {job_data['nickname']}: {str(e)}")
        print(f"file id {file_id} of task {job_data['nickname']} of index {job_data['index']} Not able to split the data") 
        return {
            'status': 'error',
            'job_index': job_data['index'],
            'nickname': job_data['nickname'],
            'error': str(e)
        }
    #break

In [48]:
from concurrent.futures import ThreadPoolExecutor

def process_failed_annotations(failed_files, max_workers=4):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all files for processing
        futures = [
            executor.submit(process_per_file_annotation, file)
            for file in failed_files
        ]
        
        # Track results
        processed_files = []
        for future in futures:
            try:
                result = future.result()
                processed_files.append(result)
                
                # Assuming process_per_file_annotation returns a dict with status
                if result.get('status') == 'success':
                    print(f"Successfully processed file {result.get('file_id')}")
                else:
                    print(f"Failed to process file {result.get('file_id')}: {result.get('error')}")
                    
            except Exception as e:
                print(f"Exception in worker thread: {str(e)}")
                
        return processed_files

IF you want to annotate specific file then you can use this array to populate the individual files

In [59]:
failed_to_annotate = []

In [60]:
len(failed_to_annotate)

0

In [61]:
# Usage
failed_to_annotate = [{'nickname': 'NER_train_batch_218', 'index': 31, 'file': '19747'}]
results = process_failed_annotations(failed_to_annotate, max_workers=4)

Processing file id 19747 of task NER_train_batch_218 of index 31
Annotation update successful!
Successfully processed file 19747
