In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Intro & Disclaimer

**author**: elenamatay@google.com

**Purpose**: This notebook is intended to be used as a simple example that helps or inspires our Customer on their Entity Resolution solution implementation. It should in no way be treated as a production-ready or state-of-the-art asset, but rather used as a starting point.


# Part 1: Create and Query Vector Search index

## Setup - Imports & Env variables

In [51]:
import os
import json
import pickle
import pandas as pd
from datetime import datetime
from pathlib import Path
from google.cloud import storage
from google.cloud import aiplatform
from google.cloud import aiplatform_v1


PROJECT_ID = "my-project" # @param - the GCP project ID
LOCATION = "my-region" # @param - the GCP region (e.g. europe-west4)

BUCKET_NAME = "my-bucket" # @param - the GCS bucket where the product JSONs are stored
BUCKET_URI = f"gs://{BUCKET_NAME}"
NER_JSON_FOLDER = "my-folder" # @param - the folder in the bucket where the product JSONs are stored
OUTPUT_FOLDER = "my-folder" # @param - the folder in the bucket where the output JSONs (products enriched with data) will be stored
TEXT_EMBEDDING_FOLDER = "my-folder" # @param - the folder in the bucket where the text embeddings will be stored
IMAGES_FOLDER = "my-folder" # @param - Optional, the folder where product images will be stored


UID = datetime.now().strftime("%m%d%H%M")

# Set variables for the current deployed index.
INDEX_URI = BUCKET_URI+"/my_index" # @param - the Vector Search index URI
DISPLAY_NAME = "my-index" # @param - the Vector Search index display name
ENDPOINT_DISPLAY_NAME = "my-endpoint" # @param - the Vector Search index endpoint display name
DEPLOYED_INDEX_ID = "my-index-id" # @param - the Vector Search index ID
APPROXIMATE_NEIGHBORS_COUNT = 10 # @param - the number of approximate neighbors to return
DIMENSIONS = 256 # @param - the number of dimensions for the embeddings
API_ENDPOINT = "manynumbers.vdb.vertexai.goog" # @param - the Vector search API endpoint
INDEX_ENDPOINT = "my-index-endpoint" # @param - the Vector Searh index endpoint ID

# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=LOCATION)

# Initialize the Google Cloud Storage client
storage_client = storage.Client(project=PROJECT_ID)
BUCKET = storage_client.bucket(BUCKET_NAME)

# Configure Vector Search client
client_options = {
  "api_endpoint": API_ENDPOINT
}
vector_search_client = aiplatform_v1.MatchServiceClient(
  client_options=client_options,
)

## Helper functions

In [9]:
# Extract string metadata from JSON to restricts (format needed for Vector Search filtering)
def extract_restrictions_from_dict(parent_namespace, data_dict):
    """Recursively extracts restrictions from nested dictionaries."""
    restrictions = []
    for key, value in data_dict.items():
        namespace = f"{parent_namespace}/{key}"
        if isinstance(value, str):
            restrictions.append({"namespace": namespace, "allow": [value]})
        elif isinstance(value, list):
            for item in value:
                if isinstance(item, dict):
                    restrictions.extend(extract_restrictions_from_dict(namespace, item))
                elif item is not None:
                    restrictions.append({"namespace": namespace, "allow": [str(item)]})
        elif isinstance(value, (int, float)):
            restrictions.append({"namespace": namespace, 
                                  "value_" + type(value).__name__: value})
        elif value is None:
            restrictions.append({"namespace": namespace, "allow": []})
    return restrictions

# Process data to be able to filter

## For data in GCS bucket (one folder with embeddings, other with attributes)

