# 01 - Quality Filtering for Legal Text

## Why Cleaning Matters: Garbage In, Garbage Out

The quality of an LLM's training data has a direct, measurable impact on model
performance. The **RefinedWeb** paper (Penedo et al., 2023) demonstrated that a
carefully filtered web corpus could match or exceed the performance of curated
datasets like The Pile, despite starting from the same raw Common Crawl data.
The **FineWeb** paper (Penedo et al., 2024) pushed this further, showing that
systematic quality filtering at scale produces state-of-the-art pretraining
corpora.

### Legal Text Has Unique Noise

For a legal AI system like CoCounsel, data cleaning is especially important.
Legal documents collected from court filing systems and public records often
contain:

- **OCR artifacts** -- page numbers, headers, footers, and garbled text from
  scanned PDF opinions
- **Boilerplate** -- standard jurisdictional headers, procedural notices, and
  filing stamps that repeat across thousands of opinions
- **Non-English content** -- foreign-language documents mixed into multilingual
  court systems
- **PII leakage** -- social security numbers, phone numbers, and addresses that
  should be redacted before training

In this notebook we build a quality filtering pipeline for legal text, drawing
on techniques from RefinedWeb and FineWeb.

## Setup

Install and import the libraries we need:
- **langdetect** -- language identification
- **json** / **pathlib** -- loading JSONL data

In [None]:
# Install dependencies (uncomment if needed)
# %pip install langdetect

In [None]:
import json
import re
from collections import Counter
from pathlib import Path
from typing import Any

from langdetect import detect, DetectorFactory

# Make langdetect deterministic
DetectorFactory.seed = 0

## Loading Data

We load court opinions from our sample dataset. Each record has an `id`,
`case_name`, `court`, `date_filed`, `text`, and `citations` field.

In [None]:
DATA_PATH = Path("../../datasets/sample/court_opinions.jsonl")


def load_opinions(path: Path = DATA_PATH) -> list[dict[str, Any]]:
    """Load court opinions from a JSONL file."""
    opinions = []
    with open(path) as f:
        for line in f:
            line = line.strip()
            if line:
                opinions.append(json.loads(line))
    return opinions


opinions = load_opinions()
print(f"Loaded {len(opinions)} opinions.")

In [None]:
# Display a sample record
sample = opinions[0]
print(f"Case : {sample['case_name']}")
print(f"Court: {sample['court']}")
print(f"Date : {sample['date_filed']}")
print(f"Text length: {len(sample['text']):,} characters")
print(f"\nFirst 500 characters:\n{sample['text'][:500]}")

## Language Detection

The first step in any multilingual pipeline is to identify and filter by
language. Even datasets that are nominally English-only can contain
foreign-language documents -- especially in legal contexts where courts may
reproduce testimony, contracts, or statutes in their original language.

We use the `langdetect` library, which implements Google's language detection
algorithm. It works well on passages of 50+ words but can be unreliable on
very short texts.

In [None]:
def detect_language(text: str) -> str:
    """Detect the language of a text string.

    Returns a two-letter ISO 639-1 code (e.g. 'en' for English).
    Returns 'unknown' if detection fails.
    """
    try:
        return detect(text)
    except Exception:
        return "unknown"


# Detect languages for all opinions
for opinion in opinions:
    opinion["language"] = detect_language(opinion["text"])

lang_counts = Counter(op["language"] for op in opinions)
print("Language distribution:")
for lang, count in lang_counts.most_common():
    print(f"  {lang}: {count}")

In [None]:
def filter_by_language(
    docs: list[dict[str, Any]], target_lang: str = "en"
) -> list[dict[str, Any]]:
    """Keep only documents in the target language."""
    return [doc for doc in docs if doc.get("language") == target_lang]


english_opinions = filter_by_language(opinions)
print(f"Before language filter: {len(opinions)} documents")
print(f"After language filter:  {len(english_opinions)} documents")
print(
    f"Removed: {len(opinions) - len(english_opinions)} non-English documents"
)

## Content Quality Heuristics

RefinedWeb and FineWeb apply a battery of heuristic filters to remove
low-quality documents. We adapt four of these for legal text:

1. **Line length filter** -- documents with very short average line lengths
   are often OCR noise or formatting artifacts.
2. **Symbol ratio filter** -- a high ratio of special characters suggests
   corrupted text or code, not prose.
3. **Repetition filter** -- repeated n-grams indicate boilerplate or
   extraction errors.
4. **Boilerplate filter** -- standard legal headers that appear verbatim
   across many documents.

### Filter 1: Line Length

Documents with very short average line lengths are often the result of OCR
errors, misformatted tables, or extracted metadata rather than prose. We
flag documents where the average line length falls below a threshold.

