In [2]:
import os
import csv
from tqdm import tqdm
import re

from typing import Dict

In [3]:
OUTPUT_DIR = os.path.join('..', 'processed_data')
DATA_PATH = os.path.join('..', '..', '2021-national-archives-data-annotation-project', 'data')

In [4]:
def should_use_file(filepath) -> bool:
    # if file path is wrong for some reason, do not use it
    if not os.path.exists(filepath):
        return False

    file_stats = os.stat(filepath)

    # if file is empty, do not use it
    if file_stats.st_size == 0:
        return False

    return True

In [5]:
def get_filepaths(data_path, folders_to_ignore = []):
    result = []
    for folder_name in os.listdir(data_path):
        if folder_name in folders_to_ignore:
            continue

        folder_path = os.path.join(data_path, folder_name)

        if not os.path.isdir(folder_path): # it's a file
            if folder_path.endswith('.ann') and should_use_file(folder_path): # only work with .ann files
                result.append(os.path.splitext(folder_path)[0])

            continue

        sub_paths = get_filepaths(folder_path)
        if len(sub_paths) > 0:
            result.extend(sub_paths)

    return result

In [6]:
filepaths = get_filepaths(DATA_PATH, folders_to_ignore=['6847', 'Charles'])

In [7]:
len(filepaths)

2889

In [8]:
def validate_line(line_text):
    '''
        Validate if a line is not marked as 'transcription error' or as a duplicated one.
        If any of those is true, this whole document must be skipped
    '''
    invalid = line_text.startswith('TranscriptionError') or line_text.startswith('DuplicatePage')
    return not invalid

In [9]:
class Constants():
    Empty = 'O'
    Beginning = 'B-'
    Inside = 'I-'

    MainEntityPrefix = 'T'
    SubEntityPrefix = 'A'

entities_to_cols = {
    'main': 'NE-MAIN',
    'gender': 'NE-PER-GENDER',
    'legalstatus': 'NE-PER-LEGAL-STATUS',
    'role': 'NE-PER-ROLE',
    'misc': 'MISC',
}

In [10]:
def prepare_output_file():
    output_file = os.path.join(OUTPUT_DIR, 'train-nl.tsv')
    file_handler = open(output_file, 'w', encoding='utf-8', newline='')
    csv_writer = csv.DictWriter(file_handler, fieldnames=['TOKEN', 'NE-MAIN', 'NE-PER-GENDER', 'NE-PER-LEGAL-STATUS', 'NE-PER-ROLE', 'MISC'], delimiter='\t')
    csv_writer.writeheader()
    return (file_handler, csv_writer)

In [11]:
def get_annotator_from_filepath(filepath:str):
    normalized_path = os.path.normpath(filepath)
    split_path = normalized_path.split(os.sep)
    main_folder_index = split_path.index('2021-national-archives-data-annotation-project')
    annotator = split_path[main_folder_index + 2]

    return annotator

In [76]:
def process_annotation_file(filepath) -> Dict[int, Dict[str, str]]:
    # 
    annotations_by_pos = {}

    # This contains the character positions for any main entity. Example: { 'T1' : [[100, 111], [112, 120]] }
    positions_by_main_entity = {}

    # Check for lines starting with TN, where N is a numeric value
    main_regex = re.compile(f'^[{Constants.MainEntityPrefix}][1-9]+')

    # Check for lines starting with AN, where N is a numeric value
    sub_regex = re.compile(f'^[{Constants.SubEntityPrefix}][1-9]+')

    with open(f'{filepath}.ann', 'r', encoding='utf-8') as file_handle:
        file_lines = file_handle.readlines()

        for file_line in file_lines:
            split_line = file_line.split('\t')
            line_key = split_line[0]

            if main_regex.match(line_key): # Main entity type
                assert len(split_line) > 1, f'File line is invalid. Not enough tokens were found\n - Original split line: {split_line}\n - Filepath: "{filepath}"'

                # Skip documents that are not valid
                if not validate_line(split_line[1]):
                    return None

                annotation = split_line[1].split()
                main_entity_type = annotation[0]

                # some positions are doubled, e.g. '100 110; 111 120'
                positions = [[int(pos) for pos in x.strip().split()] for x in ' '.join(annotation[1:]).split(';')]
                positions_by_main_entity[line_key] = positions

                assert len(positions) > 0, f'Positions are invalid.\n - Original line: {file_line}\n - Positions: {positions}'
                for position_pair in positions:
                    assert len(position_pair) == 2, f'Position pair is invalid.\n - Position pair: {position_pair}'
                    for idx in range(position_pair[0], position_pair[1] + 1):
                        if idx not in annotations_by_pos.keys():
                            annotations_by_pos[idx] = {}
                            annotations_by_pos[idx]['main'] = []

                        annotations_by_pos[idx]['main'].append(main_entity_type)
            elif sub_regex.match(line_key): # Sub entity type
                assert len(split_line) > 1, f'{split_line}, {filepath}'
                if not validate_line(split_line[1]):
                    return None

                sub_entity_type, main_entity, sub_entity_value = split_line[1].split()
                lowered_sub_entity_type = sub_entity_type.lower()

                for position_pair in positions_by_main_entity[main_entity]:
                    for idx in range(position_pair[0], position_pair[1] + 1):
                        if lowered_sub_entity_type not in annotations_by_pos[idx].keys():
                            annotations_by_pos[idx][lowered_sub_entity_type] = []

                        annotations_by_pos[idx][sub_entity_type.lower()].append(sub_entity_value)

    return annotations_by_pos

