# MVP Campofrío - Fuzzy Entity Matching


Good reference: [Efficiently fuzzy match strings with machine learning in PySpark](http://gregbiegel.net/blog/using-machine-learning-in-python-to-fuzzily-match-strings)


![](img/mmds.png)

In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import sys
import os

In [4]:
# add parent directory to path
parent_dir = '/'.join(os.getcwd().split('/')[:-1])
sys.path.append(parent_dir)

In [5]:
parent_dir

'/home/acalle/PycharmProjects/campofrio'

In [6]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark import StorageLevel

In [7]:
spark = SparkSession.builder.appName("campofrio").config("spark.driver.memory", "8g") \
    .config("spark.cleaner.referenceTracking.cleanCheckpoints","true") \
    .config("spark.checkpoint.compress","true") \
    .config("spark.io.compression.codec","lz4") \
    .config("spark.shuffle.mapStatus.compression.codec","lz4") \
    .config("spark.shuffle.spill.compress","true") \
    .config("spark.shuffle.compress","true") \
    .config("spark.rdd.compress","true") \
    .getOrCreate()

22/03/30 16:58:53 WARN Utils: Your hostname, acalle resolves to a loopback address: 127.0.1.1; using 192.168.1.155 instead (on interface wlp59s0)
22/03/30 16:58:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/03/30 16:58:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [8]:
spark

## Data

In [13]:
! ls ../data/maktx/

part-00000-a04248b5-e914-4e26-83ff-9a8db5679fdb-c000.snappy.parquet  _SUCCESS


In [14]:
df = spark.read.parquet("../data/maktx")\
            .repartition(2*spark.getActiveSession().sparkContext.defaultParallelism,"row_id")\
            .persist(StorageLevel.MEMORY_AND_DISK)

In [15]:
df.count()

7808

In [17]:
df.show(3, truncate=False)

+-----------+-----------------------------+----------------------------------------+---------+--------+
|row_id     |E1MARCM[Plants]--WERKS[Plant]|MAKTX                                   |SPRAS_ISO|MATNR   |
+-----------+-----------------------------+----------------------------------------+---------+--------+
|60129543143|[FB16]                       |POLEA BRIDA INOX ARMORINOX SPDIV0164511 |EN       |50190829|
|60129543310|[FD01]                       |KIT 4 BOCAIS P/PISTOLA JATO AREIA       |PT       |50190917|
|60129543348|[FB12]                       |RODAMIENTO (401965) LOGITRANS HL-1004-RF|ES       |50190938|
+-----------+-----------------------------+----------------------------------------+---------+--------+
only showing top 3 rows



## Data cleansing & normalization

In [16]:
# ojo! tenemos que ver como limpiamos y normalizamos el texto
#df = df.withColumn('maktx_ndf', 
#                   F.trim(F.lower(F.regexp_replace('MAKTX', "[^a-zA-Z\\s]", ""))))

In [19]:
df.show(truncate=False)

+-----------+-----------------------------+----------------------------------------+---------+--------+
|row_id     |E1MARCM[Plants]--WERKS[Plant]|MAKTX                                   |SPRAS_ISO|MATNR   |
+-----------+-----------------------------+----------------------------------------+---------+--------+
|60129543143|[FB16]                       |POLEA BRIDA INOX ARMORINOX SPDIV0164511 |EN       |50190829|
|60129543310|[FD01]                       |KIT 4 BOCAIS P/PISTOLA JATO AREIA       |PT       |50190917|
|60129543348|[FB12]                       |RODAMIENTO (401965) LOGITRANS HL-1004-RF|ES       |50190938|
|60129543786|[FD01]                       |PORCA CRAVAR PRETA M6 094875 60 005     |EN       |50191153|
|60129544382|[FA05]                       |DETECTEUR SECURITE EPINUS2KNT/MKTS      |FR       |50191490|
|60129544653|[FB11]                       |PERFIL GUIA DISCO 3 CORTE PRIM 2005X1905|EN       |50191627|
|60129545237|[FB07]                       |RETEN 45x65x8 NBR    

## Fuzzy Model

In [23]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, NGram, HashingTF, MinHashLSH, CountVectorizer

### Ejemplo

---

In [21]:
import pandas as pd

data = [[1 , "John Smyth",  "Bob Jones"],
[2 , "John Smith",  "Ned Flanders"],
[3 , "Jo Smith",    "Lisa Short"],
[4 , "Bob Jones",   "Joe Tan"],
[5 , "Tim Jones",   "Jim Jones"],
[6 , "Laura Tully", "John Smith"],
[7 , "Sheena Easton",   "John Smith"],
[8 , "Hilary Jones",    "Jon Smithers"],
[9 , "Hannah Short",    "Chris Smith"],
[10, "Greg Norman", "Norm Smith"]]


schema = T.StructType([
    T.StructField("Index", T.IntegerType(), True),
    T.StructField("Column_1", T.StringType(), True),
    T.StructField("Column_2", T.StringType(), True)])

df = spark.createDataFrame(pd.DataFrame(data, columns=['index', 'col_1', 'col_2']), schema=schema)
df.show(truncate=False)

+-----+-------------+------------+
|Index|Column_1     |Column_2    |
+-----+-------------+------------+
|1    |John Smyth   |Bob Jones   |
|2    |John Smith   |Ned Flanders|
|3    |Jo Smith     |Lisa Short  |
|4    |Bob Jones    |Joe Tan     |
|5    |Tim Jones    |Jim Jones   |
|6    |Laura Tully  |John Smith  |
|7    |Sheena Easton|John Smith  |
|8    |Hilary Jones |Jon Smithers|
|9    |Hannah Short |Chris Smith |
|10   |Greg Norman  |Norm Smith  |
+-----+-------------+------------+



In [38]:
#hashing model
model = Pipeline(stages=[

    RegexTokenizer(pattern="", inputCol="name", outputCol="tokens", minTokenLength=1),    
    NGram(n=3, inputCol="tokens", outputCol="ngrams"),
    HashingTF(inputCol="ngrams", outputCol="vectors",),
    MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=5)
])

