# Similarity Search for Fuzzy Matching Use-Case

This notebook introduces the goal of outlining the primary task of deduplicating customer account data by leveraging text embeddings methods and semantic similarity matching. Synthetic data with the following column information is used: 'temp_cust_id', 'business_name_hash_key', 'address_hash_key', 'business_name', 'line1', 'line2', 'city', 'state', 'zip', 'zip_ext', 'country', 'gpc1_source_id'

### The steps performed include:
* Parameters, variables, and any helper functions are defined
* Sample pre-processing to deduplicate initial data; Combine and transform relevant data columns
* Apply text embeddings using REST requests
* Create and Deploy Index for Vector Search
* Query Index and get Ranked Results that map back to original dataset

### Import Libraries & Define Parameters

In [None]:
# # Install the packages
# ! pip3 install --upgrade --quiet google-cloud-aiplatform \
#                                  google-cloud-storage

In [61]:
import pandas as pd
import json
import google.auth.transport.requests
import google.auth
from google.cloud import storage
import requests
from google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint import \
    Namespace

In [62]:
PROJECT_ID = "sandbox-401718" # @param
REGION = "us-central1" # @param
BUCKET_URI = f"gs://{PROJECT_ID}-fuzzymatch-textembedding-{REGION}"
INPUT_URI = f"{BUCKET_URI}/input-test"
OUTPUT_URI = f"{BUCKET_URI}/output-test"

### Create buckets

In [9]:
# ! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://sandbox-401718-fuzzymatch-textembedding-us-central1/...


## Pre-processing Logic to deduplicate data

Preprocess involves cleaning, transforming, and standardizing the raw data to reduce noise and inconsistencies. Preprocessing can help ensure optimal performance and accurate results for downstream matching algorithms.

Sample preprocessing techniques to consider:
* Remove exact duplicate records (included below) 
* Normalize keyword variations in spelling and abbreviations (e.g., "Ave" to "Avenue")
* Ensure consistent formatting across all data points
* Identify and correct invalid or inaccurate domain information

In [63]:
df = pd.read_csv('synthetic-data.csv')\
# .head(1000)

In [64]:
df_hashed = df

# Create a boolean mask indicating duplicate rows based on hashed columns.
duplicate_mask = df_hashed.duplicated(subset=['business_name_hash_key', 'address_hash_key'])

# Invert the mask to select non-duplicate rows.
unique_mask = ~duplicate_mask

# Create a new DataFrame with only unique rows.
df_hashed_unique = df_hashed[unique_mask].reset_index(drop=True)

In [65]:
original_rows = len(df)
unique_rows = len(df_hashed_unique)
num_duplicates = original_rows - unique_rows

print("Number of rows in original DataFrame:", original_rows)
print("Number of rows in de-duplicated DataFrame:", unique_rows)
print("Number of duplicate rows removed:", num_duplicates)

df = df_hashed_unique

Number of rows in original DataFrame: 100000
Number of rows in de-duplicated DataFrame: 99769
Number of duplicate rows removed: 231


In [None]:
df.head()

## Textembeddings on GCS

### Combine and transform relevant data columns

Engineer data to be optimized for embedding generation. For instance, merge granular address components like zip code and city into a unified address field to enhance semantic representation during embedding

In [68]:
# Create the Address column, handling NaN in line2 and zip_ext and including business_name

df["Address"] = df.apply(
    lambda row: f"{row['business_name']}, {row['line1']} {row['line2'] if not pd.isna(row['line2']) else ''}, "
    f"{row['city']}, {row['state']} "
    f"{row['zip']}{'-' + row['zip_ext'] if not pd.isna(row['zip_ext']) else ''}, "
    f"{row['country']}".strip(),
    axis=1,
)


In [69]:
# Create JSON file
# Create a list of dictionaries from the 'Address' column
address_data = [{"content": address} for address in df['Address']]

# Save to a JSONL file
with open('input.jsonl', 'w') as outfile:
    for entry in address_data:
        json.dump(entry, outfile)
        outfile.write('\n') 

In [70]:
# save to GCS 
storage_client = storage.Client()

BUCKET_NAME = "/".join(INPUT_URI.split("/")[:3])
bucket = storage_client.bucket(BUCKET_NAME[5:])

# Define the blob including any folders from INPUT_URI
blob_name = "/".join(INPUT_URI.split("/")[3:])+"/input.jsonl"
blob = bucket.blob(blob_name)

# Upload the file 
blob.upload_from_filename("input.jsonl")

print(f"File uploaded to cloud storage in {INPUT_URI}")

File uploaded to cloud storage in gs://sandbox-401718-fuzzymatch-textembedding-us-central1/input-test


### REST request for Batch Prediction Job

This section details the code that makes the API request to Vertex AI to generate the text embeddings.

