# GDPR implementation using the Pseudonymization  and data masking

> Prerequisite:
> - moun point already created to an adls gen2 file system with the name : /mnt/gdpr
> - Download an open source dataset from Kaggle to have the user data : https://www.kaggle.com/omercolakoglu/10m-rows-fake-turkish-names-and-address-dataset  (The data is in Excel format, change the data into csv and load it in the delta table)

## Store the raw data into a delta table [gdpr.raw_customer_data] with a Pseudo Key.

### read the raw data

In [0]:
dbutils.fs.ls ("/mnt/gdpr/rawfile")

In [0]:
df_csv = spark.read.csv("dbfs:/mnt/gdpr/rawfile/Customers_1M_Rows.csv", header=True)

### Pseduonymize the email address
> - Here we are considering that email address is unquiley identify the customers, and hence Pseudo key will uniquely identify the customers as well.

In [0]:
import pyspark.sql.functions as F
df = df_csv.withColumn("customer_pseudo_id", F.sha2(F.col("email"), 256))


### write the customer table [gdpr.raw_customer_data] into the delta lake

In [0]:

spark.sql('''create schema if not exists gdpr ''')
# drop the table if it exists
dbutils.fs.rm("/mnt/gdpr/deltalake/gdpr/raw_customer_data", recurse=True)
spark.sql('''drop table if exists gdpr.raw_customer_data''')

# write the dataframe as delta 
df.write.format("delta").mode("overwrite").save("/mnt/gdpr/deltalake/gdpr/raw_customer_data")
# create the delta table
spark.sql('''create table gdpr.raw_customer_data using delta location "/mnt/gdpr/deltalake/gdpr/raw_customer_data"''')

In [0]:
%sql

select ID, Email, customer_pseudo_id from gdpr.raw_customer_data limit 5

ID,Email,customer_pseudo_id
1,mel_ozipek@fakeyahoo.com,1e42d957fb4ed5ac23fc5cc69b0790523a322b4cfcbd51028924708c15c75b3d
2,nur_zara@fakeyahoo.com,49f59d8afc59b4fded60b19eac7267a6c7496fbc990066cc3c6cb559de0e4555
3,ser_ozalvuc@fakelive.com,4666e8f6431be0c0f80a8486d253e47fd381c179478300f999708e0ce1bbc872
4,ela_cetinturk@fakelive.com,30cd5fb28bb85af594441dd67a2e016db07042d5f26cc160211b1c5f8d7b7e66
5,elm_okkaci@fakeoutlook.com,9cb099a5541148c2537d1cdac0d115fc1c112c606701f9896150877ce7fa0bdc


## Data Masking

### Generating the Cryptographic Key
> **( If you are running it in the databricks environment, the below steps are not required. The below steps only applicable if you are running it in the local machine)**
> - Run these below commands in the local environment to setup the key. The key can be generated any system. 
> - First upgrade the pip to the latest version using the command: python -m pip install --upgrade pip 
> - pip install fernet
> - pip install cryptography
> - Generate a key in the local environment 

```
from cryptography.fernet import Fernet
key = Fernet.generate_key()
```

In [0]:
# creating the user defined function to create the encryption key 
def generate_encrypt_key():
    from cryptography.fernet import Fernet
    key = Fernet.generate_key()
    return key.decode("utf-8")
spark.udf.register("generate_key_using_Fernet", generate_encrypt_key)

### gdpr.Encryption_key table keep the mapping between the customer ID and encryption keys

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
generate_key_using_Fernet = udf(generate_encrypt_key, StringType())
df_distinct_record = spark.sql('''select distinct ID from gdpr.raw_customer_data''')
df_distinct_record = df_distinct_record.withColumn("encryption_key", F.lit(generate_key_using_Fernet()))

dbutils.fs.rm("/mnt/gdpr/deltalake/gdpr/encryption_keys", recurse=True)
spark.sql('''drop table if exists gdpr.encryption_keys''')
df_distinct_record.write.format("delta").mode("overwrite").save("/mnt/gdpr/deltalake/gdpr/encryption_keys")
spark.sql('''create table gdpr.encryption_keys using delta location "/mnt/gdpr/deltalake/gdpr/encryption_keys"''')