In [40]:
ngrams_length = 3
df_1 = df.select('Index', 'Column_1').withColumnRenamed('Column_1', 'name')
df_1 = df_1.filter(F.length('name')>=ngrams_length)
df_1.show()

+-----+-------------+
|Index|         name|
+-----+-------------+
|    1|   John Smyth|
|    2|   John Smith|
|    3|     Jo Smith|
|    4|    Bob Jones|
|    5|    Tim Jones|
|    6|  Laura Tully|
|    7|Sheena Easton|
|    8| Hilary Jones|
|    9| Hannah Short|
|   10|  Greg Norman|
+-----+-------------+



In [42]:
ngrams_length = 3
df_2 = df.select('Index', 'Column_2').withColumnRenamed('Column_2', 'name')
df_2 = df_2.filter(F.length('name')>=ngrams_length)
df_2.show()

+-----+------------+
|Index|        name|
+-----+------------+
|    1|   Bob Jones|
|    2|Ned Flanders|
|    3|  Lisa Short|
|    4|     Joe Tan|
|    5|   Jim Jones|
|    6|  John Smith|
|    7|  John Smith|
|    8|Jon Smithers|
|    9| Chris Smith|
|   10|  Norm Smith|
+-----+------------+



In [41]:
model = model.fit(df_1)    

In [43]:
df_1_hashed = model.transform(df_1)
df_2_hashed = model.transform(df_2)

In [45]:
df_1_hashed.show()

