In [2]:
#!/usr/bin/env python

"""rule based anonymisation"""

# __author__      = "Silverash Wu"
# __copyright__   = "Copyright 2020, Planet Earth"

import utils
import json
import re
from os.path import isfile, join
from os import listdir
import logging



class AnonymiseRule(object):
    def __init__(self, rule_file):
        self._rules = utils.load_json_data(rule_file)

    @staticmethod
    def rul_extraction(full_text, re_objs):
        results = []
        for ro in re_objs:
            if 'disabled' in ro and ro['disabled']:
                continue
            flag = 0
            if 'multiline' in ro['flags']:
                flag |= re.MULTILINE
            if 'ignorecase' in ro['flags']:
                flag |= re.IGNORECASE
            matches = re.finditer(ro['pattern'], full_text, flag)
            for m in matches:
                ret = {'type': ro['data_type'], 'attrs': {}}
                results.append(ret)
                ret['attrs']['full_match'] = m.group(0)
                ret['pos'] = m.span()
                i = 1
                if 'data_labels' in ro:
                    for attr in ro['data_labels']:
                        ret['attrs'][attr] = m.group(i)
                        i += 1
        return results

    def do_letter_parsing(self, full_text):
        re_exps = self._rules
        results = []
        header_pos = -1
        tail_pos = -1
        header_result = self.rul_extraction(full_text, [re_exps['letter_header_splitter']])
        tail_result = self.rul_extraction(full_text, [re_exps['letter_end_splitter']])
        results += header_result
        if len(header_result) > 0:
            header_pos = header_result[0]['pos'][0]
            header_text = full_text[:header_pos]
            phone_results = self.rul_extraction(header_text, re_exps['phone'])
            dr_results = self.rul_extraction(header_text, [re_exps['doctor']])
            results += phone_results
            results += dr_results
        if len(tail_result) > 0:
            tail_pos = tail_result[0]['pos'][1]
            tail_text = full_text[tail_pos:]
            for sent_type in re_exps['sent_rules']:
                results += self.rul_extraction(tail_text, re_exps[sent_type])
        return results, header_pos, tail_pos

    def do_full_text_parsing(self, full_text):
        re_exps = self._rules
        matched_rets = []
        for st in re_exps['sent_rules']:
            rules = re_exps['sent_rules'][st]
            matched_rets += self.rul_extraction(full_text, rules if type(rules) is list else [rules])
        return matched_rets, 0, 0

    @staticmethod
    def do_replace(text, pos, sent_text, replace_char='x'):
        return text[:pos] + re.sub(r'[^\n\s]', 'x', sent_text) + text[pos+len(sent_text):]


def anonymise_doc(doc_id, text, failed_docs, anonymis_inst, sent_container):
    """
    anonymise a document
    :param doc_id:
    :param text:
    :param failed_docs:
    :param anonymis_inst: anonymise_rule instance
    :return:
    """
    # rets = do_letter_parsing(text)
    rets = anonymis_inst.do_full_text_parsing(text)
    if rets[1] < 0 or rets[2] < 0:
        failed_docs.append(doc_id)
        logging.info('````````````` %s failed' % doc_id)
        return None, None
    else:
        sen_data = rets[0]
        anonymised_text = text
        for d in sen_data:
            if 'name' in d['attrs']:
                logging.debug('removing %s [%s] ' % (d['attrs']['name'], d['type']))
                if is_valid_place_holder(d['attrs']['name']):
                    anonymised_text = AnonymiseRule.do_replace(anonymised_text, d['pos'][0] + d['attrs']['full_match'].find(d['attrs']['name']), d['attrs']['name'])
                    # 'x' * len(d['attrs']['name']))
                sent_container.append({'type': d['type'], 'sent': d['attrs']['name']})
            if 'number' in d['attrs']:
                logging.debug ('removing %s ' % d['attrs']['number'])
                if is_valid_place_holder(d['attrs']['number']):
                    anonymised_text = AnonymiseRule.do_replace(anonymised_text, d['pos'][0], d['attrs']['number'])
                sent_container.append({'type': d['type'], 'sent': d['attrs']['number']})
        return anonymised_text, sen_data


def is_valid_place_holder(s):
    return len(s) >= 2


def dir_anonymisation(folder, rule_file, output_folder=None):
    anonymis_inst = AnonymiseRule(rule_file)
    onlyfiles = [f for f in listdir(folder) if isfile(join(folder, f)) and f.endswith('.txt')]
    container = []
    sent_data = []
    for f in onlyfiles:
        text = utils.read_text_file_as_string(join(folder, f), encoding='cp1252')
        anonymised, sensitive_data = anonymise_doc(f, text, container, anonymis_inst, sent_data)
        if output_folder is not None:
            utils.save_string(anonymised, join(output_folder, f))
            logging.info('anonymised %s saved to %s' % (f, output_folder))
        else:
            logging.info('[anonymised %s]:\n%s\n\n' % (f, anonymised))
        sent_data.append(sensitive_data)
    if output_folder is not None:
        utils.save_json_array(sent_data, join(output_folder, 'sensitive_data.json'))
        logging.info('sensitive data saved to %s' % output_folder)
    else:
        logging.info('sensitive data:\n%s' % json.dumps(sent_data))
    return sent_data


if __name__ == "__main__":
    logging.basicConfig(level='INFO', format='[%(filename)s:%(lineno)d] %(asctime)s %(message)s')
    dir_anonymisation('./files/',
                      './conf/anonymise_rules.json',
                      './anonymised/'
                     )


[<ipython-input-2-a2a9d3c5f3b3>:127] 2021-02-02 19:13:31,619 anonymised EDT_1344208.txt saved to ./anonymised/
[<ipython-input-2-a2a9d3c5f3b3>:127] 2021-02-02 19:13:31,642 anonymised EDT_884307.txt saved to ./anonymised/
[<ipython-input-2-a2a9d3c5f3b3>:127] 2021-02-02 19:13:31,660 anonymised EDT_1327876.txt saved to ./anonymised/
[<ipython-input-2-a2a9d3c5f3b3>:133] 2021-02-02 19:13:31,680 sensitive data saved to ./anonymised/