In [0]:
%sql
select * from gdpr.encryption_keys limit 5

ID,encryption_key
296,fSk7b586TrJTj0asBJT4knjHVIsddy1OGzU51dKE_vQ=
467,e-KeUTScy8BDb23nJ7lnx5GPYIrfrDJwV3jvIz1gp6A=
675,CaiL6oWP-AhoaZQ-eTgrkZa3-ergxqCUeFhsLlet0As=
691,qKhX5gmgefKFMIZCjh3WGUfQcDc1rsCEb427EiIVLTw=
829,omEZBqSbfNJeeMnr0Kf4UvqQDNIJscGRhU8z_O-ny7w=


### create the spark UDF to encrypt and decrypt the column.

In [0]:
# Define Encrypt User Defined Function 
def encrypt_val(clear_text,MASTER_KEY):
    from cryptography.fernet import Fernet
    f = Fernet(MASTER_KEY)
    clear_text_b=bytes(clear_text, 'utf-8')
    cipher_text = f.encrypt(clear_text_b)
    cipher_text = str(cipher_text.decode('ascii'))
    return cipher_text

# Define decrypt user defined function 
def decrypt_val(cipher_text,MASTER_KEY):
    from cryptography.fernet import Fernet
    f = Fernet(MASTER_KEY)
    clear_val=f.decrypt(cipher_text.encode()).decode()
    return clear_val
spark.udf.register("decrypt_val", decrypt_val)

### Encryption

- We are going to encrypt the column Email.

In [0]:
from pyspark.sql.functions import udf, lit, md5, col
from pyspark.sql.types import StringType
 
# Register UDF's
encrypt = udf(encrypt_val, StringType())
decrypt = udf(decrypt_val, StringType())
 
 
# Encrypt the data 
df = spark.sql('''select a.*,e.encryption_key from gdpr.raw_customer_data as a 
inner join gdpr.encryption_keys as e on e.ID=a.ID''')
encrypted = df.withColumn("EMAIL", encrypt("EMAIL", col("encryption_Key"))).drop("encryption_Key")
# display(encrypted.limit(10))
 
#Save encrypted data 
encrypted.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("gdpr.raw_customer_data")

### masked data

In [0]:
%sql

select ID, Email, customer_pseudo_id from gdpr.raw_customer_data limit 5

ID,Email,customer_pseudo_id
100010,gAAAAABhK-2tIRKFDVEZhKfaEhbsp-NyJ406f6kNc037H7R3Ot4hjmJx7H7SabCx2MDKCz7zsGVUv7V0uEY75FMIFqbAHZlEw7aNkBUKoJgoKBVJkjN0rsgLvvEUqGWhdQmz-VVbT_oj,ac6d8801cb69b7ea3691dc8a531117169fb5994589c9574d78db27e331e1d513
100014,gAAAAABhK-2t_d15zv3VvHYAaYHteweflr9B6ddF0J16v5Ne0W6o2_WmIbRT4GsKj2jjRjMnUkh8SW2annIC_cjWV4hcd2FAuG6Mj8fmiawCpFcUzYxxPuc=,a5beb51bda2bd813a8b6a3d47f5b07965b5181b914235d704406e18a3942784e
100021,gAAAAABhK-2t7TyAXJ6BXmUFFtL0QYkeEAPuFZVtyar4T3XaxAuY9TPGlhrcBkcZizBbfNiX_lrXOfNY7yDHzVFad0-vE_9ipMBCey1KCxCzjPNkoEoXmjY=,b307b451b3c021a44168474800498b0333a63d1b8b61b63a5da581e8d151c5a6
100062,gAAAAABhK-2tSUBSkYD8y0xyZNsBkY3hxdPzjC4voCXbcDAB_3wGHQmSzIyyQoZp8vb7at4C4aP23xtj3CivTVqrvnePE0isDwIPZ6aJ1h2SAfWQIDSAnck=,556053a03989b6738bbc5545c8db95774ad9ee6f96c2789573d083c3452ff756
100070,gAAAAABhK-2t9cLQN3XeHfv120CsamOiEx-wg_-aumMp56XMVVKHekjTzqMMyD-Mso4hrugs0cF4wGQ39yFSCXDTVqwUD48o48wJsS6hliOHLXX4SUFS8bo=,b4b737890b7c2a1bfe1ec7feba3bce0a64b5247a1a69849b73fe6e0a7a67b8cd


