In [None]:
import pymupdf # imports the pymupdf library
import os
import numpy as np
from tqdm import tqdm
import csv
import pandas as pd

# pip install clean-text
# pip install unidecode
from cleantext import clean

# Extract Text from PDF

In [None]:
# function to extract text from pdfs, returns the text from the passed file
def PDFtoText(filename):
  doc = pymupdf.open(filename) # open a document
  for page in doc: # iterate the document pages
    text = page.get_text() # get plain text encoded as UTF-8

  return text

In [None]:
# single file demo
# print(PDFtoText("combinedfiles/000e3869f09a1f4e92d466218fa2de17-cv.pdf"))

In [None]:
def clean_text(text):
    cleaned_text = clean(text,
                         fix_unicode=True,  # fix various unicode errors
                         to_ascii=False,     # transliterate to closest ASCII representation
                         lower=False,       # lowercase text
                         no_line_breaks=True,  # remove line breaks
                         no_urls=True,      # replace all URLs with a special token
                         no_emails=True,    # replace all email addresses with a special token
                         no_phone_numbers=True,  # replace all phone numbers with a special token
                         no_numbers=False,  # remove all numbers
                         no_digits=False,   # remove all digits
                         no_currency_symbols=True,  # remove all currency symbols
                         no_punct=True,     # remove punctuations
                         replace_with_url="<URL>",
                         replace_with_email="<EMAIL>",
                         replace_with_phone_number="<PHONE_NUMBER>",
                         replace_with_number="<NUMBER>",
                         replace_with_digit="0",
                         replace_with_currency_symbol="<CUR>",
                         lang="en"          # set to 'en' for English
                         )
    return cleaned_text

In [None]:
def extract_id (filename):
    return filename[:32]

In [None]:
# # get filelist from folder
# pdffiles = os.listdir("combinedfiles")

# directory_path = "combinedfiles"

# extracted_texts = [] # list to store the texts
# for file in tqdm(pdffiles, desc="Progress"):
#     file_path = os.path.join(directory_path, file)
#     text_result = PDFtoText(file_path)
#     # cleaned_text = text_result.replace('\n', ' ')
#     cleaned_text = clean_text(text_result) # this function is called here, and again below; seems that calling it multiple times improves the results.
#     # cleaned_text = text_result
#     ID_Nominations = extract_id(file)
#     extracted_texts.append((ID_Nominations, file, cleaned_text))

In [None]:
# print(extracted_texts)

In [None]:
# # # Convert the list to a pandas DataFrame
# df_extracted_texts = pd.DataFrame(extracted_texts, columns=['ID_Nominations', 'Filename', 'Result'])

In [None]:
# # Save the DataFrame to a CSV file with specified options
# # df_extracted_texts.to_csv("extracted_texts_utf16.csv", index=False, quoting=1, encoding='utf-16')
# df_extracted_texts.to_csv("texts_extracted_raw_utf8.csv", index=False, quoting=1, encoding='utf-8')

# df_extracted_texts.head(20)

# Clean Text

In [None]:
df_extracted_texts = pd.read_csv("texts_extracted_raw_utf8.csv") # if rerun and you want to load the text from file

In [None]:
# This code is unneccessary if we apply ScrubaDub below.

# # Apply cleaning to the results column

df_cleaned_texts = df_extracted_texts 

df_cleaned_texts['Result'] = df_cleaned_texts['Result'].apply(clean_text)
df_cleaned_texts['Result'] = df_cleaned_texts['Result'].apply(clean_text) # Doppelt hält besser.

In [None]:
# # Save the DataFrame to a CSV file with specified options
# # df_cleaned_texts.to_csv("cleaned_texts_utf16.csv", index=False, quoting=1, encoding='utf-16')
df_cleaned_texts.to_csv("texts_cleaned_utf8.csv", index=False, quoting=1, encoding='utf-8')

df_cleaned_texts.head(20)

We could also remove stopwords and do stemming...
https://github.com/prasanthg3/cleantext

## Replace Names with Pseudo-Names in array

### # This code is unneccessary if we apply ScrubaDub below.

In [None]:
# # load a list that contains old (real) names and new (fake) names.

namereplacements = np.genfromtxt("name-replacement_table.csv", delimiter=";", dtype=str)
namereplacements.dtype
print(namereplacements)

In [None]:
# # convert the list into a nice pd dataframe

df_namereplacements = pd.DataFrame(namereplacements)
df_namereplacements.columns = ["ID_Nominations", "ID_Persons", "new_namefirst", "new_namelast", "old_namefirst_1", "old_namelast_1", "old_namefirst_2", "old_namelast_2"]
df_namereplacements = df_namereplacements.drop(index=0) # drop header line
df_namereplacements.head()

In [None]:
df_texts_to_be_anonymized = pd.read_csv("texts_cleaned_utf8.csv")

# merge the text df with the name replacements DF, so that rows are matched correctly.
df_merged_texts = df_texts_to_be_anonymized.merge(df_namereplacements, on='ID_Nominations', how='left')
df_merged_texts.head(6)

In [None]:
# replaces the old_namefirst_2, which is the Pblic Directory name from the Persons table
# relevant for different ways of writing, and in case of name changes
# it is performed first, because the name_2 is more current than the name_1



# replaces the old_namefirst_2, which is the Public Directory name from the Persons table
def replace_names1(row):
    # if the name field is not empty
    if not (pd.isna(row['old_namefirst_2']) or row['old_namefirst_2'] == ""):
        result_text = str(row['Result']) if not pd.isna(row['Result']) and row['Result'] != "" else ""
        name_old = str(row['old_namefirst_2']) if not pd.isna(row['old_namefirst_2']) and row['old_namefirst_2'] != "" else ""
        name_new = str(row['new_namefirst']) if not pd.isna(row['new_namefirst']) and row['new_namefirst'] != "" else ""
        
        if name_old in result_text:
            return result_text.replace(name_old, name_new)
        
    return row['Result']

