This Google Colab notebook demonstrates an end-to-end Named Entity Recognition (NER) pipeline on the European Court of Human Rights (ECtHR) legal cases dataset. The project covers dataset loading and preprocessing, annotation of entities such as case names, application numbers, articles cited, and decision dates, and model training using both BiLSTM-CRF and transformer-based architectures (BERT, RoBERTa). The pipeline includes model evaluation (Precision, Recall, F1-score by entity type) and explainability using LIME to interpret entity predictions in legal text.

In [28]:
from datasets import load_dataset
ds = load_dataset("coastalcph/lex_glue", "ecthr_a")  # or "ecthr_b"
print(ds["train"][0])

{'text': ['11.  At the beginning of the events relevant to the application, K. had a daughter, P., and a son, M., born in 1986 and 1988 respectively. P.’s father is X and M.’s father is V. From March to May 1989 K. was voluntarily hospitalised for about three months, having been diagnosed as suffering from schizophrenia. From August to November 1989 and from December 1989 to March 1990, she was again hospitalised for periods of about three months on account of this illness. In 1991 she was hospitalised for less than a week, diagnosed as suffering from an atypical and undefinable psychosis. It appears that social welfare and health authorities have been in contact with the family since 1989.', '12.  The applicants initially cohabited from the summer of 1991 to July 1993. In 1991 both P. and M. were living with them. From 1991 to 1993 K. and X were involved in a custody and access dispute concerning P. In May 1992 a residence order was made transferring custody of P. to X.', '13.  K. was

In [29]:
print(ds)

DatasetDict({
    train: Dataset({
        features: ['text', 'labels'],
        num_rows: 9000
    })
    test: Dataset({
        features: ['text', 'labels'],
        num_rows: 1000
    })
    validation: Dataset({
        features: ['text', 'labels'],
        num_rows: 1000
    })
})


In [30]:
import pandas as pd
# Convert list of paragraphs → single string
def merge_text(entry):
    return " ".join(entry) if isinstance(entry, list) else str(entry)

# Create DataFrame
df_train = pd.DataFrame({
    "case_id": [f"train_{i}" for i in range(len(ds["train"]))],
    "text": [merge_text(t) for t in ds["train"]["text"]]
})

df_val = pd.DataFrame({
    "case_id": [f"val_{i}" for i in range(len(ds["validation"]))],
    "text": [merge_text(t) for t in ds["validation"]["text"]]
})

df_test = pd.DataFrame({
    "case_id": [f"test_{i}" for i in range(len(ds["test"]))],
    "text": [merge_text(t) for t in ds["test"]["text"]]
})

# Combine all splits
df_all = pd.concat([df_train, df_val, df_test], ignore_index=True)

# Optional: drop very short cases
df_all = df_all[df_all["text"].str.strip().str.len() > 50].reset_index(drop=True)

# Save for annotation
df_all.to_csv("ecthr_annotation_ready.csv", index=False)
print(f"Saved ecthr_annotation_ready.csv with {len(df_all)} cases.")

Saved ecthr_annotation_ready.csv with 11000 cases.


In [32]:
# --- Define your NER label schema ---
LABELS = [
    "CASE_NUMBER",
    "DATE",
    "ARTICLE_REF",
    "PERSON",
    "COUNTRY",
    "ORG",
    "LAW",
]

# Save as plain text (one label per line) and JSON (handy for scripts)
import json, os
os.makedirs("schema", exist_ok=True)

with open("schema/labels.txt", "w", encoding="utf-8") as f:
    f.write("\n".join(LABELS))

with open("schema/labels.json", "w", encoding="utf-8") as f:
    json.dump({"labels": LABELS}, f, ensure_ascii=False, indent=2)

print("Saved schema/labels.txt and schema/labels.json")

Saved schema/labels.txt and schema/labels.json


In [33]:
#Create Doccano-ready JSONL
import pandas as pd
import json

df = pd.read_csv("ecthr_annotation_ready.csv")  # produced earlier
out_path = "annotation_exports/doccano_import.jsonl"

os.makedirs("annotation_exports", exist_ok=True)

with open(out_path, "w", encoding="utf-8") as f:
    for _, row in df.iterrows():
        record = {
            "text": str(row["text"]),
            "meta": {"case_id": str(row["case_id"])}
            # "labels": []  # leave empty; annotators will add spans in Doccano UI
        }
        f.write(json.dumps(record, ensure_ascii=False) + "\n")

print(f"Wrote {len(df)} records to {out_path}")

Wrote 11000 records to annotation_exports/doccano_import.jsonl


In [34]:
#Convert annotations → token labels (BIO)
import json, os, re
import pandas as pd
from typing import List, Tuple, Dict

# --------------------
# 1) Config
# --------------------
INPUT_JSONL = "/content/annotation_exports/doccano_import.jsonl"  # <-- replace with your exported file
OUT_DIR = "ner_conll"
os.makedirs(OUT_DIR, exist_ok=True)

LABELS = [
    "CASE_NUMBER","DATE","ARTICLE_REF","PERSON","COUNTRY","ORG","LAW"
]
ALLOWED = set(LABELS)

# --------------------
# 2) Lightweight tokenizer (keeps punctuation as tokens with character spans)
# --------------------
_tok_re = re.compile(r"\w+|[^\w\s]", re.UNICODE)  # words OR single punctuation chars

def tokenize_with_spans(text: str) -> List[Tuple[str, int, int]]:
    tokens = []
    for m in _tok_re.finditer(text):
        tok = m.group(0)
        start, end = m.start(), m.end()
        tokens.append((tok, start, end))
    return tokens

# --------------------
# 3) Build BIO tags for one doc
# --------------------
def build_bio_for_doc(text: str, spans: List[Tuple[int,int,str]]) -> List[Tuple[str,str]]:
    """
    spans: list of (start, end, label) in character offsets, end-exclusive.
    Strategy:
      - For each token, find the entity span that overlaps the most (if any).
      - If token start == entity start -> B-LABEL; elif inside span -> I-LABEL; else O.
      - If multiple overlaps (shouldn't happen if annotated well), pick the largest overlap.
    """
    toks = tokenize_with_spans(text)
    # Clean and keep only allowed labels
    ents = [(int(s), int(e), lab) for s,e,lab in spans if lab in ALLOWED and e> s]
    # Sort to aid consistency
    ents.sort(key=lambda x: (x[0], x[1]))

    bio = []
    for tok, ts, te in toks:
        tag = "O"
        best = None
        best_ov = 0
        for s,e,lab in ents:
            # overlap length
            ov = max(0, min(te, e) - max(ts, s))
            if ov > best_ov:
                best_ov = ov
                best = (s,e,lab)
        if best:
            s,e,lab = best
            if ts == s:
                tag = f"B-{lab}"
            elif ts > s and te <= e:
                tag = f"I-{lab}"
            else:
                # partial overlap at boundary -> treat as inside
                tag = f"I-{lab}"
        bio.append((tok, tag))
    return bio

# --------------------
# 4) Read JSONL and convert
# --------------------
def read_doccano_jsonl(path: str) -> List[Dict]:
    records = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            obj = json.loads(line)
            text = obj.get("text", "")
            meta = obj.get("meta", {})
            # Doccano often uses "labels"; some tools use "entities"
            labels = obj.get("labels") or obj.get("entities") or []
            # normalize to (start,end,label)
            norm = []
            for item in labels:
                # supports [start,end,label] or {"start_offset":..,"end_offset":..,"label":..}
                if isinstance(item, list) and len(item) >= 3:
                    s, e, lab = item[0], item[1], item[2]
                elif isinstance(item, dict):
                    s, e, lab = item.get("start_offset"), item.get("end_offset"), item.get("label")
                else:
                    continue
                if s is None or e is None or lab is None:
                    continue
                norm.append((int(s), int(e), str(lab)))
            records.append({"text": text, "labels": norm, "meta": meta})
    return records

recs = read_doccano_jsonl(INPUT_JSONL)
print(f"Loaded {len(recs)} annotated docs")

# --------------------
# 5) Write CoNLL file + label stats
# --------------------
def write_conll(records, out_path):
    label_counts = {}
    with open(out_path, "w", encoding="utf-8") as out:
        for r in records:
            text = r["text"] or ""
            spans = r["labels"]
            bio = build_bio_for_doc(text, spans)
            for tok, tag in bio:
                out.write(f"{tok} {tag}\n")
                if tag != "O":
                    label_counts[tag] = label_counts.get(tag, 0) + 1
            out.write("\n")
    return label_counts

stats = write_conll(recs, os.path.join(OUT_DIR, "train.conll"))  # rename later if this is only a pilot
pd.Series(stats, name="count").sort_values(ascending=False).to_csv(os.path.join(OUT_DIR, "label_counts.csv"))
print(f"Wrote {os.path.join(OUT_DIR,'train.conll')} and label_counts.csv")

Loaded 11000 annotated docs
Wrote ner_conll/train.conll and label_counts.csv


In [35]:
# split into train/dev/test

import os
import random

# Paths
input_file = "ner_conll/train.conll"
output_dir = "ner_conll"
os.makedirs(output_dir, exist_ok=True)

# Read sentences (each sentence = list of lines)
sentences = []
current_sentence = []
with open(input_file, "r", encoding="utf-8") as f:
    for line in f:
        if line.strip() == "":
            if current_sentence:
                sentences.append(current_sentence)
                current_sentence = []
        else:
            current_sentence.append(line)
# Add last sentence if not empty
if current_sentence:
    sentences.append(current_sentence)

print(f"Loaded {len(sentences)} sentences.")

# Shuffle for randomness
random.seed(42)
random.shuffle(sentences)

# Compute split sizes
total = len(sentences)
train_size = int(total * 0.8)
dev_size = int(total * 0.1)
test_size = total - train_size - dev_size

train_sentences = sentences[:train_size]
dev_sentences = sentences[train_size:train_size+dev_size]
test_sentences = sentences[train_size+dev_size:]

# Helper to write split
def write_split(filename, split_sentences):
    with open(filename, "w", encoding="utf-8") as f:
        for sent in split_sentences:
            for line in sent:
                f.write(line)
            f.write("\n")

write_split(os.path.join(output_dir, "train.conll"), train_sentences)
write_split(os.path.join(output_dir, "dev.conll"), dev_sentences)
write_split(os.path.join(output_dir, "test.conll"), test_sentences)

print(f"Saved splits to {output_dir}:")
print(f"  Train: {len(train_sentences)} sentences")
print(f"  Dev:   {len(dev_sentences)} sentences")
print(f"  Test:  {len(test_sentences)} sentences")

Loaded 11000 sentences.
Saved splits to ner_conll:
  Train: 8800 sentences
  Dev:   1100 sentences
  Test:  1100 sentences


In [36]:
# =========================
# STEP: Tokenization + Label Alignment (Corrected)
# Input: ner_conll/train.conll, dev.conll, test.conll
# Output: tokenized_ds (HF DatasetDict ready for training)
# =========================
!pip -q install -U transformers datasets

import os, numpy as np
from datasets import Dataset, DatasetDict
from transformers import AutoTokenizer

# ---- Paths (adjust if needed) ----
CONLL_DIR = "ner_conll"
TRAIN_PATH = os.path.join(CONLL_DIR, "train.conll")
DEV_PATH   = os.path.join(CONLL_DIR, "dev.conll")
TEST_PATH  = os.path.join(CONLL_DIR, "test.conll")

# ---- Label schema (must match your annotation schema) ----
LABELS = ["CASE_NUMBER","DATE","ARTICLE_REF","PERSON","COUNTRY","ORG","LAW"]
TAG_LIST = ["O"] + [f"{p}-{l}" for l in LABELS for p in ["B","I"]]
tag2id = {t:i for i,t in enumerate(TAG_LIST)}
id2tag = {i:t for t,i in tag2id.items()}

# ---- Read CoNLL into token/tag sequences ----
def read_conll(path):
    docs_tokens, docs_tags = [], []
    cur_tokens, cur_tags = [], []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.rstrip("\n")
            if not line:
                if cur_tokens:
                    docs_tokens.append(cur_tokens); docs_tags.append(cur_tags)
                    cur_tokens, cur_tags = [], []
                continue
            parts = line.split()
            tok = parts[0]
            tag = parts[-1]
            if tag not in TAG_LIST:
                tag = "O"
            cur_tokens.append(tok); cur_tags.append(tag)
    if cur_tokens:
        docs_tokens.append(cur_tokens); docs_tags.append(cur_tags)
    return docs_tokens, docs_tags

train_tokens, train_tags = read_conll(TRAIN_PATH)
dev_tokens,   dev_tags   = read_conll(DEV_PATH)
test_tokens,  test_tags  = read_conll(TEST_PATH)

def to_hf(tokens, tags):
    return Dataset.from_dict({
        "tokens": tokens,
        "ner_tags": [[tag2id[t] for t in seq] for seq in tags]
    })

ner_ds = DatasetDict({
    "train": to_hf(train_tokens, train_tags),
    "validation": to_hf(dev_tokens, dev_tags),
    "test": to_hf(test_tokens, test_tags),
})
print(ner_ds)

# ---- Tokenizer & alignment (label first subword; others = -100) ----
MODEL_NAME = "nlpaueb/legal-bert-base-uncased"  # swap if desired
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)

def tokenize_and_align(examples):
    tokenized = tokenizer(
        examples["tokens"],
        is_split_into_words=True,
        truncation=True,   # truncate very long docs; you can pre-split earlier if needed
        padding=False
    )
    aligned_labels = []
    for i in range(len(examples["tokens"])):
        word_ids = tokenized.word_ids(batch_index=i)   # <-- correct per-example call
        labs = examples["ner_tags"][i]
        label_ids = []
        prev_w = None
        for w_id in word_ids:
            if w_id is None:
                label_ids.append(-100)                # special tokens
            elif w_id != prev_w:
                label_ids.append(labs[w_id])          # first subword gets label
            else:
                label_ids.append(-100)                # subsequent subwords ignored
            prev_w = w_id
        aligned_labels.append(label_ids)
    tokenized["labels"] = aligned_labels
    return tokenized

remove_cols = ner_ds["train"].column_names  # ['tokens','ner_tags']
tokenized_ds = ner_ds.map(
    tokenize_and_align,
    batched=True,
    remove_columns=remove_cols,
    desc="Tokenize & align labels"
)

print(tokenized_ds)
# tokenized_ds is now ready for model training with a DataCollatorForTokenClassification

DatasetDict({
    train: Dataset({
        features: ['tokens', 'ner_tags'],
        num_rows: 8800
    })
    validation: Dataset({
        features: ['tokens', 'ner_tags'],
        num_rows: 1100
    })
    test: Dataset({
        features: ['tokens', 'ner_tags'],
        num_rows: 1100
    })
})


Tokenize & align labels:   0%|          | 0/8800 [00:00<?, ? examples/s]

Tokenize & align labels:   0%|          | 0/1100 [00:00<?, ? examples/s]

Tokenize & align labels:   0%|          | 0/1100 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 8800
    })
    validation: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 1100
    })
    test: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 1100
    })
})


