# Recipe Vector Search Index Creation

This notebook demonstrates how to create a vector search index for recipe similarity matching using Google Cloud Vertex AI. The process includes generating embeddings for recipes, creating a vector search index, and deploying it for real-time similarity queries.

## Setup and Dependencies

Import required libraries for data processing, Google Cloud services, and asynchronous operations.

In [None]:
import pandas as pd
import json
from google.cloud import storage
import asyncio
import time

## Project Configuration

Set up project parameters including Google Cloud project ID, region, and storage bucket URIs for input and output data.

## Cloud Storage Setup

Create Google Cloud Storage buckets for storing recipe data and embeddings. This bucket will be used throughout the indexing process.

In [77]:
PROJECT_ID = "sandbox-401718" # @param
REGION = "us-central1" # @param
INPUT_FILE = "recipes_content.jsonl" # @param
BUCKET_URI = f"gs://{PROJECT_ID}-recipe-textembedding-{REGION}"
INPUT_URI = f"{BUCKET_URI}/input-test"
OUTPUT_URI = f"{BUCKET_URI}/output-test"

In [78]:
# ! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

## Embedding Generation with Concurrent Processing

Generate text embeddings for recipe content using Gemini's embedding model. This section implements asynchronous processing to handle large batches of recipes efficiently, significantly reducing processing time from sequential to concurrent execution.

### Initialize Gemini Client

Set up the Gemini AI client for embedding generation with Vertex AI integration.

In [79]:
from google import genai
from google.genai.types import EmbedContentConfig

client = genai.Client(vertexai=True, project=PROJECT_ID, location=REGION)

### Sample Data Generation (Optional)

Commented code for creating sample recipe data if needed for testing purposes.

In [81]:
# import json

# # Create a dummy JSONL file
# dummy_data = [
#     {"content": "Title:Miso-Butter Roast Chicken With Acorn Squash Panzanella,Ingredients:1 (3½–4-lb.) whole chicken, 2¾ tsp. kosher salt..."},
#     {"content": "Title:Classic Chocolate Chip Cookies,Ingredients:1 cup butter, 3/4 cup white sugar, 3/4 cup brown sugar, 2 eggs, 1 tsp vanilla..."},
#     {"content": "Title:Spaghetti Carbonara,Ingredients:1 lb spaghetti, 2 large eggs, 1/2 cup grated Pecorino Romano cheese, 4 slices guanciale..."},
#     {"content": "Title:Vegan Lentil Soup,Ingredients:1 tbsp olive oil, 1 large onion, 2 carrots, 2 celery stalks, 2 cloves garlic, 1 tsp dried thyme..."},
#     {"content": "Title:Simple Guacamole,Ingredients:3 ripe avocados, 1/2 small onion, 1 lime, juiced, 1/2 tsp salt, 2 tbsp chopped cilantro..."},
# ]

# INPUT_FILE = "recipes.jsonl"
# with open(INPUT_FILE, "w") as f:
#     for item in dummy_data:
#         f.write(json.dumps(item) + "\n")

# print(f"Created '{INPUT_FILE}' with {len(dummy_data)} rows.")

### Asynchronous Embedding Functions

Define functions for concurrent embedding generation:
- `embed_batch_sync()`: Synchronous function to embed a single batch
- `main_async_runner()`: Main asynchronous coordinator that processes multiple batches concurrently

In [83]:
BATCH_SIZE = 100


def embed_batch_sync(batch_of_content: list[str]) -> list[dict]:
    """
    This is a standard SYNCHRONOUS function that embeds one batch.
    It uses the exact API call structure that works in your environment.
    """
    print(f"Embedding a batch of {len(batch_of_content)} items...")
    try:
        # This is the synchronous API call

        response = client.models.embed_content(
            model="gemini-embedding-001",
            contents=batch_of_content,
            config=EmbedContentConfig(
                output_dimensionality=3072,  # Optional
                task_type="RETRIEVAL_DOCUMENT",  # Optional
            ),
        )

        results = [
            {"content": content, "embedding": embedding.values}
            for content, embedding in zip(batch_of_content, response.embeddings)
        ]
        return results
    except Exception as e:
        print(f"An error occurred with a batch: {e}")
        return []


