In [1]:
import random
import re

import numpy as np
import pandas as pd

from __future__ import annotations
from codealltag_data_processor_v2025 import CodealltagDataProcessor
from concurrent.futures import ThreadPoolExecutor, as_completed
from pandas import DataFrame
from pandas.core.series import Series
from tqdm import tqdm
from typing import Any, Dict, Generator, List, Tuple

In [2]:
cdp_2022 = CodealltagDataProcessor(data_version='20220513', config_path=['codealltag_data_processor.yml'])

In [3]:
sample_size = 10_000
k = 5

In [4]:
predicted_text_df_name = f'PredictedText_DF_{cdp_2022.get_data_version()}_{sample_size // 1000}K_k{k}.csv'
predicted_text_df = pd.read_csv(predicted_text_df_name, index_col=0)

In [5]:
labels = ['CITY', 'DATE', 'EMAIL', 'FAMILY', 'FEMALE', 'MALE', 'ORG', 
          'PHONE', 'STREET', 'STREETNO', 'UFID', 'URL', 'USER', 'ZIP']

In [6]:
def get_annotation_df_with_input_text_and_predicted_text(input_text: str, 
                                                         predicted_text: str,
                                                         labels: List[str]) -> DataFrame:
    tuples = list()

    input_text_length = len(input_text)
    input_text_copy = input_text[0: input_text_length]

    item_delim = "; "
    token_delim = ": "
    pseudonym_delim = " **"
    token_id = 0
    next_cursor = 0

    predicted_items = predicted_text.split(item_delim)
    for item in predicted_items:

        label, token, pseudonym = "", "", ""

        for l in labels:
            if item.startswith(l):
                label = l

        if label != "" and (label+token_delim) in item:

            value_splits = item.split(label+token_delim)
            token_pseudonym = value_splits[1]

            if (pseudonym_delim in token_pseudonym and token_pseudonym.endswith(pseudonym_delim.strip())):

                pseudonym_splits = token_pseudonym.split(pseudonym_delim)
                token = pseudonym_splits[0]
                pseudonym = pseudonym_splits[1][:-2]

            else:
                token = token_pseudonym

            if len(token.strip()) > 0:

                start = input_text_copy.find(token)
                if start == -1 and ' ' in token:
                    start = input_text_copy.find(token.split(' ')[0])
                    token = token.replace(' ', '')

                if start != -1:
                    end = start + len(token)

                    token_id += 1
                    prev_cursor = next_cursor
                    next_cursor += end
                    input_text_copy = input_text[next_cursor: input_text_length]

                    start = prev_cursor + start
                    end = prev_cursor + end

                    tuples.append((
                        'T' + str(token_id),
                        label,
                        start,
                        end,
                        input_text[start:end],
                        pseudonym
                    ))

    return pd.DataFrame(
        tuples,
        columns=["Token_ID", "Label", "Start", "End", "Token", "Pseudonym"]
    )

In [7]:
def get_pseudonymized_text(input_text: str, predicted_annotation_df: DataFrame) -> str:
    output_text = input_text
    offset = 0
    for index, row in predicted_annotation_df.iterrows():
        output_text = output_text[:(row.Start+offset)] + row.Pseudonym + output_text[(row.End+offset):]
        offset += len(row.Pseudonym) - len(row.Token)
    return output_text

In [8]:
def select_version_and_pseudonymized_text(predicted_text_df: DataFrame, 
                                          idx: int, 
                                          cdp: CodealltagDataProcessor) -> Dict[str, Tuple]:
    
    row: Series = predicted_text_df.iloc[idx]
    file_path: str = row.FilePath
    input_text: str = cdp.read_email(file_path)[1]
    true_adf: DataFrame = cdp.get_annotation_df_by_file(file_path)
    label_token_list: List[str] = true_adf[['Label', 'Token']].agg('-'.join, axis=1).tolist()
    versions: List[str] = [col for col in predicted_text_df.columns if re.match(r'^V\d+$', col)]
    version_match_dict: Dict[str, Any] = dict()
    for version in versions:
        version_adf: DataFrame = get_annotation_df_with_input_text_and_predicted_text(input_text, row[version], labels)
        v_label_token_list: List[str] = version_adf[['Label', 'Token']].agg('-'.join, axis=1).tolist()
        match_count: int = 0
        for item in label_token_list:
            if item in v_label_token_list:
                label: str = item.split('-', 1)[0]
                token: str = item.split('-', 1)[1]
                pseudonym: str = version_adf.loc[
                    (version_adf['Label'] == label) & (version_adf['Token'] == token)
                ].iloc[0].Pseudonym
                if token != pseudonym:
                    match_count += 1
        priority: str = 'low'
        if label_token_list == v_label_token_list:
            priority = 'high'
        version_match_dict[version] = {'priority': priority, 'count': match_count, 'adf': version_adf}

    has_high: bool = any(v['priority'] == 'high' for v in version_match_dict.values())
    if has_high:
        filtered: Dict[str, Any] = {k: v for k, v in version_match_dict.items() if v['priority'] == 'high'}
    else:
        filtered = version_match_dict.copy()
    max_count: int = max(v['count'] for v in filtered.values())
    candidates: List[str] = [k for k, v in filtered.items() if v['count'] == max_count]
    selected_version: str = random.Random(cdp.get_random_seed()).choice(candidates)
    pseudonymized_text: str = get_pseudonymized_text(input_text, version_match_dict[selected_version]['adf'])
    
    return {file_path: (selected_version, pseudonymized_text)}

In [9]:
def collect_pseudonymized_texts(max_workers: int = 10) -> Generator[Dict[str, Tuple]]:
    with tqdm(total=len(predicted_text_df), smoothing=0) as progress_bar:
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [
                executor.submit(select_version_and_pseudonymized_text, predicted_text_df, idx, cdp_2022)
                for idx in range(0, len(predicted_text_df))
            ]
            for future in as_completed(futures):
                progress_bar.update(1)
                yield future.result()

In [10]:
merged_dict: Dict[str, Tuple] = {}
for result in collect_pseudonymized_texts():
    merged_dict.update(result)

100%|███████████████████████████████████████| 2000/2000 [00:33<00:00, 59.57it/s]


In [11]:
predicted_text_df[['UseVersion', 'PseudonymizedText']] = np.array(
    [merged_dict.get(file_path) for file_path in predicted_text_df['FilePath']]
)

In [12]:
predicted_text_df.to_csv(predicted_text_df_name)