# Harpin AI Identity Resolution - 50k Data Sample


## Pre-requisites
1. **Note**: This notebook contains elements which render correctly in Jupyter interface. Open this notebook from an Amazon SageMaker Notebook Instance or Amazon SageMaker Studio.
1. Ensure that IAM role used has **AmazonSageMakerFullAccess**.
1. Some hands-on experience using [Amazon SageMaker](https://aws.amazon.com/sagemaker/).
1. To use this algorithm successfully, ensure that:
    1. Either your IAM role has these three permissions and you have authority to make AWS Marketplace subscriptions in the AWS account used:
        1. **aws-marketplace:ViewSubscriptions**
        1. **aws-marketplace:Unsubscribe**
        1. **aws-marketplace:Subscribe**
    1. or your AWS account has already subscribed to this free product from AWS Marketplace: [Identity Resolution](https://aws.amazon.com/marketplace/pp/prodview-etnavzupbnthk?sr=0-7&ref_=beagle&applicationId=AWSMPContessa).

## Set up Amazon SageMaker environment

The sagemaker session remembers our connection parameters to Amazon SageMaker. We'll use it to perform all of our Amazon SageMaker operations.

In [1]:
import sagemaker
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()

In [2]:
#Set up your s3 bucket, or use the SageMaker default s3 bucket
s3_bucket = 'YOUR S3 BUCKET'
#s3_bucket = sagemaker_session.default_bucket()

#Set up the Algorithm ARN from your algorithm subscription
algorithm_arn = 'YOUR ALGORITHM ARN'

## Input Data Description
For this example, the input dataset contains 50,000 rows of sample identity data in a CSV format.  The file is located in the Git repository that was cloned into this notebook.Other notebooks will provide examples of loading data from different sources, such as AWS S3.  Data can be read from files in CSV, Avro, or Parquet format.

In [3]:
#Set up the common prefix
common_prefix = '/harpin/batch-resolution/sample-data-50k'

#Upload the configuration file (channel_config.yml) and sample data to S3.
config_local = 'channel_config.yml'
data_local = 'data'
config_prefix = common_prefix + '/config'
data_prefix = common_prefix + '/data'
source_config = sagemaker_session.upload_data(config_local, bucket=s3_bucket, key_prefix=config_prefix)
clustering_data = sagemaker_session.upload_data(data_local, bucket=s3_bucket, key_prefix=data_prefix)

#Set up the output s3 location for the identity graph
identity_graph = 's3://' + s3_bucket + '/' + common_prefix + '/identity-graph'
print('Input identity data location: ', clustering_data)
print('Source config file location: ', source_config)
print('Output identity graph location: ' + identity_graph)

## Create Identity Resolution SageMaker TrainingJob using Algorithm ARN

We will use the tools provided by the Amazon SageMaker Python SDK to create the [AlgorithmEstimator](https://sagemaker.readthedocs.io/en/stable/api/training/algorithm.html) to perform the job.

In [4]:
from sagemaker.algorithm import AlgorithmEstimator

algo = AlgorithmEstimator(
    algorithm_arn=algorithm_arn,
    role=role,
    instance_count=1,
    instance_type='ml.m5.2xlarge',
    base_job_name='harpin-ai-identity-resolution-50k-sample',
    output_path=identity_graph
)

## Run Identity Resolution Clustering with SageMaker TrainingJob
Note that the TrainingJob actually performs a clustering process. The clustering process produces an identity graph, which clusters the records in the input dataset into a set of dis-joint customer profiles. 

In [5]:
#Specify the input data sources for up to 3 channels (i.e. clustering, clustering2 and clustering3), and a channel config file.
#And run the identity resolution process by calling the fit() method
print('Now run the identity resolution clustering using Algorithm ARN %s in region %s' % (algorithm_arn, sagemaker_session.boto_region_name))
algo.fit({"clustering": clustering_data, 
          "channel_config": source_config})

INFO:sagemaker:Creating training-job with name: harpin-ai-identity-resolution-50k-sampl-2024-09-30-19-31-49-222


Now run the identity resolution clustering using Algorithm ARN arn:aws:sagemaker:us-west-2:594846645681:algorithm/identity-resolution13-1d953864e3d2303e932e639006777330 in region us-west-2
2024-09-30 19:31:50 Starting - Starting the training job...
2024-09-30 19:32:05 Starting - Preparing the instances for training...
2024-09-30 19:32:38 Downloading - Downloading the training image...
2024-09-30 19:33:04 Training - Training image download completed. Training in progress.[34m24/09/30 19:33:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable[0m
[34mSetting default log level to "WARN".[0m
[34mTo adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).[0m
[34m24/09/30 19:33:15 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).[0m
[34mStarting the i

## Identity Graph Data and Format
The identity graph will be stored in a folder with one or more files with the exact same type as the input files (csv, avro or parquet). If the input files are CSVs, then the output will contains CSV files too. All the fields in the input files will be retained in the output files, along with one additional field called PIN. The field PIN is the assigned unique customer profile identitfier. Customer records with the same (non-default) PIN are considered to be referring to the same customer profile. The default value for PIN is -1, meaning that there is not enough information available in the input record to determine which customer profile it belongs to.

In [6]:
#Here is the output path for storing the results from running the algorithm
path = algo.output_path
!aws s3 ls $path/

                           PRE harpin-ai-identity-resolution-50k-sampl-2024-09-30-19-04-31-044/
                           PRE harpin-ai-identity-resolution-50k-sampl-2024-09-30-19-31-49-222/


In [7]:
#Make sure that you change the value to match for your "specific_run".
specific_run = 'harpin-ai-identity-resolution-50k-sample-TIMESTAMP'

#Specify a temporary directory, and extract the identity graph from S3 to the temp_data directory for analysis
temp_data = './temp_data'
!rm -rf $temp_data
!mkdir -p $temp_data
!aws s3 cp $path/$specific_run/output/output.tar.gz $temp_data/
!tar -xzvf $temp_data/output.tar.gz -C $temp_data/

## Identity Graph Analysis
Now the identity resolution clustering process is finished and we have the identity graph. We can perform some simple analysis on the identity graph such as record count, unique customer profiles, duplicate identity analysis, etc.

In [8]:
#Import the pyspark libraries and create the spark object
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/09/30 19:49:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
#Load the identity graph into spark dataframe in csv
identity_graph = spark.read.format('csv').options(header='true', inferSchema='false', delimiter=',') \
                                         .option('mode', 'DROPMALFORMED') \
                      .load(temp_data + '/identity_graph/')

In [11]:
#List the fields and their data types in the identity graph
#The identity graph will contain the union of fields from all the input data sources, plus an additional field "pin"
identity_graph.dtypes

[('rid', 'string'),
 ('given_name', 'string'),
 ('middle_name', 'string'),
 ('sur_name', 'string'),
 ('dob', 'string'),
 ('email', 'string'),
 ('street_address', 'string'),
 ('city', 'string'),
 ('zip', 'string'),
 ('state', 'string'),
 ('country', 'string'),
 ('phone', 'string'),
 ('gender', 'string'),
 ('email2', 'string'),
 ('phone2', 'string'),
 ('ip_address', 'string'),
 ('loyalty_id', 'string'),
 ('pin', 'string')]

In [12]:
#Count the number of records in the identity graph
identity_graph.count()

50000

In [13]:
#Count the unique number of customer profiles in the identity graph
identity_graph.filter(F.col('pin') != '-1') \
              .select('pin') \
              .distinct() \
              .count()

                                                                                

26258

In [14]:
#Perform duplicate records analysis for the identity graph. For example, there are 47 records which are assigned the same PIN (10000000543). 
#Those records are considered to be referring to the same customer.
identity_graph.groupBy('pin') \
              .count() \
              .orderBy(F.desc('count')) \
              .show(20)

+-----------+-----+
|        pin|count|
+-----------+-----+
|         -1|  833|
|10000000129|    9|
|10000005343|    8|
|10000003761|    8|
|10000002045|    8|
|10000009889|    8|
|10000003635|    8|
|10000003935|    8|
|10000009518|    8|
|10000001604|    8|
|10000005128|    7|
|10000010153|    7|
|10000006268|    7|
|10000006251|    7|
|10000001798|    7|
|10000005264|    7|
|10000012096|    7|
|10000012324|    7|
|10000000566|    7|
|10000011963|    7|
+-----------+-----+
only showing top 20 rows



In [15]:
#Clean up the temporary directory
!rm -rf $temp_data