In [71]:
# Credentials

# Set up Application Default Credentials (ADC)
credentials, project_id = google.auth.default()
auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
access_token = credentials.token

In [72]:
MODEL = "publishers/google/models/text-embedding-004"
url = f"https://us-central1-aiplatform.googleapis.com/v1/projects/{PROJECT_ID}/locations/us-central1/batchPredictionJobs"

headers = {
        'Authorization': 'Bearer ' + access_token,
        'Content-Type': 'application/json; charset=utf-8'
    }

In [73]:
request_body = str(
    {
        "name": "batch-test",
        "displayName": "batch-test",
        "model": MODEL,
        "inputConfig": {
            "instancesFormat": "jsonl",
            "gcs_source": {"uris": [f"{INPUT_URI}/input.jsonl"]},
        },
        "outputConfig": {
            "predictionsFormat": "jsonl",
            "gcs_destination": {"output_uri_prefix": OUTPUT_URI},
        },
    }
)


In [74]:
# request_body = '{"name": "test", "displayName": "test", "model": "publishers/google/models/text-embedding-004", "inputConfig": {"instancesFormat": "jsonl", "gcs_source": {"uris": ["gs://sandbox-401718-fuzzymatch-textembedding/input-test/input.jsonl"]}}, "outputConfig": {"predictionsFormat": "jsonl", "gcs_destination": {"output_uri_prefix": "gs://sandbox-401718-fuzzymatch-textembedding/output-test"}}}'
r = requests.post(url, data=request_body, headers=headers)

In [75]:
print(r.status_code)

200


In [76]:
print(r.content)

b'{\n  "name": "projects/757654702990/locations/us-central1/batchPredictionJobs/4021747624389378048",\n  "displayName": "test",\n  "model": "publishers/google/models/text-embedding-004",\n  "inputConfig": {\n    "instancesFormat": "jsonl",\n    "gcsSource": {\n      "uris": [\n        "gs://sandbox-401718-fuzzymatch-textembedding-us-central1/input-test/input.jsonl"\n      ]\n    }\n  },\n  "outputConfig": {\n    "predictionsFormat": "jsonl",\n    "gcsDestination": {\n      "outputUriPrefix": "gs://sandbox-401718-fuzzymatch-textembedding-us-central1/output-test"\n    }\n  },\n  "state": "JOB_STATE_PENDING",\n  "createTime": "2024-07-03T04:16:45.381697Z",\n  "updateTime": "2024-07-03T04:16:45.381697Z",\n  "modelVersionId": "1"\n}\n'


### Import JSON From Path and Map

This section outlines the steps of retrieving the generated embeddings files from the Batch Job output and creating a mapping to tie it back to the respective accounts in the original data. Users are to specify the output embeddings files.

* Download embed file: The embedding results from the batch prediction job are downloaded from GCS.
* Map embeddings to original dataset: The downloaded embeddings are associated with their corresponding addresses in the original DataFrame.

In [78]:
EMBED_FILES = [
    "gs://sandbox-401718-fuzzymatch-textembedding-us-central1/output-test/prediction-model-2024-07-03T04:16:45.355581Z/000000000000.jsonl",  # @param
    "gs://sandbox-401718-fuzzymatch-textembedding-us-central1/output-test/prediction-model-2024-07-03T04:16:45.355581Z/000000000001.jsonl",
]

# Write to the local JSON Lines file directly
with open("embeddings.jsonl", "w", encoding="utf-8") as outfile:
    for embed_file in EMBED_FILES:
        bucket_name = embed_file.split("/")[2]
        blob_name = "/".join(embed_file.split("/")[3:])
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)

        # Download the entire file as a string

        file_content = blob.download_as_string().decode("utf-8")
        lines = file_content.splitlines()
        for line in lines:
            outfile.write(line + "\n")  # Write each line as a separate JSON object
print(f"Combined JSON Lines data saved to `embeddings.jsonl`")


Combined JSON Lines data saved to `embeddings.jsonl`


In [79]:
# Load the JSON data from your local .jsonl file
response_json = []
with open("embeddings.jsonl", "r") as f:
    for line in f:
        response_json.append(json.loads(line))  # Parse each line

# Create a dictionary to map addresses to embeddings
address_embedding_map = {}
for item in response_json:
    address = item['instance']['content']
    embedding = item['predictions'][0]['embeddings']['values'] 
    address_embedding_map[address] = embedding

# Map embeddings to the DataFrame using the address lookup
df['Embeddings'] = df['Address'].map(address_embedding_map)

In [None]:
df.head() # Embeddings column now included

In [81]:
# check unique id's
unique_cust_ids = df['temp_cust_id'].unique()
len(unique_cust_ids) == len(df)

True

## Create Index for Vector Search

