# Similarity Search for Product Validation Use-Case

In [1]:
import pandas as pd
import json
import time
import asyncio
import requests
import google.auth
import google.auth.transport.requests
from google.cloud import storage
from google.cloud import aiplatform
from google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint import Namespace
from google import genai
from google.genai.types import EmbedContentConfig

In [2]:
PROJECT_ID = "sandbox-401718" # @param
REGION = "us-central1" # @param
INPUT_FILE = "product_embeddings.jsonl" # @param

BUCKET_URI = f"gs://{PROJECT_ID}-product-textembedding-{REGION}"
INPUT_URI = f"{BUCKET_URI}/input-test"
OUTPUT_URI = f"{BUCKET_URI}/output-test"

### Create buckets

In [4]:
# ! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://sandbox-401718-product-textembedding-us-central1/...


## Textembeddings on GCS

Wrapped the Gemini call inside a helper function (embed_batch_sync) and then used asyncio.to_thread to run multiple batches concurrently. This gives you the speed of asynchronous processing while using the synchronous library function that works for you.

In [3]:
client = genai.Client(vertexai=True, project=PROJECT_ID, location=REGION)

In [4]:
BATCH_SIZE = 100


def embed_batch_sync(batch_of_items: list[dict]) -> list[dict]:
    """
    This function embeds a batch of items, preserving their original IDs.
    It now accepts a list of dictionaries, each with an 'id' and 'content'.
    """
    # 1. Extract just the content strings for the API call
    contents_to_embed = [item["content"] for item in batch_of_items]
    
    print(f"Embedding a batch of {len(contents_to_embed)} items...")
    try:
        # This is the synchronous API call
        response = client.models.embed_content(
            model="gemini-embedding-001",
            contents=contents_to_embed,  # Use the extracted content
            config=EmbedContentConfig(
                output_dimensionality=3072,  # Optional
                task_type="RETRIEVAL_DOCUMENT",  # Optional
            ),
        )

        # 2. Re-associate the original ID and content with the new embedding
        results = [
            {
                "id": item["id"],
                "content": item["content"],
                "embedding": embedding.values
            }
            for item, embedding in zip(batch_of_items, response.embeddings)
        ]
        return results
    except Exception as e:
        print(f"An error occurred with a batch: {e}")
        return []


async def main_async_runner(input_file: str):
    """Main function to read the file and run synchronous jobs concurrently."""
    start_time = time.time()

    tasks = []
    with open(input_file, "r") as f:
        # 3. This batch will hold dictionaries ({'id':..., 'content':...})
        item_batch = []
        for line in f:
            original_data = json.loads(line)
            item_batch.append(original_data) # Keep both id and content

            if len(item_batch) >= BATCH_SIZE:
                # Use asyncio.to_thread to run the blocking function in a separate thread
                task = asyncio.to_thread(embed_batch_sync, item_batch)
                tasks.append(task)
                item_batch = []

    # Process remaining items in the last batch
    if item_batch:
        task = asyncio.to_thread(embed_batch_sync, item_batch)
        tasks.append(task)

    print(f"Created {len(tasks)} concurrent tasks.")

    # asyncio.gather will wait for all the threads to complete
    all_batch_results = await asyncio.gather(*tasks)

    final_results = [item for batch in all_batch_results for item in batch]

    end_time = time.time()
    print(f"--- Process Finished ---")
    print(f"Total time taken: {end_time - start_time:.2f} seconds")
    return final_results

In [6]:
# Run the main asynchronous function
all_embeddings = await main_async_runner(INPUT_FILE)

# Convert the list of dictionaries to a pandas DataFrame
if all_embeddings:
    # 4. The 'id' column is now automatically created from your source data
    df = pd.DataFrame(all_embeddings)
    df['embedding_dim'] = df['embedding'].apply(len)
    print(f"\nSuccessfully generated {len(df)} embeddings.")
else:
    print("\nNo embeddings were generated. Please check for errors in the logs above.")

# 5. Reorder columns for clarity (The 'id' is the original one from the file)
df = df[['id', 'content', 'embedding', 'embedding_dim']]

# Display the DataFrame to verify the original 'id' is used
print("DataFrame with original 'id' from JSONL file:")
display(df.head())

Created 10 concurrent tasks.
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
--- Process Finished ---
Total time taken: 8.38 seconds

Successfully generated 1000 embeddings.


In [11]:
df.to_csv("product_embeddings_output.csv", index=False)

## Create Index for Vector Search

This section describes the process of creating an index for Vertex Vector Search. Bruce force (exhaustive) search index is used in this example, and is used to find the exact nearest neighbors to the query vector. Brute force Index is computationally rigorous compared to ANN which is focuses on performant approximations and retrieval efficiency.

For more information about the methods and their tradeoff: https://cloud.google.com/vertex-ai/docs/vector-search/create-manage-index

In [12]:
# Prepare the data in the required JSONL format for Vertex AI Vector Search

data_for_index = []
for index, row in df.iterrows():
    data_for_index.append({
        "id": str(row['id']),
        "embedding": row['embedding']
    })

# Define the output file name for the index data
INDEX_DATA_FILE_NAME = "product_index_data.json"

