# OCR Post-Processing & Text Restoration Pipeline

This documentation provides a technical overview of the `final_preprocessing_pipeline.ipynb`, which is designed to clean, standardise, and restore historical text data degraded by Optical Character Recognition (OCR) errors. The pipeline focuses on art reviews and employs a multi-stage approach combining regex heuristics, Named Entity Recognition (NER), and symmetric spelling correction.

---

## 1. Environment & Dependencies
The script relies on several key libraries for NLP and data processing:
* **Data Handling**: `pandas`, `datasets` (Hugging Face), and `pathlib`.
* **NLP Models**: `transformers` (utilising the `dslim/bert-large-NER` model), `torch`, and `symspellpy`.
* **Visualisation**: `matplotlib` for performance tracking.
* **Utilities**: `re` for pattern matching and `pickle` for wordlist management.

## 2. Pipeline Architecture
The `pipeline_driver()` function orchestrates three primary stages of processing:

| Stage | Function | Purpose |
| :--- | :--- | :--- |
| **1. Regex Cleaning** | `clean_text_piece_batched` | Corrects systematic OCR misreads and removes metadata or "junk" characters. |
| **2. NER & Spelling** | `capitalization_and_correction_batched` | Protects proper nouns and applies length-based spellchecking. |
| **3. Polishing** | `final_polishing_batched` | Standardises honorifics and fixes punctuation/currency spacing. |

---

## 3. Detailed Component Breakdown

### A. Heuristic OCR Patching
Before statistical correction, `apply_common_ocr_patches` and `apply_aggressive_ocr_patches` target high-frequency failures:
* **Common Substitutions**: e.g., `alvays` $\rightarrow$ `always`, `thle` $\rightarrow$ `the`, and `vith` $\rightarrow$ `with`.
* **Systematic Fixes**: Resolves character confusion such as `rn` being read as `m`, or `wv/vw` being read as `w`.
These patterns have been identified by inspecting an non-dictionary words in the full uncorrected corpus.

### B. NER Shielding
To prevent the spellchecker from erroneously altering rare surnames or historical locations, the pipeline implements a "shielding" mechanism:
1. **Extraction**: The BERT NER model identifies entities (PER, ORG, LOC).
2. **Merging**: `merge_entities_generic` joins fragmented tokens (e.g., "B.", "B.", "C.") into unified entities.
3. **Protection**: Words within these entity boundaries are checked against a specialized `ENTITY_WORDLIST` or ignored by the general spellchecker.

### C. Staircase Edit Distance
The `get_staircase_edit_distance` function assigns an edit budget based on word length to minimize over-correction:
* For word length $L < 4$: 1 edit allowed.
* For $4 \le L < 6$: 2 edits allowed.
* For $6 \le L < 10$: 3 edits allowed.
* For $10 \le L < 15$: 4 edits allowed.
* For $L \ge 15$: 6 edits allowed.

### D. Digit-to-Letter Correction
The `find_best_digit_correction` function addresses cases where numbers were substituted for letters (e.g., "M0re" for "More"). It maps digits to visually similar letters (e.g., $0 \rightarrow o, d, q$) and checks the resulting candidates against the frequency dictionary.

---

## 4. Standardisation & Polishing
The final stage ensures consistency, specifically following British English conventions:
* **Honorifics**: `standardize_honorifics` manages titles like Mr, Mrs, and Dr, with an option for the modern British "no-dot" style.
* **Spacing**: `reapply_punctuation_spacing` corrects errors like "word ' s" $\rightarrow$ "word's" and standardises currency formatting (e.g., "£ 100" $\rightarrow$ "£100").

---

## 5. Evaluation & Metrics
The `evaluate_pipeline_oov` function assesses the pipeline's effectiveness:
* **OOV Reduction**: Tracks the percentage of Out-Of-Vocabulary words before and after processing.
* **Historical Grouping**: Groups results into 50-year periods to analyze performance across different eras of print quality.
* **Visualisation**: Generates dual-axis plots comparing document volume with mean improvement percentages.

---

## 6. Execution Example
To run the pipeline in a Jupyter Notebook cell:

```python
# 1. Run the main processing driver
wordlist, output_folder = pipeline_driver()

# 2. Evaluate performance on the output
stats, eval_path = evaluate_pipeline_oov(wordlist, output_folder)

# 3. Generate summary plots
plot_merged_performance(stats, eval_path)

In [1]:
import pandas as pd
import os
from pathlib import Path
import numpy as np
import re
from itertools import product
import pickle
import shutil
import textwrap
import torch
from datasets import load_dataset
from transformers import pipeline
from symspellpy import SymSpell, Verbosity
import logging
import matplotlib.pyplot as plt

In [46]:
# Directories and file paths
DICTIONARY_DATA_DIR = './Dictionary_data/'

ART_WORDLIST_NAME = 'artist_wordlist.pkl' 
NER_WORDLIST_NAME = 'ner_wordlist.pkl'

GUTENBERG_WORDS_PATH = 'project_gutenberg_word_count.txt'
GUTENBERG_BIGRAMS_PATH = 'project_gutenberg_bigrams.txt'

INPUT_PATH = './Data'
OUTPUT_PATH = './Cleaned_data/' + os.path.basename(os.path.normpath(INPUT_PATH)) + '/'

In [3]:
# Setup basic logging to see warnings
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

# Regex to match words (including hyphenated and apostrophized words)
WORD_REGEX = re.compile(r"\b[a-zA-Z'0-9-]+\b")

# Check for CUDA and set the device
device = 0 if torch.cuda.is_available() else -1

# --- Load NER Model and Tokenizer ---
model_name_ner = "dslim/bert-large-NER"
try:
    log.info(f"Loading NER model: {model_name_ner}...")
    # Using aggregation_strategy="simple" helps merge B-TAG I-TAG sequences
    ner_pipeline = pipeline("ner", model=model_name_ner, tokenizer=model_name_ner, aggregation_strategy="simple", device=device)
    log.info("NER model loaded successfully.")
except Exception as e:
    log.error(f"Failed to load NER model: {e}")
    raise e

INFO:__main__:Loading NER model: dslim/bert-large-NER...
Some weights of the model checkpoint at dslim/bert-large-NER were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cuda:0
INFO:__main__:NER model loaded successfully.


In [4]:
# Reading wordlists from files and creating master
with open(DICTIONARY_DATA_DIR + ART_WORDLIST_NAME, 'rb') as file:
    ART_WORDLIST = pickle.load(file)
print("Total 'art' words loaded:", len(ART_WORDLIST))

with open(DICTIONARY_DATA_DIR + NER_WORDLIST_NAME, 'rb') as file:
    NER_WORDLIST = pickle.load(file)
print("Total 'ner' words loaded:", len(NER_WORDLIST))

ENTITY_WORDLIST = ART_WORDLIST.union(NER_WORDLIST)
print(f"The 'ENTITY' wordlist consist of {len(ENTITY_WORDLIST)} words.")



Total 'art' words loaded: 33188
Total 'ner' words loaded: 14874
The 'ENTITY' wordlist consist of 45111 words.


In [5]:
# Read a CSV file into a HuggingFace dataset
def load_csv_file(file_path):
    try:
        ds = load_dataset('csv', data_files=file_path)
        file = Path(file_path)

        return {'name': file.name, 
                'data': ds,
                'length': ds.get('train').num_rows # ds is a dict like dataset while train is an arrow dataset
                }
    
    except Exception as e:
        print(f"Caught error when loading csv file from path: {file_path} \n\n {e}")


In [6]:
def setup_spellchecker(master_words=True, artist_set=True, global_max_edits=6):

    log.info(f"Setting up SymSpellPy...")
    symspell = SymSpell(max_dictionary_edit_distance=global_max_edits, prefix_length=7)
    master_words_added = set()
    art_words_added = set()

    # Load Project Gutenberg dictionary
    if master_words:
        symspell.load_dictionary(DICTIONARY_DATA_DIR + GUTENBERG_WORDS_PATH, term_index=0, count_index=1, separator='\t')
        symspell.load_bigram_dictionary(DICTIONARY_DATA_DIR + GUTENBERG_BIGRAMS_PATH, term_index=0, count_index=1, separator='\t') 
        master_words_added = set(symspell.words.keys())

    # Add artist and NER words with high frequency
    if artist_set: 
        try:
            # Use the max frequency from the existing dictionary to set a high frequency for art words
            # this ensures they are prioritised in suggestions
            art_word_count = (max(symspell.words.values())*10) or 1000000000 
        except ValueError:
            art_word_count = 1000000000
        for word in ENTITY_WORDLIST:
            # Replace the current frequency with a higher to
            # emphasise the genre importance
            symspell.create_dictionary_entry(word, art_word_count)
        art_words_added = ENTITY_WORDLIST
            
    log.info(f"Added {len(art_words_added)} words from ENTITY wordlist to SymSpell.")    
    log.info(f"Added {len(master_words_added)} words from Project Gutenberg library to SymSpell.")
    log.info(f"SymSpell setup complete. Total unique terms: {symspell.word_count}")
    return symspell, master_words_added, art_words_added

In [7]:
def apply_common_ocr_patches(text):
    # Fix common title misreads
    text = re.sub(r'\bMar\.\s', 'Mr. ', text)
    text = re.sub(r'\bAlr\.\s', 'Mr. ', text) 
    
    # Common miss-spellings based on frequence analysis of 
    # unidentified words from the whole dataset.
    text = re.sub(r'\balvays\b', 'always', text)

    text = re.sub(r'\banld\b', 'and', text)
    text = re.sub(r'\bnnd\b', 'and', text)
    text = re.sub(r'\bsnd\b', 'and', text)
    text = re.sub(r'\bahd\b', 'and', text)
    text = re.sub(r'\banv\b', 'and', text)

    text = re.sub(r'\bcen\b', 'been', text)
    text = re.sub(r'\becn\b', 'been', text)

    text = re.sub(r'\bflrst\b', 'first', text)
    text = re.sub(r'\blirst\b', 'first', text)

    text = re.sub(r'\birom\b', 'from', text)
    text = re.sub(r'\bfiom\b', 'from', text)
    text = re.sub(r'\bfronm\b', 'from', text)
    text = re.sub(r'\btrom\b', 'from', text)

    text = re.sub(r'\bhavc\b', 'have', text)

    text = re.sub(r'\bhiis\b', 'his', text)

    text = re.sub(r'\bmladame\b', 'madame', text)
    text = re.sub(r'\bmiadame\b', 'madame', text)

    text = re.sub(r'\bmanv\b', 'many', text)

    text = re.sub(r'\bmav\b', 'may', text)
    text = re.sub(r'\bmnay\b', 'may', text)

    text = re.sub(r'\bmnore\b', 'more', text)
    text = re.sub(r'\bnmore\b', 'more', text)

    text = re.sub(r'\bmnost\b', 'most', text)

    text = re.sub(r'\bmedtner\b', 'mother', text)

    text = re.sub(r'\bmnuch\b', 'much', text)

    text = re.sub(r'\bmlusic\b', 'music', text)
    text = re.sub(r'\bmnsic\b', 'music', text)

    text = re.sub(r'\bonlv\b', 'only', text)

    text = re.sub(r'\bplav\b', 'play', text)

    text = re.sub(r'\bplaved\b', 'played', text)

    text = re.sub(r'\bprogrammc\b', 'programme', text)

    text = re.sub(r'\bsomc\b', 'some', text)

    text = re.sub(r'\bstvle\b', 'style', text)

    text = re.sub(r'\btbat\b', 'that', text)
    text = re.sub(r'\bthlat\b', 'that', text)
    text = re.sub(r'\bthiat\b', 'that', text)
    text = re.sub(r'\btlhat\b', 'that', text)

    text = re.sub(r'\bthc\b', 'the', text)
    text = re.sub(r'\btbe\b', 'the', text)
    text = re.sub(r'\bthle\b', 'the', text)
    text = re.sub(r'\blhe\b', 'the', text)
    text = re.sub(r'\bt\'he\b', 'the', text)
    text = re.sub(r'\btthe\b', 'the', text)
    text = re.sub(r'\bthr\b', 'the', text)
    text = re.sub(r'\bfhe\b', 'the', text)

    text = re.sub(r'\bteatro\b', 'theatre', text)
    text = re.sub(r'\btbeatre\b', 'theatre', text)
    text = re.sub(r'\bthcatre\b', 'theatre', text)

    text = re.sub(r'\bthemn\b', 'them', text)
    text = re.sub(r'\bthenm\b', 'them', text)

    text = re.sub(r'\bthel\b', 'they', text)

    text = re.sub(r'\bthoso\b', 'those', text)

    text = re.sub(r'\btimc\b', 'time', text)

    text = re.sub(r'\btvo\b', 'two', text)

    text = re.sub(r'\bvcry\b', 'very', text)

    text = re.sub(r'wv', 'w', text)
    text = re.sub(r'vw', 'w', text)

    text = re.sub(r'\bwvas\b', 'was', text)
    text = re.sub(r'\bwias\b', 'was', text)

    text = re.sub(r'\bwerc\b', 'were', text)

    text = re.sub(r'\bwlhat\b', 'what', text)

    text = re.sub(r'\bvhen\b', 'when', text)
    text = re.sub(r'\bwlhen\b', 'when', text)
    text = re.sub(r'\bwben\b', 'when', text)

    text = re.sub(r'\bwvhich\b', 'which', text)
    text = re.sub(r'\bwbich\b', 'which', text)
    text = re.sub(r'\bwhicb\b', 'which', text)
    text = re.sub(r'\bwhieh\b', 'which', text)
    text = re.sub(r'\bwhichl\b', 'which', text)
    text = re.sub(r'\bwhiclh\b', 'which', text)
    text = re.sub(r'\bwhlich\b', 'which', text)
    text = re.sub(r'\bwhicl\b', 'which', text)
    text = re.sub(r'\bwhiich\b', 'which', text)
    text = re.sub(r'\bwrhich\b', 'which', text)

    text = re.sub(r'\bwlho\b', 'who', text)
    text = re.sub(r'\bvho\b', 'who', text)
    text = re.sub(r'\bwbo\b', 'who', text)
    text = re.sub(r'\bwhlo\b', 'who', text)
    text = re.sub(r'\bwhio\b', 'who', text)

    text = re.sub(r'\bwtih\b', 'with', text)
    text = re.sub(r'\bwvith\b', 'with', text)
    text = re.sub(r'\bvith\b', 'with', text)
    text = re.sub(r'\bwitb\b', 'with', text)
    text = re.sub(r'\bwithl\b', 'with', text)
    text = re.sub(r'\bwitl\b', 'with', text)

    text = re.sub(r'\bvould\b', 'would', text)

    text = re.sub(r'\bvears\b', 'years', text)

    text = re.sub(r'\bvou\b', 'you', text)

    return text

In [8]:
def apply_aggressive_ocr_patches(text):
    """
    Applies regex-based fixes for systematic OCR errors (wv->w, rn->m)
    and high-frequency specific word errors found in the dataset.
    """
    # --- 1. Systematic 'w' and 'm' Fixes ---
    # Fix "wv" or "vw" anywhere (e.g. "twvo", "wvas")
    text = re.sub(r'wv|vw', 'w', text, flags=re.IGNORECASE)
    # Fix "wl" at start of word (e.g. "wlhich", "wlho")
    text = re.sub(r'\bwl', 'w', text, flags=re.IGNORECASE)
    # Fix "w" being read as "v" in specific common words
    text = re.sub(r'\bv(hich|ith|hen|here|ho|hose|ould)\b', r'w\1', text, flags=re.IGNORECASE)
    
    # Fix "mn" or "nm" at start (e.g. "mnore")
    text = re.sub(r'\b[mn]m', 'm', text, flags=re.IGNORECASE)
    # Fix "rn" -> "m" at start (e.g. "rnore")
    text = re.sub(r'\brn(?=[aeiou])', 'm', text, flags=re.IGNORECASE)

    # --- 2. 'The' / 'That' variants ---
    text = re.sub(r'\bt[bkn]e\b', 'the', text, flags=re.IGNORECASE) 
    text = re.sub(r'\bt[li]he\b', 'the', text, flags=re.IGNORECASE)
    text = re.sub(r'\b[li]he\b', 'the', text, flags=re.IGNORECASE)
    text = re.sub(r'\bt[li](at|is|ose)\b', r'th\1', text, flags=re.IGNORECASE)

    # --- 3. High-Frequency Specific Fixes (Safe List) ---
    # Common misreads of "and"
    text = re.sub(r'\ba[ni][li]d\b', 'and', text, flags=re.IGNORECASE) 
    # Honorifics
    text = re.sub(r'\bml(iss|rs|r)\b', r'M\1', text, flags=re.IGNORECASE) # Mliss -> Miss
    # Common words
    text = re.sub(r'\bMar\.\s', 'Mr. ', text)  # Fixes Mar. 
    text = re.sub(r'\bAlr\.\s', 'Mr. ', text)  # Fixes Alr. 
    text = re.sub(r'\bMlr\.\s', 'Mr. ', text)  # Fixes Mlr. 
    text = re.sub(r'\b[fl]irst\b', 'first', text, flags=re.IGNORECASE)
    text = re.sub(r'\bhiis\b', 'his', text, flags=re.IGNORECASE)
    text = re.sub(r'\balvays\b', 'always', text, flags=re.IGNORECASE)
    text = re.sub(r'\bprogrammc\b', 'programme', text, flags=re.IGNORECASE)
    
    return text

In [9]:
def clean_text_piece_batched(batch, text_column="Full_text"):
    input_texts = batch[text_column]
    cleaned_texts = []

    # --- Regex Patterns ---
    backslash_re = re.compile(r'\\')
    newpage_re = re.compile(r'<NEWPAGE>')
    junk_char_re = re.compile(r'[^a-zA-Z0-9\s\.,;:?!£\'"()-]')
    double_single_quote_re = re.compile(r"''")
    dehyphen_eol_re = re.compile(r'([a-zA-Z])-\s*\n\s*([a-zA-Z])')
    internal_hyphen_re = re.compile(r'-')
    newline_re = re.compile(r'\n')
    
    # Currency/Context
    currency_l_dot_re = re.compile(r'\s+l\.(?=\s+|\d)')
    num_l00_re = re.compile(r'\sl00(?=\s|[,.])')
    num_O0_re = re.compile(r'\sO0(?=\s|[,.])')

    # Spacing
    space_before_punct_re = re.compile(r'\s+([,\.;:?!])')
    space_after_punct_re = re.compile(r'([,\.;:?!])\s*')
    space_around_brackets_open_re = re.compile(r'\s*\(\s*')
    space_around_brackets_close_re = re.compile(r'\s*\)\s*')
    multi_space_re = re.compile(r'\s+')

    for text in input_texts:
        if not text:
            cleaned_texts.append("")
            continue

        # 1. Metadata & Junk
        text = backslash_re.sub('', text)
        text = newpage_re.sub('', text)
        text = junk_char_re.sub(' ', text)
        text = double_single_quote_re.sub('"', text)

        # 2. Aggressive Regex Patches (Systematic Fixes)
        text = apply_common_ocr_patches(text)
        text = apply_aggressive_ocr_patches(text)

        # 3. De-hyphenation
        new_text = text
        while True:
            processed_text = dehyphen_eol_re.sub(r'\1\2', new_text)
            if processed_text == new_text: break
            new_text = processed_text
        text = new_text

        # Remove internal hyphens (non-EOL noise like 'in-the' -> 'in the')
        text = internal_hyphen_re.sub(' ', text)

        # 4. Join Lines
        text = newline_re.sub(' ', text)

        # 5. Currency & Context
        text = currency_l_dot_re.sub(' £', text)
        text = num_l00_re.sub(' 100', text)
        text = num_O0_re.sub(' 00', text)

        # 6. Pre-Correction Spacing 
        text = space_before_punct_re.sub(r'\1', text)
        text = space_after_punct_re.sub(r'\1 ', text)
        text = space_around_brackets_open_re.sub(' (', text)
        text = space_around_brackets_close_re.sub(') ', text)
        text = multi_space_re.sub(' ', text)

        # 8. Final Cleanup
        text = multi_space_re.sub(' ', text)
        cleaned_texts.append(text.strip())

    return {'Regex_cleaned_text': cleaned_texts}

In [10]:
def merge_entities_generic(ner_results, text, entity_types_to_merge={'PER', 'ORG', 'LOC'}):
    """
    Merges adjacent, fragmented entities (PER, ORG, LOC by default)
    from a NER model's output if separated by common connectors or whitespace.
    Handles initials specifically for PER entities.

    Args:
        ner_results (list): The raw output from the NER pipeline (ideally
                            with an aggregation strategy like "simple" already applied).
        text (str): The original text that was analyzed.
        entity_types_to_merge (set): A set of entity group labels to consider for merging
                                     (e.g., {'PER', 'ORG', 'LOC'}).

    Returns:
        list: A new list of entities with fragmented ones merged.
    """
    if not ner_results:
        return []

    merged_results = []
    i = 0
    while i < len(ner_results):
        current_entity = ner_results[i]

        # Check if the current entity is one we want to merge and if there's a next entity
        if current_entity['entity_group'] in entity_types_to_merge and i + 1 < len(ner_results):
            next_entity = ner_results[i+1]

            # --- MERGING LOGIC ---
            # Condition 1: Check if the next entity is of the SAME type
            if next_entity['entity_group'] == current_entity['entity_group']:
                # Get the text between the two entities
                start = current_entity['end']
                end = next_entity['start']
                text_between = text[start:end]
                stripped_text_between = text_between.strip()

                should_merge = False
                # Merge based on simple separators ('and', '&', space/nothing)
                if stripped_text_between in ['and', '&', '']:
                    should_merge = True

                # Merge initials specifically for PER entities
                elif current_entity['entity_group'] == 'PER' and \
                     current_entity['word'].endswith('.') and \
                     len(current_entity['word'].strip('.')) <= 1 and \
                     stripped_text_between == '':
                    # Ensure space is added between initials
                     text_between = " "
                     should_merge = True

                if should_merge:
                    # Combine words, average scores, update indices
                    # Add space if original separation was just whitespace but not empty
                    if stripped_text_between == '' and text_between != '':
                        new_word = current_entity['word'] + ' ' + next_entity['word']
                    else:
                        new_word = current_entity['word'] + text_between + next_entity['word']

                    new_score = (current_entity['score'] + next_entity['score']) / 2

                    # Create a new merged entity dictionary
                    merged_entity = {
                        'entity_group': current_entity['entity_group'],
                        'word': new_word.replace(" ##", ""), # Clean up sub-word tokens if necessary
                        'score': new_score,
                        'start': current_entity['start'],
                        'end': next_entity['end']
                    }

                    # Replace the current entity with the merged one for the next iteration/append step
                    current_entity = merged_entity
                    # Increment 'i' to skip the next entity that we just merged
                    i += 1

        # Append the current (potentially merged) entity to the results
        merged_results.append(current_entity)
        i += 1 # Move to the next entity

    # --- Final Filtering ---
    # Keep only the desired entity types and apply basic length filtering
    # Allows single initials ending with '.' for PER, otherwise length > 1
    final_results = [
        ent for ent in merged_results
        if ent['entity_group'] in entity_types_to_merge and \
           (len(ent['word'].strip()) > 1 or \
            (ent['entity_group'] == 'PER' and ent['word'].strip().endswith('.') and len(ent['word'].strip()) <= 2)
           )
    ]

    return final_results

In [11]:
def apply_capitalization(text, ner_entities):
    """
    Applies sentence case and capitalizes recognized entities,
    handling common abbreviations/acronyms as special cases (keeping them uppercase).
    """
    if not text or text.isspace():
        return text

    # Convert text to list of characters for easier modification
    text_chars = list(text.lower())

    # --- Apply Sentence Capitalization ---
    capitalize_next = True 
    for i, char in enumerate(text_chars):
        if capitalize_next and char.isalpha():
            text_chars[i] = char.upper()
            capitalize_next = False
        elif char in '.?!' and i + 1 < len(text_chars) and text_chars[i+1].isspace():
             capitalize_next = True
        elif not char.isspace():
            capitalize_next = False

    # Ensure first letter is capitalized even if text starts with non-alpha
    for i, char in enumerate(text_chars):
        if char.isalpha():
            text_chars[i] = char.upper()
            break

    # --- Lock Existing Title Case Words ---
    # If a word is already title case in the original text,
    # preserve its casing if for example it is not
    # captred by NER.
    for match in WORD_REGEX.finditer(text):
        start, end = match.start(), match.end()

        if match.group(0).istitle():
            text_chars[start:end] = list(match.group(0))

    # --- Apply Entity Capitalization ---
    # Sort reversed to handle indices safely if modifications were to change length
    sorted_entities = sorted(ner_entities, key=lambda x: x['start'], reverse=True)
    modified_indices = set()

    known_abbreviations = {
        'llc', 'ltd', 'inc', 'co', 'plc', 'corp',
        'ra', 'mp', 'esq', 't5', 'kbe', 'cbe', 'obe', 'mbe', 
        'mph', 'kph', 'm', 'km', 'hh', 'kph', 'mph', 'kg', 
        'lb', 'oz', 'in', 'ft', 'yd', 'am', 'pm', 'cm', 
        'fps', 'mm', 'ns', 'ms'
    }
    initials_pattern = re.compile(r'^([A-Z]\.\s?)+$')

    # Process each entity for capitalization.
    # Iterate in reverse order to avoid index shifting issues.
    # These overwrite the sentence case applied earlier.
    for entity in sorted_entities:
        start = entity['start']
        end = entity['end']

        # If the entity is followed by 's or ', extend the end boundary 
        # to ensure the possessive suffix matches the entity's casing.
        if end < len(text_chars) and text_chars[end] == "'":
            if end + 1 < len(text_chars) and text_chars[end+1] == "s":
                end += 2  # Expand span to include 's
            else:
                end += 1
    
        # --- Index Boundary Safety Check ---
        if start < 0 or end > len(text_chars) or start >= end:
            log.warning(f"Skipping entity '{entity.get('word')}' due to out-of-bounds indices: [{start}:{end}]")
            continue

        entity_group = entity['entity_group']
        original_word = entity['word'] 

        if any(idx in modified_indices for idx in range(start, end)):
            continue

        current_span_list = text_chars[start:end]
        current_span_lower = "".join(current_span_list) 

        # --- Capitalization Rules ---
        capitalized_span = ""
        check_word = current_span_lower.rstrip('.,;:?!')
        
        if check_word in known_abbreviations:
            capitalized_span = current_span_lower.upper()
        elif initials_pattern.match(original_word):
             capitalized_span = current_span_lower.upper()
        elif entity_group in ('ORG', 'LOC') and len(original_word) > 0:
             caps_count = sum(1 for char in original_word if char.isupper())
             if caps_count / len(original_word) >= 0.4: 
                capitalized_span = current_span_lower.upper()
        
        # Default fallback
        if not capitalized_span:
            capitalized_span = current_span_lower.title()

        # --- Force Length Alignment ---
        # This prevents the 'Length mismatch' warning by ensuring the replacement
        # fits exactly into the original character slice.
        expected_len = end - start
        if len(capitalized_span) != expected_len:
            if len(capitalized_span) > expected_len:
                capitalized_span = capitalized_span[:expected_len]
            else:
                capitalized_span = capitalized_span.ljust(expected_len)

        text_chars[start:end] = list(capitalized_span)
        modified_indices.update(range(start, end))

    return "".join(text_chars)

In [12]:
def is_garbage(word, alpha_threshold=0.6):
    """
    Identifies tokens likely to be OCR noise.
    Returns True if the token should be ignored by the spellchecker.
    """
    # Do not filter short tokens or dates
    if len(word) < 4 or word.isdigit():
        return False
        
    letters = sum(c.isalpha() for c in word)
    alpha_ratio = letters / len(word)
    
    # High ratio of symbols/numbers usually indicates a failed OCR read
    if alpha_ratio < alpha_threshold:
        return True
        
    # Check for vowel presence in alphabetic tokens longer than 3 chars
    if letters > 3:
        has_vowel = any(c in 'aåäeëêiïîoöôuüûyÿŷ' for c in word.lower())
        if not has_vowel:
            return True
            
    return False

In [13]:
def get_staircase_edit_distance(word_len):
    if word_len < 4:
        return 1  
    elif word_len < 6:
        return 2
    elif word_len < 10:
        return 3
    elif word_len < 15:
        return 4
    else:
        return 6 # Only long words get a budget of 6

In [14]:
# Define the most common OCR digit-to-letter confusion map
# We keep this conservative to limit the number of combinations generated.
OCR_DIGIT_MAP = {
    # 0 (Zero): Visually similar to O, D, and Q.
    '0': ['o', 'd', 'q'], 

    # 1 (One): Visually similar to I, L, T, A, and sometimes J.
    '1': ['i', 'l', 't', 'a', 'j'],

    # 2 (Two): Visually similar to Z, R, E, and sometimes N.
    '2': ['z', 'r', 'e', 'n'],

    # 3 (Three): Very common substitute for E, A, I, and sometimes B/P (mirrored).
    '3': ['e', 'a', 'i', 'b', 'p'],

    # 4 (Four): Visually similar to A, H, T, and sometimes Y (inverted).
    '4': ['a', 'h', 't', 'y'],

    # 5 (Five): Visually similar to S and sometimes I.
    '5': ['s', 'i'],

    # 6 (Six): Very common substitute for G, B, and sometimes E (inverted 9).
    '6': ['g', 'b', 'e'], 

    # 7 (Seven): Most commonly substituted for T, L, or I.
    '7': ['t', 'l', 'i'],

    # 8 (Eight): Visually similar to B, O, and sometimes G/S.
    '8': ['b', 'o', 'g', 's'],

    # 9 (Nine): Most often substituted for F or G.
    '9': ['f', 'g']
}

DIGIT_CHECK_REGEX = re.compile(r'\d')


def generate_candidates(word):
    """
    Generates all possible word candidates by replacing digits with mapped letters.
    """
    # Prepare the word structure: list of possible characters for each position
    word_parts = []
    for char in word:
        # If the character is a mapped digit, use its substitutes; otherwise, use the character itself.
        word_parts.append(OCR_DIGIT_MAP.get(char, [char]))
            
    # Use itertools.product to generate all possible combinations efficiently
    # The list comprehension at the end joins the characters back into strings.
    candidates = ["".join(p) for p in product(*word_parts)]
    return candidates


def find_best_digit_correction(word, symspell_instance):
    """
    Generates candidates and finds the one with an exact match (distance 0)
    and the highest frequency in the SymSpell dictionary.
    """
    candidates = generate_candidates(word.lower())
    best_suggestion = None
    best_count = -1
    
    # Allow some flexibility with a max_edit_distance=2 since a lot of the words are misspelled.
    for candidate in candidates:
        suggestions = symspell_instance.lookup(
            candidate, 
            Verbosity.CLOSEST, # Use CLOSEST to check frequency if distance is 1
            max_edit_distance=2, 
            include_unknown=False # Must exist in the dictionary
        )
        
        if suggestions:
            suggestion = suggestions[0]
            
            # Prioritize the suggestion with the highest frequency count
            if suggestion.count > best_count:
                best_count = suggestion.count
                best_suggestion = suggestion.term
                
    return best_suggestion

In [15]:
def preserve_case(original_word, new_word):
    """Matches the case of the new word to the original word."""
    if original_word.isupper():
        return new_word.upper()
    if original_word.istitle():
        return new_word.title()
    if original_word[0].isupper() and (len(original_word) == 1 or original_word[1:].islower()):
        return new_word.title()
    return new_word.lower()

In [16]:
def correct_spelling_safe(text, ner_entities, ner_symspell=None, symspell=None,
                          master_word_set=None, 
                          segmentation_ratio=6):
    """
    Corrects misspellings using a staircase edit-distance model and NER shielding.
    Maintains digit-to-letter checks and possessive logic.
    """
    if ner_symspell is None or symspell is None or master_word_set is None or not text:
        return text

    entity_indices = set()
    for ent in ner_entities:
        entity_indices.update(range(ent['start'], ent['end']))

    corrected_parts = []
    last_index = 0
    
    for match in WORD_REGEX.finditer(text):
        word = match.group(0)
        word_lower = word.lower()
        match_start = match.start()
        match_end = match.end()

        # --- ALPHA DENSITY CHECK ---
        if is_garbage(word):
            corrected_parts.append(text[last_index:match_start])
            corrected_parts.append(word)
            last_index = match_end
            continue
        
        corrected_word = word 
        is_known = False
        is_possessive = False
        
        # --- 1. Possessive Check ---
        if word_lower.endswith("'s"):
            root_word = word_lower[:-2]
            is_possessive = True
        elif word_lower.endswith("'"):
            root_word = word_lower[:-1]
            is_possessive = True
        
        if is_possessive:
            if root_word in master_word_set:
                is_known = True
        else:
            if word_lower in master_word_set:
                is_known = True

        # --- 2. Staircase Budget Assignment ---
        # Calculate the budget once per word based on its length.
        staircase_budget = get_staircase_edit_distance(len(word))

        if not is_known:
            # --- 3. High-Priority Digit-to-Letter Correction ---
            if DIGIT_CHECK_REGEX.search(word):
                digit_corrected = find_best_digit_correction(word, symspell)
                if digit_corrected:
                    corrected_word = preserve_case(word, digit_corrected)
                    # Update word_lower for subsequent steps if digit-corrected
                    word_lower = corrected_word.lower()

            # --- 4. NER Shielded Correction ---
            # If word is still uncorrected and is shielded by NER
            is_shielded = any(idx in entity_indices for idx in range(match_start, match_end))
            
            if corrected_word == word and is_shielded:
                # Use a very strict budget for names (max 1 edit) to prevent
                # changing one person's name into another's.
                suggestions = ner_symspell.lookup(word, Verbosity.CLOSEST, max_edit_distance=1)
                if suggestions:
                    corrected_word = preserve_case(word, suggestions[0].term)

            # --- 5. General SymSpell Correction (Unshielded) ---
            if corrected_word == word and not is_shielded and staircase_budget > 0:
                # Try standard lookup first using the staircase budget
                suggestions = symspell.lookup(
                    word, 
                    Verbosity.CLOSEST, 
                    max_edit_distance=staircase_budget,
                    include_unknown=False
                )
                
                if suggestions:
                    corrected_word = preserve_case(word, suggestions[0].term)
                
                # --- 6. Word Segmentation (Fallback for long tokens) ---
                elif len(word) > 10: 
                    # A length-based ratio is used but capped at the staircase budget
                    dynamic_max_edits = min(staircase_budget, (len(word) // segmentation_ratio))
                    if dynamic_max_edits < 1: dynamic_max_edits = 1

                    try:
                        segment_suggestion = symspell.word_segmentation(
                            word,
                            max_edit_distance=dynamic_max_edits
                        )
                        if segment_suggestion.corrected_string and \
                           segment_suggestion.corrected_string.lower() != word_lower:
                            corrected_word = preserve_case(word, segment_suggestion.corrected_string)
                    except Exception as e:
                        log.warning(f"Segmentation failed for '{word}': {e}")


        # --- 7. Reassemble ---
        corrected_parts.append(text[last_index:match_start])
        corrected_parts.append(corrected_word)
        last_index = match_end

    corrected_parts.append(text[last_index:])
    return "".join(corrected_parts)

In [17]:
#NOTE! This function is not currently a part of the pipeline_driver
# if it is used some experimentation is needed to see how much
# freedom the symspell instance should have. It easily over-corrects.
# it can be integrated into the 'capitalization_and_correction_batched'
# as a last step:
# final_segmented_text = fix_targeted_segmentation(
#    spell_checked_text_v2, 
#    merged_entities, 
#    symspell
# )
#
#final_texts.append(final_segmented_text)
#
#

def fix_targeted_segmentation(text, ner_entities, symspell, max_edit_distance=4):
    """
    Finds and corrects *only* specific, targeted segmentation errors
    (like "wa ter") using regex and SymSpellPy, AND SKIPPING matches
    that overlap with NER entities.
    
    Args:
        text (str): The text to correct (output of 1-to-1 correction).
        ner_entities (list): The list of merged/filtered NER entities for this text.
        symspell (SymSpell): The pre-configured SymSpellPy instance.
    
    Returns:
        str: The text with targeted segmentation corrections.
    """
    if symspell is None or not text:
        return text

    # --- Step 1: Create the "shield" set of all indices ---
    entity_indices = set()
    for ent in ner_entities:
        entity_indices.update(range(ent['start'], ent['end']))

    # Define the patterns (excluding the risky p3)
    p1 = r"\b([a-zA-Z']{1,2})\s([a-zA-Z']{3,})\b"
    p2 = r"\b([a-zA-Z']{3,})\s([a-zA-Z']{1,2})\b"
    candidate_pattern = re.compile(f"({p1})|({p2})", flags=re.IGNORECASE)
    
    matches = list(candidate_pattern.finditer(text))
    corrected_text = text
    
    for match in reversed(matches):
        original_phrase = match.group(0)
        start_index = match.start()
        end_index = match.end()

        # --- Step 2: The Shield Check ---
        # Check if *any* part of this match overlaps with the NER shield
        is_shielded = False
        for i in range(start_index, end_index):
            if i in entity_indices:
                is_shielded = True
                break
        
        if is_shielded:
            continue # This match is part of a NER entity, do not correct.

        # --- Step 3: The Decider (SymSpellPy) ---
        # (This part is only reached if the phrase is NOT shielded)
        try:
            suggestions = symspell.lookup_compound(
                original_phrase,
                max_edit_distance=max_edit_distance,
                transfer_casing=True
            )
            
            if suggestions:
                best_suggestion = suggestions[0].term
                if best_suggestion != original_phrase and " " not in best_suggestion:
                    corrected_text = (
                        corrected_text[:start_index] + 
                        best_suggestion + 
                        corrected_text[end_index:]
                    )
        except Exception as e:
            log.error(f"Error during lookup_compound on phrase '{original_phrase}': {e}")
            # Continue without correcting this phrase

    return corrected_text

In [18]:
def capitalization_and_correction_batched(batch, text_column="Regex_cleaned_text", ner_symspell=None, symspell=None, master_word_set=ENTITY_WORDLIST, run=""):
    """
    Stage 2: NER, Entity Merging, and Capitalization.
    """
    if ner_pipeline is None:
        log.error("NER pipeline not loaded.")
        return {f"Corrected_{text_column}": batch[text_column]}

    input_texts = batch[text_column]
    final_texts = []
    all_merged_entities = []

    # Run NER
    try:
        all_ner_raw_results = ner_pipeline(input_texts)
    except Exception as e:
        log.error(f"NER failed: {e}")
        return {f"Corrected_{text_column}": input_texts}

    for i, text in enumerate(input_texts):
        if not text or text.isspace():
             final_texts.append(text)
             all_merged_entities.append([])
             continue

        ner_results = all_ner_raw_results[i]

        # 1. Merge Entities
        merged_entities = merge_entities_generic(ner_results, text)
        
        # 2. Apply Capitalization
        capitalized_text = apply_capitalization(text, merged_entities)
        
        # 3. Spell check
        spell_checked_text_v1 = correct_spelling_safe(capitalized_text,
                                                   merged_entities,
                                                   ner_symspell,
                                                   symspell,
                                                   master_word_set)
        
        # 4. Spell check again to enable correction on previously merged words
        spell_checked_text_v2 = correct_spelling_safe(spell_checked_text_v1,
                                                   merged_entities,
                                                   ner_symspell,
                                                   symspell,
                                                   master_word_set)
        
        final_texts.append(spell_checked_text_v2)
        all_merged_entities.append(merged_entities)

    return {f"Symspell_corrected_text_v{run}": final_texts}


In [19]:
def standardize_honorifics(text, style='dot'):
    """
    Standardizes common honorifics (Mr, Mrs, Sir, etc.) to a consistent
    capitalization and punctuation style, running as a final polish.
    
    This function should be applied *after* all NER and spelling corrections.
    
    Args:
        text (str): The text to process.
        style (str): 
            'dot'   -> Enforces a dot: 'Mr.', 'Mrs.', 'Dr.', 'Esq.'
            'no_dot' -> Enforces no dot: 'Mr', 'Mrs', 'Dr', 'Esq' 
                         (Closer to modern British English style)
    
    Returns:
        str: The text with standardized honorifics.
    """
    if not text:
        return text

    # --- Define Replacements ---
    
    # These titles are always capitalized but never take a dot.
    # These are run first.
    no_dot_titles = {
        r'\bsir\b': 'Sir',
        r'\bdame\b': 'Dame',
        r'\blord\b': 'Lord',
        r'\blady\b': 'Lady',
        r'\bmiss\b': 'Miss', # Miss is a full word, not an abbreviation
    }

    # These are abbreviations, and their punctuation depends on the style.
    # The regex \.? matches if a dot is present or not, standardizing both.
    if style == 'dot':
        style_dependent_titles = {
            r'\bmr\.?\b': 'Mr.',
            r'\bmrs\.?\b': 'Mrs.',
            r'\bms\.?\b': 'Ms.',
            r'\bdr\.?\b': 'Dr.',
            r'\brev\.?\b': 'Rev.',
            r'\bprof\.?\b': 'Prof.',
            r'\bcapt\.?\b': 'Capt.',
            r'\bcol\.?\b': 'Col.',
            r'\bgen\.?\b': 'Gen.',
            r'\besq\.?\b': 'Esq.',
            r'\bwm\.?\b': 'Wm.', # For William
        }
    else: # style == 'no_dot' (Modern British)
        style_dependent_titles = {
            r'\bmr\.?\b': 'Mr',
            r'\bmrs\.?\b': 'Mrs',
            r'\bms\.?\b': 'Ms',
            r'\bdr\.?\b': 'Dr',
            r'\brev\.?\b': 'Rev',
            r'\bprof\.?\b': 'Prof',
            r'\bcapt\.?\b': 'Capt',
            r'\bcol\.?\b': 'Col',
            r'\bgen\.?\b': 'Gen',
            r'\besq\.?\b': 'Esq', # For consistency, the dot is removed
            r'\bwm\.?\b': 'Wm', 
        }

    # Apply the replacements, starting with the non-dotted ones.
    # re.IGNORECASE is used to catch all variants (e.g., 'mr', 'Mr', 'MR').
    
    for pattern, replacement in no_dot_titles.items():
        text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
        
    for pattern, replacement in style_dependent_titles.items():
        text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
            
    return text

In [20]:
def reapply_punctuation_spacing(text):
    """
    Standardises spacing around punctuation, preserves acronyms, 
    and heals split initials.
    """
    if not text:
        return text

    # 1. Heal Split Initials: "B. B. C." -> "B.B.C."
    # Matches a single letter + dot + space, followed by another single letter + dot
    text = re.sub(r'\b([a-zA-Z])\.\s+(?=([a-zA-Z]\.))', r'\1.', text)

    # 2. Standardise Possessives: "word ' s" -> "word's"
    text = re.sub(r'(\w)\s*\'\s*s\b', r"\1's", text, flags=re.IGNORECASE)

    # 3. Standardise Plural Possessives: "artists ' " -> "artists' "
    # Ensures a space follows to avoid joining the next word
    text = re.sub(r"(\w)\s*'\s*(?!s\b)", r"\1' ", text, flags=re.IGNORECASE)

    # 4. Standardise space BEFORE punctuation (Remove it)
    text = re.sub(r'\s+([,\.;:?!])', r'\1', text)

    # 5. Standardise space AFTER punctuation (Ensure it)
    # 5a. Commas, colons, semicolons, and exclamation/question marks
    text = re.sub(r'([,;:?!])([a-zA-Z0-9])', r'\1 \2', text)
    
    # 5b. Initial-aware dots: Adds space only if NOT an initial
    # If the dot is preceded by a single letter (an initial), do not add a space.
    # If the dot is preceded by a full word, add a space.
    text = re.sub(r'(?<!\b[a-zA-Z])\.([a-zA-Z0-9])', r'. \1', text)

    # 6. Clean up spaces around brackets
    text = re.sub(r'\s*\(\s*', ' (', text)
    text = re.sub(r'\s*\)\s*', ') ', text)

    # 7. Final cleanup of multiple spaces
    text = re.sub(r'\s+', ' ', text)

    # 8. Fix spaces in common currencies
    # 8a. Symbol + Number: "$ 100", "£ 40. 2", "€ 5"
    text = re.sub(
        r'([$£€])\s*(\d+(?:,\d+)*)(?:\s*[\.]\s*(\d+))?',
        lambda m: f"{m.group(1)}{m.group(2)}{'.' + m.group(3) if m.group(3) else ''}",
        text
    )

    # 8b. Number + Symbol: "100 $", "2,500 €", "5 £"
    text = re.sub(
        r'(\d+(?:,\d+)*)(?:\s*[\.]\s*(\d+))?\s*([$£€])',
        lambda m: f"{m.group(1)}{'.' + m.group(2) if m.group(2) else ''}{m.group(3)}",
        text
    )

    # Fix spaces in times
    # 9a. Heal fragmented times: "10 . 30" or "10 : 30" -> "10:30" / "10.30"
    # This handles the colon found in modern reviews and the dot common in older ones.
    text = re.sub(r'(\b\d{1,2})\s*([.:])\s*(\d{2})\b', r'\1\2\3', text)

    # 9b. Standardise a.m. and p.m.
    # Matches digits + optional space + a/p + optional dots + m + optional dots.
    # Output is standardised to "a.m." or "p.m." for dictionary consistency.
    text = re.sub(r'(\d+)\s*([ap])\.?\s*m\.?\b', r'\1 \2.m.', text, flags=re.IGNORECASE)

    # 10. Standardise Units (m, kg, mph, etc.)
    # Ensures a single space between digits and units to prevent OOV "word+unit" tokens.
    units = r'\b(m|km|hh|kph|mph|kg|lb|oz|in|ft|yd|cm|mm|fps)\b'
    text = re.sub(rf'(\d+)\s*({units})', r'\1 \2', text, flags=re.IGNORECASE)
    
    return text.strip()

In [21]:
def final_polishing_batched(batch, text_column="MLM_Corrected_text", style='dot'):
    """
    Wrapper function to apply honorific standardization to a
    dataset batch.
    """
    input_texts = batch[text_column]
    
    polished_texts = [
        standardize_honorifics(text, style=style) for text in input_texts
        ]
    
    polished_texts = [
        reapply_punctuation_spacing(text) for text in polished_texts
        ]

    # Return in a new column to preserve the previous step
    return {f"Final_Polished_Text": polished_texts}

In [22]:
def display_record(record):
    """
    Displays a Hugging Face dataset record with metadata followed by two
    text fields (Full_text, Final_Polished_Text) in side-by-side
    columns in the terminal.
    
    This function works with both dictionaries and pandas Series
    (e.g., a row from a DataFrame like df.loc[0]).
    """
    
    # --- 1. Display Metadata ---
    print("-" * 80)
    print("METADATA")
    print("-" * 80)
    
    # Define metadata keys to extract and display
    metadata_keys = [
        'Author', 'Title', 'Publication', 'Date', 'Place', 'URL'
    ]
    
    for key in metadata_keys:
        # Use .get() to safely access keys, providing 'N/A' if key is missing
        value = record.get(key, 'N/A')
        print(f"{key+':':<15} {value}")
        
    print("\n" + "=" * 80)
    print("TEXT COMPARISON")
    print("=" * 80)

    # --- 2. Prepare Columnar Text ---
    
    # Get text fields
    full_text = record.get('Full_text', '')
    capitalized_text = record.get('Final_Polished_Text', '')

    # Get terminal width to make columns responsive
    try:
        terminal_width = shutil.get_terminal_size().columns
    except OSError:
        # Fallback if not in a real terminal (e.g., CI/CD)
        terminal_width = 120

    # Define spacing: 3 columns + 2 separators (" | ")
    # We give a little extra buffer for the separators
    padding = 4 
    col_width = (terminal_width - padding) // 2
    
    if col_width < 10:
        print("Terminal is too narrow to display columns effectively.")
        print("\nFull_text:\n", full_text)
        print("\Final_Polished_Text:\n", capitalized_text)
        return

    # Wrap text for each column
    wrapped_full = textwrap.wrap(full_text, width=col_width)
    wrapped_capitalized = textwrap.wrap(capitalized_text, width=col_width)

    # Find the maximum number of lines needed
    max_lines = max(len(wrapped_full), len(wrapped_capitalized))

    # --- 3. Print Headers and Rows ---
    
    # Create header strings, left-aligned and truncated if necessary
    header_full = "Full_text".ljust(col_width)
    header_capitalized = "Final_Polished_Text".ljust(col_width)

    print(f"{header_full} | {header_capitalized}")
    print(f"{'-' * col_width} | {'-' * col_width}")

    # Print each row
    for i in range(max_lines):
        # Get the line for each column, or an empty string if text is shorter
        line_full = wrapped_full[i] if i < len(wrapped_full) else ""
        line_capitalized = wrapped_capitalized[i] if i < len(wrapped_capitalized) else ""
        
        # Print the formatted row, ensuring each part adheres to the column width
        print(f"{line_full.ljust(col_width)} | {line_capitalized.ljust(col_width)}")

    print("=" * 80)

In [23]:
def pipeline_driver():
    """
    Main driver function to run the full pre-processing pipeline.

    - Finds all .csv files in a specified input directory.
    - Sequentially applies the four main processing stages:
      1. Initial text cleaning (`clean_text_piece_batched`).
      2. NER-driven correction and capitalization (`capitalization_and_correction_batched`).
      3. Honorific standardization (`final_polishing_batched`).
      4. Final punctuation spacing cleanup (`final_spacing_batched`).
    - Saves the fully processed data to new .csv files in an output directory.
    - Returns the DataFrame from the last processed file for inspection.
    """
    # --- Configuration ---
    # Define the folder where your raw .csv files are located
    input_folder = Path(INPUT_PATH)  
    # Define the folder where processed files will be saved
    output_folder = Path(OUTPUT_PATH)
    
    # --- Setup ---
    # Create the output directory if it doesn't exist
    output_folder.mkdir(parents=True, exist_ok=True)
    log.info(f"Input folder: '{input_folder}'")
    log.info(f"Output folder: '{output_folder}'")

    # Setup SymSpellPy with the master word set
    general_symspell_checker, master_words_added, art_words_added = setup_spellchecker(master_words=True, artist_set=True)
    # Create a separate SymSpell instance for NER-shielded corrections
    ner_symspell_checker, _, _ = setup_spellchecker(master_words=False, artist_set=True)
    all_words = master_words_added | art_words_added

    # Find all CSV files in the input directory
    csv_files = list(input_folder.glob('*.csv'))
    if not csv_files:
        log.warning(f"No CSV files found in '{input_folder}'. Exiting.")
        return

    log.info(f"Found {len(csv_files)} CSV file(s) to process: {[f.name for f in csv_files]}")

    # --- Processing Loop ---
    for file_path in csv_files:
        log.info(f"\n{'='*20} Processing file: {file_path.name} {'='*20}")
        
        # 1. Load the data using the helper function
        loaded_data = load_csv_file(str(file_path))
        if not loaded_data:
            log.error(f"Failed to load {file_path.name}. Skipping.")
            continue
        
        raw_dataset = loaded_data['data']
        log.info(f"Loaded {loaded_data['length']} rows.")

        # --- 2. RUN THE FULL PIPELINE ---
        log.info("Starting text cleaning pipeline...")

        log.info("Step 1/3: Initial regex cleaning and pattern correction...")
        regex_cleaned_dataset = raw_dataset['train'].map(
            clean_text_piece_batched, 
            batched=True,
            fn_kwargs={'text_column': 'Full_text'}
        )

        log.info("Step 2/3: NER, capitalisation and Symspell corrections...")
        symspell_corrected_dataset_v1 = regex_cleaned_dataset.map(
            capitalization_and_correction_batched, 
            batched=True,
            batch_size=8,
            fn_kwargs={'text_column': 'Regex_cleaned_text', 
                       'ner_symspell': ner_symspell_checker,
                       'symspell': general_symspell_checker, 
                       'master_word_set': all_words,
                       'run': 1}
        )

        log.info("Step 3/3: Final polishing...")
        final_dataset = symspell_corrected_dataset_v1.map(
            final_polishing_batched,
            batched=True,
            fn_kwargs={
                'text_column': 'Symspell_corrected_text_v1',
                'style': 'no_dot' 
            }
        )

        log.info("Pipeline complete. Final text is in 'Final_Polished_Text'.")

        # 4. Save the results to a new CSV file
        log.info("Saving processed data...")
        output_filename = f"{file_path.stem}_processed.csv"
        output_path = output_folder / output_filename
        

        # Convert the final dataset to a pandas DataFrame to save as CSV
        final_df = final_dataset.to_pandas()
        
        final_df.to_csv(output_path, index=False)
        log.info(f"Successfully saved processed data to '{output_path}'")

    log.info(f"\n{'='*20} Pipeline finished. {'='*20}")

    return all_words, output_folder

In [24]:
def evaluate_oov_corrections_batched(batch, text_column, corrected_text_column, master_word_set):    
    
    oov_scores = list()

    for original_text, corrected_text in zip(batch[text_column], batch[corrected_text_column]):
        if not original_text or not corrected_text:
            continue

        original_words = WORD_REGEX.findall(original_text.lower())
        corrected_words = WORD_REGEX.findall(corrected_text.lower())

        original_oov = sum(1 for word in original_words if word not in master_word_set)
        corrected_oov = sum(1 for word in corrected_words if word not in master_word_set)

        # Calculate OOV reduction percentage
        # Deal with division by zero if there were no OOVs originally
        if original_oov == 0:
            oov_scores.append(0)
        else:
            oov_difference_percentage = (original_oov - corrected_oov) / original_oov 
            oov_scores.append(oov_difference_percentage)

    return {'OOV_Correction_Improvement_Percentage': oov_scores}
    

In [25]:
def evaluate_pipeline_oov(wordlist, output_folder):
    """
    Evaluates the pipeline's effectiveness on out-of-vocabulary (OOV) words
    by comparing pre- and post-correction texts against a master word list.
    Outputs statistics on corrections made to OOV words.
    """

    # Determine number of CPU cores to use (leaving 2 free)
    num_cores = max(1, os.cpu_count() - 2) 

    # Define the folder where processed files will be saved
    # The input path is the same as output path from pipeline_driver
    # The output path will save the evaluation results in a separate folder
    input_folder = Path(output_folder)
    output_folder = Path(input_folder) / "evaluation_results"
    
    # --- Setup ---
    # Create the output directory if it doesn't exist
    output_folder.mkdir(parents=True, exist_ok=True)
    log.info(f"Input folder: '{input_folder}'")
    log.info(f"Output folder: '{output_folder}'")

    # If no wordlist provided, setup SymSpellPy to get the master word set
    if not wordlist:
        # Setup SymSpellPy with the master word set
        # Same setup as in pipeline_driver
        _, master_words_added, art_words_added = setup_spellchecker(master_words=True, artist_set=True)
        wordlist = master_words_added | art_words_added

    # This list will hold smaller dataframes for final reduction
    all_metrics_storage = []
    negative_metric_storage = []
    periods_min_texts = dict()
    periods_max_texts = dict()

    # Find all CSV files in the input directory
    csv_files = list(input_folder.glob('*.csv'))
    if not csv_files:
        log.warning(f"No CSV files found in '{input_folder}'. Exiting.")
        return

    log.info(f"Found {len(csv_files)} CSV file(s) to process: {[f.name for f in csv_files]}")

    # --- Processing Loop ---
    for file_path in csv_files:
        log.info(f"\n{'='*20} Processing file: {file_path.name} {'='*20}")
        
        # 1. Load the data using the helper function
        loaded_data = load_csv_file(str(file_path))
        if not loaded_data:
            log.error(f"Failed to load {file_path.name}. Skipping.")
            continue
        
        raw_dataset = loaded_data['data']
        log.info(f"Loaded {loaded_data['length']} rows.")

        log.info("Evaluating OOV correction effectiveness...")
        oov_dataset = raw_dataset['train'].map(
            evaluate_oov_corrections_batched,
            batched=True,
            num_proc=num_cores,
            batch_size=500,
            fn_kwargs={
                'text_column': 'Full_text',
                'corrected_text_column': 'Final_Polished_Text',
                'master_word_set': wordlist
            }
        )
        
        oov_df = oov_dataset.to_pandas()


        metrics_df = oov_df[['Date', 'Publication', 'OOV_Correction_Improvement_Percentage']].copy()
        metrics_df.loc[metrics_df['Date'] == "The Times", 'Date'] = metrics_df['Publication']
        # Extraction logic for the year
        # Converts "2008-Nov-22" -> 2008
        metrics_df['Year'] = pd.to_datetime(metrics_df['Date'], errors='coerce').dt.year
        
        # If any rows failed datetime conversion, try to grab the first 4 chars (YYYY)
        mask = metrics_df['Year'].isna()
        metrics_df.loc[mask, 'Year'] = metrics_df.loc[mask, 'Date'].str.extract(r'(\d{4})')[0].astype(float)

        metrics_df['Period'] = (metrics_df['Year'] // 50) * 50

        metrics_df.drop(columns=['Publication', 'Date', 'Year'], inplace=True)
        all_metrics_storage.append(metrics_df)

        # Isolate all rows with a negative improvement score
        negative_impact_df = oov_df[oov_df['OOV_Correction_Improvement_Percentage'] < 0].copy()
        negative_metric_storage.append(negative_impact_df)


        # Determine min/max oov for each period
        for period in metrics_df['Period'].unique():
            period_data = metrics_df[metrics_df['Period'] == period]
            min_idx = period_data['OOV_Correction_Improvement_Percentage'].idxmin()
            max_idx = period_data['OOV_Correction_Improvement_Percentage'].idxmax()
            min_oov = period_data['OOV_Correction_Improvement_Percentage'].min()
            max_oov = period_data['OOV_Correction_Improvement_Percentage'].max()

            # Check if period already exists in the dictionary
            if period in periods_min_texts:
                min_score, original_text, cleaned_text = periods_min_texts[period]
                if min_oov < min_score:
                    periods_min_texts[period] = (min_oov, oov_df.loc[min_idx, 'Full_text'], oov_df.loc[min_idx, 'Final_Polished_Text'])
            else:
                periods_min_texts[period] = (min_oov, oov_df.loc[min_idx, 'Full_text'], oov_df.loc[min_idx, 'Final_Polished_Text'])

            if period in periods_max_texts:
                max_score, original_text, cleaned_text = periods_max_texts[period]
                if max_oov > max_score:
                    periods_max_texts[period] = (max_oov, oov_df.loc[max_idx, 'Full_text'], oov_df.loc[max_idx, 'Final_Polished_Text'])
            else:
                periods_max_texts[period] = (max_oov, oov_df.loc[max_idx, 'Full_text'], oov_df.loc[max_idx, 'Final_Polished_Text'])



    # Reduce all collected metrics into a single DataFrame
    negative_metric_storage_df = pd.concat(negative_metric_storage, ignore_index=True)
    negative_metric_storage_df.to_csv(output_folder / "negative_oov_impact_records.csv", index=False)

    log.info("Aggregating results across all files...")
    master_df = pd.concat(all_metrics_storage, ignore_index=True)

    # Groupby acts as the 'reduce' function
    summary_stats = master_df.groupby('Period')['OOV_Correction_Improvement_Percentage'].agg(
        ['count', 'mean', 'median', 'std']
    ).reset_index()

    log.info("Final summary statistics computed.")
    # Save and Print Summary
    summary_stats.to_csv(output_folder / "global_oov_summary.csv", index=False)
    log.info("Saved global summary statistics to 'global_oov_summary.csv'.")

    rows = []

    for period in sorted(periods_min_texts.keys()):
        min_score, original_text_min, cleaned_text_min = periods_min_texts[period]
        max_score, original_text_max, cleaned_text_max = periods_max_texts[period]

        rows.append({
            'Period': period,
            'Min_OOV_Improvement': min_score,
            'Original_Text_Min': original_text_min,
            'Cleaned_Text_Min': cleaned_text_min,
            'Max_OOV_Improvement': max_score,
            'Original_Text_Max': original_text_max,
            'Cleaned_Text_Max': cleaned_text_max,
        })

        print(f"\n--- Period: {period} ---")
        print(f"Min OOV Improvement: {min_score:.4f}")
        
        display_record({
            'Full_text': original_text_min,
            'Final_Polished_Text': cleaned_text_min,
        })

        print(f"\nMax OOV Improvement: {max_score:.4f}")
        display_record({
            'Full_text': original_text_max,
            'Final_Polished_Text': cleaned_text_max,
        })

    period_df = pd.DataFrame(rows)

    # 4. Save to CSV
    period_df.to_csv(output_folder / "extreme_oov_cases_per_period.csv", index=False)

    return summary_stats, output_folder

In [26]:
def plot_oov_performance(summary_df, output_folder):
    """
    Creates two plots: 
    1. A line plot showing Mean/Median OOV improvement with standard deviation.
    2. A bar chart showing the volume of data processed per period.
    """
    # Plot 1: Improvement Metrics
    plt.errorbar(
        summary_df['Period'], 
        summary_df['mean'], 
        yerr=summary_df['std'], 
        fmt='-o', 
        label='Mean Improvement (%)', 
        capsize=5, 
        color='#1f77b4'
    )
    plt.plot(
        summary_df['Period'], 
        summary_df['median'], 
        '-s', 
        label='Median Improvement (%)', 
        alpha=0.7, 
        color='#ff7f0e'
    )
    
    plt.title('OOV Correction Improvement by Historical Period')
    plt.xlabel('Period (Start Year)')
    plt.ylabel('Improvement Percentage (%)')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.tight_layout()
    img_dir = Path(output_folder) / 'images'
    img_dir.mkdir(parents=True, exist_ok=True)
    plt.savefig(img_dir / 'oov_improvement_trends.png')
    plt.close()

    # Plot 2: Processing Volume
    plt.bar(
        summary_df['Period'], 
        summary_df['count'], 
        width=40, 
        color='#aec7e8', 
        edgecolor='#1f77b4', 
        alpha=0.8
    )
    plt.title('Review Volume per 50-Year Period')
    plt.xlabel('Period (Start Year)')
    plt.ylabel('Number of Reviews')
    plt.xticks(summary_df['Period'])
    plt.grid(axis='y', linestyle='--', alpha=0.6)
    plt.tight_layout()
    plt.savefig(img_dir / 'data_volume_per_period.png')
    plt.close()



In [27]:
def plot_merged_performance(summary_df, output_folder):
    """
    Creates a single dual-axis plot showing both OOV improvement trends
    and data volume per period.
    """
    fig, ax1 = plt.subplots(figsize=(12, 7))

    # --- Plot 1 (Primary Axis - Left): Improvement Percentage ---
    # Plot error bars for Mean OOV Improvement
    line1 = ax1.errorbar(
        summary_df['Period'],
        summary_df['mean'],
        yerr=summary_df['std'],
        fmt='-o',
        label='Mean OOV Improvement (%)',
        capsize=5,
        color='#1f77b4',  # Standard blue
        zorder=2  # Ensure lines are on top of bars
    )
    
    ax1.set_xlabel('Period (Start Year)', fontsize=12)
    ax1.set_ylabel('Improvement Percentage (%)', fontsize=12, color='#1f77b4')
    ax1.tick_params(axis='y', labelcolor='#1f77b4')
    # Set x-ticks to match the periods for clarity
    ax1.set_xticks(summary_df['Period'])
    # Add a light grid based on the primary axis
    ax1.grid(True, linestyle='--', alpha=0.6)

    # --- Plot 2 (Secondary Axis - Right): Data Volume ---
    # Create a second y-axis that shares the same x-axis
    ax2 = ax1.twinx()

    # Plot bars for Document Count
    bar1 = ax2.bar(
        summary_df['Period'],
        summary_df['count'],
        width=35,  # Adjust width for visibility
        color='#aec7e8',  # Lighter blue
        edgecolor='#1f77b4',
        alpha=0.6,
        label='Document Count',
        zorder=1  # Ensure bars are behind lines
    )
    
    ax2.set_ylabel('Number of Documents', fontsize=12, color='navy')
    ax2.tick_params(axis='y', labelcolor='navy')

    # --- Formatting ---
    plt.title('Pipeline Effectiveness and Data Volume by Historical Period', fontsize=14)

    # Combine legends from both axes into a single legend box
    lines, labels = ax1.get_legend_handles_labels()
    bars, bar_labels = ax2.get_legend_handles_labels()
    ax1.legend(lines + bars, labels + bar_labels, loc='upper right')

    img_dir = Path(output_folder) / 'images'
    img_dir.mkdir(parents=True, exist_ok=True)
    plt.tight_layout()
    plt.savefig(img_dir / 'merged_oov_volume_plot.png', dpi=300)
    plt.close()

In [33]:
def export_to_latex(summary_df, output_folder, filename='oov_results.tex'):
    """
    Generates a professionally formatted LaTeX table.
    """
    # Create a copy and rename columns for the table header
    latex_df = summary_df.copy()
    latex_df.columns = ['Period', 'Count', 'Mean Imp. (%)', 'Median Imp. (%)', 'Std Dev']
    
    latex_str = latex_df.to_latex(
        index=False, 
        caption='Summary of OOV Correction Improvement across Historical Art Reviews',
        label='tab:oov_pipeline_summary',
        column_format='lcccc',
        float_format="%.2f"
    )
    
    with open(Path(output_folder) / filename, 'w') as f:
        f.write(latex_str)
    
    return latex_str

In [29]:
# --- Execute the main function ---
# This block ensures the code runs when the script is executed
wordlist, output_folder = pipeline_driver()

INFO:__main__:Input folder: '1-sample-test'
INFO:__main__:Output folder: 'Cleaned_data/1-sample-test'
INFO:__main__:Setting up SymSpellPy...
INFO:__main__:Added 45111 words from ENTITY wordlist to SymSpell.
INFO:__main__:Added 1337241 words from Project Gutenberg library to SymSpell.
INFO:__main__:SymSpell setup complete. Total unique terms: 1363716
INFO:__main__:Setting up SymSpellPy...
INFO:__main__:Added 45111 words from ENTITY wordlist to SymSpell.
INFO:__main__:Added 0 words from Project Gutenberg library to SymSpell.
INFO:__main__:SymSpell setup complete. Total unique terms: 45111
INFO:__main__:Found 1 CSV file(s) to process: ['test_1_sample copy.csv']
INFO:__main__:
INFO:__main__:Loaded 76 rows.
INFO:__main__:Starting text cleaning pipeline...
INFO:__main__:Step 1/3: Initial regex cleaning and pattern correction...
INFO:__main__:Step 2/3: NER, capitalisation and Symspell corrections...


Map:   0%|          | 0/76 [00:00<?, ? examples/s]

INFO:__main__:Step 3/3: Final polishing...


Map:   0%|          | 0/76 [00:00<?, ? examples/s]

INFO:__main__:Pipeline complete. Final text is in 'Final_Polished_Text'.
INFO:__main__:Saving processed data...
INFO:__main__:Successfully saved processed data to 'Cleaned_data/1-sample-test/test_1_sample copy_processed.csv'
INFO:__main__:


In [48]:
stats, output_folder = evaluate_pipeline_oov(wordlist, output_folder)

INFO:__main__:Input folder: 'Cleaned_data/Data'
INFO:__main__:Output folder: 'Cleaned_data/Data/evaluation_results'
INFO:__main__:Found 7 CSV file(s) to process: ['Operas_gale_processed.csv', 'Poetry_gale_processed.csv', 'Books_gale_processed.csv', 'Theater_gale_processed.csv', 'Art_exhibitions_gale_processed.csv', 'Concerts_gale_processed.csv', 'Dance_gale_processed.csv']
INFO:__main__:


Generating train split: 0 examples [00:00, ? examples/s]

INFO:__main__:Loaded 972 rows.
INFO:__main__:Evaluating OOV correction effectiveness...


Map (num_proc=4):   0%|          | 0/972 [00:00<?, ? examples/s]

INFO:__main__:


Generating train split: 0 examples [00:00, ? examples/s]

INFO:__main__:Loaded 629 rows.
INFO:__main__:Evaluating OOV correction effectiveness...


Map (num_proc=4):   0%|          | 0/629 [00:00<?, ? examples/s]

INFO:__main__:


Generating train split: 0 examples [00:00, ? examples/s]

INFO:__main__:Loaded 2283 rows.
INFO:__main__:Evaluating OOV correction effectiveness...


Map (num_proc=4):   0%|          | 0/2283 [00:00<?, ? examples/s]

INFO:__main__:


Generating train split: 0 examples [00:00, ? examples/s]

INFO:__main__:Loaded 18036 rows.
INFO:__main__:Evaluating OOV correction effectiveness...


Map (num_proc=4):   0%|          | 0/18036 [00:00<?, ? examples/s]

INFO:__main__:


Generating train split: 0 examples [00:00, ? examples/s]

INFO:__main__:Loaded 2651 rows.
INFO:__main__:Evaluating OOV correction effectiveness...


Map (num_proc=4):   0%|          | 0/2651 [00:00<?, ? examples/s]

INFO:__main__:


Generating train split: 0 examples [00:00, ? examples/s]

INFO:__main__:Loaded 21907 rows.
INFO:__main__:Evaluating OOV correction effectiveness...


Map (num_proc=4):   0%|          | 0/21907 [00:00<?, ? examples/s]

INFO:__main__:


Generating train split: 0 examples [00:00, ? examples/s]

INFO:__main__:Loaded 3766 rows.
INFO:__main__:Evaluating OOV correction effectiveness...


Map (num_proc=4):   0%|          | 0/3766 [00:00<?, ? examples/s]

INFO:__main__:Aggregating results across all files...
INFO:__main__:Final summary statistics computed.
INFO:__main__:Saved global summary statistics to 'global_oov_summary.csv'.



--- Period: 1750 ---
Min OOV Improvement: 0.0000
--------------------------------------------------------------------------------
METADATA
--------------------------------------------------------------------------------
Author:         N/A
Title:          N/A
Publication:    N/A
Date:           N/A
Place:          N/A
URL:            N/A

TEXT COMPARISON
Full_text                              | Final_Polished_Text                   
-------------------------------------- | --------------------------------------
The Beth Theatre closed on Saturday    | The Beth Theatre closed on saturday   
la#, after what is termed a profitable | la, after what is termed a profitable 
season. Friday evening Mrs. Martyr and | season. Friday evening Mrs. Martyr and
Mr, I)Arley commenced their regular    | Mr, I) Arley commenced their regular  
engagements at Vauxhall, their         | engagements at Vauxhall, their        
performance at the Theatre having      | performance at the Theatre having     
pr

In [49]:
plot_oov_performance(stats, output_folder)
plot_merged_performance(stats, output_folder)
latex_table = export_to_latex(stats, output_folder)
print("Generated LaTeX Table:\n", latex_table)

Generated LaTeX Table:
 \begin{table}
\caption{Summary of OOV Correction Improvement across Historical Art Reviews}
\label{tab:oov_pipeline_summary}
\begin{tabular}{lcccc}
\toprule
Period & Count & Mean Imp. (%) & Median Imp. (%) & Std Dev \\
\midrule
1750 & 422 & 0.81 & 0.84 & 0.15 \\
1800 & 2542 & 0.86 & 0.89 & 0.12 \\
1850 & 4260 & 0.81 & 0.84 & 0.12 \\
1900 & 19977 & 0.68 & 0.71 & 0.21 \\
1950 & 18050 & 0.53 & 0.54 & 0.21 \\
2000 & 4993 & 0.48 & 0.48 & 0.17 \\
\bottomrule
\end{tabular}
\end{table}