In [None]:
def avg_line_length(text: str) -> float:
    """Compute the average line length (in characters) of a text."""
    lines = text.split("\n")
    if not lines:
        return 0.0
    return sum(len(line) for line in lines) / len(lines)


def line_length_filter(
    docs: list[dict[str, Any]], min_avg_length: float = 40.0
) -> list[dict[str, Any]]:
    """Remove documents with average line length below the threshold."""
    return [
        doc for doc in docs if avg_line_length(doc["text"]) >= min_avg_length
    ]


# Show line length statistics before filtering
print("Average line lengths:")
for op in english_opinions:
    avg = avg_line_length(op["text"])
    print(f"  {op['case_name'][:50]:50s}  {avg:>8.1f} chars")

after_line = line_length_filter(english_opinions)
print(f"\nBefore: {len(english_opinions)} | After: {len(after_line)}")
print(f"Removed: {len(english_opinions) - len(after_line)}")

### Filter 2: Symbol Ratio

Documents with an unusually high ratio of special characters (symbols,
punctuation, digits relative to alphabetic characters) are often corrupted
or non-prose content.

In [None]:
def symbol_ratio(text: str) -> float:
    """Compute the ratio of non-alphanumeric, non-space characters to total."""
    if not text:
        return 0.0
    symbols = sum(1 for c in text if not c.isalnum() and not c.isspace())
    return symbols / len(text)


def symbol_ratio_filter(
    docs: list[dict[str, Any]], max_ratio: float = 0.3
) -> list[dict[str, Any]]:
    """Remove documents where the symbol ratio exceeds the threshold."""
    return [doc for doc in docs if symbol_ratio(doc["text"]) <= max_ratio]


# Show symbol ratios
print("Symbol ratios:")
for op in after_line:
    ratio = symbol_ratio(op["text"])
    print(f"  {op['case_name'][:50]:50s}  {ratio:.4f}")

after_symbol = symbol_ratio_filter(after_line)
print(f"\nBefore: {len(after_line)} | After: {len(after_symbol)}")
print(f"Removed: {len(after_line) - len(after_symbol)}")

### Filter 3: Repetition

Repeated n-grams are a strong signal of extraction errors or boilerplate.
For example, a page header that appears on every page of a scanned PDF
will produce many repeated 5-grams. RefinedWeb filters documents where
more than a threshold fraction of n-grams are duplicates.

In [None]:
def repeated_ngram_ratio(text: str, n: int = 5) -> float:
    """Compute the fraction of n-grams that appear more than once.

    A high value means the text is repetitive.
    """
    words = text.lower().split()
    if len(words) < n:
        return 0.0
    ngrams = [tuple(words[i : i + n]) for i in range(len(words) - n + 1)]
    counts = Counter(ngrams)
    repeated = sum(c - 1 for c in counts.values() if c > 1)
    return repeated / len(ngrams)


def repetition_filter(
    docs: list[dict[str, Any]], max_repetition: float = 0.3
) -> list[dict[str, Any]]:
    """Remove documents with excessive n-gram repetition."""
    return [
        doc
        for doc in docs
        if repeated_ngram_ratio(doc["text"]) <= max_repetition
    ]


# Show repetition ratios
print("Repetition ratios (5-gram):")
for op in after_symbol:
    ratio = repeated_ngram_ratio(op["text"])
    print(f"  {op['case_name'][:50]:50s}  {ratio:.4f}")

after_rep = repetition_filter(after_symbol)
print(f"\nBefore: {len(after_symbol)} | After: {len(after_rep)}")
print(f"Removed: {len(after_symbol) - len(after_rep)}")

### Filter 4: Boilerplate Detection

Legal opinions often begin with standardized text that is specific to the
court or filing system rather than the substantive content of the opinion.
We detect common boilerplate patterns and flag documents that consist
primarily of boilerplate.

In [None]:
# Common boilerplate patterns found in court opinions
BOILERPLATE_PATTERNS = [
    re.compile(r"NOT FOR PUBLICATION", re.IGNORECASE),
    re.compile(r"THIS OPINION IS NOT PRECEDENTIAL", re.IGNORECASE),
    re.compile(r"FILED\s+\w+\s+\d{1,2},?\s+\d{4}", re.IGNORECASE),
    re.compile(r"Page\s+\d+\s+of\s+\d+", re.IGNORECASE),
    re.compile(
        r"Case\s+\d+:\d+-\w+-\d+\s+Document\s+\d+", re.IGNORECASE
    ),
    re.compile(r"UNITED STATES (?:DISTRICT|CIRCUIT) COURT", re.IGNORECASE),
]