### Decrypt the data

#### using pyspark

In [0]:
encrypted = spark.sql('''select a.*,e.encryption_key from gdpr.raw_customer_data as a 
inner join gdpr.encryption_keys as e on e.ID=a.ID''')
decrypted = encrypted.withColumn("EMAIL", decrypt("EMAIL",(col("encryption_Key")))).drop("encryption_Key")
display(decrypted.select("ID", "EMAIL","customer_pseudo_id" ).limit(5))

ID,EMAIL,customer_pseudo_id
100010,ser_turkbay@fakegmail.com@fakeyahoo.com,ac6d8801cb69b7ea3691dc8a531117169fb5994589c9574d78db27e331e1d513
100014,nis_akcealan@fakeoutlook.com,a5beb51bda2bd813a8b6a3d47f5b07965b5181b914235d704406e18a3942784e
100021,ayk_cevirme@fakelive.com,b307b451b3c021a44168474800498b0333a63d1b8b61b63a5da581e8d151c5a6
100062,erd_bayburtlu@fakeyahoo.com,556053a03989b6738bbc5545c8db95774ad9ee6f96c2789573d083c3452ff756
100070,zey_akboga@fakehotmail.com,b4b737890b7c2a1bfe1ec7feba3bce0a64b5247a1a69849b73fe6e0a7a67b8cd


#### using databricks sql

In [0]:
%sql
select a.ID, decrypt_val(a.EMAIL,e.encryption_Key) as email, a.customer_pseudo_id
from gdpr.raw_customer_data as a 
inner join gdpr.encryption_keys as e on e.ID=a.ID
limit 5

ID,email,customer_pseudo_id
100010,ser_turkbay@fakegmail.com@fakeyahoo.com,ac6d8801cb69b7ea3691dc8a531117169fb5994589c9574d78db27e331e1d513
100014,nis_akcealan@fakeoutlook.com,a5beb51bda2bd813a8b6a3d47f5b07965b5181b914235d704406e18a3942784e
100021,ayk_cevirme@fakelive.com,b307b451b3c021a44168474800498b0333a63d1b8b61b63a5da581e8d151c5a6
100062,erd_bayburtlu@fakeyahoo.com,556053a03989b6738bbc5545c8db95774ad9ee6f96c2789573d083c3452ff756
100070,zey_akboga@fakehotmail.com,b4b737890b7c2a1bfe1ec7feba3bce0a64b5247a1a69849b73fe6e0a7a67b8cd


### Build the Hive function. 
> We would like to create a persistant view for the admin, so that they can see the actual email address whenever is required. Databricks function is a session scoped and it does not persist in multiple sessions. Due to that, we need to create a hive function to create the view.

- you can use vscode to create the hive function. 
Here is the folder structure 
```
│   build.sbt
├───src
│   └───main
│       └───scala
│               decryptUDF.scala
```

- build the scala package with ```sbt package ``` command. 
- upload the jar into the databricks cluster. 
- copy the jar path from the cluster It would be needed to register the HIVE function. 
- Install the maven package fernet for java in the databricks cluster "com.macasaet.fernet:fernet-java8:1.5.0"

In [0]:
# content of the file : decryptUDF.scala ( It is not required to run it here )
%scala

# import com.macasaet.fernet.{Key, StringValidator, Token}
# import org.apache.hadoop.hive.ql.exec.UDF;
# import java.time.{Duration, Instant}
# class Validator extends StringValidator {

#   override def getTimeToLive() : java.time.temporal.TemporalAmount = {
#     Duration.ofSeconds(Instant.MAX.getEpochSecond());
#   }
# }

# class udfDecrypt extends UDF {

#   def evaluate(inputVal: String, sparkKey : String): String = {

#     if( inputVal != null && inputVal!="" ) {
#       val keys: Key = new Key(sparkKey)
#       val token = Token.fromString(inputVal)
#       val validator = new Validator() {}
#       val payload = token.validateAndDecrypt(keys, validator)
#       payload
#     } else return inputVal
#   }
# }

