Pseudonymisation pipeline

The pipeline encrypts the data using four functions:
* Encrypt ID - concatenates the data using a salt key and encrypts this data using sha2, then removing original PII column
* Encrypt Date - removes the day from dates of the format DD/MM/YYYY (Note: this can be adapted in the code if date format changes)
* Encrypt Timestamp - assigns a number according to the hour of timestamp of format HH:MM:SS (Note: this can be adapted in the code if date format changes)
* Encrypt Freetext - uses Flair to identify Person, Organisation or Location entities in free text, replacing this data with the type of entity e.g. "Clara goes on a walk" -> "<Person> goes on a walk"
* Encrypt Postcode - replaces postcode with LSOA data

Actions if schema changes:
* Replacing metadata.csv in bronze container of adlsinframricdev storage account with new schema, taken from catalogue.json in mc-data-catalogue repo https://github.com/M-RIC-TRE/mc-data-catalogue
* Replacing PII_columns.csv with new PII columns in metadata.csv (replacing first column with "other_identifiable_columns" from metadata.csv and removing duplicates, then assigning each column manually to an encryption method in second column)
* Adapting code in encrypt_postcode function to the relevant column(s), currently this is applied only to column "ResidentPostcode"

Other risks:
* Unresponsive kernel - from Flair library that loads a pre-trained Named Entity Recognition (NER) model, which could be computationally expensive and cause the kernel to become unresponsive if the text being analysed is too large or complex

Importing Packages

In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

## import hashlib for sha-256 encryption
import hashlib

# import packages for free text anonymisation
import logging
import presidio_analyzer
import re

In [0]:
# Needed to debug flair dependency

%pip install numpy --upgrade

# %pip install --force-reinstall numpy==1.22.4

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting numpy
  Downloading numpy-1.24.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 17.3/17.3 MB 56.0 MB/s eta 0:00:00
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.21.5
    Not uninstalling numpy at /databricks/python3/lib/python3.10/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-ef064bb0-a24c-4354-b576-a5c6bca7b893
    Can't uninstall 'numpy'. No files were found to uninstall.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
petastorm 0.12.1 requires pyspark>=2.1.0, which is not installed.
databricks-feature-store 0.11.1 requires pyspark<4,>=3.1.2, which is not installed.
ydata-profil

In [0]:
dbutils.library.restartPython()

In [0]:
# import packages for free text anonymisation
import flair

from flair.models import SequenceTagger
from flair.data import Sentence

### Ingestion configuration
NOTE: Any changes to the database schema and PII columns need to be adjusted in the csv files metadata.csv and PII_columns.csv

In [0]:
#service_credential = dbutils.secrets.get(scope="flowehr-secrets",key="<service-credential-key>")

client_id = dbutils.secrets.get(scope="flowehr-secrets",key="flowehr-dbks-adls-app-id")
client_secret = dbutils.secrets.get(scope="flowehr-secrets", key="flowehr-dbks-adls-app-secret")


spark.conf.set("fs.azure.account.auth.type.adlsinframricdev.dfs.core.windows.net", "OAuth")

spark.conf.set("fs.azure.account.oauth.provider.type.adlsinframricdev.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")

spark.conf.set("fs.azure.account.oauth2.client.id.adlsinframricdev.dfs.core.windows.net", client_id)

spark.conf.set("fs.azure.account.oauth2.client.secret.adlsinframricdev.dfs.core.windows.net", client_secret)

spark.conf.set("fs.azure.account.oauth2.client.endpoint.adlsinframricdev.dfs.core.windows.net", "https://login.microsoftonline.com/71c35f80-28a0-4c98-8472-f29f22f6614d/oauth2/token") # put your tenant id

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Set up the SparkSession
spark = SparkSession.builder.appName("ReadCSVFilesFromBlobStorage").getOrCreate()

# Set up the Azure Blob Storage account details
account_name = "adlsinframricdev"
account_key = ""