async def main_async_runner(input_file: str):
    """Main function to read the file and run synchronous jobs concurrently."""
    start_time = time.time()

    tasks = []
    with open(input_file, "r") as f:
        content_batch = []
        for line in f:
            original_data = json.loads(line)
            content_batch.append(original_data["content"])

            if len(content_batch) >= BATCH_SIZE:
                # Use asyncio.to_thread to run the blocking function in a separate thread

                task = asyncio.to_thread(embed_batch_sync, content_batch)
                tasks.append(task)
                content_batch = []
    if content_batch:
        task = asyncio.to_thread(embed_batch_sync, content_batch)
        tasks.append(task)
    print(f"Created {len(tasks)} concurrent tasks.")

    # asyncio.gather will wait for all the threads to complete

    all_batch_results = await asyncio.gather(*tasks)

    final_results = [item for batch in all_batch_results for item in batch]

    end_time = time.time()
    print(f"--- Process Finished ---")
    print(f"Total time taken: {end_time - start_time:.2f} seconds")
    return final_results


In [85]:
# Run the main asynchronous function
all_embeddings = await main_async_runner(INPUT_FILE)

# Convert the list of dictionaries to a pandas DataFrame for easy analysis
if all_embeddings:
    df = pd.DataFrame(all_embeddings)
    df['embedding_dim'] = df['embedding'].apply(len)
    
    print(f"\nSuccessfully generated {len(df)} embeddings.")
else:
    print("\nNo embeddings were generated. Please check for errors in the logs above.")
    
    
# Create a unique ID for each recipe using its index
df['id'] = df.index.astype(str)

# Reorder columns to have 'id' first for clarity
df = df[['id', 'content', 'embedding', 'embedding_dim']]

# Display the DataFrame with the new 'id' column
print("DataFrame with unique 'id' column:")
display(df.head())

Created 10 concurrent tasks.
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
Embedding a batch of 100 items...
--- Process Finished ---
Total time taken: 9.95 seconds

Successfully generated 1000 embeddings.
DataFrame with unique 'id' column:


Unnamed: 0,id,content,embedding,embedding_dim
0,0,Title:Miso-Butter Roast Chicken With Acorn Squ...,"[-0.0023760846816003323, -0.011549703776836395...",3072
1,1,"Title:Crispy Salt and Pepper Potatoes,Ingredie...","[0.01674710586667061, 0.010629256255924702, 0....",3072
2,2,"Title:Thanksgiving Mac and Cheese,Ingredients:...","[-0.0013591762399300933, -0.002013501012697816...",3072
3,3,"Title:Italian Sausage and Bread Stuffing,Ingre...","[0.006325311027467251, -0.005718541797250509, ...",3072
4,4,"Title:Newton's Law,Ingredients:1 teaspoon dark...","[0.011500250548124313, -0.005415198393166065, ...",3072


In [86]:
df.to_csv("recipes_embeddings.csv", index=False)

## Vector Search Index Creation

Create a vector search index for efficient similarity matching.

### Prepare Index Data

Format the embeddings data for Vertex AI Vector Search by creating JSONL format with ID and embedding pairs.

In [87]:
# Prepare the data in the required JSONL format for Vertex AI Vector Search

data_for_index = []
for index, row in df.iterrows():
    data_for_index.append({
        "id": str(row['id']),
        "embedding": row['embedding']
    })

# Define the output file name for the index data
INDEX_DATA_FILE_NAME = "recipes_index_data.json"

# Write the data to the JSONL file
with open(INDEX_DATA_FILE_NAME, "w") as f:
    for item in data_for_index:
        f.write(json.dumps(item) + "\n")

### Upload to Cloud Storage

Upload the prepared index data to Google Cloud Storage for use by the Vector Search service.

In [89]:
# Initialize the GCS client. It will infer the project from your authenticated environment.
storage_client = storage.Client()

# Get the bucket name by simply removing the "gs://" prefix from your BUCKET_URI.
bucket_name = BUCKET_URI.replace("gs://", "")
bucket = storage_client.bucket(bucket_name)

# Get the destination folder path from the INPUT_URI variable.
destination_folder = INPUT_URI.replace(BUCKET_URI, "").strip("/")

# Combine the folder path with the local filename to create the full blob name.
# This will result in a path like "input-test/recipes_index_data.jsonl"
destination_blob_name = f"{destination_folder}/{INDEX_DATA_FILE_NAME}"
blob = bucket.blob(destination_blob_name)

# Upload the local file you just created.
blob.upload_from_filename(INDEX_DATA_FILE_NAME)

# Store the full GCS path of the uploaded file. This will now point to your INPUT_URI folder.
index_data_gcs_uri = f"gs://{bucket_name}/{destination_blob_name}"

print(f"File '{INDEX_DATA_FILE_NAME}' successfully uploaded to:")
print(index_data_gcs_uri)

File 'recipes_index_data.json' successfully uploaded to:
gs://sandbox-401718-recipe-textembedding-us-central1/input-test/recipes_index_data.json


## Create Vector Search Index

Configure and create the vector search index using optimized distance metrics. The documentation recommends using `DOT_PRODUCT_DISTANCE` with `UNIT_L2_NORM` instead of `COSINE` distance for better performance while maintaining mathematical equivalence.