This section describes the process of creating an index for Vertex Vector Search. Bruce force (exhaustive) search index is used in this example, and is used to find the exact nearest neighbors to the query vector. Brute force Index is computationally rigorous compared to ANN which is focuses on performant approximations and retrieval efficiency.

For more information about the methods and their tradeoff: https://cloud.google.com/vertex-ai/docs/vector-search/create-manage-index

### Format data
https://cloud.google.com/vertex-ai/docs/vector-search/setup/format-structure

In [82]:
# Create a list of dictionaries (same as before)
data = []
for index, row in df.iterrows(): 
    data.append({
        "id": str(row['temp_cust_id']),
        "embedding": row['Embeddings'],
        # "address_hash_key": row['address_hash_key']
    })
    
# Export the data as a JSON Lines file
with open("index_input_data.json", "w", encoding="utf-8") as f:
    for entry in data:
        json.dump(entry, f)  # Write each dictionary as JSON
        f.write('\n')        # Add a newline to separate objects 

In [83]:
# save to GCS 
BUCKET_NAME = "/".join(INPUT_URI.split("/")[:3])
bucket = storage_client.bucket(BUCKET_NAME[5:])

# Define the blob including any folders from INPUT_URI
blob_name = "/".join(INPUT_URI.split("/")[3:])+"/initial/index_input_data.json"
blob = bucket.blob(blob_name)

# Upload the file 
blob.upload_from_filename("index_input_data.json")

print(f"File uploaded to cloud storage in {INPUT_URI}/initial/")

File uploaded to cloud storage in gs://sandbox-401718-fuzzymatch-textembedding-us-central1/input-test/initial/


### Create Index

For similarity calculations, the documentation strongly recommends using DOT_PRODUCT_DISTANCE + UNIT_L2_NORM instead of the COSINE distance. These algorithms have been more optimized for the DOT_PRODUCT distance, and when combined with UNIT_L2_NORM, offers the same ranking and mathematical equivalence as the COSINE distance.

In [84]:
import os
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=INPUT_URI)

In [85]:
DIMENSIONS = len(df["Embeddings"][0])
DISPLAY_NAME = "index_fuzzy_match"
DISPLAY_NAME_BRUTE_FORCE = DISPLAY_NAME + "_brute_force"

In [86]:
brute_force_index = aiplatform.MatchingEngineIndex.create_brute_force_index(
    display_name=DISPLAY_NAME_BRUTE_FORCE,
    contents_delta_uri=f"{INPUT_URI}/initial/",
    dimensions=DIMENSIONS,
    distance_measure_type="DOT_PRODUCT_DISTANCE",
    feature_norm_type="UNIT_L2_NORM",
    description="Fuzzy Match index (brute force)",
)

Creating MatchingEngineIndex
Create MatchingEngineIndex backing LRO: projects/757654702990/locations/us-central1/indexes/7759715352098897920/operations/3180206300940206080
MatchingEngineIndex created. Resource name: projects/757654702990/locations/us-central1/indexes/7759715352098897920
To use this MatchingEngineIndex in another session:
index = aiplatform.MatchingEngineIndex('projects/757654702990/locations/us-central1/indexes/7759715352098897920')


In [87]:
INDEX_BRUTE_FORCE_RESOURCE_NAME = brute_force_index.resource_name #'projects/757654702990/locations/us-central1/indexes/9080369554546229248'

brute_force_index = aiplatform.MatchingEngineIndex(
    index_name=INDEX_BRUTE_FORCE_RESOURCE_NAME
)

In [None]:
# tree_ah_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
#     display_name=DISPLAY_NAME,
#     contents_delta_uri=EMBEDDINGS_INITIAL_URI,
#     dimensions=DIMENSIONS,
#     approximate_neighbors_count=150,
#     distance_measure_type="DOT_PRODUCT_DISTANCE",
#     leaf_node_embedding_count=500,
#     leaf_nodes_to_search_percent=7,
#     description="Glove 100 ANN index",
#     labels={"label_name": "label_value"},
# )

### Deploy Index to Endpoint

In [88]:
# Retrieve the project number
PROJECT_NUMBER = !gcloud projects list --filter="PROJECT_ID:'{PROJECT_ID}'" --format='value(PROJECT_NUMBER)'
PROJECT_NUMBER = PROJECT_NUMBER[0]
# PROJECT_NUMBER = 757654702990

VPC_NETWORK = "beusebio-network"
VPC_NETWORK_FULL = f"projects/{PROJECT_NUMBER}/global/networks/{VPC_NETWORK}"

In [89]:
# Endpoint
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name="index_endpoint_for_fuzzy_match",
    description="fuzzy matching index",
    network=VPC_NETWORK_FULL,
)

INDEX_ENDPOINT_NAME = my_index_endpoint.resource_name

