# Data Deduplication & Record linkage

The solution intelligently links & combines records from two related data sources into a single master record. The module acts as a crucial component in Master Data Management. It assigns a common ID to records belonging to the same entity/duplicates. The probabilistic linkage algorithm provides the flexibility to find duplicate records based on a required match percentage given the columns of interest.

<b>The module thus takes two datasets as input along with column names from each dataset to link/deduplicate on. The output is a unified dataset with a new unique ID generated for each record. Records that are duplicates will share a common ID.</b>

### Prerequisite

The kernel comes pre-installed with the required packages. Else ensure to have the following Python Packages in your environment at minimum:

    - scikit-learn
    - numpy
    - pandas
    - recordlinkage
    - uuid

 ### Contents

1. [Input Data](#Input-Format)
2. [Creating Model](#Creating-Model)
3. [Batch Transform](#Batch-Transform)
4. [Output](#Output)
5. [Invoking through Endpoint](#Invoking-through-Endpoint)

___
### Input Format

The solution takes a zip file as input. <br><br>
The zip must conatin: <br>
1. two 'CSV' files corresponding to the two sources of data and 
2. "config.json: - a configuration file. (List of column pairs for linking/deduplication along with %threshold match required)

Ensure Content-Type is 'application/zip'


- input.zip  
    - config.json  
    - dataset1.csv  
    - dataset2.csv


 ### Input instructions

* The solution works with csv files only.


* Current version limitation: Both the csv files should contain less than 2000 rows


* "config.json" should be uploaded with the two csv files.


* The input zip must have two csv and the config.json


* The "config.json" must follow the below format<br>
{<br>
<b>"filename_1"</b>: filename of the first csv,<br>
<b>"filename_2"</b>: filename of the second csv,<br> 
<b>"comparision_pairs"</b>: [[column name from first csv,column name from second csv,threshold]]<br>
}

* Threshold (% matching criteria) value ranges from 0 to 1 <b> [1 is Exact Match] </b>



### Output interpretation

* Output file will be a csv that conatins rows and columns from both the csv files.
* The output csv will have 3 additional columns:<br>

|MasterID |Source |SourceID|
|-----|-------|------|

* MasterID: ID assigned to each record and records that are duplicates will have the same "MasterID"

* Source: indicates filename from which the record is populated.

* SourceID: row index from the corresponding source.

___

## Importing libraries for runtime

In [25]:
import pandas as pd
import boto3
import re
import json

In [26]:
Df1=pd.read_csv("dataset1.csv")
Df1.head(2)

Unnamed: 0,name,address,suburb,postcode,state,date_of_birth,soc_sec_id
0,michaela neumann,8 stanley street miami,winston hills,4223,nsw,19151111,5304218
1,courtney painter,12 pinkerton circuit bega flats,richlands,4560,vic,19161214,4066625


In [27]:
Df2=pd.read_csv("dataset2.csv")
Df2.head(2)

Unnamed: 0,employee_name,employee_address,suburb,postcode,date_of_birth,soc_sec_id
0,elton,3 light setreet pinehill,windermere,3212,19651013,1551941
1,mitchell maxon,47 edkins street lochaoair,north ryde,3355,19390212,8859999


In [28]:
with open("config.json","r") as fp:
    configFile=json.load(fp)
configFile

{'filename_1': 'dataset1.csv',
 'filename_2': 'dataset2.csv',
 'comparision_pairs': [['name', 'employee_name', 1],
  ['address', 'employee_address', 0.85],
  ['soc_sec_id', 'soc_sec_id', 1]]}

## Creating Model

In [29]:
model_package_arn = 'arn:aws:sagemaker:us-east-2:786796469737:model-package/data-deduplication-v1'

In [30]:
from sagemaker import ModelPackage
import sagemaker as sage
from sagemaker import get_execution_role

role = get_execution_role()
sagemaker_session = sage.Session()

In [31]:
model = ModelPackage(model_package_arn=model_package_arn,
                    role = role,
                    sagemaker_session = sagemaker_session)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


## Batch Transform

Now that model is ready, we can deploy the model and make predictions.

### Prediction Classes - Batch Transform Job

<b>Output (csv) file will combine both Dataframes and duplicate rows will have same MasterID  </b>

In [32]:
import json 
import uuid

OutputPath='s3://mphasis-marketplace/data-deduplication/'

transformer = model.transformer(1, 'ml.m5.xlarge',output_path=OutputPath)
transformer.transform('s3://mphasis-marketplace/data-deduplication/Input_data.zip', content_type='application/zip')
transformer.wait()

print("Batch Transform complete")
bucketFolder = transformer.output_path.rsplit('/')[3]

......................[32m2020-10-23T16:28:51.162:[sagemaker logs]: MaxConcurrentTransforms=1, MaxPayloadInMB=6, BatchStrategy=MULTI_RECORD[0m
[34m * Serving Flask app "serve" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://0.0.0.0:8080/ (Press CTRL+C to quit)[0m
[34m169.254.255.130 - - [23/Oct/2020 16:28:51] "#033[37mGET /ping HTTP/1.1#033[0m" 200 -[0m
[34m169.254.255.130 - - [23/Oct/2020 16:28:51] "#033[33mGET /execution-parameters HTTP/1.1#033[0m" 404 -[0m
[34m169.254.255.130 - - [23/Oct/2020 16:28:55] "#033[37mPOST /invocations HTTP/1.1#033[0m" 200 -[0m
[35m * Serving Flask app "serve" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://0.0.0.0:8080/ (Press CTRL+C to quit)[0m
[35m169.254.255.130 - - [23/Oct/2020 16:28:51] "#033[37mGET /ping HTTP/1.1#033[0m" 200 -[0m
[35m169.254.255.130 - - [23/Oct/2020 16:28:51] "#033[33mGE

In [39]:
s3_conn = boto3.client("s3")

bucket_name="mphasis-marketplace"

with open('Output_data.csv', 'wb') as f:
    s3_conn.download_fileobj(bucket_name, bucketFolder+'/Input_data.zip.out', f)
    print("Output file loaded from bucket")

Output file loaded from bucket


## Output
  
• Output (csv) file will combine both Dataframes and duplicate rows will have same MasterID

    

In [2]:
import pandas as pd
df_output=pd.read_csv("Output_data.csv")
df_output.head(5)

Unnamed: 0,MasterID,Source,SourceID,name_1_employee_name_2,address_1_employee_address_2,soc_sec_id_1_soc_sec_id_2,suburb_1,postcode_1,state_1,date_of_birth_1,suburb_2,postcode_2,date_of_birth_2
0,d5b7a9e8-154c-11eb-a372-0242a9feff83,dataset1.csv,0,michaela neumann,8 stanley street miami,5304218,winston hills,4223.0,nsw,19151111,,,
1,d5b7ab78-154c-11eb-a372-0242a9feff83,dataset1.csv,1,courtney painter,12 pinkerton circuit bega flats,4066625,richlands,4560.0,vic,19161214,,,
2,d5b7ac2c-154c-11eb-a372-0242a9feff83,dataset1.csv,2,charles green,38 salkauskas crescent kela,4365168,dapto,4566.0,nsw,19480930,,,
3,d5b7acc2-154c-11eb-a372-0242a9feff83,dataset1.csv,3,vanessa parr,905 macquoid place broadbridge manor,9239102,south grafton,2135.0,sa,19951119,,,
4,d5b7ad44-154c-11eb-a372-0242a9feff83,dataset1.csv,4,mikayla malloney,37 randwick road avalind,7207688,hoppers crossing,4552.0,vic,19860208,,,


### Duplicate Records - Example

In [23]:
df_output.groupby(['MasterID']).filter(lambda x: len(x) == 2).sort_values(by='MasterID').head(8)

Unnamed: 0,MasterID,Source,SourceID,name_1_employee_name_2,address_1_employee_address_2,soc_sec_id_1_soc_sec_id_2,suburb_1,postcode_1,state_1,date_of_birth_1,suburb_2,postcode_2,date_of_birth_2
7,d7c15b4e-154c-11eb-a372-0242a9feff83,dataset1.csv,7,blakeston broadby,53 traeger street valley of springs,4308555,north ward,3083.0,qld,19120907.0,,,
2492,d7c15b4e-154c-11eb-a372-0242a9feff83,dataset2.csv,992,blakeston broadby,53 traeger street valley of springs,4308555,,,,,north ward,3083.0,19120907.0
15,d7c1d6a0-154c-11eb-a372-0242a9feff83,dataset1.csv,15,lachlan godfrey,45 lienhop street sorrento,9107241,lalor,2350.0,sa,19080429.0,,,
2973,d7c1d6a0-154c-11eb-a372-0242a9feff83,dataset2.csv,1473,lachlan godfrey,45 lienhop street sorretno,9107241,,,,,lalor,2350.0,19080429.0
19,d7c230d2-154c-11eb-a372-0242a9feff83,dataset1.csv,19,caitlin basey,414 pelsart street st francis vlge,7811311,daisy hill,2113.0,nsw,19110711.0,,,
1764,d7c230d2-154c-11eb-a372-0242a9feff83,dataset2.csv,264,caitlin basey,414 pelsart street st francis vlge,7811311,,,,,daisyh ill,2113.0,19110711.0
26,d7c2894c-154c-11eb-a372-0242a9feff83,dataset1.csv,26,joselyn dakin,19 abernethy street kanangra hostel,4852063,manunda,3028.0,wa,19261205.0,,,
2650,d7c2894c-154c-11eb-a372-0242a9feff83,dataset2.csv,1150,dakin joselyn,57 aberneth ystreet kanangra hostel,4852063,,,,,manundza,3028.0,19261205.0


## Invoking through Endpoint

In [42]:
import json 
import uuid
from sagemaker import ModelPackage
import sagemaker as sage
from sagemaker import get_execution_role
from sagemaker import ModelPackage
import boto3
from IPython.display import Image
from PIL import Image as ImageEdit

role = get_execution_role()

sagemaker_session = sage.Session()
bucket=sagemaker_session.default_bucket()

In [43]:
content_type='application/zip'
model_name='data-deduplication-v1'
real_time_inference_instance_type='ml.m5.xlarge'

In [44]:
#model_package_arn = 'put your arn here'
model_package_arn = 'arn:aws:sagemaker:us-east-2:786796469737:model-package/data-deduplication-v1'

In [45]:
from sagemaker import ModelPackage
import sagemaker as sage
from sagemaker import get_execution_role

role = get_execution_role()
sagemaker_session = sage.Session()

In [46]:
#Define predictor wrapper class
def predict_wrapper(endpoint, session):
    return sage.RealTimePredictor(endpoint, session,content_type=content_type)
#create a deployable model from the model package.
model = ModelPackage(role=role,
                    model_package_arn=model_package_arn,
                    sagemaker_session=sagemaker_session,
                    predictor_cls=predict_wrapper)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


In [47]:
predictor = model.deploy(1, real_time_inference_instance_type, endpoint_name=model_name)

-----------!

### Invoking endpoint result through CLI command

In [48]:
file_name="Input_data.zip"

In [49]:
!aws sagemaker-runtime invoke-endpoint --endpoint-name $model_name --body fileb://$file_name --content-type 'application/zip' --region us-east-2 output.csv

{
    "ContentType": "text/csv; charset=utf-8",
    "InvokedProductionVariant": "AllTraffic"
}


In [None]:
predictor.delete_endpoint()