### Create new JSONs including product ID, embedding, and metadata in restricts format
**Source** - [Docs: Data format required for filtering](https://cloud.google.com/vertex-ai/docs/vector-search/filtering#specify-namespaces-tokens-or-values)

#### Just print with one product (to test it works)

In [None]:
def process_product(product_id):
    """Processes a single product to create a merged JSON with embeddings and restrictions."""
    try:
        # Load NER data
        ner_blob = BUCKET.blob(os.path.join(NER_JSON_FOLDER, f'{product_id}.json'))
        ner_data = json.loads(ner_blob.download_as_string())

        # Load embedding data
        embedding_blob = BUCKET.blob(os.path.join(TEXT_EMBEDDING_FOLDER, f'{product_id}.json'))
        embedding_data = json.loads(embedding_blob.download_as_string())

        # Extract embedding
        embedding = embedding_data['embedding']

        # Prepare restrictions
        restricts = []
        numeric_restricts = []

        # Iterate through attributes in 'data'
        for attribute_name, attribute_value in ner_data.get('data', {}).items():
            # Determine attribute type and handle accordingly
            if isinstance(attribute_value, str):
                restricts.append({"namespace": attribute_name, "allow": [attribute_value]})
            elif isinstance(attribute_value, dict):
                # Handle nested dictionaries recursively
                restricts.extend(extract_restrictions_from_dict(attribute_name, attribute_value))
            elif isinstance(attribute_value, list):
                # Handle lists (could contain dictionaries, strings, or numbers)
                for item in attribute_value:
                    if isinstance(item, dict):
                        restricts.extend(extract_restrictions_from_dict(attribute_name, item))
                    elif item is not None:  # Include non-null values
                        restricts.append({"namespace": attribute_name, "allow": [str(item)]})
            elif isinstance(attribute_value, (int, float)):
                numeric_restricts.append({"namespace": attribute_name, 
                                          "value_" + type(attribute_value).__name__: attribute_value})
            elif attribute_value is None:
                restricts.append({"namespace": attribute_name, "allow": []})
                
        # Create the merged data
        merged_data = {
            "id": product_id,
            "embedding": embedding,
            "restricts": restricts,
            "numeric_restricts": numeric_restricts
        }

        print(f"Processed product {product_id}: {json.dumps(merged_data)}")

    except Exception as e:
        print(f"Error processing product {product_id}: {e}")

#### Upload results to GCS

-Code commented as it was already uploaded, uncomment if want to do a new upload-

In [None]:
# def process_product(product_id):
#     """Processes a single product to create a merged JSON with embeddings and restrictions."""
#     try:
#         # Load NER data
#         ner_blob = BUCKET.blob(os.path.join(NER_JSON_FOLDER, f'{product_id}.json'))
#         ner_data = json.loads(ner_blob.download_as_string())

#         # Load embedding data
#         embedding_blob = BUCKET.blob(os.path.join(TEXT_EMBEDDING_FOLDER, f'{product_id}.json'))
#         embedding_data = json.loads(embedding_blob.download_as_string())

#         # Extract embedding
#         embedding = embedding_data['embedding']

#         # Prepare restrictions
#         restricts = []
#         numeric_restricts = []

#         # Iterate through attributes in 'data'
#         for attribute_name, attribute_value in ner_data.get('data', {}).items():
#             # Determine attribute type and handle accordingly
#             if isinstance(attribute_value, str):
#                 restricts.append({"namespace": attribute_name, "allow": [attribute_value]})
#             elif isinstance(attribute_value, dict):
#                 # Handle nested dictionaries recursively
#                 restricts.extend(extract_restrictions_from_dict(attribute_name, attribute_value))
#             elif isinstance(attribute_value, list):
#                 # Handle lists (could contain dictionaries, strings, or numbers)
#                 for item in attribute_value:
#                     if isinstance(item, dict):
#                         restricts.extend(extract_restrictions_from_dict(attribute_name, item))
#                     elif item is not None:  # Include non-null values
#                         restricts.append({"namespace": attribute_name, "allow": [str(item)]})
#             elif isinstance(attribute_value, (int, float)):
#                 numeric_restricts.append({"namespace": attribute_name, 
#                                           "value_" + type(attribute_value).__name__: attribute_value})
#             elif attribute_value is None:
#                 restricts.append({"namespace": attribute_name, "allow": []})
                
#         # Create the merged data
#         merged_data = {
#             "id": product_id,
#             "embedding": embedding,
#             "restricts": restricts,
#             "numeric_restricts": numeric_restricts
#         }

#         # Upload the merged data to the output folder
#         output_blob = BUCKET.blob(os.path.join(OUTPUT_FOLDER, f'{product_id}.json'))
#         output_blob.upload_from_string(json.dumps(merged_data))

#         print(f"Processed product {product_id}")

#     except Exception as e:
#         print(f"Error processing product {product_id}: {e}")

# # Get a list of all product IDs (assuming filenames are product IDs)
# blobs = BUCKET.list_blobs(prefix=NER_JSON_FOLDER)
# product_ids = [blob.name.split('/')[-1].split('.')[0] for blob in blobs]

# # Process each product
# for product_id in product_ids[:30]:
#     process_product(product_id)

### For data in GCS (embeddings) + pickle (attributes)

#### Just print - for testing purposes
Create new JSONs including product ID, embedding, and metadata in restricts format.

**Source** - [Docs: Data format required for filtering](https://cloud.google.com/vertex-ai/docs/vector-search/filtering#specify-namespaces-tokens-or-values)

In [1]:
product_data_pickle = 'product_with_embeddings.pkl'

In [17]:
def process_products(product_data_pickle):
    """Processes the first 5 products to create JSONs with embeddings and combined category restrictions."""
    product_metadata = pd.read_pickle(product_data_pickle).head(5)  

    for _, row in product_metadata.iterrows():
        product_id = row['id']
        try:
            embedding_blob = BUCKET.blob(os.path.join(TEXT_EMBEDDING_FOLDER, f'{product_id}.json'))
            embedding = json.loads(embedding_blob.download_as_string())['embedding']

            # Combine categories under the same namespace
            restricts = []
            categories = row.get('categories', []) 

            if categories:
                restricts.append({"namespace": "category", "allow": list(set(categories))})

            # Use json.dumps with double quotes
            merged_data_str = json.dumps({
                "id": product_id,
                "embedding": embedding,
                "restricts": restricts,
            })

            # Just printing:
            print(f"Processed product {product_id}: {merged_data_str}")

        except Exception as e:
            print(f"Error processing product {product_id}: {e}")


process_products(product_data_pickle)


Processed product 7014542: {"id": "7014542", "embedding": [-0.038705796003341675, -0.013637830503284931, 0.008773560635745525, -0.0010383558692410588, 0.005564272869378328, 0.027952738106250763, 0.010825978592038155, -0.03772643953561783, 0.011578280478715897, 0.013310451991856098, 0.009192606434226036, 4.7592118789907545e-05, 0.027162015438079834, 0.010100962594151497, 0.008223128505051136, -0.058381155133247375, 0.03186623007059097, 0.028071442618966103, -0.12360720336437225, 0.024802975356578827, 0.05325445532798767, -0.019877633079886436, 0.042826808989048004, 0.0047453222796320915, -0.03647266700863838, -0.005100157577544451, 0.04150101915001869, 0.03186649829149246, -0.06308475881814957, -0.04534391313791275, -0.016683271154761314, 0.043205954134464264, 0.00488685816526413, -0.04089771583676338, 0.011693982407450676, 0.024341866374015808, -0.009603875689208508, -0.016349272802472115, 0.005081045441329479, -0.060591790825128555, -0.006151492707431316, -0.02098490670323372, 0.01457

#### Upload results to GCS -50 products only to check if filtering works-

-Code commented as it was already uploaded (to [this folder](https://console.cloud.google.com/storage/browser/elodin-europe-4/elena_test_index_for_filtering?e=13802955&mods=dm_deploy_from_gcs&project=elodin-426209&pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22)))), uncomment if want to do a new upload-

In [20]:
# def process_products(product_data_pickle):
#     """Processes the first 50 products to create JSONs with embeddings and combined category restrictions."""
#     product_metadata = pd.read_pickle(product_data_pickle).head(50)  

#     for _, row in product_metadata.iterrows():
#         product_id = row['id']
#         try:
#             embedding_blob = BUCKET.blob(os.path.join(TEXT_EMBEDDING_FOLDER, f'{product_id}.json'))
#             embedding = json.loads(embedding_blob.download_as_string())['embedding']

#             # Combine categories under the same namespace
#             restricts = []
#             categories = row.get('categories', []) 

#             if categories:
#                 restricts.append({"namespace": "category", "allow": list(set(categories))})

#             # Use json.dumps with double quotes
#             merged_data_str = json.dumps({
#                 "id": product_id,
#                 "embedding": embedding,
#                 "restricts": restricts,
#             })

#             # Upload the merged data to the output folder
#             output_blob = BUCKET.blob(os.path.join(OUTPUT_FOLDER, f'{product_id}.json'))
#             output_blob.upload_from_string(merged_data_str)

#         except Exception as e:
#             print(f"Error processing product {product_id}: {e}")


# process_products(product_data_pickle)


## Create & Deploy Index with ready-to-filter data

In [21]:
# Create Index
filtering_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name = DISPLAY_NAME,
    contents_delta_uri = f"{BUCKET_URI}/{OUTPUT_FOLDER}",
    dimensions = 256,
    approximate_neighbors_count = 10
)

Creating MatchingEngineIndex
Create MatchingEngineIndex backing LRO: projects/787645211092/locations/europe-west4/indexes/7405958480978247680/operations/1248819534391934976
MatchingEngineIndex created. Resource name: projects/787645211092/locations/europe-west4/indexes/7405958480978247680
To use this MatchingEngineIndex in another session:
index = aiplatform.MatchingEngineIndex('projects/787645211092/locations/europe-west4/indexes/7405958480978247680')


In [28]:
# Create Index Endpoint

# Create IndexEndpoint
filtering_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name=ENDPOINT_DISPLAY_NAME,
     public_endpoint_enabled = True,
)

Creating MatchingEngineIndexEndpoint
Create MatchingEngineIndexEndpoint backing LRO: projects/787645211092/locations/europe-west4/indexEndpoints/2582392173831913472/operations/8554221079940300800
MatchingEngineIndexEndpoint created. Resource name: projects/787645211092/locations/europe-west4/indexEndpoints/2582392173831913472
To use this MatchingEngineIndexEndpoint in another session:
index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/787645211092/locations/europe-west4/indexEndpoints/2582392173831913472')


In [29]:
# Deploy the Index to the Index Endpoint
filtering_index_endpoint.deploy_index(
    index = filtering_index,
    deployed_index_id = DEPLOYED_INDEX_ID
)

Deploying index MatchingEngineIndexEndpoint index_endpoint: projects/787645211092/locations/europe-west4/indexEndpoints/2582392173831913472
Deploy index MatchingEngineIndexEndpoint index_endpoint backing LRO: projects/787645211092/locations/europe-west4/indexEndpoints/2582392173831913472/operations/8568857778729254912
MatchingEngineIndexEndpoint index_endpoint Deployed index. Resource name: projects/787645211092/locations/europe-west4/indexEndpoints/2582392173831913472


<google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint.MatchingEngineIndexEndpoint object at 0x10ad90f70> 
resource name: projects/787645211092/locations/europe-west4/indexEndpoints/2582392173831913472

## Query index

### Helper

In [35]:

def get_product_embedding(product_id):
    product_blob = BUCKET.blob(os.path.join(OUTPUT_FOLDER, f'{product_id}.json'))
    product_data = json.loads(product_blob.download_as_string())
    embeddings = product_data['embedding']

    return embeddings

def get_product_categories(product_id):
    product_blob = BUCKET.blob(os.path.join(OUTPUT_FOLDER, f'{product_id}.json'))
    product_data = json.loads(product_blob.download_as_string())

    # Check if 'restricts' key exists and is not empty
    if 'restricts' in product_data and product_data['restricts']:
        for restrict_item in product_data['restricts']:
            # Check if 'namespace' key exists and is equal to 'category'
            if 'namespace' in restrict_item and restrict_item['namespace'] == 'category':
                # Check if 'allow' key exists
                if 'allow' in restrict_item:
                    categories = restrict_item['allow']
                    break  # Exit the loop if 'category' is found

        return categories


In [88]:
def find_neighbours_ids(product_id, num_neighbours):
    
    # Build FindNeighborsRequest object
    datapoint = aiplatform_v1.IndexDatapoint(
    feature_vector = get_product_embedding(product_id),
    restricts=[
        aiplatform_v1.IndexDatapoint.Restriction(
        namespace="category",
        allow_list=get_product_categories(product_id),
        deny_list=[],
    )]
    )

    query = aiplatform_v1.FindNeighborsRequest.Query(
    datapoint=datapoint,

    # The number of nearest neighbors to be retrieved
    neighbor_count=num_neighbours
    )
    request = aiplatform_v1.FindNeighborsRequest(
    index_endpoint=INDEX_ENDPOINT,
    deployed_index_id=DEPLOYED_INDEX_ID,
    # Request can have multiple queries
    queries=[query],
    return_full_datapoint=False,
    )

    # Execute the request
    response = vector_search_client.find_neighbors(request)
    
    # Initialize an empty list for datapoint_ids
    datapoint_ids = []

    # Assuming vector_search_client.find_neighbors(request) returns a response object
    # with an iterable nearest_neighbors attribute
    response = vector_search_client.find_neighbors(request)

    # Iterate over the NearestNeighbors objects in the response
    for nearest_neighbor in response.nearest_neighbors:
        # Each nearest_neighbor object contains multiple neighbors
        # Iterate over each neighbor in the nearest_neighbor object
        for neighbor in nearest_neighbor.neighbors:
            # Extract the datapoint_id and convert it to an integer (or keep as string if preferred)
            datapoint_id = int(neighbor.datapoint.datapoint_id)
            
            # Append the datapoint_id to the list
            datapoint_ids.append(datapoint_id)
        
    datapoint_ids = datapoint_ids[1:] # Discard the first neighbour as it's the promper input embedding

    # datapoint_ids now contains the list of datapoint IDs
    return datapoint_ids

In [89]:
candidate_neighbours = find_neighbours_ids(7133276, 10)
print(candidate_neighbours)

[7209394, 7167926, 7133446, 7096421, 7248310, 7059758, 7208753, 7096723, 7096919]


# Part 2: Call Gemini to check which from top N neighbours are really the same product

### Setup and Env variables definition

In [23]:
import os
import json

import vertexai
import vertexai.preview.generative_models as generative_models
from google.cloud import storage
from google.cloud import aiplatform
from vertexai.generative_models import GenerativeModel, Part, FinishReason

# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=LOCATION)

# Initialize the Google Cloud Storage client
storage_client = storage.Client(project=PROJECT_ID)
BUCKET = storage_client.bucket(BUCKET_NAME)

### Helper

In [56]:
def get_product_json(product_id):
    product_blob = BUCKET.blob(os.path.join(NER_JSON_FOLDER, f'{product_id}.json'))
    product_json = json.dumps(json.loads(product_blob.download_as_string()))
    return product_json

def load_product_image(product_id):
    
    image_uri = f"gs://{BUCKET_NAME}/{IMAGES_FOLDER}/{product_id}.png"
    print(image_uri)

    # blob = BUCKET.blob(image_uri)
    # print(blob)
    # print(blob.exists())

    # if blob.exists():
    product_image = Part.from_uri(mime_type="image/png", uri=image_uri)
    return product_image
    # else:
    #     return "[No images available for this product]"

generation_config = {
    "max_output_tokens": 8192,
    "temperature": 0.1,
    "top_p": 0.95,
}

safety_settings = {
    generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
    generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
    generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
    generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
}

def gen_neighbours_verification(prompt):
  vertexai.init(project=PROJECT_ID, location=LOCATION)
  model = GenerativeModel(
    "gemini-1.5-flash-001",
  )
  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=True,
  )

  for response in responses:
    print(response.text, end="")

image1 = Part.from_uri(
    mime_type="image/png",
    uri=f"{BUCKET_URI}/images/7072341.png")

generation_config = {
    "max_output_tokens": 8192,
    "temperature": 1,
    "top_p": 0.95,
}



In [90]:
def create_candidate_products_list(candidate_neighbours):
    prompt_parts = []
    
    for index, product_id in enumerate(candidate_neighbours, start=1):
        product_json = get_product_json(product_id)  # Fetch product JSON using the helper function
        #product_image = load_product_image(product_id)  # Fetch product image using the helper function
        
        # Append the formatted string for each candidate product to the list
        prompt_parts.append(f"""Candidate product {index}:
        Candidate product {index} JSON:
        {product_json}
        Candidate product {index} image:
        """)

        prompt_parts.append(f"{load_product_image(product_id)}")
    
    # Join all parts into a single string to be used as a prompt
    prompt = "\n".join(prompt_parts)
    
    print(prompt)
    return prompt

In [None]:
candidate_products_list = create_candidate_products_list(candidate_neighbours)

In [None]:
def generate_prompt(input_product_id, candidate_neighbours):
    prompt = ["""
        You are a Retail assistant that checks if retailer products that are candidate to be repeated, are actually the same product or not.
        You will now see a list of products defined by a JSON file describing the product attributes, and sometimes (optional) a product image.
        You will need to compare the first product to all the other (candidate) products, and verify which of the candidates are actually the same than the initial product.
        Not all products have images about them, but this doesn't mean they are not the same product. If a product doesn't have an image, just take into account the JSONs comparison for your evaluation.
                
        Perform the verification, and return the products that are identified as actually the same as the input one. Return only the product IDs for these products.

        Input product:
        Input product JSON:
        """
        , get_product_json(input_product_id),
        """
        Input product image:
        """
        , load_product_image(input_product_id),
        """
        
        Candidate products to be verified if the same than input product:
        """
        
        , create_candidate_products_list(candidate_neighbours),
        
        """
        If you don't get any candidate products, request these to the user.

        If you did get the candidate products, now it's your turn for verifying.
        Compare the first product to each of the candidate products, one by one, and verify if any of them are actually the same than the initial product based on their attributes and optional images.
        It could happen that more than one candidate is the same product than the input product. Return all products that are identified as the same as the input.
        Not all products have images about them, but this doesn't mean they are not the same product. If a product doesn't have an image, just take into account the JSONs comparison for your evaluation.
        Return the IDs for just the products identified to be exactly the same as the input one. If no match with the input products, explain what each product (input and candidates) is about, with the ID and a couple of words for the product.

    """]
    return prompt

prompt = generate_prompt(7133276, candidate_neighbours)

(previous cell output was cleared to anonymise the notebook, but was showing the closest neighbours)

In [95]:
gen_neighbours_verification(prompt)

The products with ID: **7209394** are the same as the input product. 

The product with ID 7133276 is Pro Plan EN Gastroenteric dog food.
The product with ID 7209394 is Pro Plan veterinary diets EN gastroenteric dog food.
The product with ID 7167926 is Purina Dog Chow Adult Mini and Small Breed Dog Food.
The product with ID 7133446 is Sportsman's Choice Dog Food for Adult Dogs.
The product with ID 7096421 is Purina Dog Chow Adult Mini and Small Breed Dry Dog Food.
The product with ID 7248310 is Purina Dog Chow Adult Mini and Small Breed Dry Food.
The product with ID 7059758 is Purina Dog Chow Adult Mini and Small Breed Dog Food.
The product with ID 7208753 is Dog Chow Croquetas Adult Mini/Small Breed.
The product with ID 7096723 is Sportsman's Choice Dog Food for Adult Dogs.
The product with ID 7096919 is Sportsman's Choice Dog Food. 