def boilerplate_score(text: str) -> int:
    """Count how many boilerplate patterns match in the text."""
    return sum(1 for pat in BOILERPLATE_PATTERNS if pat.search(text))


def boilerplate_filter(
    docs: list[dict[str, Any]], max_matches: int = 4
) -> list[dict[str, Any]]:
    """Remove documents that match too many boilerplate patterns.

    A high number of matches suggests the document is mostly boilerplate
    rather than substantive legal reasoning.
    """
    return [
        doc for doc in docs if boilerplate_score(doc["text"]) <= max_matches
    ]


# Show boilerplate scores
print("Boilerplate pattern matches:")
for op in after_rep:
    score = boilerplate_score(op["text"])
    print(f"  {op['case_name'][:50]:50s}  {score} matches")

after_boilerplate = boilerplate_filter(after_rep)
print(f"\nBefore: {len(after_rep)} | After: {len(after_boilerplate)}")
print(f"Removed: {len(after_rep) - len(after_boilerplate)}")

## PII Detection

Training data should be scrubbed of personally identifiable information (PII)
to avoid the model memorizing and reproducing sensitive data. We implement
regex-based detectors for three common PII types:

- **Social Security Numbers** (XXX-XX-XXXX)
- **Phone numbers** (various US formats)
- **Email addresses**

> **Note:** We demonstrate on synthetic examples below. The sample court
> opinions do not contain real PII.

In [None]:
PII_PATTERNS = {
    "ssn": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
    "phone": re.compile(
        r"\b(?:\+?1[-.\s]?)?"
        r"(?:\(?\d{3}\)?[-.\s]?)"
        r"\d{3}[-.\s]?\d{4}\b"
    ),
    "email": re.compile(
        r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
    ),
}


def detect_pii(text: str) -> dict[str, list[str]]:
    """Find all PII matches in a text string.

    Returns a dictionary mapping PII type to list of matches.
    """
    results = {}
    for pii_type, pattern in PII_PATTERNS.items():
        matches = pattern.findall(text)
        if matches:
            results[pii_type] = matches
    return results


def redact_pii(text: str) -> str:
    """Replace all detected PII with redaction markers."""
    redacted = text
    redacted = PII_PATTERNS["ssn"].sub("[SSN REDACTED]", redacted)
    redacted = PII_PATTERNS["phone"].sub("[PHONE REDACTED]", redacted)
    redacted = PII_PATTERNS["email"].sub("[EMAIL REDACTED]", redacted)
    return redacted

In [None]:
# Demonstrate PII detection on synthetic examples
# (These are fictional -- never put real PII in notebooks!)
synthetic_texts = [
    (
        "The plaintiff, John Doe (SSN: 123-45-6789), filed his claim "
        "on January 15, 2024. He can be reached at john.doe@example.com "
        "or (555) 123-4567."
    ),
    (
        "Witness testimony was provided by Jane Smith, whose contact "
        "information is on file with the court."
    ),
    (
        "Records indicate the account holder's SSN is 987-65-4321 and "
        "the alternate phone number listed is 555-987-6543. Email "
        "correspondence was sent to legal.team@lawfirm.org."
    ),
]

for i, text in enumerate(synthetic_texts):
    print(f"--- Example {i + 1} ---")
    pii_found = detect_pii(text)
    if pii_found:
        for pii_type, matches in pii_found.items():
            print(f"  {pii_type}: {matches}")
        print(f"\n  Redacted: {redact_pii(text)}")
    else:
        print("  No PII detected.")
    print()

In [None]:
# Check our real sample data for PII
print("PII scan of sample court opinions:")
for op in opinions:
    pii_found = detect_pii(op["text"])
    status = "PII FOUND" if pii_found else "clean"
    print(f"  {op['case_name'][:50]:50s}  [{status}]")
    if pii_found:
        for pii_type, matches in pii_found.items():
            print(f"    {pii_type}: {matches}")

## Full Pipeline

Now we chain all the filters together into a single `clean_document`
function and run the complete pipeline on our sample corpus.

