# Clean - Clean Dataset
This example illustrate how to perform meta-blocking on a clean-clean dataset (data linkage), so when we have two datasets that do not contain duplicates and we want to discover the duplicates between them. 

In [22]:
import sparker
import random

## Load the data
sparkER provides wrappers to load CSV and JSON files.

First, we load the first dataset, and we extract the maximum id. The profiles ids of the second dataset will be assigned starting from this one.

*real_id_field* is the field that contains the identifier of the record.

In [23]:
# Profiles contained in the first dataset
profiles1 = sparker.JSONWrapper.load_profiles('../datasets/clean/abtBuy/dataset1.json', 
                                              real_id_field = "realProfileID")
# Max profile id in the first dataset, used to separate the profiles in the next phases
separator_id = profiles1.map(lambda profile: profile.profile_id).max()
# Separators, used during blocking to understand from which dataset a profile belongs. It is an array because sparkER
# could work with multiple datasets
separator_ids = [separator_id]

Let's visualize a profile to check if they are correctly loaded

In [24]:
print(profiles1.take(1)[0])

{'profile_id': 0, 'attributes': [{'key': 'name', 'value': 'Sony Turntable - PSLX350H'}, {'key': 'description', 'value': 'Sony Turntable - PSLX350H/ Belt Drive System/ 33-1/3 and 45 RPM Speeds/ Servo Speed Control/ Supplied Moving Magnet Phono Cartridge/ Bonded Diamond Stylus/ Static Balance Tonearm/ Pitch Control'}], 'original_id': '0', 'source_id': 0}


Loads the second dataset and extract the max id (it will be used later)

In [25]:
profiles2 = sparker.JSONWrapper.load_profiles('../datasets/clean/abtBuy/dataset2.json', 
                                              start_id_from = separator_id+1, 
                                              real_id_field = "realProfileID")
# Max profile id
max_profile_id = profiles2.map(lambda profile: profile.profile_id).max()

Finally, concatenate the two RDDs

In [26]:
profiles = profiles1.union(profiles2)

### Groundtruth (optional)
If you have a groundtruth you can measure the performance of each step.

When you load the groundtruth it contains the original profiles IDs, it is necessary to convert it to use the IDs assigned to each profile by Spark.

In [27]:
# Loads the groundtruth, takes as input the path of the file and the names of the attributes that represent
# respectively the id of profiles of the first dataset and the id of profiles of the second dataset
gt = sparker.JSONWrapper.load_groundtruth('../datasets/clean/abtBuy/groundtruth.json', 'id1', 'id2')

In [28]:
# Converts the groundtruth by replacing original IDs with those given by Spark
new_gt = sparker.Converters.convert_groundtruth(gt, profiles1, profiles2)

In [29]:
# We can explore some pairs
random.sample(new_gt, 2)

[(373, 1254), (23, 1555)]

## Blocking
Now we can perform blocking.

By default sparkER performs token blocking, but it is possible to provide a different blocking function.

In the following example each token is splitted in ngrams of size 4 that are used for blocking.

In [30]:
blocks = sparker.Blocking.create_blocks(profiles, separator_ids, 
                                        blocking_method=sparker.BlockingKeysStrategies.ngrams_blocking,
                                        ngram_size=4)
print("Number of blocks",blocks.count())

Number of blocks 9159


Let's continue by using the standard token blocking

In [31]:
blocks = sparker.Blocking.create_blocks(profiles, separator_ids)
print("Number of blocks",blocks.count())

Number of blocks 2132


## Block cleaning

sparkER implements two block cleaning strategies:

* Block purging: discard the largest blocks that involve too many comparisons, the parameter must be >= 1. A lower value mean a more aggressive purging.
* Block cleaning: removes for every profile the largest blocks in which it appears. The parameter is in range ]0, 1\[. A lower value mean a more aggressive cleaning.

In [32]:
# Perfoms the purging
blocks_purged = sparker.BlockPurging.block_purging(blocks, 1.025)

In [33]:
# Performs the cleaning
(profile_blocks, profile_blocks_filtered, blocks_after_filtering) = sparker.BlockFiltering.block_filtering_quick(blocks_purged, 0.8, separator_ids)

If you have the groundtruth, after every blocking step it is possible to check which are the performance of the blocking collection.

In [34]:
recall, precision, cmp_n = sparker.Utils.get_statistics(blocks_after_filtering, max_profile_id, new_gt, separator_ids)

print("Recall", recall)
print("Precision", precision)
print("Number of comparisons", cmp_n)

Recall 0.9953531598513011
Precision 0.009352813266847726
Number of comparisons 114511


## Meta-blocking
Meta-blocking can be used to further refine the block collection removing superfluous comparisons.

SparkER implements different kind of meta-blocking algorithms, you can find the descriptions in our paper.


For every partition of the RDD the pruning algorithm returns as output a triplet that contains:

* The number of edges
* The number of matches (only if the groundtruth is provided)
* The retained edges

To perform the meta-blocking first some data structures have to be created.

In [37]:
block_index_map = blocks_after_filtering.map(lambda b : (b.block_id, b.profiles)).collectAsMap()
block_index = sc.broadcast(block_index_map)

# This is only needed for certain weight measures
profile_blocks_size_index = sc.broadcast(profile_blocks_filtered.map(lambda pb : (pb.profile_id, len(pb.blocks))).collectAsMap())

# Broadcasted groundtruth
gt_broadcast = sc.broadcast(new_gt)

### Weighted Node Pruning

In [38]:
results = sparker.WNP.wnp(
                          profile_blocks_filtered,
                          block_index,
                          max_profile_id,
                          separator_ids,
                          weight_type=sparker.WeightTypes.CBS,
                          groundtruth=gt_broadcast,
                          profile_blocks_size_index=profile_blocks_size_index,
                          comparison_type=sparker.ComparisonTypes.OR
                         )
num_edges = results.map(lambda x: x[0]).sum()
num_matches = results.map(lambda x: x[1]).sum()
print("Recall", num_matches/len(new_gt))
print("Precision", num_matches/num_edges)
print("Number of comparisons",num_edges)

Recall 0.9693308550185874
Precision 0.036182612918892666


### Reciprocal Weighted Node Pruning

In [39]:
results = sparker.WNP.wnp(
                          profile_blocks_filtered,
                          block_index,
                          max_profile_id,
                          separator_ids,
                          weight_type=sparker.WeightTypes.CBS,
                          groundtruth=gt_broadcast,
                          profile_blocks_size_index=profile_blocks_size_index,
                          comparison_type=sparker.ComparisonTypes.AND
                         )
num_edges = results.map(lambda x: x[0]).sum()
num_matches = results.map(lambda x: x[1]).sum()
print("Recall", num_matches/len(new_gt))
print("Precision", num_matches/num_edges)
print("Number of comparisons",num_edges)

Recall 0.9628252788104089
Precision 0.03915047993348953


### Weighted Edge Pruning

In [42]:
results = sparker.WEP.wep(
                          profile_blocks_filtered,
                          block_index,
                          max_profile_id,
                          separator_ids,
                          weight_type=sparker.WeightTypes.CBS,
                          groundtruth=gt_broadcast,
                          profile_blocks_size_index=profile_blocks_size_index
                         )
num_edges = results.map(lambda x: x[0]).sum()
num_matches = results.map(lambda x: x[1]).sum()
print("Recall", num_matches/len(new_gt))
print("Precision", num_matches/num_edges)
print("Number of comparisons",num_edges)

Recall 0.9618959107806692
Precision 0.03620906801007557


### Cardinality Node Pruning

In [46]:
results = sparker.CNP.cnp(
                          blocks_after_filtering,
                          profiles.count(),
                          profile_blocks_filtered,
                          block_index,
                          max_profile_id,
                          separator_ids,
                          weight_type=sparker.WeightTypes.CBS,
                          groundtruth=gt_broadcast,
                          profile_blocks_size_index=profile_blocks_size_index,
                          comparison_type=sparker.ComparisonTypes.OR
                         )
num_edges = results.map(lambda x: x[0]).sum()
num_matches = results.map(lambda x: x[1]).sum()
print("Recall", num_matches/len(new_gt))
print("Precision", num_matches/num_edges)
print("Number of comparisons",num_edges)

Recall 0.9600371747211895
Precision 0.05844743691297952


### Reciprocal Cardinality Node Pruning

In [48]:
results = sparker.CNP.cnp(
                          blocks_after_filtering,
                          profiles.count(),
                          profile_blocks_filtered,
                          block_index,
                          max_profile_id,
                          separator_ids,
                          weight_type=sparker.WeightTypes.CBS,
                          groundtruth=gt_broadcast,
                          profile_blocks_size_index=profile_blocks_size_index,
                          comparison_type=sparker.ComparisonTypes.AND
                         )
num_edges = results.map(lambda x: x[0]).sum()
num_matches = results.map(lambda x: x[1]).sum()
print("Recall", num_matches/len(new_gt))
print("Precision", num_matches/num_edges)
print("Number of comparisons",num_edges)

Recall 0.8485130111524164
Precision 0.15801315334025615
Number of comparisons 5778


### Cardinality Edge Pruning

In [49]:
results = sparker.CEP.cep(
                          profile_blocks_filtered,
                          block_index,
                          max_profile_id,
                          separator_ids,
                          weight_type=sparker.WeightTypes.CBS,
                          groundtruth=gt_broadcast,
                          profile_blocks_size_index=profile_blocks_size_index
                         )
num_edges = results.map(lambda x: x[0]).sum()
num_matches = results.map(lambda x: x[1]).sum()
print("Recall", num_matches/len(new_gt))
print("Precision", num_matches/num_edges)
print("Number of comparisons",num_edges)

Recall 0.8680297397769516
Precision 0.06737844466887895
Number of comparisons 13862


## Collecting edges after meta-blocking
As mentioned before, the third element of the tuples returned by the meta-blocking contains the edges.


Edges are weighted according to the weight strategy provided to the meta-blocking.

In [50]:
edges = results.flatMap(lambda x: x[2])

edges.take(10)

[(0, 1281, 2),
 (0, 1773, 2),
 (0, 1269, 2),
 (0, 1129, 2),
 (572, 1773, 4),
 (572, 1385, 3),
 (572, 1940, 3),
 (572, 2064, 2),
 (572, 2070, 2),
 (572, 2062, 2)]