Creating MatchingEngineIndexEndpoint
Create MatchingEngineIndexEndpoint backing LRO: projects/757654702990/locations/us-central1/indexEndpoints/4631376072657600512/operations/3168947301871779840
MatchingEngineIndexEndpoint created. Resource name: projects/757654702990/locations/us-central1/indexEndpoints/4631376072657600512
To use this MatchingEngineIndexEndpoint in another session:
index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/757654702990/locations/us-central1/indexEndpoints/4631376072657600512')


In [90]:
brute_force_index

<google.cloud.aiplatform.matching_engine.matching_engine_index.MatchingEngineIndex object at 0x7fa10ac5d030> 
resource name: projects/757654702990/locations/us-central1/indexes/7759715352098897920

In [91]:
# Deploy
DEPLOYED_BRUTE_FORCE_INDEX_ID = "fuzzy_match_brute_force_deploy_full"
my_index_endpoint = my_index_endpoint.deploy_index(
    index=brute_force_index, deployed_index_id=DEPLOYED_BRUTE_FORCE_INDEX_ID
)

my_index_endpoint.deployed_indexes

Deploying index MatchingEngineIndexEndpoint index_endpoint: projects/757654702990/locations/us-central1/indexEndpoints/4631376072657600512
Deploy index MatchingEngineIndexEndpoint index_endpoint backing LRO: projects/757654702990/locations/us-central1/indexEndpoints/4631376072657600512/operations/3310810690133950464
MatchingEngineIndexEndpoint index_endpoint Deployed index. Resource name: projects/757654702990/locations/us-central1/indexEndpoints/4631376072657600512


[id: "fuzzy_match_brute_force_deploy_full"
index: "projects/757654702990/locations/us-central1/indexes/7759715352098897920"
create_time {
  seconds: 1720046077
  nanos: 116225000
}
private_endpoints {
  match_grpc_address: "10.116.0.5"
}
index_sync_time {
  seconds: 1720046291
  nanos: 278326000
}
automatic_resources {
  min_replica_count: 2
  max_replica_count: 2
}
deployment_group: "default"
]

### Query and get ranked results
Query the deployed index to find similar addresses.

In [182]:
query = df["Embeddings"][9000] # @param

In [183]:
# Test query
response = my_index_endpoint.match(
    deployed_index_id=DEPLOYED_BRUTE_FORCE_INDEX_ID,
    queries=[query],
    num_neighbors=10,
)

response

[[MatchNeighbor(id='020ec9083f20ab04cb779b8990967817', distance=0.9999999403953552, feature_vector=None, crowding_tag=None, restricts=None, numeric_restricts=None),
  MatchNeighbor(id='0a1be698f1d315c5dba9b3581a931710', distance=0.9554516077041626, feature_vector=None, crowding_tag=None, restricts=None, numeric_restricts=None),
  MatchNeighbor(id='0d7c94165eaf73c39cf02f01db468d2f', distance=0.7347900867462158, feature_vector=None, crowding_tag=None, restricts=None, numeric_restricts=None),
  MatchNeighbor(id='0bbbb1ac1580e80eb8a800b93eaf6080', distance=0.7340426445007324, feature_vector=None, crowding_tag=None, restricts=None, numeric_restricts=None),
  MatchNeighbor(id='097c65235d1a20fc7475fe933d83d66c', distance=0.7240421772003174, feature_vector=None, crowding_tag=None, restricts=None, numeric_restricts=None),
  MatchNeighbor(id='15a186457d087e2379429d750eee11f4', distance=0.7237255573272705, feature_vector=None, crowding_tag=None, restricts=None, numeric_restricts=None),
  MatchNei

### Map Response back to Address

Retrieve the original addresses from the Vector Search results (which are initially provided as customer IDs).

In [None]:
matched_data = []
for neighbor in response[0]:  # Accessing the inner list
    matched_id = neighbor.id
    distance = neighbor.distance
    matched_address = df[df["temp_cust_id"] == matched_id]["Address"].iloc[0]
    matched_data.append(
        {"ID": matched_id, "Address": matched_address, "Distance": distance}
    )
matched_df = pd.DataFrame(matched_data)
[(address) for address in matched_df["Address"]] 

### Considerations for Refining and Improving the Fuzzy Matching Results
* **Threshold Selection:**  Choose an appropriate similarity threshold to define matches, balancing accuracy and the tolerance for errors. 
* **Integration:** Seamlessly integrate this similarity search process into your existing fuzzy matching workflows and systems. 
* **Post-Processing:** Apply additional techniques and rules to refine the matching results further.
* **Domain Heuristics:** Consider incorporating domain knowledge and rules to improve the quality of the fuzzy matching.
* **FIne Tuning:** Fine Tune the Text Gecko Embeddings model to increase overall embedding task effectiveness.