In [None]:
def clean_document(
    text: str,
    *,
    target_lang: str = "en",
    min_avg_line_length: float = 40.0,
    max_symbol_ratio: float = 0.3,
    max_repetition_ratio: float = 0.3,
    max_boilerplate_matches: int = 4,
    redact: bool = True,
) -> tuple[str | None, dict[str, Any]]:
    """Run the full cleaning pipeline on a single document.

    Returns:
        A tuple of (cleaned_text, metadata). If the document is filtered
        out, cleaned_text is None and metadata contains the reason.
    """
    metadata: dict[str, Any] = {"filters_passed": []}

    # 1. Language detection
    lang = detect_language(text)
    metadata["language"] = lang
    if lang != target_lang:
        metadata["filtered_by"] = "language"
        return None, metadata
    metadata["filters_passed"].append("language")

    # 2. Line length
    avg_ll = avg_line_length(text)
    metadata["avg_line_length"] = avg_ll
    if avg_ll < min_avg_line_length:
        metadata["filtered_by"] = "line_length"
        return None, metadata
    metadata["filters_passed"].append("line_length")

    # 3. Symbol ratio
    sr = symbol_ratio(text)
    metadata["symbol_ratio"] = sr
    if sr > max_symbol_ratio:
        metadata["filtered_by"] = "symbol_ratio"
        return None, metadata
    metadata["filters_passed"].append("symbol_ratio")

    # 4. Repetition
    rr = repeated_ngram_ratio(text)
    metadata["repetition_ratio"] = rr
    if rr > max_repetition_ratio:
        metadata["filtered_by"] = "repetition"
        return None, metadata
    metadata["filters_passed"].append("repetition")

    # 5. Boilerplate
    bp = boilerplate_score(text)
    metadata["boilerplate_matches"] = bp
    if bp > max_boilerplate_matches:
        metadata["filtered_by"] = "boilerplate"
        return None, metadata
    metadata["filters_passed"].append("boilerplate")

    # 6. PII detection and redaction
    pii_found = detect_pii(text)
    metadata["pii_found"] = bool(pii_found)
    if redact and pii_found:
        text = redact_pii(text)
        metadata["pii_redacted"] = True
    metadata["filters_passed"].append("pii")

    return text, metadata

In [None]:
# Run the full pipeline on the sample corpus
cleaned_opinions = []
pipeline_stats: dict[str, int] = {
    "total": len(opinions),
    "passed": 0,
    "filtered_language": 0,
    "filtered_line_length": 0,
    "filtered_symbol_ratio": 0,
    "filtered_repetition": 0,
    "filtered_boilerplate": 0,
    "pii_redacted": 0,
}

total_chars_before = 0
total_chars_after = 0

for op in opinions:
    total_chars_before += len(op["text"])
    cleaned_text, meta = clean_document(op["text"])

    if cleaned_text is not None:
        pipeline_stats["passed"] += 1
        total_chars_after += len(cleaned_text)
        cleaned_opinions.append({**op, "text": cleaned_text, "meta": meta})
        if meta.get("pii_redacted"):
            pipeline_stats["pii_redacted"] += 1
    else:
        reason = meta.get("filtered_by", "unknown")
        key = f"filtered_{reason}"
        pipeline_stats[key] = pipeline_stats.get(key, 0) + 1

print("=" * 60)
print("PIPELINE STATISTICS")
print("=" * 60)
print(f"Total documents:      {pipeline_stats['total']}")
print(f"Passed all filters:   {pipeline_stats['passed']}")
print(f"Filtered (language):  {pipeline_stats['filtered_language']}")
print(f"Filtered (line len):  {pipeline_stats['filtered_line_length']}")
print(f"Filtered (symbols):   {pipeline_stats['filtered_symbol_ratio']}")
print(f"Filtered (repetition):{pipeline_stats['filtered_repetition']}")
print(f"Filtered (boilerplate):{pipeline_stats['filtered_boilerplate']}")
print(f"PII redacted:         {pipeline_stats['pii_redacted']}")
print(f"\nText volume:")
print(f"  Before: {total_chars_before:>10,} characters")
print(f"  After:  {total_chars_after:>10,} characters")
if total_chars_before > 0:
    pct = (1 - total_chars_after / total_chars_before) * 100
    print(f"  Removed: {pct:.1f}%")

## Exercises

### Exercise (a): Custom Boilerplate Filter

Add a custom filter for court-specific boilerplate. Many courts use
standardized headers that appear verbatim in every opinion from that court.
For example:

```
UNITED STATES COURT OF APPEALS FOR THE SEVENTH CIRCUIT
No. 24-1234
```

Write a function that:
1. Groups opinions by court.
2. Identifies text patterns that appear in **all** opinions from a given court.
3. Strips those patterns from the text before further processing.

Test it on the sample data and measure how much text is removed.

### Exercise (b): Threshold Tuning

The quality filters use hard-coded thresholds (`min_avg_line_length=40`,
`max_symbol_ratio=0.3`, etc.). Experiment with different values:

1. Sweep each threshold across a range of values.
2. For each setting, record how many documents pass and the total text volume.
3. Plot corpus size vs. threshold to understand the trade-off between
   aggressive filtering (higher quality but less data) and permissive
   filtering (more data but potentially lower quality).

Which thresholds have the most impact on your legal corpus?