In [5]:
import pandas as pd
import json

In [6]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, IntegerType, DateType, BinaryType, BooleanType, ByteType, MapType
from pyspark.sql.functions import expr, lit
from pyspark.sql.functions import udf
from pyspark.sql import functions as sf

In [7]:
#from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, NGram, HashingTF, MinHashLSH

In [8]:
spark = SparkSession.builder.appName('Entity-Resolution-Matching').getOrCreate()
sparkContext=spark.sparkContext

2022-04-25 13:37:51 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2022-04-25 13:37:52 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Master data

In [13]:
## This is going to be input from CLI / json
data_id = 'hashed_output_bloomhash_1650943904'
key = '_bloomhash'
timestamp = '1650943904'
dedup = True
if not dedup:
    data_id_2 = 'SPECIFY'
else:
    data_id_2 = data_id 

In [10]:
df = spark.read.option("header",True).csv(data_id)

## Candidates dataset

In [11]:
#trivial matching on the same data
df2 = spark.read.option("header", True).csv(data_id_2)

In [18]:
# df.take(3)

[Row(person_id='1000', service_date='2020-10-20', first_name='vicky', middle_name='Kathy', last_name='Singh', dob='1984-04-01', ssn='895-37-4654', name_phonetic='FK', ssn_clean='895-37-4654', name_phonetic_bloomhash='[B@282985d2', ssn_clean_bloomhash='[B@5c5ea3f0'),
 Row(person_id='1001', service_date='2021-09-06', first_name='Gary', middle_name='Stacey', last_name='Watson', dob='1984-10-22', ssn='220-72-9331', name_phonetic='KR', ssn_clean='220-72-9331', name_phonetic_bloomhash='[B@4ff9867', ssn_clean_bloomhash='[B@5299460b'),
 Row(person_id='1002', service_date='2020-11-07', first_name='Anna', middle_name='Joseph', last_name='Camacho', dob='1996-04-11', ssn='404-44-6841', name_phonetic='AN', ssn_clean='404-44-6841', name_phonetic_bloomhash='[B@617aeb05', ssn_clean_bloomhash='[B@56a6c880')]

## Parameter Alignment 

In [19]:
# Here, import set of unique identifiers and fuzzy identifiers that are agreed upon in the matching
f = open(timestamp + "_param_config.json")
fields = json.load(f)
# fields

{'fields': [{'field_origin': 'first_name',
   'field_name': 'name_phonetic',
   'preprocess_steps': ['drop_chars', 'sort', 'phonetic'],
   'matching_type': 'string',
   'ngrams': True,
   'weight': 0.5},
  {'field_origin': 'ssn',
   'field_name': 'ssn_clean',
   'preprocess_steps': ['drop_chars'],
   'matching_type': 'exact',
   'ngrams': False,
   'weight': 0.5}]}

In [21]:
matching_fields = [x['field_name']+ key for x in fields['fields']]

In [22]:
matching_fields

['name_phonetic_bloomhash', 'ssn_clean_bloomhash']

### Match new records df2 to existing records df

In [None]:
def concat_hashes(data, fields_for_matching):
    data.withColumn('long_hash', )
    for col in matching_fields:
    

In [24]:
from pyspark.sql.functions import concat

In [31]:
df_left = df.select(concat(*matching_fields).alias("long_hash"), 'person_id')  ##OOPS also add initial index
df_right = df2.select(concat(*matching_fields).alias("long_hash"), 'person_id')

In [32]:
model = Pipeline(stages=[
    RegexTokenizer(
        pattern="", inputCol="long_hash", outputCol="tokens", minTokenLength=1
    ),
    NGram(n=3, inputCol="tokens", outputCol="ngrams"),
    HashingTF(inputCol="ngrams", outputCol="vectors"),
    MinHashLSH(inputCol="vectors", outputCol="lsh")
]).fit(df_left)



In [33]:
db_hashed = model.transform(df_left)
query_hashed = model.transform(df_right)

In [26]:
db_hashed = model.transform(df_feat)

In [49]:
#model.stages[-1].approxSimilarityJoin(db_hashed, query_hashed, 0.5).show()

In [40]:
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on MinHash coefficient distance smaller than 1.5:")
candidates = model.stages[-1].approxSimilarityJoin(db_hashed, query_hashed, 1.5, distCol="MinHash")\
    .select(col("datasetA.person_id").alias("idA"),
            col("datasetB.person_id").alias("idB"),
            col("MinHash"))

candidates.take(5)


Approximately joining dfA and dfB on Jaccard coefficient distance smaller than 1.5:


[Row(idA='1012', idB='1012', Jaccard=0.0),
 Row(idA='1002', idB='1002', Jaccard=0.0),
 Row(idA='1015', idB='1015', Jaccard=0.0),
 Row(idA='1017', idB='1017', Jaccard=0.0),
 Row(idA='1020', idB='1020', Jaccard=0.0)]

## FIND MATCH: trivial match: find id with minimum distance

In [43]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

In [45]:
w2 = Window.partitionBy("idB").orderBy(col("MinHash"))
matches = candidates.withColumn("row",row_number().over(w2)) \
  .filter(col("row") == 1).drop("row") 

