# Sequence labeling annotation aggregation with the RASA model

## Setup

#### Colab

In [None]:
# check if on colab
COLAB = True
try:
    import google.colab
except:
    COLAB=False

if COLAB:
    # shallow clone of current state of main branch 
    !git clone --branch main --single-branch --depth 1 --filter=blob:none https://github.com/haukelicht/advanced_text_analysis.git
    # make repo root findable for python
    import sys
    sys.path.append("/content/advanced_content_analysis/")

#### Required libraries

In [3]:
!pip install -q crowd-kit==1.4.1

In [5]:
from pathlib import Path
import numpy as np
import pandas as pd
from crowdkit.aggregation import SegmentationRASA

#### data paths

In [6]:
base_path = Path("/content/advanced_text_analysis/" if COLAB else "../../")
data_path = base_path / "data" / "labeled" / "fornaciari_we_2021"

## Read the data

In [11]:
annotations_path = data_path / 'annotations' / 'extraction' / 'llms'

fps = list(annotations_path.glob('*.jsonl'))
annotations = pd.concat({fp.stem: pd.read_json(fp, lines=True) for fp in fps}, ignore_index=False).reset_index(level=0, names=['annotator'])

# add metadata and reformat the DataFrame
if 'metadata' in annotations.columns:
    metadata = annotations['metadata'].apply(pd.Series)
    metadata.drop(columns=['label'], inplace=True)
    annotations[metadata.columns] = metadata
    annotations.drop(columns=['metadata'], inplace=True)

annotations = annotations.sort_values(by=['text_id', 'annotator']).reset_index(drop=True)
annotations = annotations[['text_id', 'text', 'annotator', 'label']]

# discard entity type
annotations['label'] = annotations.label.apply(lambda x: [anno[:2] for anno in x])

# induce spans from character offsets
annotations['spans'] = annotations.apply(lambda x: [x['text'][slice(*lab)] for lab in x['label']], axis=1)

# list unique annotators
annotations.annotator.unique().tolist()

['DeepSeek-V3-0324',
 'Llama-4-Maverick-17B-128E-Instruct',
 'Qwen3-235B-A22B-Instruct-2507',
 'gpt-oss-120b']

### Process character spans into token masks 

In [12]:
from nltk.tokenize import TreebankWordTokenizer
tokenizer = TreebankWordTokenizer()

# list(tokenizer.span_tokenize(text))
def character_to_token_spans(text: str, spans: list[tuple[int, int]]) -> list[tuple[int, int]]:
    token_spans = list(tokenizer.span_tokenize(text))
    token_span_list = []
    for span in spans:
        start_char, end_char = span
        # Find the first token that starts after or at the start_char
        start_token = next((i for i, (s, _) in enumerate(token_spans) if s >= start_char), None)
        # Find the last token that ends before or at the end_char
        end_token = next((i for i, (_, e) in reversed(list(enumerate(token_spans))) if e <= end_char), None)
        if start_token is not None and end_token is not None and start_token <= end_token:
            token_span_list.append((start_token, end_token + 1))  # +1 to make it exclusive
    return token_span_list

In [13]:
annotations['tokens'] = annotations.apply(lambda x: tokenizer.tokenize(x['text']), axis=1)
annotations['token_spans'] = annotations.apply(lambda x: character_to_token_spans(x['text'], x['label']), axis=1)

In [14]:
annotations['segmentation'] = annotations.apply(lambda x: np.array([0]*len(x['tokens'])), axis=1)
for i, row in annotations.iterrows():
    for span in row['token_spans']:
        annotations.at[i, 'segmentation'][span[0]:span[1]] = 1

In [15]:
df = annotations[['text_id', 'annotator', 'segmentation']].rename(columns={'text_id': 'task', 'annotator': 'worker'})
df['segmentation'] = df.segmentation.apply(lambda x: np.expand_dims(x, axis=0))

## Fit the model

