# Script python para anonimização de dados 

O objetivo deste script é ser importado como um job no Glue e então ser inserido no seu pipeline de dados para remoção de informações identificáveis (PII, PHI, etc).

**Atenção/Disclaimer:** Este é um script de exemplo e deverá ser adaptado e testado de acordo com as necessidades da sua organização antes de estar pronto para entrar em produção.


Para execução deste notebook em seu ambiente, escolha o kernel "Sparkmagic (PySpark)".

Será necessário também executar o notebook a partir de um Glue Development Endpoint. Para mais informações: https://docs.aws.amazon.com/pt_br/glue/latest/dg/dev-endpoint.html



## Importação de bibliotecas

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = {'JOB_NAME':'anonimizacao'}

## Caminho de output para armazenamento do arquivo anonimizado no S3
## TODO: substituir pelo caminho do seu bucket
s3_output_path = "s3://<<caminho onde o arquivo final será armazenado>>"

## Database do Glue Data Catalog com dados da Origem
## TODO: substituir pelo seu database
glue_database = "<<seu database com os dados de origem no glue data catalog>>"

## Tabela do Glue Data Catalog com dados da Origem
## TODO: substituir por sua tabela
glue_table_name = "<<sua tabela com os dados de origem no glue data catalog>>"


## Leitura do arquivo original

Observe que "database" e "table name" são os nomes que foram criados no Glue Data Catalog


In [None]:

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = glue_database, table_name = glue_table_name]
## @return: datasource
## @inputs: []
datasource = glueContext.create_dynamic_frame.from_catalog(database = glue_database, table_name = glue_table_name)
## @type: ApplyMapping
## @args: [mapping = [("nome", "string", "nome", "string"), ("cpf", "string", "cpf", "string"), ("email", "string", "email", "string"), ("cep", "string", "cep", "string"), ("rua", "string", "rua", "string"), ("numero", "long", "numero", "long"), ("cidade", "string", "cidade", "string"), ("estado", "string", "estado", "string"), ("nascimento", "string", "nascimento", "string"), ("peso", "int", "peso", "int")]]
## @return: applymapping
## @inputs: [frame = datasource]
applymapping = ApplyMapping.apply(frame = datasource, mappings = [("nome", "string", "nome", "string"), ("cpf", "string", "cpf", "string"), ("email", "string", "email", "string"), ("cep", "string", "cep", "string"), ("rua", "string", "rua", "string"), ("numero", "long", "numero", "long"), ("cidade", "string", "cidade", "string"), ("estado", "string", "estado", "string"), ("nascimento", "string", "nascimento", "string"), ("peso", "long", "peso", "long")])
## @type: ResolveChoice
## @args: [choice = "make_struct"]
## @return: originalData
## @inputs: [frame = applymapping]
originalData = ResolveChoice.apply(frame = applymapping, choice = "make_struct")



### Dado raw


Para fins de visualização, vamos imprimir o dado raw, ou seja, na forma como ele chegou ao pipeline.

In [None]:
originalData.toDF().show()

### Supressão

Vamos aplicar agora supressão em alguns dados para exemplificar a técnica.
Primeiramente, vamos suprimir os campos: "rua" e "número" da residência.


In [None]:
supressao = DropFields.apply(frame = originalData, 
                             paths=['rua', 'numero'])

supressao.toDF().show()

### Generalização

Vamos aplicar agora generalização em alguns dados para exemplificar a técnica.

Os campos generalizados são: "nome", "email", "CEP" e "nascimento".

No exemplo abaixo, a função f = RecuperaUltimoSobrenome será aplicada ao DynamicFrame "supressao". O resultado será um novo DynamicFrame chamado "nomeGeneralizado" cuja coluna "nome" conterá apenas o sobrenome do paciente.

No próximo passo, a função f = RecuperaDominioEmail será aplicada ao DynamicFrame "nomeGeneralizado". O resultado será um novo DynamicFrame chamado "emailGeneralizado" cuja coluna "email" conterá apenas o domínio do endereço de email.