In [0]:
# content of the file build.sbt ( It is not required to trigger it here)

# name := "decryptUDF"
# version := "1.0"
# scalaVersion := "2.12.10"
# libraryDependencies += "org.apache.hive" % "hive-exec" % "0.13.1"
# libraryDependencies += "com.macasaet.fernet" % "fernet-java8" % "1.5.0"

### create the viewes for the normal user and admin user. We will be segregating the access using the ACL

In [0]:
%sql
drop function if exists udfPIIDecrypt;
create function if not exists udfPIIDecrypt as 'udfDecrypt' using jar 'dbfs:/FileStore/jars/be50d23a_6c5f_4f8b_9150_5462f989342e-decryptudf_2_12_1_0-5d7b8.jar' -- the jar file location in the cluster

In [0]:
%sql
create schema if not exists gdpr_admin;
drop view if  exists gdpr_admin.Test_Encryption_PII_for_admins_v2;
create view  gdpr_admin.Test_Encryption_PII_for_admins_v2 as select a.ID, a.NAME_, a.SURNAME, a.NAMESURNAME, a.GENDER, a.BIRTHDATE, udfPIIDecrypt(a.EMAIL, e.encryption_Key) as EMAIL, a.customer_pseudo_id
from gdpr.raw_customer_data as a 
inner join gdpr.encryption_keys as e on e.ID=a.ID

In [0]:
%sql

select * from gdpr_admin.Test_Encryption_PII_for_admins_v2 limit 5

ID,NAME_,SURNAME,NAMESURNAME,GENDER,BIRTHDATE,EMAIL,customer_pseudo_id
100010,Serdar,TÜRKBAY,Serdar TÜRKBAY,E,1989-09-17,ser_turkbay@fakegmail.com@fakeyahoo.com,ac6d8801cb69b7ea3691dc8a531117169fb5994589c9574d78db27e331e1d513
100014,Nisanur,AKÇEALAN,Nisanur AKÇEALAN,K,1965-02-05,nis_akcealan@fakeoutlook.com,a5beb51bda2bd813a8b6a3d47f5b07965b5181b914235d704406e18a3942784e
100021,Aykut,ÇEVİRME,Aykut ÇEVİRME,E,1985-07-31,ayk_cevirme@fakelive.com,b307b451b3c021a44168474800498b0333a63d1b8b61b63a5da581e8d151c5a6
100062,Erdal,BAYBURTLU,Erdal BAYBURTLU,E,1968-04-26,erd_bayburtlu@fakeyahoo.com,556053a03989b6738bbc5545c8db95774ad9ee6f96c2789573d083c3452ff756
100070,Zeynep Naz,AKBOĞA,Zeynep Naz AKBOĞA,K,1966-09-21,zey_akboga@fakehotmail.com,b4b737890b7c2a1bfe1ec7feba3bce0a64b5247a1a69849b73fe6e0a7a67b8cd


## (ACL) provide the access control

We need to first enable Table Access control in the workspace level. This is one of the prerequiste for the Access Control in the databrciks.

> 2 Personas ( admin, reportinguser)

> 2 clusters ( devcluster, reportingcluster)

- **devcluster**: no extra setting. This is used for the development and admins. 

- **reportingcluster**:
  - 1) reporting_users databricks group should be created in the databricks.
  - 2) **spark.databricks.acl.sqlOnly true** has to be set in the databricks config. This setting will restrict the user to run any python or scala code in the cluster. 
  - 3) add the reporting users in the reporting_users databricks group.
  - 4) allow the reportinguser group to see only the reportingcluster. 
  - 5) Grant access to the database or objects to the datanbricks group 

> If you would like to use  one cluster, but still want to use the Table access control, we need to use the High concurrency cluster for this and then enable the Table access control in the cluster. 

**Important Links**

[Table Access Control](https://docs.microsoft.com/en-us/azure/databricks/security/access-control/table-acls/table-acl)

[Data object privileges](https://docs.microsoft.com/en-us/azure/databricks/security/access-control/table-acls/object-privileges)