# replaces the old_namefirst_1, which is the main name from the Persons table
def replace_names2(row):
    # if the name field is not empty
    if not (pd.isna(row['old_namefirst_1']) or row['old_namefirst_1'] == ""):
        result_text = str(row['Result']) if not pd.isna(row['Result']) and row['Result'] != "" else ""
        name_old = str(row['old_namefirst_1']) if not pd.isna(row['old_namefirst_1']) and row['old_namefirst_1'] != "" else ""
        name_new = str(row['new_namefirst']) if not pd.isna(row['new_namefirst']) and row['new_namefirst'] != "" else ""
        
        if name_old in result_text:
            return result_text.replace(name_old, name_new)
        
    return row['Result']

def replace_names3(row):
    # if the name field is not empty
    if not (pd.isna(row['old_namelast_2']) or row['old_namelast_2'] == ""):
        result_text = str(row['Result']) if not pd.isna(row['Result']) and row['Result'] != "" else ""
        name_old = str(row['old_namelast_2']) if not pd.isna(row['old_namelast_2']) and row['old_namelast_2'] != "" else ""
        name_new = str(row['new_namelast']) if not pd.isna(row['new_namelast']) and row['new_namelast'] != "" else ""
        
        if name_old in result_text:
            return result_text.replace(name_old, name_new)
        
    return row['Result']

def replace_names4(row):
    # if the name field is not empty
    if not (pd.isna(row['old_namelast_1']) or row['old_namelast_1'] == ""):
        result_text = str(row['Result']) if not pd.isna(row['Result']) and row['Result'] != "" else ""
        name_old = str(row['old_namelast_1']) if not pd.isna(row['old_namelast_1']) and row['old_namelast_1'] != "" else ""
        name_new = str(row['new_namelast']) if not pd.isna(row['new_namelast']) and row['new_namelast'] != "" else ""
        
        if name_old in result_text:
            return result_text.replace(name_old, name_new)
        
    return row['Result']

# Apply the function to each row
df_merged_texts['Result'] = df_merged_texts.apply(replace_names1, axis=1)
df_merged_texts['Result'] = df_merged_texts.apply(replace_names2, axis=1)
df_merged_texts['Result'] = df_merged_texts.apply(replace_names3, axis=1)
df_merged_texts['Result'] = df_merged_texts.apply(replace_names4, axis=1)

df_merged_texts.to_csv("texts_anonymized_utf8.csv", index=False, quoting=1, encoding='utf-8')

df_merged_texts.head(10)


In [None]:
# drop unnecessary columns

df_final_texts = df_merged_texts.drop(["new_namefirst", "new_namelast", "old_namefirst_1", "old_namelast_1", "old_namefirst_2", "old_namelast_2", "ID_Persons"], axis=1)

df_final_texts.to_csv("texts_final_without_scrubbing_utf8.csv", index=False, quoting=1, encoding='utf-8')

df_final_texts.head()

# df_final_texts = df_cleaned_texts

## Anonymization via NER using Scrubadub / Spacy
Removes all names, organizations, PII  - too much, want to keep names

In [None]:
import scrubadub, scrubadub_spacy

In [None]:
# spacy.load('en_core_web_trf')

scrubber = scrubadub.Scrubber()
scrubber.add_detector(scrubadub_spacy.detectors.SpacyEntityDetector)
print(scrubber.clean("My name is Alex, I work at LifeGuard in London, and my eMail is alex@lifeguard.com btw. my super secret twitter login is username: alex_2000 password: g-dragon180888"))
# My name is {{NAME}}, I work at {{ORGANIZATION}} in {{LOCATION}}, and my eMail is {{EMAIL}} btw. my super secret twitter login is username: {{USERNAME}} password: {{PASSWORD}}

In [None]:
df_final_texts.head()

In [None]:

def clean_filename(filename):
    return filename[33:-4]

# Apply the function to the 'Filename' column
df_final_texts['Type'] = df_final_texts['Filename'].apply(clean_filename)

# Save the DataFrame to a CSV file with specified options
df_final_texts.to_csv('final_texts_non-scrubbed_full.csv', index=False, quoting=1, encoding='utf-8')

df_final_texts.head(20)

## Alternative Methods

https://github.com/kylemclaren/scrub
https://microsoft.github.io/presidio/analyzer/
AWS comprehend
pytesseract for ocr

In [None]:

# Only do it for a smller subset of the data, as it takes long...
# approx 4 hours
# df_scrub = df_final_texts.iloc[:10, :]
df_scrub = df_final_texts


ProcessedTexts = []
i = 0

# Loop over the existing DataFrame rows with a progress bar
for index, row in tqdm(df_scrub.iterrows(), desc="Processing text", total=df_scrub.shape[0]):
    ID_Nominations = row['ID_Nominations']
    Filename = row['Filename']
    Result = row['Result']
    Type = row['Type']
    # ID_Persons = row['ID_Persons']
    
    processed_result = scrubber.clean(Result)
    #ProcessedTexts.append((ID_Nominations, Type, Filename, processed_result))
    ProcessedTexts.append((ID_Nominations, Type, processed_result))

# Convert the list to a pandas DataFrame
df_texts_scrubbed = pd.DataFrame(ProcessedTexts, columns=['ID_Nominations', 'Filename', 'Result'])

# Save the DataFrame to a CSV file with specified options
df_texts_scrubbed.to_csv('final_texts_scrubbed_full.csv', index=False, quoting=1, encoding='utf-8')

df_texts_scrubbed.head(20)