Na sequência, a função f = GeneralizaCep será aplicada ao DynamicFrame "emailGeneralizado". O resultado será um novo DynamicFrame chamado "cepGeneralizado" cuja coluna "cep" conterá apenas os 5 primeiros dígitos do CEP.

Finalmente, a função f = AnoNascimento será aplicada ao DynamicFrame "cepGeneralizado". O resultado será um novo DynamicFrame chamado "anoNascimento" cuja coluna "nascimento" conterá apenas o ano da data de nascimento.


In [None]:
def RecuperaUltimoSobrenome(rec):
    nomeCompleto = rec["nome"]
    nomeSeparado = nomeCompleto.split()
    rec["nome"] = nomeSeparado[len(nomeSeparado)-1]
    return rec

nomeGeneralizado = Map.apply(frame = supressao, 
                             f = RecuperaUltimoSobrenome)

nomeGeneralizado.toDF().show()

In [None]:
def RecuperaDominioEmail(rec):
    emailCompleto = rec["email"]
    dominioEmail = emailCompleto.split("@")
    rec["email"] = dominioEmail[1]
    return rec

emailGeneralizado = Map.apply(frame = nomeGeneralizado, 
                              f = RecuperaDominioEmail)

emailGeneralizado.toDF().show()

Então, vamos suprimir dia e mês do campo "nascimento", deixando apenas o ano de nascimento da pessoa.

In [None]:
def GeneralizaCep(rec):
    cepOriginal = rec["cep"]
    rec["cep"] = cepOriginal[0:5]
    return rec

cepGeneralizado = Map.apply(frame = emailGeneralizado, 
                            f = GeneralizaCep)

cepGeneralizado.toDF().show()

In [None]:
def AnoNascimento(rec):
    nascimentoOriginal = rec["nascimento"]
    anoNascimento = nascimentoOriginal.split("/")
    rec["nascimento"] = anoNascimento[2]
    return rec

anoNascimento = Map.apply(frame = cepGeneralizado, 
                          f = AnoNascimento)

anoNascimento.toDF().show()

### Randomização/Perturbação/Ruído

Voltamos a usar o transformador Map para a operação de perturbação. A coluna a ser perturbada é "peso", com base na função f = AddRuidoPeso, que arredondará o peso para o valor mais próximo da base 5.


In [None]:
def Round(x, base=5):
    return base * round(x/base)

def AddRuidoPeso(rec):
    pesoOriginal = rec["peso"]
    rec["peso"] = Round(pesoOriginal)
    return rec

pesoArredondado = Map.apply(frame = anoNascimento, 
                            f = AddRuidoPeso)

pesoArredondado.toDF().show()

### Hashing

O último passo será aplicar a função f = HashCPF no campo "cpf" também por meio do transformador Map. Para tanto, é necessária a importação das bibliotecas random (https://docs.python.org/3/library/random.html) e hashlib (https://docs.python.org/3/library/hashlib.html) do Python para posterior uso de suas funções randrange e sha256, respectivamente.




In [None]:
import random
import hashlib


def HashCPF(rec):
    cpfOriginal = str(rec["cpf"]) + str(random.randrange(100,999))
    rec["cpf"] = hashlib.sha256(cpfOriginal.encode()).hexdigest()
    return rec


cpfHasheado = Map.apply(frame = pesoArredondado, 
                        f = HashCPF)

cpfHasheado.toDF().show()

## Escreve o arquivo anonimizado em formato parquet no S3

Após a execução de todos os passos acima, finalizamos com um DynamicFrame totalmente anonimizado. Seu conteúdo então poderá ser salvo em um novo bucket no Amazon S3 (https://aws.amazon.com/s3/), o qual poderá ser consumido pelo seu time de cientistas de dados.

In [None]:
## @args: [connection_type = "s3", connection_options = {"path": s3_output_path}, format = "parquet"]
## @inputs: [frame = cpfHasheado]

glueContext.write_dynamic_frame.from_options(frame = cpfHasheado, 
                                             connection_type = "s3", 
                                             connection_options = {"path": s3_output_path},
                                             format = "parquet")
job.commit()