In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [2]:
datasetA = spark.read.csv('handson_spark/dataset_a_v3.csv', header=True, sep=';')
datasetB = spark.read.csv('handson_spark/dataset_b_v3.csv', header=True, sep=';')

In [3]:
print(datasetA.count())
print(datasetB.count())
datasetA.limit(5).show()
datasetB.limit(5).show()

100
20
+-----+--------------------+---------+------+--------------------+--------+---------------+
|cod_a|              nome_a|     dn_a|sexo_a|               mae_a|cidade_a|primeiro_nome_a|
+-----+--------------------+---------+------+--------------------+--------+---------------+
|    1|EDSON GOMES    DO...|1-01-2007|     1|WEDILAINE VIEIRA ...|  280030|          EDSON|
|    2|ALESSANDRA KAUANÈ...|1-02-2008|     2|VITORIA LUCIA AMO...|  280740|     ALESSANDRA|
|    3|DAVI GONÇALVES DA...|1-04-2007|     1| VILMA GOMES MOREIRA|  280030|           DAVI|
|    4|ALISON DE JESUS T...|1-05-2007|     2|VERA LUCIA FRANCI...|  280030|         ALISON|
|    5|DANNYEL COSTA DE ...|1-12-2006|     1|VANILDA TRINDADE ...|  280030|        DANNYEL|
+-----+--------------------+---------+------+--------------------+--------+---------------+

+-----+--------------------+----------+------+--------------------+--------+---------------+
|cod_b|              nome_b|      dn_b|sexo_b|               mae_b|cida

In [6]:
import jellyfish
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType

In [9]:
def criaMetaphone(col):
    return jellyfish.metaphone(col)
udf_criaMetaphone = F.udf(criaMetaphone, StringType())

In [10]:
datasetA = datasetA.withColumn('phonetic_nome_a', udf_criaMetaphone(F.col('nome_a')))
datasetA = datasetA.withColumn('phonetic_mae_a', udf_criaMetaphone(F.col('mae_a')))
datasetB = datasetB.withColumn('phonetic_nome_b', udf_criaMetaphone(F.col('nome_b')))
datasetB = datasetB.withColumn('phonetic_mae_b', udf_criaMetaphone(F.col('mae_b')))

+-----+--------------------+---------+------+--------------------+--------+---------------+--------------------+----------------+
|cod_a|              nome_a|     dn_a|sexo_a|               mae_a|cidade_a|primeiro_nome_a|     phonetic_nome_a|  phonetic_mae_a|
+-----+--------------------+---------+------+--------------------+--------+---------------+--------------------+----------------+
|    1|EDSON GOMES    DO...|1-01-2007|     1|WEDILAINE VIEIRA ...|  280030|          EDSON|    ETSN KMS TS SNTS|     WTLN FR BSR|
|    2|ALESSANDRA KAUANÈ...|1-02-2008|     2|VITORIA LUCIA AMO...|  280740|     ALESSANDRA|ALSNTR KN SS TS SNTS|FTR LX AMRM T SS|
|    3|DAVI GONÇALVES DA...|1-04-2007|     1| VILMA GOMES MOREIRA|  280030|           DAVI|     TF KNKLFS T  RX|     FLM KMS MRR|
+-----+--------------------+---------+------+--------------------+--------+---------------+--------------------+----------------+

+-----+--------------------+----------+------+--------------------+--------+-------------

In [16]:
datasetA.limit(3).toPandas()

Unnamed: 0,cod_a,nome_a,dn_a,sexo_a,mae_a,cidade_a,primeiro_nome_a,phonetic_nome_a,phonetic_mae_a,ultimo_nome_a
0,1,EDSON GOMES DOS SANTOS,1-01-2007,1,WEDILAINE VIEIRA BEZERRA,280030,EDSON,ETSN KMS TS SNTS,WTLN FR BSR,SANTOS
1,2,ALESSANDRA KAUANÈ SOUZA DOS SANTOS,1-02-2008,2,VITORIA LUCIA AMORIM DE SOUZA,280740,ALESSANDRA,ALSNTR KN SS TS SNTS,FTR LX AMRM T SS,SANTOS
2,3,DAVI GONÇALVES DA &Ä!! ROCHA,1-04-2007,1,VILMA GOMES MOREIRA,280030,DAVI,TF KNKLFS T RX,FLM KMS MRR,ROCHA


In [17]:
datasetB.limit(3).toPandas()