### Initialize AI Platform

Set up Google Cloud AI Platform with project configuration and staging bucket.

In [90]:
import os
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=INPUT_URI)

### Configure Index Parameters

Set the embedding dimensions and display name for the vector search index.

In [91]:
DIMENSIONS = len(df["embedding"][0])
DISPLAY_NAME = "index_recipe_match"

### Create the Index

Create the Tree AH (Approximate Hierarchical) index with optimized distance measurement settings.

In [92]:
ann_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=DISPLAY_NAME,
    contents_delta_uri=f"{INPUT_URI}",
    dimensions=DIMENSIONS,
    approximate_neighbors_count=200,
    distance_measure_type="DOT_PRODUCT_DISTANCE",
    feature_norm_type="UNIT_L2_NORM",
    description="Similar Recipe match index",
)

Creating MatchingEngineIndex
Create MatchingEngineIndex backing LRO: projects/757654702990/locations/us-central1/indexes/6928561527314186240/operations/1185869212346744832
MatchingEngineIndex created. Resource name: projects/757654702990/locations/us-central1/indexes/6928561527314186240
To use this MatchingEngineIndex in another session:
index = aiplatform.MatchingEngineIndex('projects/757654702990/locations/us-central1/indexes/6928561527314186240')


In [93]:
INDEX_RESOURCE_NAME = ann_index.resource_name #'projects/757654702990/locations/us-central1/indexes/9080369554546229248'

ann_index = aiplatform.MatchingEngineIndex(
    index_name=INDEX_RESOURCE_NAME
)

## Index Deployment

Deploy the created vector search index to an endpoint for real-time querying. This involves creating an index endpoint and deploying the index to it.

### Create Index Endpoint

Create a Matching Engine Index Endpoint that will host the deployed index for querying.

In [94]:
# Retrieve the project number
PROJECT_NUMBER = !gcloud projects list --filter="PROJECT_ID:'{PROJECT_ID}'" --format='value(PROJECT_NUMBER)'
PROJECT_NUMBER = PROJECT_NUMBER[0]
# PROJECT_NUMBER = 757654702990

VPC_NETWORK = "beusebio-network"
VPC_NETWORK_FULL = f"projects/{PROJECT_NUMBER}/global/networks/{VPC_NETWORK}"

In [95]:
# Endpoint
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name="index_endpoint_recipe",
    description="recipe index",
    network=VPC_NETWORK_FULL,
)

INDEX_ENDPOINT_NAME = my_index_endpoint.resource_name

Creating MatchingEngineIndexEndpoint
Create MatchingEngineIndexEndpoint backing LRO: projects/757654702990/locations/us-central1/indexEndpoints/3153243217810423808/operations/7168901317308448768
MatchingEngineIndexEndpoint created. Resource name: projects/757654702990/locations/us-central1/indexEndpoints/3153243217810423808
To use this MatchingEngineIndexEndpoint in another session:
index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/757654702990/locations/us-central1/indexEndpoints/3153243217810423808')


In [96]:
ann_index

<google.cloud.aiplatform.matching_engine.matching_engine_index.MatchingEngineIndex object at 0x7f22bd4a5a80> 
resource name: projects/757654702990/locations/us-central1/indexes/6928561527314186240

### Deploy Index to Endpoint

Deploy the vector search index to the endpoint, making it available for similarity queries. The deployment creates the final infrastructure needed for recipe similarity matching.

In [97]:
# Deploy
DEPLOYED_INDEX_ID = "recipe_index"
my_index_endpoint = my_index_endpoint.deploy_index(
    index=ann_index, deployed_index_id=DEPLOYED_INDEX_ID
)

my_index_endpoint.deployed_indexes

Deploying index MatchingEngineIndexEndpoint index_endpoint: projects/757654702990/locations/us-central1/indexEndpoints/3153243217810423808
Deploy index MatchingEngineIndexEndpoint index_endpoint backing LRO: projects/757654702990/locations/us-central1/indexEndpoints/3153243217810423808/operations/7826426862904541184
MatchingEngineIndexEndpoint index_endpoint Deployed index. Resource name: projects/757654702990/locations/us-central1/indexEndpoints/3153243217810423808


[id: "recipe_index"
index: "projects/757654702990/locations/us-central1/indexes/6928561527314186240"
create_time {
  seconds: 1756927700
  nanos: 743241000
}
private_endpoints {
  match_grpc_address: "10.116.0.14"
}
index_sync_time {
  seconds: 1756928971
  nanos: 493166000
}
automatic_resources {
  min_replica_count: 2
  max_replica_count: 2
}
deployment_group: "default"
]