In [1]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as f
from pyspark.sql.types import FloatType, BooleanType

# Goal

The goal is to perfom entity resolution, i.e. identifying if records are true matches.

The dataframe represents a set of edges which connect different records (profiles).

The goal is to keep only the most relevant edges for each profile (prune the graph).

Each edge has a set of features, by using these features the probability of being a true match (*p_match*) is assigned to each edge.

*is_match* come from a groundtruth and is used to compute recall/precision of the method employed to prune the graph.


Records comes from two different data sources, so p1 are the identifiers of the records from source 1, so p2 are the identifiers of the records from source 2.

In [15]:
features = spark.read.option('header', 'true').option("inferSchema","true").csv('features.csv')
features.show(2)

+---+----+---------+------------+------------+---------+---------+-----------+---------+------------+-----------+--------+
| p1|  p2|    cfibf|       raccb|          js|numCompP1|numCompP2|         rs|     aejs|         nrs|        wjs|is_match|
+---+----+---------+------------+------------+---------+---------+-----------+---------+------------+-----------+--------+
|  0|3461| 41.13539|0.0019762847| 0.004032258|      143|       74|0.022222223|1.0641375|0.0075938664|5.493571E-4|       0|
|  0|3336|39.005108|0.0019762847|0.0028818443|      143|      109|0.022222223|0.7368643| 0.009265238|9.440924E-4|       0|
+---+----+---------+------------+------------+---------+---------+-----------+---------+------------+-----------+--------+
only showing top 2 rows



## Scoring the edges
By using a probabilistic classifier (logistic regression) we assign to each pair (edge of the meta-blocking graph) the probability of being a match.

In [3]:
# Features to employ
features_set = ["cfibf", "raccb", "js", "numCompP1", "numCompP2", "rs", "aejs", "nrs", "wjs"]

va = VectorAssembler(inputCols=features_set, outputCol="features")

df = va.transform(features)

### Split the data in train/test
This will generate a balanced training set in which the number of positive/negative samples is the same.

In [4]:
# Number of samples per class (actually spark do not ensure this exact value during sampling)
n_samples = 20

# Sampling of matching pairs
matches = df.where("is_match == 1")
m_n = n_samples/matches.count()
m_train, m_test = matches.randomSplit([m_n, 1-m_n])

# Sampling of non-matching pairs
non_matches = df.where("is_match == 0")
nm_n = n_samples/non_matches.count()
nm_train, nm_test = non_matches.randomSplit([nm_n, 1-nm_n])

# Train/Test
train = m_train.union(nm_train)
test = m_test.union(nm_test)

### Training the classifier and get the probabilities

In [5]:
lr = LogisticRegression(featuresCol='features', 
                        labelCol='is_match', 
                        predictionCol='prediction', 
                        maxIter=1000, 
                        probabilityCol='probability'
                       )
# Training
model = lr.fit(train)
# Performs the predictions
predictions = model.transform(test)

# Get the results as the probability of each pair (edge) of being a match
get_p_match = f.udf(lambda v: float(v[1]), FloatType())
edges = predictions\
        .withColumn("p_match", get_p_match("probability"))\
        .select("p1", "p2", "p_match", "is_match")

#edges.cache()
#edges.count()

## Pruning

First, keep only the edges with a score greater than 0.5

In [6]:
over_t = edges.filter("p_match >= 0.5")

For each record of source 1 create a hashmap that given the record id returns the maximum weight associated to its connected edges

In [7]:
profiles1_max_proba = sc.broadcast(over_t.groupby('p1').max('p_match').rdd.collectAsMap())

Same for the records from source 2

In [8]:
profiles2_max_proba = sc.broadcast(over_t.groupby('p2').max('p_match').rdd.collectAsMap())

Prune the graph by combinig the maximum weights of each record

In [9]:
def do_pruning(p1, p2, p_match):
    threshold = 0.35 * (profiles1_max_proba.value[p1] + profiles2_max_proba.value[p2])
    return p_match >= threshold

pruning_udf = f.udf(do_pruning, BooleanType())

res = over_t \
    .select("p1", "p2", "p_match", "is_match", pruning_udf("p1", "p2", "p_match").alias("keep")) \
    .where("keep") \
    .select("p1", "p2", "p_match", "is_match")

In [11]:
res.show()

+---+----+-------+--------+
| p1|  p2|p_match|is_match|
+---+----+-------+--------+
|  0|2733|    1.0|       1|
|  4|4066|    1.0|       1|
|  8|4375|    1.0|       1|
| 12|3829|    1.0|       1|
| 16|2968|    1.0|       1|
| 20|3358|    1.0|       1|
| 24|2775|    1.0|       1|
| 28|3792|    1.0|       1|
| 32|3560|    1.0|       1|
| 40|4174|    1.0|       1|
| 44|3022|    1.0|       1|
| 48|2976|    1.0|       1|
| 52|4296|    1.0|       1|
| 56|2879|    1.0|       1|
| 60|3448|    1.0|       1|
| 64|3872|    1.0|       1|
| 68|4664|    1.0|       1|
| 72|3703|    1.0|       1|
| 76|3825|    1.0|       1|
| 80|4077|    1.0|       1|
+---+----+-------+--------+
only showing top 20 rows