Unnamed: 0,cod_b,nome_b,dn_b,sexo_b,mae_b,cidade_b,primeiro_nome_b,phonetic_nome_b,phonetic_mae_b,ultimo_nome_b
0,1,EDU PEREIRA TAVACHO,09/13/2007,1,SIMONE ANTONIA RODRIGUES,280030,EDU,ET PRR TFX,SMN ANTN RTRKS,TAVACHO
1,2,JOSE IGOR SANTANA DOS SANTOS,05/17/2007,1,NICOLINA FATIMA MACHADO,280030,JOSE,JS IKR SNTN TS SNTS,NKLN FTM MXT,SANTOS
2,3,JOSE ITALO SANTANA SANTOS,09/17/2006,1,NADJA MARINHO DOS SANTOS,280030,JOSE,JS ITL SNTN SNTS,NTJ MRNH TS SNTS,SANTOS


In [7]:
def criaUltimoNome(col):
    return col.split(' ')[-1]
udf_criaUltimoNome = F.udf(criaUltimoNome, StringType())

In [11]:
datasetA = datasetA.withColumn('ultimo_nome_a', udf_criaUltimoNome(F.col('nome_a')))
datasetB = datasetB.withColumn('ultimo_nome_b', udf_criaUltimoNome(F.col('nome_b')))

In [13]:
datasetA.limit(3).toPandas()

Unnamed: 0,cod_a,nome_a,dn_a,sexo_a,mae_a,cidade_a,primeiro_nome_a,phonetic_nome_a,phonetic_mae_a,ultimo_nome_a
0,1,EDSON GOMES DOS SANTOS,1-01-2007,1,WEDILAINE VIEIRA BEZERRA,280030,EDSON,ETSN KMS TS SNTS,WTLN FR BSR,SANTOS
1,2,ALESSANDRA KAUANÈ SOUZA DOS SANTOS,1-02-2008,2,VITORIA LUCIA AMORIM DE SOUZA,280740,ALESSANDRA,ALSNTR KN SS TS SNTS,FTR LX AMRM T SS,SANTOS
2,3,DAVI GONÇALVES DA &Ä!! ROCHA,1-04-2007,1,VILMA GOMES MOREIRA,280030,DAVI,TF KNKLFS T RX,FLM KMS MRR,ROCHA


In [21]:
datasetB.limit(3).toPandas()

Unnamed: 0,cod_b,dn_b,sexo_b,cidade_b,primeiro_nome_b,ultimo_nome_b,phonetic_nome_b,phonetic_mae_b
0,1,09/13/2007,1,280030,EDU,TAVACHO,ET PRR TFX,SMN ANTN RTRKS
1,2,05/17/2007,1,280030,JOSE,SANTOS,JS IKR SNTN TS SNTS,NKLN FTM MXT
2,3,09/17/2006,1,280030,JOSE,SANTOS,JS ITL SNTN SNTS,NTJ MRNH TS SNTS


In [18]:
datasetA = datasetA.select(['cod_a', 'dn_a', 
                            'sexo_a', 'cidade_a', 
                            'primeiro_nome_a', 'ultimo_nome_a', 
                            'phonetic_nome_a', 'phonetic_mae_a'])

datasetB = datasetB.select(['cod_b', 'dn_b', 
                            'sexo_b', 'cidade_b', 
                            'primeiro_nome_b', 'ultimo_nome_b', 
                            'phonetic_nome_b', 'phonetic_mae_b']) 

In [19]:
dataset_linkage = datasetA.crossJoin(datasetB)

In [22]:
dataset_linkage.limit(10).toPandas()

Unnamed: 0,cod_a,dn_a,sexo_a,cidade_a,primeiro_nome_a,ultimo_nome_a,phonetic_nome_a,phonetic_mae_a,cod_b,dn_b,sexo_b,cidade_b,primeiro_nome_b,ultimo_nome_b,phonetic_nome_b,phonetic_mae_b
0,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,1,09/13/2007,1,280030,EDU,TAVACHO,ET PRR TFX,SMN ANTN RTRKS
1,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,2,05/17/2007,1,280030,JOSE,SANTOS,JS IKR SNTN TS SNTS,NKLN FTM MXT
2,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,3,09/17/2006,1,280030,JOSE,SANTOS,JS ITL SNTN SNTS,NTJ MRNH TS SNTS
3,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,4,02/18/2008,1,280030,CARLOS,RODRIGUES,KRLS ALKSNTR S RTRKS,MRN BRBS T ARJ
4,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,5,11/18/2007,1,280030,LEON,TELES,LN WNR T ARJ TLS,MLN SR RX TS RS MT
5,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,6,02/03/2007,2,280740,JOAO,COSTA,J FTR T SLF KST,MR KRSXN TS SNTS
6,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,7,02/10/2006,1,280030,FELIPE,RODRIGUES,FLP LTN RTRKS,MR KNSK SNTN
7,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,8,01/20/2008,1,280030,KAIO,MATOS,K LKS SS T MTS,MR BTN B TS SNTS
8,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,9,11/30/2006,2,280030,MARIA,MARCO,MR KTLP T NSSMNT MRK,TN NSSMNT MRNH
9,1,1-01-2007,1,280030,EDSON,SANTOS,ETSN KMS TS SNTS,WTLN FR BSR,10,08/31/2007,1,280670,JAILSON,SANTOS,JLSN KFLKNT SNTS,TYN ALMT T SS