In [0]:
# Read metadata files

# Each row corresponds to a table, with table name, column name and description, PII columns and free text columns, taken from catalogue repo
df_metadata = spark.read.format("csv").option("header", True).load("abfs://bronze@adlsinframricdev.dfs.core.windows.net/metadata/metadata.csv")
# PII columns collated for all tables and assigned an encryption method, built manually
df_PII_metadata = spark.read.format("csv").option("header", True).load("abfs://bronze@adlsinframricdev.dfs.core.windows.net/metadata/PII_columns.csv")

# Converting dataframe to dictionary to access columns by their encryption method
rows = df_PII_metadata.collect()
PII_META_DICT = {}
for row in rows:
    key = row[0]
    value = row[1]
    PII_META_DICT[key] = value

In [0]:
df_PII_metadata

DataFrame[Column: string, Pseudo method: string]

### Encrypt IDs

In [0]:
def encrypt_id(df_spark):
    salt_key_name = 'saltkey-mric-pseudo-dev'
    secretValue = 'v4a1REWsfIDCF6BwS3qzHiUXhUO1ahQ0ICl3mJ5QQyWVMUyS0mqm52Qffp3vDYPfVxGK1om09Jg1VOO5IzfTu9hFj2CUxl__ba-8UgPAnBuxKQI57O1FElkFFSPBLt6ARetQCEjjU0PpgIbwUb98BWzYg5bHuC4hY6eY3h14krcxSpsyzMHecne5bAwSmOrpEFLcoEJPNCujDd_fqMPfXkTq5J0TfWlGexR0TqMNpBKRw08py197AS8BhirXsmrrEfEIX9_hEOl3IB4FryRKFZOrsyvYcrtOnHhOFwk6tm0YK7Zhu_iBMgWC-vKYepmxF0VF556CzJ6lWm0i_qeJqQ'
    # Creating a list of columns in PII metadata to be encrypted by this function
    encrypt_ID_columns = []
    # Loop over the items in the PII dictionary and check if the value is "encrypt_ID"
    for key, value in PII_META_DICT.items():
        if value == "encrypt_ID":
            encrypt_ID_columns.append(key)
    # Loop thorugh columns in the dataframe
    for column in df_spark.columns:
        # Finding columns in the dataframe to be encrypted by this function
        if column in encrypt_ID_columns:
            # Creating two new columns
            salted_column_name = f'salted{column}'
            encrypted_column_name = f'encrypted{column}'
            # Concatinating the data in column with a salt key with secret value
            df_spark = df_spark.withColumn(salted_column_name, concat(df_spark[column], lit(secretValue)))
            # Encrypting the salted value with sha2 encryption
            df_spark = df_spark.withColumn(encrypted_column_name, sha2(df_spark[salted_column_name].cast("Binary"),256))
            # Removing the column to leave the encrypted columns remaining
            df_spark = df_spark.drop(column)
    return df_spark

### Encrypt Dates
Removing the day from dates, NOTE: this relies on the data being of the format DD/MM/YYYY, but can be adapted in line 15 for other formats

In [0]:
# from pyspark.sql.functions import *

In [0]:
def encrypt_dates(df_spark):
    # Creating a list of columns in PII metadata to be encrypted by this function
    encrypt_date_columns = []
    # Loop over the items in the dictionary and check if the value is "encrypt_ID"
    for key, value in PII_META_DICT.items():
        if value == "encrypt_date":
            encrypt_date_columns.append(key)
    # Loop thorugh columns in the dataframe
    for column in df_spark.columns:
        # Finding columns in the dataframe to be encrypted by this function
        if column in encrypt_date_columns:
            # Casting the data to string to be able to remove the "day" of the string
            df_spark.withColumn(column,col(column).cast(StringType()))
            # Removing day dates to leave month and year
            df_spark = df_spark.withColumn(column, df_spark[column].substr(4,10))
    return df_spark

#### Encrypt Timestamps

