In [1]:
import re
from pathlib import Path
import spacy
from scripts.docloader import DocLoader

In [2]:
def pseudonymise_content(doc):
    """
    Extracts and replaces names from a doc's content using SpaCy's Named Entity Recognition (NER)
    and regular expressions for headers and signatures.

    Args:
    - doc: A SpaCy document object.

    Returns:
        str: The pseudonymised content.
    """
    # List of words to ignore during name extraction
    ignore_list = [
        's', 'a', 'i', 'prison', 'distribution', 'and', 'now', 'known', 'as', 'formerly',
        'aka', 'the', 'secretary', 'of', 'state', '#', '&', ',', '-', ' '
    ]

    # Convert full document text and tokens to lowercase for case-insensitive matching
    letter_text = doc.text

    # Extract names using SpaCy NER, remove unnecessary punctuation, and filter out names in ignore_list
    ner_names_filtered = [
        (ent.text, (ent.start_char, ent.end_char))
        for ent in doc.ents
        if ent.label_ == 'PERSON' and ent.text not in ignore_list
        ]

    # Extract name from header using regular expressions
    header_match = re.search(r'name:(.*)\n', letter_text)
    header_tuple = None

    if header_match:
        match_text = header_match.group(1).strip()  # The matched text
        match_index = (header_match.start(1), header_match.end(1))  # Index of the match
        # Only include if the match_text is not in ignore_list
        if match_text not in ignore_list:
            header_tuple = (match_text, match_index)

    # Extract names from signatures using regular expressions
    sig_match_1 = re.search(r'parole board(?: member)?:(.*)\n', letter_text)
    sig_match_2 = re.search(r'parole board(?::)?(.*)(\s*)distribution', letter_text)
    sig_tuple = None

    if sig_match_1:
        match_text = sig_match_1.group(1).strip()  # The matched text
        match_index = (sig_match_1.start(1), sig_match_1.end(1))  # Index of the match
        # Only include if the match_text is not in ignore_list
        if match_text not in ignore_list:
            sig_tuple = (match_text, match_index)

    elif sig_match_2:
        match_text = sig_match_2.group(1).strip()  # The matched text
        match_index = (sig_match_2.start(1), sig_match_2.end(1))  # Index of the match
        # Only include if the match_text is not in ignore_list
        if match_text not in ignore_list:
            sig_tuple = (match_text, match_index)

    # Get the indices from header and signature matches
    header_indices = header_tuple[1] if header_tuple else None
    sig_indices = sig_tuple[1] if sig_tuple else None

    # Collect all indices from ner_names_filtered, filtering out overlaps
    filtered_indices = []

    for name, index in ner_names_filtered:
        # Check for overlap with header and signature indices
        overlap = False
        # Check against header indices if they exist
        if header_indices and (index[0] < header_indices[1] and index[1] > header_indices[0]):
            overlap = True
        # Check against signature indices if they exist
        if sig_indices and (index[0] < sig_indices[1] and index[1] > sig_indices[0]):
            overlap = True

        if not overlap:
            filtered_indices.append((name, index))

    # Create a list to hold all matches
    all_matches = []

    # Add header, signature, and filtered NER names to the list
    if header_tuple:
        all_matches.append((header_tuple[0], header_tuple[1])) 
    if sig_tuple:
        all_matches.append((sig_tuple[0], sig_tuple[1]))
    all_matches.extend(filtered_indices)

    # Sort all matches based on the start index
    all_matches_sorted = sorted(all_matches, key=lambda x: x[1][0])

    # Create a mapping for NAME_X replacements
    name_mapping = {}
    name_counter = 1

    # Create a list to hold the replacements
    replacements = []

    # Generate replacements and store their positions
    for match_text, (start_index, end_index) in filtered_indices:
        # Create a unique NAME_X token for this match
        if match_text not in name_mapping:
            name_mapping[match_text] = f'NAME_{name_counter}'
            name_counter += 1
        
        # Store the start and end index with the NAME_X token
        replacements.append((start_index, end_index, name_mapping[match_text]))

    # Sort replacements by start index
    replacements.sort(key=lambda x: x[0])

    # Create the final replaced text
    replaced_text = letter_text  # Start with the original text

    # Replace from the end to the start to avoid index shifting issues
    for start_index, end_index, name_x in reversed(replacements):
        replaced_text = replaced_text[:start_index] + name_x + replaced_text[end_index:]

    return replaced_text


In [3]:
def pseudonymise_letters(letter_paths, folder_path_pseudon, loader):
    """
    Pseudonymises the content per letter from a list of letter paths and saves the content in a pseudon folder.

    Args:
    - letter_paths (list): List of letter paths to be pseudonymised.
    - folder_path_pseudon (str): Path to the folder where pseudonymised letters will be saved.
    - loader (DocLoader): A DocLoader instance to load documents.
    """
    folder_pseudon = Path(folder_path_pseudon)

    for letter_path in letter_paths:
        letter_name = letter_path.name
        # Load the doc
        doc = loader.load_pseudon(letter_path)
        
        # Pseudonymise the doc
        letter_content_pseudon = pseudonymise_content(doc)
        
        # Save the pseudonymised content as a .txt file
        with open((folder_pseudon / letter_name).with_suffix('.txt'), 'w', encoding='utf-8') as letter_pseudon:
            letter_pseudon.write(letter_content_pseudon)

In [4]:
def run_pseudonymisation(loader, folder_path_pseudon):
    '''
    Runs the pseudonymisation process on letters that haven't been pseudonymised yet.

    Args:
    - loader (DocLoader): A DocLoader instance to load docs.
    - folder_path_pseudon (str): Path to the folder where pseudonymised letters will be stored.
    '''
    # Load all paths from the letter folder
    all_letter_paths = set(loader.all_letter_paths_pseudon())
    # Get all paths that have already been pseudonymised
    already_pseudonymised_letter_paths = set(Path(folder_path_pseudon).glob('*.txt'))
    
    # Only process new letters (those not already pseudonymised)
    letter_paths = all_letter_paths - already_pseudonymised_letter_paths
    
    # Perform the pseudonymisation process
    pseudonymise_letters(letter_paths, folder_path_pseudon, loader)

In [5]:
# Load the SpaCy model
nlp = spacy.load('en_core_web_trf')

In [6]:
# Define paths for letters, caches, and pseudonymised folders
folder_path_letters_mcadl = 'data/primary_data/letters/original_dls/mcadl/'
folder_path_letters_ohdl = 'data/primary_data/letters/original_dls/ohdl/'

folder_path_cache_mcadl = 'data/primary_data/letters/caches/mcadl/en_core_web_trf_pseudon'
folder_path_cache_ohdl = 'data/primary_data/letters/caches/ohdl/en_core_web_trf_pseudon'

folder_path_pseudon_mcadl = 'data/primary_data/letters/pseudon_dls/mcadl/'
folder_path_pseudon_ohdl = 'data/primary_data/letters/pseudon_dls/ohdl/'

In [7]:
# Initialize the DocLoader instances
loader_mcadl = DocLoader(nlp, folder_path_letters_mcadl, folder_path_cache_mcadl)
loader_ohdl = DocLoader(nlp, folder_path_letters_ohdl, folder_path_cache_ohdl)

In [8]:
# Run the pseudonymisation process on all new documents
run_pseudonymisation(loader_mcadl, folder_path_pseudon_mcadl)
run_pseudonymisation(loader_ohdl, folder_path_pseudon_ohdl)