In [23]:
def compare(cod_a, dn_a, sexo_a, cidade_a, primeiro_nome_a, ultimo_nome_a, phonetic_nome_a, phonetic_mae_a,
           cod_b, dn_b, sexo_b, cidade_b, primeiro_nome_b, ultimo_nome_b, phonetic_nome_b, phonetic_mae_b):
    sim = 0
    
    # Comparando atributos nominais
    sim_nominais = jellyfish.jaro_winkler(str(primeiro_nome_a), str(primeiro_nome_b))
    sim_nominais += jellyfish.jaro_winkler(str(ultimo_nome_a), str(ultimo_nome_b))
    sim_nominais += jellyfish.jaro_winkler(str(phonetic_nome_a), str(phonetic_nome_b))
    sim_nominais += jellyfish.jaro_winkler(str(phonetic_mae_a), str(phonetic_mae_b))
    
    # Comparando categorias
    # Note que Hamming é uma distancia, então para saber a similiarade, precisamos
    # encontrar o complemento da medida. 
    sim_cat = 1 - (jellyfish.hamming_distance(str(sexo_a), str(sexo_b)))
    sim_cat += 1 - (jellyfish.hamming_distance(str(dn_a), str(sexo_b)))
    sim_cat += 1 - (jellyfish.hamming_distance(str(cidade_a), str(cidade_b)))
    
    # Media aritmetica simples
    sim = abs(float(sim_nominais + sim_cat)/7)
    
    return sim
udf_compare = F.udf(compare, FloatType())

In [24]:
result_linkage = dataset_linkage.withColumn('similaridade', udf_compare(F.col('cod_a'), F.col('dn_a'), F.col('sexo_a'), F.col('cidade_a'), F.col('primeiro_nome_a'), F.col('ultimo_nome_a'), F.col('phonetic_nome_a'), F.col('phonetic_mae_a'),
                                                                       F.col('cod_b'), F.col('dn_b'), F.col('sexo_b'), F.col('cidade_b'), F.col('primeiro_nome_b'), F.col('ultimo_nome_b'), F.col('phonetic_nome_b'), F.col('phonetic_mae_b')))

In [25]:
result_linkage.select(['cod_a', 'cod_b', 'similaridade']).show()

+-----+-----+------------+
|cod_a|cod_b|similaridade|
+-----+-----+------------+
|    1|    1|  0.37114254|
|    1|    2|  0.31348473|
|    1|    3|   0.3117244|
|    1|    4|  0.40457723|
|    1|    5|  0.37878615|
|    1|    6|  0.97144663|
|    1|    7|   0.4313893|
|    1|    8|  0.34897444|
|    1|    9|   0.7393321|
|    1|   10|  0.57529056|
|    1|   11|   0.6647277|
|    1|   12|  0.69759583|
|    1|   13|  0.43655542|
|    1|   14|   0.8468892|
|    1|   15|  0.71266234|
|    1|   16|  0.34705988|
|    1|   17|  0.48796898|
|    1|   18|   0.6880837|
|    1|   19|  0.40749013|
|    1|   20|  0.71381074|
+-----+-----+------------+
only showing top 20 rows



In [31]:
result_linkage.select(['cod_a','cod_b','similaridade']).orderBy(['similaridade'], ascending=False).dropDuplicates(['cod_b']).show()

+-----+-----+------------+
|cod_a|cod_b|similaridade|
+-----+-----+------------+
|   78|    7|   1.1433725|
|   22|   15|    1.134425|
|   52|   11|   1.2207842|
|   62|    3|   1.2613997|
|   78|    8|   1.2331666|
|   78|   16|    1.165733|
|   62|    5|   1.1375446|
|   66|   18|   1.1578609|
|   68|   17|   1.2574726|
|   16|    6|   1.2056123|
|   62|   19|   1.1350102|
|   78|    9|   1.2543484|
|   60|    1|   1.2105134|
|   63|   20|    1.258467|
|   62|   10|   1.1786258|
|   78|    4|   1.1902939|
|   68|   12|   1.2202948|
|   78|   13|   1.1830667|
|   80|   14|   1.2107688|
|   62|    2|      1.2695|
+-----+-----+------------+