In [0]:
def encrypt_timestamps(df_spark):
    
    #Identify columns that need to be encrypted by this method
    encrypt_timestamp_columns = []
    # Loop over the items in the dictionary and check if the value is "encrypt_ID"
    for key, value in PII_META_DICT.items():
        if value == "encrypt_timestamp":
            encrypt_timestamp_columns.append(key)
    # # For dummy data
    #encrypt_timestamp_columns.append("ActivityTimestamps")

    # create banded timestamp dataframe for banded timestamp calculation
    data = [
        ("1", "00:00:00"),
        ("2", "02:00:00"),
        ("3", "04:00:00"),
        ("4", "06:00:00"),
        ("5", "08:00:00"),
        ("6", "10:00:00"),
        ("7", "12:00:00"),
        ("8", "14:00:00"),
        ("9", "16:00:00"),
        ("10", "18:00,00"),
        ("11", "20:00:00"),
        ("12", "22:00:00")
    ]
    schema = ["id","timestamp"]
    banded_df = spark.createDataFrame(
        data,
        schema
    )
    # prepare dataframe for timestamp
    for column in df_spark.columns:
        if column in encrypt_timestamp_columns:
            encrypted_column = f'encrypted {column}'
            df_spark = df_spark.withColumn(encrypted_column,
                            when((df_spark[column] >  banded_df.collect()[0].timestamp) & (df_spark[column] <  banded_df.collect()[1].timestamp), "1")
                            .when((df_spark[column] >  banded_df.collect()[1].timestamp) & (df_spark[column] <  banded_df.collect()[2].timestamp), "2")
                            .when((df_spark[column] >  banded_df.collect()[2].timestamp) & (df_spark[column] <  banded_df.collect()[3].timestamp), "3")
                            .when((df_spark[column] >  banded_df.collect()[3].timestamp) & (df_spark[column] <  banded_df.collect()[4].timestamp), "4")
                            .when((df_spark[column] >  banded_df.collect()[4].timestamp) & (df_spark[column] <  banded_df.collect()[5].timestamp), "5")
                            .when((df_spark[column] >  banded_df.collect()[5].timestamp) & (df_spark[column] <  banded_df.collect()[6].timestamp), "6")
                            .when((df_spark[column] >  banded_df.collect()[6].timestamp) & (df_spark[column] <  banded_df.collect()[7].timestamp), "7")
                            .when((df_spark[column] >  banded_df.collect()[7].timestamp) & (df_spark[column] <  banded_df.collect()[8].timestamp), "8")
                            .when((df_spark[column] >  banded_df.collect()[8].timestamp) & (df_spark[column] <  banded_df.collect()[9].timestamp), "9")
                            .when((df_spark[column] >  banded_df.collect()[9].timestamp) & (df_spark[column] <  banded_df.collect()[10].timestamp), "10")
                            .when((df_spark[column] >  banded_df.collect()[10].timestamp) & (df_spark[column] <  banded_df.collect()[11].timestamp), "11")
                            .otherwise("12")
            )
            df_spark = df_spark.drop(column)
    return df_spark

### Encrypt free text

In [0]:
import re
import logging

In [0]:
logger = logging.getLogger("presidio-analyzer")

ENTITIES = [
    "LOCATION",
    "PERSON",
    "ORGANIZATION",
]

CHECK_LABEL_GROUPS = [
        ({"LOCATION"}, {"LOC", "LOCATION"}),
        ({"PERSON"}, {"PER", "PERSON"}),
        ({"ORGANIZATION"}, {"ORG"}),
    ]

MODEL_LANGUAGES = {
        "en": "flair/ner-english-large",
        "es": "flair/ner-spanish-large",
        "de": "flair/ner-german-large",
        "nl": "flair/ner-dutch-large",
    }

PRESIDIO_EQUIVALENCES = {
        "PER": "PERSON",
        "LOC": "LOCATION",
        "ORG": "ORGANIZATION",
    }

