<a href="https://colab.research.google.com/github/brpnunes/cimatec/blob/master/TrabalhoDataLinkage.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
#Instala e configura pyspark
!wget -q http://archive.apache.org/dist/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz
!tar xf spark-2.4.2-bin-hadoop2.7.tgz
!pip install -q findspark

!pip install jellyfish

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.11.0-openjdk-amd64/"
os.environ["SPARK_HOME"] = "/content/spark-2.4.2-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Collecting jellyfish
[?25l  Downloading https://files.pythonhosted.org/packages/69/9f/ae6f6ad509725b71d45bb408953c850da7a2ecc3dbdad4063a825702ba29/jellyfish-0.7.1.tar.gz (131kB)
[K     |████████████████████████████████| 133kB 2.8MB/s 
[?25hBuilding wheels for collected packages: jellyfish
  Building wheel for jellyfish (setup.py) ... [?25l[?25hdone
  Stored in directory: /root/.cache/pip/wheels/eb/33/cf/c7ce9866a02202b1f8ca45595e20f5145bf56c0262cfa2daf1
Successfully built jellyfish
Installing collected packages: jellyfish
Successfully installed jellyfish-0.7.1


In [0]:
#Funções de similaridade
import jellyfish

#Para os campos nome e mae, será usada comparação via jaro_winkler dos códigos fonéticos das strings
def get_metaphone_similarity(s1, s2):
  return jellyfish.jaro_winkler(jellyfish.metaphone(s1), jellyfish.metaphone(s2))

#Para as datas, faremos a comparação como strings. Não calcularemos a similaridade usando o tipo data, porque uma simples troca de caracteres causada por erro de digitação poderia dar uma distância muito grande. Calcularemos a similaridade usando jaro_winkler e tratando como string.
def get_string_similarity(s1, s2):
  return jellyfish.jaro_winkler(s1, s2)

#Caso a data de nascimento seja informada, a similaridade será calculada usando o nome, mae e  data_nasc, com maior peso para nome e data_nasc. Caso a data_nasc não seja informada, usaremos apenas o nome e mae, com maior peso para o nome.
def compose_weighted_similarity_strategy_1(sim_nome, sim_mae, sim_data):
  if (sim_data > 0):
    return (2*sim_nome+ sim_mae +2*sim_data)/5
  return (2*sim_nome+ sim_mae)/3

#Caso a data de nascimento seja informada, a similaridade será calculada usando o nome, mae e  data_nasc, com maior peso para nome e data_nasc. Caso a data_nasc não seja informada, usaremos apenas o nome e mae, com maior peso para o nome.
def compose_weighted_similarity_strategy_2(sim_nome, sim_mae, sim_data, sim_tipo_sanguineo):
  if(sim_nome > 0):
    if (sim_data > 0):
      return (2*sim_nome+ sim_mae +2*sim_data)/5  
    return (2*sim_nome+ sim_mae)/3
  return (2*sim_tipo_sanguineo + sim_mae + 2*sim_data)/5

def compose_weighted_similarity_strategy_3(sim_1, sim_2, sim_data):
  if (sim_data > 0):
    return (0.2*sim_1 + 0.1*sim_2 + 0.7*sim_data)
  return 2/3*sim_1 + 1/3*sim_2




#Funções "split"
def criaUltimoNome(col):
    return col.split(' ')[-1]

def criaPrimeiroNome(col):
    return col.split(' ')[0]

In [0]:
#Cria as udfs
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

get_metaphone_similarity_udf = udf(get_metaphone_similarity, FloatType())

get_string_similarity_udf = udf(get_string_similarity, FloatType())

compose_weighted_similarity_strategy_1_udf = udf(compose_weighted_similarity_strategy_1, FloatType())

compose_weighted_similarity_strategy_2_udf = udf(compose_weighted_similarity_strategy_2, FloatType())

compose_weighted_similarity_strategy_3_udf = udf(compose_weighted_similarity_strategy_3, FloatType())

udf_criaUltimoNome = F.udf(criaUltimoNome, StringType())

udf_criaPrimeiroNome = F.udf(criaPrimeiroNome, StringType())

In [0]:
#Leitura das bases
df_a = spark.read.csv("base_sintetica_ascii_a.csv", sep=',', header=True)
df_b = spark.read.csv("base_sintetica_ascii_b.csv", sep=',', header=True)

In [0]:
from pyspark.sql.functions import trim


#Renomeia colunas para não haver conflito na hora do join
df_a = df_a.withColumnRenamed("nome", "nome_a")
df_a = df_a.withColumnRenamed("mae", "mae_a")
df_a = df_a.withColumnRenamed("data_nasc", "data_nasc_a")
df_a = df_a.withColumnRenamed("cpf", "cpf_a")
df_a = df_a.withColumnRenamed("tipo_sanguineo", "tipo_sanguineo_a")

df_b = df_b.withColumnRenamed("nome", "nome_b")
df_b = df_b.withColumnRenamed("mae", "mae_b")
df_b = df_b.withColumnRenamed("data_nasc", "data_nasc_b")
df_b = df_b.withColumnRenamed("cpf", "cpf_b")
df_b = df_b.withColumnRenamed("tipo_sanguineo", "tipo_sanguineo_b")


#Preenche os campos não informados com a string " "
df_a = df_a.na.fill({"nome_a": " ", "mae_a" : " ", "data_nasc_a": " ", "tipo_sanguineo_a": " "})
df_b = df_b.na.fill({"nome_b": " ", "mae_b" : " ", "data_nasc_b": " ", "tipo_sanguineo_b": " "})

#Faz o crossjoin entre as bases
df_join = df_b.select("nome_b", "mae_b", "data_nasc_b", "cpf_b", "tipo_sanguineo_b").crossJoin(df_a.select("nome_a", "mae_a", "data_nasc_a", "cpf_a", "tipo_sanguineo_a"))

df_join = df_join.withColumn('ultimo_nome_a', udf_criaUltimoNome(trim(F.col('nome_a'))))
df_join = df_join.withColumn('ultimo_nome_b', udf_criaUltimoNome(trim(F.col('nome_b'))))

df_join = df_join.withColumn('primeiro_nome_a', udf_criaPrimeiroNome(trim(F.col('nome_a'))))
df_join = df_join.withColumn('primeiro_nome_b', udf_criaPrimeiroNome(trim(F.col('nome_b'))))

df_join = df_join.withColumn('ultimo_nome_mae_a', udf_criaUltimoNome(trim(F.col('mae_a'))))
df_join = df_join.withColumn('ultimo_nome_mae_b', udf_criaUltimoNome(trim(F.col('mae_b'))))

df_join = df_join.withColumn('primeiro_nome_mae_a', udf_criaPrimeiroNome(trim(F.col('mae_a'))))
df_join = df_join.withColumn('primeiro_nome_mae_b', udf_criaPrimeiroNome(trim(F.col('mae_b'))))


#Cria as novas colunas com as similaridades
df_join = df_join.withColumn("sim_nome", get_metaphone_similarity_udf(df_join["nome_a"], df_join["nome_b"]))
df_join = df_join.withColumn("sim_mae", get_metaphone_similarity_udf(df_join["mae_a"], df_join["mae_b"]))
df_join = df_join.withColumn("sim_data_nasc", get_string_similarity_udf(df_join["data_nasc_a"], df_join["data_nasc_b"]))
df_join = df_join.withColumn("sim_tipo_sanguineo", get_string_similarity_udf(df_join["tipo_sanguineo_a"], df_join["tipo_sanguineo_b"]))
df_join = df_join.withColumn("sim_primeiro_nome", get_metaphone_similarity_udf(df_join["primeiro_nome_a"], df_join["primeiro_nome_b"]))
df_join = df_join.withColumn("sim_ultimo_nome", get_metaphone_similarity_udf(df_join["ultimo_nome_a"], df_join["ultimo_nome_b"]))
df_join = df_join.withColumn("sim_primeiro_nome_mae", get_metaphone_similarity_udf(df_join["primeiro_nome_mae_a"], df_join["primeiro_nome_mae_b"]))
df_join = df_join.withColumn("sim_ultimo_nome_mae", get_metaphone_similarity_udf(df_join["ultimo_nome_mae_a"], df_join["ultimo_nome_mae_b"]))



Estratégia 1: Utilizando a similaridade do nome, nome da mãe e data de nascimento 

In [0]:
def do_linkage_strategy_1(df_join):
  
  #A coluna similarity guarda a similaridade ponderada considerando nome, mae e data_nasc
  df_final = df_join.withColumn("similarity", compose_weighted_similarity_strategy_1_udf(df_join["sim_nome"], df_join["sim_mae"], df_join["sim_data_nasc"]))

  #Ordena o dataset pela similaridade, agrupando pelo nome_b e mae_b
  df_final = df_final.orderBy(["cpf_b", "similarity"], ascending=[1, 0])
  
  #Exclui as entradas duplicadas, mantendo apenas as linhas que tiverem a maior similaridade para cada nome_b e mae_b
  df_final = df_final.dropDuplicates(["cpf_b"])

  #Gera o dataset final e grava no arquivo results_for_accuracy.csv
  df_final = df_final.toPandas()

  df_final = df_final[["cpf_a", "cpf_b", "similarity"]]

  df_final.to_csv("results_for_accuracy_calc_strategy_1.csv")

  df_final.count()
 
#do_linkage_strategy_1(df_join)

Estratégia 2: Utilizando a similaridade do nome, nome da mãe, data de nascimento e tipo sanguíneo

In [0]:
def do_linkage_strategy_2(df_join):
  

  #A coluna similarity guarda a similaridade ponderada considerando nome, mae e data_nasc
  df_final = df_join.withColumn("similarity", compose_weighted_similarity_strategy_2_udf(df_join["sim_nome"], df_join["sim_mae"], df_join["sim_data_nasc"], df_join["sim_tipo_sanguineo"]))

  #Ordena o dataset pela similaridade, agrupando pelo nome_b e mae_b
  df_final = df_final.orderBy(["cpf_b", "similarity"], ascending=[1, 0])
  
  #Exclui as entradas duplicadas, mantendo apenas as linhas que tiverem a maior similaridade para cada nome_b e mae_b
  df_final = df_final.dropDuplicates(["cpf_b"])

  #Gera o dataset final e grava no arquivo results_for_accuracy.csv
  df_final = df_final.toPandas()

  df_final = df_final[["cpf_a", "cpf_b", "similarity"]]

  df_final.to_csv("results_for_accuracy_calc_strategy_2.csv")

  df_final.count()
 
#do_linkage_strategy_2(df_join)

Estratégia 3: Utilizando a similaridade do primeiro nome e do primeiro nome da mãe


In [0]:
def do_linkage_strategy_3(df_join):
  

  #A coluna similarity guarda a similaridade ponderada considerando nome, mae e data_nasc
  df_final = df_join.withColumn("similarity", compose_weighted_similarity_strategy_3_udf(df_join["sim_primeiro_nome"], df_join["sim_primeiro_nome_mae"], df_join["sim_data_nasc"]))

  #Ordena o dataset pela similaridade, agrupando pelo nome_b e mae_b
  df_final = df_final.orderBy(["cpf_b", "similarity"], ascending=[1, 0])
  
  #Exclui as entradas duplicadas, mantendo apenas as linhas que tiverem a maior similaridade para cada nome_b e mae_b
  df_final = df_final.dropDuplicates(["cpf_b"])

  #Gera o dataset final e grava no arquivo results_for_accuracy.csv
  df_final = df_final.toPandas()

  df_final = df_final[["cpf_a", "cpf_b", "similarity"]]

  df_final.to_csv("results_for_accuracy_calc_strategy_3.csv")

  df_final.count()
 
#do_linkage_strategy_2(df_join)

Cálculo de acurácia

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd

def inspect_pairs(cpf_a, cpf_b, match):
    if match == '1':
        if cpf_a == cpf_b:
            return "TP"
        else:
            return "FP"
    else:
        if cpf_a != cpf_b:
            return "TN"
        else: 
            return "FN"
          
udf_inspect_pairs = F.udf(inspect_pairs, StringType())

def get_results(filename):
  data = spark.read.csv(filename, header=True)
  
  #O valor do ponto de corte (cutoff) deve ser o número que separa os 100  primeiros do restante de sua base que resultou do linkage  
  cutoff = data.orderBy('similarity', ascending=False).toPandas().iloc[99, 3]
  
  
  # Let us consider a cuttoff point set as 0.85
  #cutoff = 0.9

  # sorting and deduplicating the resulting dataset
  data = data.withColumn('similarity', F.col('similarity').cast(DoubleType()))
  data = data.orderBy('similarity', ascending=False).dropDuplicates(['cpf_b'])
  
  data = data.withColumn('match', F.when(F.col('similarity') >= cutoff, '1').otherwise('0'))
  
  data = data.withColumn('perf', udf_inspect_pairs(F.col('cpf_a'), F.col('cpf_b'), F.col('match')))
    
  dic_results = {}
  TP = data.filter(F.col('perf') == "TP").count()
  TN = data.filter(F.col('perf') == "TN").count()
  FP = data.filter(F.col('perf') == "FP").count()
  FN = data.filter(F.col('perf') == "FN").count()


  dic_results['accuracy'] =  float(TP + TN) / (FP + TP + FN + TN)
  dic_results['ppv'] = float(TP) / (TP + FP)
  dic_results['npv'] = float(TN) / (TN + FN)
  dic_results['sens'] = float(TP) / (TP + FN)
  dic_results['spec'] = float(TN) / (TN + FP)
  
  final_results = pd.DataFrame(dic_results, index=[0])
  
  return final_results
  
 
#


3 Estratégias de Linkage

In [0]:
do_linkage_strategy_1(df_join)
print("Estratégia 1: \n %s " % get_results("results_for_accuracy_calc_strategy_1.csv"))

do_linkage_strategy_2(df_join)
print("\nEstratégia 2: \n %s " % get_results("results_for_accuracy_calc_strategy_2.csv"))

do_linkage_strategy_3(df_join)
print("\nEstratégia 3: \n %s " % get_results("results_for_accuracy_calc_strategy_3.csv"))

Estratégia 1: 
    accuracy      ppv       npv      sens      spec
0  0.915254  0.92381  0.846154  0.979798  0.578947 

Estratégia 2: 
    accuracy   ppv  npv  sens      spec
0  0.991525  0.99  1.0   1.0  0.947368 

Estratégia 3: 
    accuracy   ppv       npv      sens      spec
0  0.932203  0.99  0.611111  0.933962  0.916667 