+-----+-------------+--------------------+--------------------+--------------------+--------------------+
|Index|         name|              tokens|              ngrams|             vectors|                 lsh|
+-----+-------------+--------------------+--------------------+--------------------+--------------------+
|    1|   John Smyth|[j, o, h, n,  , s...|[j o h, o h n, h ...|(262144,[3028,767...|[[4.6847381E7], [...|
|    2|   John Smith|[j, o, h, n,  , s...|[j o h, o h n, h ...|(262144,[3028,767...|[[4.6847381E7], [...|
|    3|     Jo Smith|[j, o,  , s, m, i...|[j o  , o   s,   ...|(262144,[4272,767...|[[2.06700422E8], ...|
|    4|    Bob Jones|[b, o, b,  , j, o...|[b o b, o b  , b ...|(262144,[4162,632...|[[6.6092098E7], [...|
|    5|    Tim Jones|[t, i, m,  , j, o...|[t i m, i m  , m ...|(262144,[10366,77...|[[6.6092098E7], [...|
|    6|  Laura Tully|[l, a, u, r, a,  ...|[l a u, a u r, u ...|(262144,[14214,82...|[[3.9339039E7], [...|
|    7|Sheena Easton|[s, h, e, e, n, a...|[s h

In [46]:
df_2_hashed.show()

+-----+------------+--------------------+--------------------+--------------------+--------------------+
|Index|        name|              tokens|              ngrams|             vectors|                 lsh|
+-----+------------+--------------------+--------------------+--------------------+--------------------+
|    1|   Bob Jones|[b, o, b,  , j, o...|[b o b, o b  , b ...|(262144,[4162,632...|[[6.6092098E7], [...|
|    2|Ned Flanders|[n, e, d,  , f, l...|[n e d, e d  , d ...|(262144,[18620,39...|[[1.10767686E8], ...|
|    3|  Lisa Short|[l, i, s, a,  , s...|[l i s, i s a, s ...|(262144,[23423,48...|[[5224079.0], [1....|
|    4|     Joe Tan|[j, o, e,  , t, a...|[j o e, o e  , e ...|(262144,[58656,88...|[[1.42790588E8], ...|
|    5|   Jim Jones|[j, i, m,  , j, o...|[j i m, i m  , m ...|(262144,[47949,77...|[[6.6092098E7], [...|
|    6|  John Smith|[j, o, h, n,  , s...|[j o h, o h n, h ...|(262144,[3028,767...|[[4.6847381E7], [...|
|    7|  John Smith|[j, o, h, n,  , s...|[j o h, o h n,

In [37]:
model.stages

[RegexTokenizer_229dc81fb0b7,
 NGram_c30577eb8a41,
 HashingTF_a5ddddc50e0b,
 MinHashLSHModel: uid=MinHashLSH_c12dbdfa26ac, numHashTables=5]

In [49]:
#usa el modelo minhash para encontrar similitud:

threshold = .8
results_names = model.stages[-1]\
                 .approxSimilarityJoin(df_1_hashed, df_2_hashed, threshold, distCol="dist_jaccard")\
                 .select(                 #       F.col("datasetA.id_originally"),
                        F.col("datasetA.name"),
                        F.col("datasetB.name"),
                        F.col("dist_jaccard"), 
                        F.col("datasetA.Index"),
                        F.col("datasetB.Index"))\
                  .filter(F.col("datasetA.Index") != F.col("datasetB.Index"))\
                  .orderBy(F.col("dist_jaccard").desc())
                    
results_names.show(100, truncate=False)

+------------+------------+------------------+-----+-----+
|name        |name        |dist_jaccard      |Index|Index|
+------------+------------+------------------+-----+-----+
|Hannah Short|Lisa Short  |0.7142857142857143|9    |3    |
|Hilary Jones|Jim Jones   |0.6923076923076923|8    |5    |
|Hilary Jones|Bob Jones   |0.6923076923076923|8    |1    |
|John Smith  |Chris Smith |0.6923076923076923|2    |9    |
|John Smith  |Norm Smith  |0.6666666666666667|2    |10   |
|Jo Smith    |Jon Smithers|0.6666666666666667|3    |8    |
|Jo Smith    |Chris Smith |0.6363636363636364|3    |9    |
|John Smith  |Jon Smithers|0.6153846153846154|2    |8    |
|Jo Smith    |John Smith  |0.6               |3    |6    |
|Tim Jones   |Bob Jones   |0.6               |5    |1    |
|Jo Smith    |John Smith  |0.6               |3    |7    |
|Jo Smith    |Norm Smith  |0.6               |3    |10   |
|Bob Jones   |Jim Jones   |0.6               |4    |5    |
|John Smyth  |John Smith  |0.5454545454545454|1    |7   

---

In [28]:
ngrams_length = 3
df_1 = df.select('row_id', 'MAKTX').withColumnRenamed('MAKTX', 'name')
df_1 = df_1.filter(F.length("MAKTX")>=ngrams_length)

###df_1 = df.select('maktx_ndf').withColumnRenamed('maktx_ndf', 'name').limit(1000)
###df_2 = df.select('maktx_ndf').withColumnRenamed('maktx_ndf', 'name')

In [29]:
df_1.show(truncate=False) 

+-----------+----------------------------------------+
|row_id     |name                                    |
+-----------+----------------------------------------+
|60129543143|POLEA BRIDA INOX ARMORINOX SPDIV0164511 |
|60129543310|KIT 4 BOCAIS P/PISTOLA JATO AREIA       |
|60129543348|RODAMIENTO (401965) LOGITRANS HL-1004-RF|
|60129543786|PORCA CRAVAR PRETA M6 094875 60 005     |
|60129544382|DETECTEUR SECURITE EPINUS2KNT/MKTS      |
|60129544653|PERFIL GUIA DISCO 3 CORTE PRIM 2005X1905|
|60129545237|RETEN 45x65x8 NBR                       |
|60129545658|REJA HIG.PRO BLUCHER 150x500 69725015020|
|60129545989|KIT PUNCOES ALFABET/NUMER. 5/16         |
|60129546010|PULSADOR ABRIR PINZA GEA (4016021014)   |
|60129546274|CILINDRO FAC (XFA28118 081B) 120*100*350|
|60129546745|RODILLO INTERROLL RD-11DZS20D03 RL: 95;A|
|60129546950|GALET DE CAME INOX (SSLRN25ENS)         |
|60129547160|COURROIE PFM (8709947) 50ATN20/10800 TPU|
|60129547364|GUIDE PFM (GT20DYH006) SC=50 NOIR       |
|601295474

In [30]:
#hashing model
model = Pipeline(stages=[

    RegexTokenizer(pattern="", inputCol="name", outputCol="tokens", minTokenLength=1),    
    NGram(n=3, inputCol="tokens", outputCol="ngrams"),
    HashingTF(inputCol="ngrams", outputCol="vectors",),
    MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=5)
])

In [31]:
model = model.fit(df_1)
df_hashed = model.transform(df_1)

In [32]:
df_hashed.show()

+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     row_id|                name|              tokens|              ngrams|             vectors|                 lsh|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|60129543143|POLEA BRIDA INOX ...|[p, o, l, e, a,  ...|[p o l, o l e, l ...|(262144,[924,2866...|[[2496939.0], [8....|
|60129543310|KIT 4 BOCAIS P/PI...|[k, i, t,  , 4,  ...|[k i t, i t  , t ...|(262144,[2735,965...|[[4.6850384E7], [...|
|60129543348|RODAMIENTO (40196...|[r, o, d, a, m, i...|[r o d, o d a, d ...|(262144,[4445,961...|[[6.7745083E7], [...|
|60129543786|PORCA CRAVAR PRET...|[p, o, r, c, a,  ...|[p o r, o r c, r ...|(262144,[5926,171...|[[1.66041761E8], ...|
|60129544382|DETECTEUR SECURIT...|[d, e, t, e, c, t...|[d e t, e t e, t ...|(262144,[15761,16...|[[1.6836316E7], [...|
|60129544653|PERFIL GUIA DISCO...|[p, e, r, f, i

In [26]:
model.stages

[RegexTokenizer_eba7d65d9886,
 NGram_0432f1d695d1,
 HashingTF_24f535d5bd53,
 MinHashLSHModel: uid=MinHashLSH_29b2a72df97d, numHashTables=5]

In [35]:
#similitud
threshold = .2
results_names = model.stages[-1]\
                 .approxSimilarityJoin(df_hashed, df_hashed, threshold, distCol="dist_jaccard")\
                 .select(                 #       F.col("datasetA.id_originally"),
                        F.col("datasetA.name"),
                        F.col("datasetB.name"),
                        F.col("dist_jaccard"), 
                        F.col("datasetA.row_id"),
                        F.col("datasetB.row_id"))\
                  .filter(F.col("datasetA.row_id") != F.col("datasetB.row_id"))\
                  .orderBy(F.col("dist_jaccard").desc())
                    

results_names.show(100, truncate=False)




+----------------------------------------+----------------------------------------+-------------------+-----------+-----------+
|name                                    |name                                    |dist_jaccard       |row_id     |row_id     |
+----------------------------------------+----------------------------------------+-------------------+-----------+-----------+
|FIRST SLICE REF. 3000008949 - CUTTER FUM|FIRST SLICE REF. 3000008967 - CUTTER FUM|0.19999999999999996|60129548888|60129548953|
|FUNDA TERMORETRACTIL 13MM               |FUNDA TERMORETRACTIL 3MM                |0.19999999999999996|60129544666|60129544658|
|FIRST SLICE REF. 3000008980 - CUTTER FUM|FIRST SLICE REF. 3000008967 - CUTTER FUM|0.19999999999999996|60129548958|60129548954|
|DETECTOR SECURITE EPINUS2KNT/MKTS       |DETECTEUR SECURITE EPINUS2KNT/MKTS      |0.19999999999999996|60129544381|60129544382|
|FUNDA TERMORETRACTIL 3MM                |FUNDA TERMORETRACTIL 13MM               |0.19999999999999996|6


                                                                                