def check_label(entity,label,check_label_groups) -> bool:
    return any(
        [entity in egrp and label in lgrp for egrp, lgrp in check_label_groups]
    )

def convert_to_recognizer_result(entity):

    entity_type = PRESIDIO_EQUIVALENCES.get(entity.tag, entity.tag)
    start=entity.start_position
    end=entity.end_position

    return [entity_type,start,end]


def encrypt(text,entity,start,end):
    # Remove PII information based on start and end
    # return f'{text[:start]}<{entity}>{text[start + len(entity) + (end-start):]}'
    return f'{text[:start]}<{entity}>{text[end:]}'


def run_encryption(text):
    
    # Clean text to remove html tags
    text = re.sub('<.*?>','',text)

    # Load model
    model = SequenceTagger.load(MODEL_LANGUAGES.get("en"))
    sentences = Sentence(text)
    model.predict(sentences)

    # Iterative through text to determine PII entitites to be removed
    results = []

    for entity in ENTITIES:
        if entity not in ENTITIES:
            continue

        for ent in sentences.get_spans("ner"):
            if not check_label(
                entity, ent.labels[0].value, CHECK_LABEL_GROUPS
            ):
                continue
            # Generate results of the PII entities identified, the start string position and end string position of entity
            flair_result = convert_to_recognizer_result(ent)
            results.append(flair_result)
    
    # Ensure the entities are ordered with start position descending
    results = sorted(results, key=lambda x:x[1], reverse=True)

    # Encrypt text to remove PII entities from text and replace with entity name
    if len(results)!=0:
        for PII_detection in results:
            entity = PII_detection[0]
            start = PII_detection[1]
            end = PII_detection[2]
            encrypted_text = encrypt(text,entity,start,end)
            text = encrypted_text
    else:
        encrypted_text = "None"
    return encrypted_text

In [0]:
def encrypt_freetext(df_spark):
    rows = df_metadata.collect()

    encrypt_freetext_columns = []

    # Loop over the rows and add the key-value pairs to the dictionary
    for row in rows:
        encrypt_freetext_columns.append(row['free_text_columns'])  
        
    encrypt_freetext_columns.append('FreeText')

    for column in df_spark.columns:
        if column in encrypt_freetext_columns:
            encrypted_column = f'encrypted {column}'
            free_text_anon_udf = udf(lambda x:run_encryption(str(x)),StringType())
            df_spark = df_spark.withColumn(encrypted_column, free_text_anon_udf(col(column)))
            df_spark = df_spark.drop(column)
    return df_spark

### Encrypt postcode

In [0]:
def encrypt_postcode(df_spark):
    #Identify columns that need to be encrypted by this method
    encrypt_postcode_columns = []
    # Loop over the items in the dictionary and check if the value is "encrypt_ID"
    for key, value in PII_META_DICT.items():
        if value == "encrypt_address":
            encrypt_postcode_columns.append(key)

    # Read files
    df_gridlink = spark.read.format("csv").option("header", True).load("abfs://bronze@adlsinframricdev.dfs.core.windows.net/metadata/gridlink_header.csv")
    df_grid = spark.read.format("csv").option("header", True).load("abfs://bronze@adlsinframricdev.dfs.core.windows.net/metadata/gridall.csv")
    columns = df_gridlink.columns
    columns = ','.join(columns)
    # Merging both CSVs
    df_grid_all = df_gridlink.union(df_grid)
    reduced_df_grid_all = df_grid_all[['PCDS','LSOA01']]
    # There is only one column in the catalogue to be encrypted by this method, join LSOA with this postcode to encrypt
    for column in df_spark.columns:
        if column in encrypt_postcode_columns:
            df_final = df_spark.join(reduced_df_grid_all, df_spark["ResidentPostcode"]==reduced_df_grid_all["PCDS"])
            df_final = df_final.drop("ResidentPostcode")
        else:
            df_final = df_spark
    return df_final