# Write the data to the JSONL file
with open(INDEX_DATA_FILE_NAME, "w") as f:
    for item in data_for_index:
        f.write(json.dumps(item) + "\n")

In [13]:
# Initialize the GCS client. It will infer the project from your authenticated environment.
storage_client = storage.Client()

# Get the bucket name by simply removing the "gs://" prefix from your BUCKET_URI.
bucket_name = BUCKET_URI.replace("gs://", "")
bucket = storage_client.bucket(bucket_name)

# Get the destination folder path from the INPUT_URI variable.
destination_folder = INPUT_URI.replace(BUCKET_URI, "").strip("/")

# Combine the folder path with the local filename to create the full blob name.
# This will result in a path like "input-test/recipes_index_data.jsonl"
destination_blob_name = f"{destination_folder}/{INDEX_DATA_FILE_NAME}"
blob = bucket.blob(destination_blob_name)

# Upload the local file you just created.
blob.upload_from_filename(INDEX_DATA_FILE_NAME)

# Store the full GCS path of the uploaded file. This will now point to your INPUT_URI folder.
index_data_gcs_uri = f"gs://{bucket_name}/{destination_blob_name}"

print(f"File '{INDEX_DATA_FILE_NAME}' successfully uploaded to:")
print(index_data_gcs_uri)

File 'product_index_data.json' successfully uploaded to:
gs://sandbox-401718-product-textembedding-us-central1/input-test/product_index_data.json


## Create Index
For similarity calculations, the documentation strongly recommends using DOT_PRODUCT_DISTANCE + UNIT_L2_NORM instead of the COSINE distance. These algorithms have been more optimized for the DOT_PRODUCT distance, and when combined with UNIT_L2_NORM, offers the same ranking and mathematical equivalence as the COSINE distance.

In [14]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=INPUT_URI)

In [15]:
DIMENSIONS = len(df["embedding"][0])
DISPLAY_NAME = "index_product_match"

In [16]:
ann_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=DISPLAY_NAME,
    contents_delta_uri=INPUT_URI, 
    dimensions=DIMENSIONS,
    approximate_neighbors_count=200,
    distance_measure_type="DOT_PRODUCT_DISTANCE",
    feature_norm_type="UNIT_L2_NORM",
    description="Similar Product match index",
)

Creating MatchingEngineIndex
Create MatchingEngineIndex backing LRO: projects/757654702990/locations/us-central1/indexes/5403556491774918656/operations/7735117954135621632
MatchingEngineIndex created. Resource name: projects/757654702990/locations/us-central1/indexes/5403556491774918656
To use this MatchingEngineIndex in another session:
index = aiplatform.MatchingEngineIndex('projects/757654702990/locations/us-central1/indexes/5403556491774918656')


In [17]:
INDEX_RESOURCE_NAME = ann_index.resource_name 

ann_index = aiplatform.MatchingEngineIndex(
    index_name=INDEX_RESOURCE_NAME
)

## Deploy

In [18]:
# Retrieve the project number
project_number_result = !gcloud projects list --filter="PROJECT_ID:'{PROJECT_ID}'" --format='value(PROJECT_NUMBER)'
PROJECT_NUMBER = project_number_result[0]
# PROJECT_NUMBER = 757654702990

VPC_NETWORK = "beusebio-network"
VPC_NETWORK_FULL = f"projects/{PROJECT_NUMBER}/global/networks/{VPC_NETWORK}"

In [19]:
# Endpoint
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name="index_endpoint_product",
    description="product index",
    network=VPC_NETWORK_FULL,
)

INDEX_ENDPOINT_NAME = my_index_endpoint.resource_name

Creating MatchingEngineIndexEndpoint
Create MatchingEngineIndexEndpoint backing LRO: projects/757654702990/locations/us-central1/indexEndpoints/3966107766178709504/operations/4224139832135254016
MatchingEngineIndexEndpoint created. Resource name: projects/757654702990/locations/us-central1/indexEndpoints/3966107766178709504
To use this MatchingEngineIndexEndpoint in another session:
index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/757654702990/locations/us-central1/indexEndpoints/3966107766178709504')


In [20]:
# Deploy
DEPLOYED_INDEX_ID = "product_index"
my_index_endpoint = my_index_endpoint.deploy_index(
    index=ann_index, deployed_index_id=DEPLOYED_INDEX_ID
)

my_index_endpoint.deployed_indexes

Deploying index MatchingEngineIndexEndpoint index_endpoint: projects/757654702990/locations/us-central1/indexEndpoints/3966107766178709504
Deploy index MatchingEngineIndexEndpoint index_endpoint backing LRO: projects/757654702990/locations/us-central1/indexEndpoints/3966107766178709504/operations/6961061768181317632
MatchingEngineIndexEndpoint index_endpoint Deployed index. Resource name: projects/757654702990/locations/us-central1/indexEndpoints/3966107766178709504


[id: "product_index"
index: "projects/757654702990/locations/us-central1/indexes/5403556491774918656"
create_time {
  seconds: 1764436645
  nanos: 921789000
}
private_endpoints {
  match_grpc_address: "10.116.0.14"
}
index_sync_time {
  seconds: 1764437975
  nanos: 656273000
}
automatic_resources {
  min_replica_count: 2
  max_replica_count: 2
}
deployment_group: "default"
]