In [46]:
matches.where(col("idA") == col("idB")).count()

26

In [47]:
matches.where(col("idA") != col("idB")).count()

0

In [48]:
df_left.count()

26

------------------------------------------------------------------------------------------------------------

In [31]:
# Now that we have our hashes, we need to decide on a similarity distance and a approximate matching algorithm

In [None]:
## Option 1: home made dice coefficients

In [43]:
def sum_digits(bits):
    digits = [int(x) for x in list(bits)]
    return sum(digits)

def dice_coeff(bits1, bits2):
    return '10'
#     bits = list(zip(bits1, bits2))
#     commons = sum([x[0] == x[1] for x in bits])
#     return (2 * commons) / (sum_digits(bits1) + sum_digits(bits2))

In [44]:
udf_dice = udf(dice_coeff, StringType())

In [34]:
left = df1.alias("left")
right = df2.alias("right")

In [50]:
left.crossJoin(right).select(df1.person_id,
      udf_dice(left.bloom_1, right.bloom_1).alias("dx")).take(5)

[Row(person_id='1000', dx='10'),
 Row(person_id='1000', dx='10'),
 Row(person_id='1000', dx='10'),
 Row(person_id='1000', dx='10'),
 Row(person_id='1000', dx='10')]

self join code for reference:

In [5]:
df = sparkContext.parallelize(
    [("a", 1,2),("a",1,4),("b",5,6),("b",10,2),("c",1,1)]
  ).toDF()#"id","x","y" )

                                                                                

In [6]:
df.collect()

                                                                                

[Row(_1='a', _2=1, _3=2),
 Row(_1='a', _2=1, _3=4),
 Row(_1='b', _2=5, _3=6),
 Row(_1='b', _2=10, _3=2),
 Row(_1='c', _2=1, _3=1)]

In [7]:
left = df.alias("left")
right = df.alias("right")

left.join(right,"_1").select(df._1,
      (left._2-right._2).alias("dx"),
      (left._3-right._3).alias("dy")).collect()

                                                                                

[Row(_1='c', dx=0, dy=0),
 Row(_1='b', dx=0, dy=0),
 Row(_1='b', dx=0, dy=0),
 Row(_1='b', dx=0, dy=0),
 Row(_1='b', dx=0, dy=0),
 Row(_1='a', dx=0, dy=0),
 Row(_1='a', dx=0, dy=0),
 Row(_1='a', dx=0, dy=0),
 Row(_1='a', dx=0, dy=0)]

## Approx LSH

In [59]:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])



In [65]:
brp = MinHashLSH(inputCol="features", outputCol="hashes",
                                  numHashTables=3)
model = brp.fit(dfA)

brp.fit(dfB)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()



The hashed dataset where hashed values are stored in the column 'hashes':
+---+-----------+--------------------+
| id|   features|              hashes|
+---+-----------+--------------------+
|  0|  [1.0,1.0]|[[6.70322104E8], ...|
|  1| [1.0,-1.0]|[[6.70322104E8], ...|
|  2|[-1.0,-1.0]|[[6.70322104E8], ...|
|  3| [-1.0,1.0]|[[6.70322104E8], ...|
+---+-----------+--------------------+



In [66]:
model.transform(dfB).show()

+---+----------+--------------------+
| id|  features|              hashes|
+---+----------+--------------------+
|  4| [1.0,0.0]|[[6.70322104E8], ...|
|  5|[-1.0,0.0]|[[6.70322104E8], ...|
|  6| [0.0,1.0]|[[1.055109162E9],...|
|  7|[0.0,-1.0]|[[1.055109162E9],...|
+---+----------+--------------------+



In [63]:
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Jaccard coefficient distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="Jaccard")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("Jaccard")).show()



Approximately joining dfA and dfB on Jaccard coefficient distance smaller than 1.5:


                                                                                

+---+---+-------+
|idA|idB|Jaccard|
+---+---+-------+
|  2|  6|    0.5|
|  3|  6|    0.5|
|  0|  5|    0.5|
|  1|  7|    0.5|
|  0|  4|    0.5|
|  2|  7|    0.5|
|  1|  5|    0.5|
|  2|  5|    0.5|
|  0|  7|    0.5|
|  1|  4|    0.5|
|  0|  6|    0.5|
|  2|  4|    0.5|
|  3|  7|    0.5|
|  1|  6|    0.5|
|  3|  4|    0.5|
|  3|  5|    0.5|
+---+---+-------+



In [68]:
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
candidates = model.approxNearestNeighbors(dfA, key, 2)
candidates.filter(candidates.distCol > 0.4).show()

Approximately searching dfA for 2 nearest neighbors of the key:
+---+----------+--------------------+-------+
| id|  features|              hashes|distCol|
+---+----------+--------------------+-------+
|  0| [1.0,1.0]|[[6.70322104E8], ...|    0.5|
|  1|[1.0,-1.0]|[[6.70322104E8], ...|    0.5|
+---+----------+--------------------+-------+