In [22]:
!pip install torchcrf




In [23]:
!pip install seqeval




In [38]:
# Minimal BiLSTM NER (no CRF)
!pip -q install seqeval

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report

# 1) Tiny toy data (swap with your own lists if you want)
train_tokens = [["John","lives","in","New","York"], ["Mary","works","at","Google"]]
train_tags   = [["B-PER","O","O","B-LOC","I-LOC"], ["B-PER","O","O","B-ORG"]]
test_tokens  = [["Google","is","in","California"]]
test_tags    = [["B-ORG","O","O","B-LOC"]]

# 2) Vocab + tags
word2id = {"<PAD>":0,"<UNK>":1}
for s in train_tokens+test_tokens:
    for w in s:
        if w not in word2id: word2id[w]=len(word2id)

all_tags = sorted({t for seq in (train_tags+test_tags) for t in seq})
tag2id = {t:i for i,t in enumerate(all_tags)}
id2tag = {i:t for t,i in tag2id.items()}
PAD_ID = word2id["<PAD>"]

# 3) Dataset/Collator
class NERDS(Dataset):
    def __init__(self,toks,tags): self.toks=toks; self.tags=tags
    def __len__(self): return len(self.toks)
    def __getitem__(self,i):
        x=[word2id.get(w,1) for w in self.toks[i]]
        y=[tag2id[t] for t in self.tags[i]]
        return torch.tensor(x), torch.tensor(y)