#### Pseudo function

In [0]:
# Pipeline of pseudo functions that encrypts the table according to the table columns and PII_metadata
def pseudo(df_spark):        
    df_spark = encrypt_id(df_spark)
    df_spark = encrypt_dates(df_spark)
    df_spark = encrypt_timestamps(df_spark)
    df_spark = encrypt_freetext(df_spark)
    df_spark = encrypt_postcode(df_spark)
    return df_spark

#### Run pipeline for dummy data

In [0]:
from pyspark.sql.functions import *

# dummy_df = spark.read.format("csv").option("header", True).load(f"abfs://bronze@adlsinframricdev.dfs.core.windows.net/metadata/dummyPIDData.csv")

In [0]:
# dummy_df = pseudo(dummy_df)

# dummy_df.display()

In [0]:
# (dummy_df
# .write.format("com.databricks.spark.csv")
# .format("overwrite")
# .option("header", "true")
# .save(f"abfs://silver@adlsinframricdev.dfs.core.windows.net/dummy_outputs.csv"))

In [0]:
#dummy_df = spark.read.format("csv").option("header", True).load(f"abfs://silver@adlsinframricdev.dfs.core.windows.net/dummy_outputs.csv")


#### Run pipeline for each table

In [0]:
# df_metadata = spark.read.format("csv").option("header", True).load("abfs://bronze@adlsinframricdev.dfs.core.windows.net/metadata/metadata.csv")

# Creating the list of tables from metadata dataframe
df_metadata_rows = df_metadata.collect()

table_list = []
for row in df_metadata_rows:
    table_list.append(row[0])

table_list = [t.replace("dbo.","") for t in table_list]

In [0]:
# # Iterating through each file in bronze, running the function and writing to silver
for table in table_list:
    df = spark.read.format("parquet").option("header", True).load(f"abfs://bronze@adlsinframricdev.dfs.core.windows.net/raw/{table}.parquet")
    df = pseudo(df)
    output_filename = f'{table}.encrypted'
    (df.coalesce(1)
    .write.format("parquet")
    .mode("overwrite")
    .option("header", "true")
    .save(f"abfs://silver@adlsinframricdev.dfs.core.windows.net/transformed/{output_filename}"))

In [0]:
PII_META_DICT

{'UBRN': 'encrypt_ID',
 'USRN': 'encrypt_ID',
 'ClientID': 'encrypt_ID',
 'ContactTransportID': 'encrypt_ID',
 'ContactInterpreterID': 'encrypt_ID',
 'StartUserID': 'encrypt_ID',
 'EndReason': 'encrypt_ID',
 'UserID': 'encrypt_ID',
 'EnteredBy': 'encrypt_timestamp',
 'RemovalUserID': 'encrypt_ID',
 'Path': 'encrypt_ID',
 'Author': 'encrypt_ID',
 'NNN': 'encrypt_ID',
 'NNNStatus': 'encrypt_ID',
 'AlternativeID': 'encrypt_ID',
 'Surname': 'encrypt_ID',
 'SurnameSoundex': 'encrypt_ID',
 'Firstname': 'encrypt_ID',
 'FirstnameSoundex': 'encrypt_ID',
 'Title': 'encrypt_ID',
 'DateOfBirth': 'encrypt_date',
 'EstimatedDOB': 'encrypt_date',
 'DaytimePhone': 'encrypt_ID',
 'EveningPhone': 'encrypt_ID',
 'MotherLink': 'encrypt_ID',
 'FatherLink': 'encrypt_ID',
 'EMailAddress': 'encrypt_ID',
 'MobilePhone': 'encrypt_ID',
 'MainCarer': 'encrypt_ID',
 'NINumber': 'encrypt_ID',
 'NNNLastTape': 'encrypt_ID',
 'OtherCarer': 'encrypt_ID',
 'SpineID': 'encrypt_ID',
 'SupersedingNNN': 'encrypt_ID',
 'Resi