def get_first_pos_in_annotations(current_position, word_length, annotations, entity_type):
    for i in range(current_position, current_position + word_length):
        if i in annotations.keys() and (entity_type in annotations[i].keys() or entity_type == 'misc'):
            return i

    return None

def get_misc_comment(current_position, word_length, annotations, entity_type):
    annotations_per_main_entity = {}
    for i in range(current_position, current_position + word_length):
        if i in annotations.keys() and ('main' in annotations[i].keys()):
            for entity_tag in annotations[i]['main']:
                if entity_tag not in annotations_per_main_entity.keys():
                    annotations_per_main_entity[entity_tag] = []

                annotations_per_main_entity[entity_tag].append(i)

    if len(annotations_per_main_entity) == 0:
        return '_'

    result = ''
    for main_entity_tag, positions in annotations_per_main_entity.items():
        if positions[0] > current_position or positions[-1] < (current_position + word_length - 1):
            result += f'partial-{main_entity_tag}<{positions[0] - current_position}:{positions[-1] - current_position}>'

    if result == '':
        return '_'

    return result

def calculate_entity_tag(annotations, entity_type, prev_entities, current_pos, word_length):
    entity = Constants.Empty
    valid_pos = get_first_pos_in_annotations(current_pos, word_length, annotations, entity_type)
    if valid_pos is not None:
        if entity_type == 'misc':
            entity = get_misc_comment(current_pos, word_length, annotations, entity_type)
        else:
            entities = []
            for annotation in annotations[valid_pos][entity_type]:
                prefix = Constants.Beginning
                if annotation in prev_entities[entity_type]:
                    prefix = Constants.Inside

                entities.append(f'{prefix}{annotation}')

            entity = ','.join(entities)
            prev_entities[entity_type] = annotations[valid_pos][entity_type]
    else:
        if entity_type == 'misc':
            entity = '_'

        prev_entities[entity_type] = Constants.Empty

    return entity, prev_entities

def get_word_annotations(word, annotations, prev_entities, current_pos, word_length):
    result = {
        'TOKEN': word,
        'MISC': '_'
    }

    if prev_entities is None:
        prev_entities = {x: Constants.Empty for x in entities_to_cols.keys()}

    for entity_type, col_name in entities_to_cols.items():
        entity, prev_entities = calculate_entity_tag(annotations, entity_type, prev_entities, current_pos, word_length)
        result[col_name] = entity

    return result, prev_entities

def process_txt_file(filepath, annotations, csv_writer: csv.DictWriter):
    char_counter = 1

    csv_writer.writerow({})
    csv_writer.writerow({'TOKEN': '# language = nl'})
    csv_writer.writerow({'TOKEN': f'# document_path = {filepath}.txt'})
    annotator = get_annotator_from_filepath(filepath)
    csv_writer.writerow({'TOKEN': f'# annotator = {annotator}'})

    with open(f'{filepath}.txt', 'r', encoding='utf-8') as file_handle:
        file_content = file_handle.read().replace('\n', ' ')
        file_words = file_content.split(' ') # must specify ' ' in order not to remove empty strings

        prev_entities = None

        for word in file_words:
            if word.strip() == '':
                # add 1 as those are empty lines but in .ann files they still count as one character
                char_counter += 1
                continue

            word_annotations, prev_entities = get_word_annotations(word, annotations, prev_entities, char_counter, len(word))
            csv_writer.writerow(word_annotations)

            char_counter += len(word) + 1


def process_files(filepaths):
    file_handler, csv_writer = prepare_output_file()
    for filepath in tqdm(filepaths, desc='Processing files'):
        annotations = process_annotation_file(filepath)
        if annotations is None:
            continue

        process_txt_file(filepath, annotations, csv_writer)

    file_handler.close()

In [77]:
process_files(filepaths)

Processing files: 100%|██████████| 2889/2889 [00:05<00:00, 517.78it/s]