def collate(batch):
    xs,ys=zip(*batch)
    L=max(len(x) for x in xs)
    xpad=torch.full((len(xs),L), PAD_ID, dtype=torch.long)
    ypad=torch.full((len(xs),L), -100, dtype=torch.long) # -100 ignored by loss
    mask=torch.zeros((len(xs),L), dtype=torch.bool)
    for i,(x,y) in enumerate(zip(xs,ys)):
        n=len(x); xpad[i,:n]=x; ypad[i,:n]=y; mask[i,:n]=1
    return xpad, ypad, mask

dl_tr = DataLoader(NERDS(train_tokens,train_tags), batch_size=2, shuffle=True, collate_fn=collate)
dl_te = DataLoader(NERDS(test_tokens,test_tags),   batch_size=1, shuffle=False, collate_fn=collate)

# 4) Model = BiLSTM + linear + cross-entropy
class BiLSTM_NER(nn.Module):
    def __init__(self,vocab,labels,emb=64,hid=64):
        super().__init__()
        self.emb = nn.Embedding(vocab, emb, padding_idx=PAD_ID)
        self.lstm= nn.LSTM(emb, hid//2, bidirectional=True, batch_first=True)
        self.fc  = nn.Linear(hid, labels)
    def forward(self,x):
        e=self.emb(x)
        o,_=self.lstm(e)
        return self.fc(o)  # (B,T,C)

model = BiLSTM_NER(len(word2id), len(tag2id))
opt = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss(ignore_index=-100)

# 5) Train (few epochs)
for ep in range(5):
    model.train(); tot=0.0
    for x,y,mask in dl_tr:
        logits = model(x)                 # (B,T,C)
        loss = loss_fn(logits.view(-1, logits.size(-1)), y.view(-1))
        opt.zero_grad(); loss.backward(); opt.step()
        tot += loss.item()
    print(f"Epoch {ep+1} loss: {tot/len(dl_tr):.4f}")

# 6) Evaluate
model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for x,y,mask in dl_te:
        logits = model(x)                 # (B,T,C)
        pred = logits.argmax(-1)          # (B,T)
        for i in range(x.size(0)):
            gold_seq=[id2tag[int(t)] for t,m in zip(y[i].tolist(), mask[i].tolist()) if m]
            pred_seq=[id2tag[int(t)] for t,m in zip(pred[i].tolist(), mask[i].tolist()) if m]
            y_true.append(gold_seq); y_pred.append(pred_seq)

print(classification_report(y_true, y_pred, digits=3))


Epoch 1 loss: 1.5812
Epoch 2 loss: 1.2790
Epoch 3 loss: 1.0209
Epoch 4 loss: 0.7854
Epoch 5 loss: 0.5755
              precision    recall  f1-score   support

         LOC      0.000     0.000     0.000         1
         ORG      1.000     1.000     1.000         1

   micro avg      1.000     0.500     0.667         2
   macro avg      0.500     0.500     0.500         2
weighted avg      0.500     0.500     0.500         2



  _warn_prf(average, modifier, msg_start, len(result))


In [40]:
# --- Diagnostics + robust reporting for seqeval ---
from collections import Counter
from seqeval.metrics import precision_score, recall_score, f1_score, classification_report

def flat_counts(tag_seqs):
    c = Counter()
    for seq in tag_seqs:
        c.update(seq)
    return c, sum(1 for seq in tag_seqs for t in seq if t != "O")

# test_tags, pred_tags must already exist from your prior step
gold_counts, gold_non_o = flat_counts(test_tags)
pred_counts, pred_non_o = flat_counts(pred_tags)

print("Gold tag counts:", gold_counts)
print("Pred tag counts:", pred_counts)
print("Gold non-O:", gold_non_o, " | Pred non-O:", pred_non_o)

# Always print micro scores (these exist even if all O)
prec = precision_score(test_tags, pred_tags)
rec  = recall_score(test_tags, pred_tags)
f1   = f1_score(test_tags, pred_tags)
print(f"\nMicro Precision: {prec:.3f} | Recall: {rec:.3f} | F1: {f1:.3f}")

# Only call classification_report if there is at least one entity in either gold or preds
if gold_non_o > 0 or pred_non_o > 0:
    print("\n=== Per-entity report (strict) ===")
    # zero_division avoids warnings on empty classes
    print(classification_report(test_tags, pred_tags, digits=3, zero_division=0))
else:
    print("\n(No entities in gold and predictions; per-entity report is not applicable.)")
    # Optional: token accuracy vs all-O baseline
    total = sum(len(seq) for seq in test_tags)
    correct = sum(1 for gseq, pseq in zip(test_tags, pred_tags) for g,p in zip(gseq, pseq) if g == p)
    print(f"Token accuracy: {correct/total:.3f}")


Gold tag counts: Counter({'O': 2229433})
Pred tag counts: Counter({'O': 2229433})
Gold non-O: 0  | Pred non-O: 0

Micro Precision: 0.000 | Recall: 0.000 | F1: 0.000

(No entities in gold and predictions; per-entity report is not applicable.)
Token accuracy: 1.000