In [16]:
aggregator = SegmentationRASA(n_iter=1000, tol=1e-8)

In [17]:
import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=RuntimeWarning, module="crowdkit")
    posterior_labels = aggregator.fit_predict(df)
posterior_labels = posterior_labels.apply(lambda x: x.squeeze())

n_annotators = annotations.annotator.nunique()
docs = annotations.sort_values(['text_id', 'annotator'])[['text_id', 'text', 'tokens']].iloc[::n_annotators]
docs = docs.merge(posterior_labels.to_frame(name='mask').reset_index(names='text_id'))

### Convert posterior token masks int token- and character-level spans 

In [18]:
from typing import List, Tuple
def extract_spans(tokens: List[int], mask: List[bool]) -> List[Tuple[int, int]]:
    """Extract start and end indices of all spans from binary mask."""
    spans = []
    start = None
    for i, val in enumerate(mask):
        if val and start is None:
            start = i
        elif not val and start is not None:
            spans.append((start, i))
            start = None
    if start is not None:
        spans.append((start, len(mask)))
    return spans

In [19]:
def token_to_character_spans(text, spans):
    token_spans = list(tokenizer.span_tokenize(text))
    char_spans = []
    for start, end in spans:
        char_start = token_spans[start][0]
        char_end = token_spans[end - 1][1]
        char_spans.append((char_start, char_end))
    return char_spans

In [20]:
docs['token_label'] = docs.apply(lambda x: extract_spans(x['tokens'], x['mask']), axis=1)
docs['label'] = docs.apply(lambda x: token_to_character_spans(x['text'], x['token_label']), axis=1)

In [21]:
docs['spans'] = docs.apply(lambda x: [x['text'][slice(*lab)] for lab in x['label']], axis=1)

In [22]:
docs['spans'].apply(len).value_counts()

spans
0    34
1    16
Name: count, dtype: int64

In [23]:
positive_instances = docs[docs['spans'].apply(len)>0]
for i, row in positive_instances.iterrows():
    print(f"Text: {row['text']}")
    for span in row['spans']:
        print(f" - {repr(span)}")
    print()

Text: We will also introduce a multi - purpose identity card for all citizens .
 - 'introduce a multi - purpose identity card for all citizens'

Text: a . We will ensure the passage of the Women’s Reservation Bill .
 - 'We will ensure the passage of the Women’s Reservation Bill'

Text: We will restrict foreign equity holding in private television broadcasting to 20% (and prevent cross holding to avoid emergence of monopolies in the media) .
 - 'We will restrict foreign equity holding in private television broadcasting to 20% (and prevent cross holding to avoid emergence of monopolies in the media)'

Text: India’s indigenous thorium technology programme will be expedited and given all financial assistance, correcting the grievous wrong done by the UPA Government .
 - 'India’s indigenous thorium technology programme will be expedited and given all financial assistance'

Text: Immediately after forming the governments in Chhattisgarh, Madhya Pradesh and Rajasthan, as promised, the 3 Congr

## Eval

In [24]:
fp = data_path / "annotation_set_01.csv"
df = pd.read_csv(fp)

In [25]:
tmp = df[['text_id', 'label']].merge(docs[['text_id', 'spans']], on='text_id')
tmp['pred'] = (tmp['spans'].apply(len) > 0).astype(int)

In [26]:
tmp.value_counts(['label', 'pred']).unstack().fillna(0).astype(int)

pred,0,1
label,Unnamed: 1_level_1,Unnamed: 2_level_1
0,25,0
1,9,16


Let's quantify this alignment:

In [27]:
from sklearn.metrics import classification_report
print(classification_report(tmp['label'], tmp['pred'], zero_division=0))

              precision    recall  f1-score   support

           0       0.74      1.00      0.85        25
           1       1.00      0.64      0.78        25

    accuracy                           0.82        50
   macro avg       0.87      0.82      0.81        50
weighted avg       0.87      0.82      0.81        50

