In [1]:
! pip3 install --upgrade google-cloud-aiplatform \
                        google-cloud-storage \
                        'google-cloud-bigquery[pandas]'

Collecting google-cloud-aiplatform
  Downloading google_cloud_aiplatform-1.61.0-py2.py3-none-any.whl.metadata (31 kB)
Collecting google-cloud-storage
  Downloading google_cloud_storage-2.18.2-py2.py3-none-any.whl.metadata (9.1 kB)
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,<3.0.0dev,>=1.34.1 (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,<3.0.0dev,>=1.34.1->google-cloud-aiplatform)
  Downloading google_api_core-2.19.1-py3-none-any.whl.metadata (2.7 kB)
Collecting google-resumable-media>=2.7.2 (from google-cloud-storage)
  Downloading google_resumable_media-2.7.2-py2.py3-none-any.whl.metadata (2.2 kB)
Downloading google_cloud_aiplatform-1.61.0-py2.py3-none-any.whl (5.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.1/5.1 MB[0m [31m37.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading google_cloud_storage-2.18.2-py2.py3-none-any.whl (130 kB)
[2K   [90m━━━━━

Restart kernel after installs so that your environment can access the new packages


In [2]:
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)


{'status': 'ok', 'restart': True}

Setup the environment values for your project.

In [1]:
PROJECT = !gcloud config get-value project
PROJECT_ID = PROJECT[0]
REGION = "us-east1"

Import and initialize the Vertex AI Python SDK.

In [2]:
import vertexai
vertexai.init(project = PROJECT_ID,
              location = REGION)

The dataset used for this lab is the StackOverflow dataset. This public dataset is hosted in Google BigQuery and is included in BigQuery's 1TB/mo of free tier processing. This means that each user receives 1TB of free BigQuery processing every month, which can be used to run queries on this public dataset.

Stack Overflow is the largest online community for programmers to learn, share their knowledge, and advance their careers. Updated on a quarterly basis, this BigQuery dataset includes an archive of Stack Overflow content, including posts, votes, tags, and badges. This dataset is updated to mirror the Stack Overflow content on the Internet Archive, and is also available through the Stack Exchange Data Explorer.

The BigQuery table is too large to fit into memory, so you need to write a generator called query_bigquery_chunks to yield chunks of the dataframe for processing. Additionally, an extra column title_with_body is added, which is a concatenation of the question title and body that will be used for creating embeddings.

Import the libraries and initialize the BigQuery client.


In [3]:
import math
from typing import Any, Generator

import pandas as pd
from google.cloud import bigquery

client = bigquery.Client(project=PROJECT_ID)

Define the BigQuery query for the remote dataset.

In [4]:
QUERY_TEMPLATE = """
        SELECT distinct q.id, q.title, q.body
        FROM (SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` where Score>0 ORDER BY View_Count desc) AS q
        LIMIT {limit} OFFSET {offset};
        """

Create a function to access the BigQuery data in chunks.

In [5]:
def query_bigquery_chunks(
    max_rows: int, rows_per_chunk: int, start_chunk: int = 0
) -> Generator[pd.DataFrame, Any, None]:
    for offset in range(start_chunk, max_rows, rows_per_chunk):
        query = QUERY_TEMPLATE.format(limit=rows_per_chunk, offset=offset)
        query_job = client.query(query)
        rows = query_job.result()
        df = rows.to_dataframe()
        df["title_with_body"] = df.title + "\n" + df.body
        yield df

Get a dataframe of 1000 rows for demonstration purposes.

In [6]:
df = next(query_bigquery_chunks(max_rows=1000, rows_per_chunk=1000))

# Examine the data
df.head()

I0000 00:00:1723298849.983210     405 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache


Unnamed: 0,id,title,body,title_with_body
0,17836143,"I use string.split(""|"") and I am getting array...","<p>My code:</p>\n\n<pre><code> result =""sub||...","I use string.split(""|"") and I am getting array..."
1,17819298,"Jade HTML attribute called ""attribute""","<p>Working with Polymer Project markup, the <c...","Jade HTML attribute called ""attribute""\n<p>Wor..."
2,18075375,Create custom ListView with shadow,<p>How could I create custom <code>ListView</c...,Create custom ListView with shadow\n<p>How cou...
3,18179726,how to open pdf file to another tab in browser...,<pre><code>im making currently making my thesi...,how to open pdf file to another tab in browser...
4,18173191,UICollectionReusableView method not being called,<p>I want my sections in the <code>UICollectio...,UICollectionReusableView method not being call...


Load the Vertex AI Embeddings for Text model.

In [7]:
from typing import List, Optional
from vertexai.preview.language_models import TextEmbeddingModel

model = TextEmbeddingModel.from_pretrained("text-embedding-004")

Define an embedding method that uses the model.

In [8]:
def encode_texts_to_embeddings(sentences: List[str]) -> List[Optional[List[float]]]:
    try:
        embeddings = model.get_embeddings(sentences)
        return [embedding.values for embedding in embeddings]
    except Exception:
        return [None for _ in range(len(sentences))]

According to the documentation, each request can handle up to 5 text instances. So we will need to split the BigQuery question results in batches of 5 before sending to the embedding API.

Create a generate_batches to split results in batches of 5 to be sent to the embeddings API.

In [9]:
import functools
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Generator, List, Tuple

import numpy as np
from tqdm.auto import tqdm


# Generator function to yield batches of sentences
def generate_batches(
    sentences: List[str], batch_size: int
) -> Generator[List[str], None, None]:
    for i in range(0, len(sentences), batch_size):
        yield sentences[i : i + batch_size]

Encapsulate the process of generating batches and calling the embeddings API in a method called encode_text_to_embedding_batched. This method also handles rate-limiting using time.sleep. For production use cases, you would want a more sophisticated rate-limiting mechanism that takes retries into account.

In [10]:
def encode_text_to_embedding_batched(
    sentences: List[str], api_calls_per_second: int = 10, batch_size: int = 5
) -> Tuple[List[bool], np.ndarray]:

    embeddings_list: List[List[float]] = []

    # Prepare the batches using a generator
    batches = generate_batches(sentences, batch_size)

    seconds_per_job = 1 / api_calls_per_second

    with ThreadPoolExecutor() as executor:
        futures = []
        for batch in tqdm(
            batches, total=math.ceil(len(sentences) / batch_size), position=0
        ):
            futures.append(
                executor.submit(functools.partial(encode_texts_to_embeddings), batch)
            )
            time.sleep(seconds_per_job)

        for future in futures:
            embeddings_list.extend(future.result())

    is_successful = [
        embedding is not None for sentence, embedding in zip(sentences, embeddings_list)
    ]
    embeddings_list_successful = np.squeeze(
        np.stack([embedding for embedding in embeddings_list if embedding is not None])
    )
    return is_successful, embeddings_list_successful

Test the encoding function by encoding a subset of data and see if the embeddings and distance metrics make sense.


In [11]:
# Encode a subset of questions for validation
questions = df.title.tolist()[:500]
is_successful, question_embeddings = encode_text_to_embedding_batched(
    sentences=df.title.tolist()[:500]
)

# Filter for successfully embedded sentences
questions = np.array(questions)[is_successful]

  0%|          | 0/100 [00:00<?, ?it/s]

Save the dimension size for later usage when creating the Vertex AI Vector Search index.

In [12]:
DIMENSIONS = len(question_embeddings[0])

print(DIMENSIONS)

768


Sort questions in order of similarity. According to the embedding documentation, the similarity of embeddings is calculated using the dot-product, with np.dot. Once you have the similarity score, sort the results and print them for inspection. 1 means very similar, 0 means very different.

In [13]:
import random

question_index = random.randint(0, 99)

print(f"Query question = {questions[question_index]}")

# Get similarity scores for each embedding by using dot-product.
scores = np.dot(question_embeddings[question_index], question_embeddings.T)

# Print top 20 matches
for index, (question, score) in enumerate(
    sorted(zip(questions, scores), key=lambda x: x[1], reverse=True)[:20]
):
    print(f"\t{index}: {question}: {score}")

Query question = Having issues preserving variable contents in closure
	0: Having issues preserving variable contents in closure: 0.9999981469995682
	1: Return reference to argument object: 0.542115084178388
	2: Global vars with jsFiddle: 0.5274585932955009
	3: Session variable is lost on RedirectToAction in IE: 0.4960334294011826
	4: Uncaught TypeError: Object function (){return i.apply(this,arguments)} has no method 'on': 0.4859915756741863
	5: How do I reference the outer "$(this)" in jquery?: 0.48440742705501616
	6: Saving function evaluations while using std::min_element(): 0.4759444377204304
	7: Dictionary is empty on deserialization: 0.4735764952108908
	8: Why do I see duplicate javascript async responses?: 0.4700778842273614
	9: Java regex finds only last match when I want all: 0.46294989592238034
	10: flot chart plothover event fires but item is always null in bar chart: 0.45219314193317717
	11: I use string.split("|") and I am getting array that have every char in one array e

Save the embeddings in JSONL format. The data must be formatted in JSONL format, which means each embedding dictionary is written as an individual JSON object on its own line.

In [14]:
import tempfile
from pathlib import Path

# Create temporary file to write embeddings to
embeddings_file_path = Path(tempfile.mkdtemp())

print(f"Embeddings directory: {embeddings_file_path}")

Embeddings directory: /tmp/tmp1fe4ylub


Write embeddings in batches to prevent out-of-memory errors. Notice we are only using 5000 questions so that the embedding creation process and indexing is faster. The dataset contains more than 50,000 questions. This step will take around 5 minutes.


In [15]:
import gc
import json

BQ_NUM_ROWS = 5000
BQ_CHUNK_SIZE = 1000
BQ_NUM_CHUNKS = math.ceil(BQ_NUM_ROWS / BQ_CHUNK_SIZE)

START_CHUNK = 0

# Create a rate limit of 300 requests per minute. Adjust this depending on your quota.
API_CALLS_PER_SECOND = 300 / 60
# According to the docs, each request can process 5 instances per request
ITEMS_PER_REQUEST = 5

# Loop through each generated dataframe, convert
for i, df in tqdm(
    enumerate(
        query_bigquery_chunks(
            max_rows=BQ_NUM_ROWS, rows_per_chunk=BQ_CHUNK_SIZE, start_chunk=START_CHUNK
        )
    ),
    total=BQ_NUM_CHUNKS - START_CHUNK,
    position=-1,
    desc="Chunk of rows from BigQuery",
):
    # Create a unique output file for each chunk
    chunk_path = embeddings_file_path.joinpath(
        f"{embeddings_file_path.stem}_{i+START_CHUNK}.json"
    )
    with open(chunk_path, "a") as f:
        id_chunk = df.id

        # Convert batch to embeddings
        is_successful, question_chunk_embeddings = encode_text_to_embedding_batched(
            sentences=df.title_with_body.to_list(),
            api_calls_per_second=API_CALLS_PER_SECOND,
            batch_size=ITEMS_PER_REQUEST,
        )

        # Append to file
        embeddings_formatted = [
            json.dumps(
                {
                    "id": str(id),
                    "embedding": [str(value) for value in embedding],
                }
            )
            + "\n"
            for id, embedding in zip(id_chunk[is_successful], question_chunk_embeddings)
        ]
        f.writelines(embeddings_formatted)

        # Delete the DataFrame and any other large data structures
        del df
        gc.collect()

Chunk of rows from BigQuery:   0%|          | 0/5 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

Upload the text-embeddings to Cloud Storage, so that Vertex AI Vector Search can access them later.



Define a bucket where you will store your embeddings.


In [16]:
BUCKET_URI = f"gs://{PROJECT_ID}-unique"

Create your Cloud Storage bucket.


In [17]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

I0000 00:00:1723299513.278119     405 work_stealing_thread_pool.cc:320] WorkStealingThreadPoolImpl::PrepareFork


Creating gs://qwiklabs-gcp-03-a735eb03bf93-unique/...


Upload the training data to a Google Cloud Storage bucket.


In [18]:
remote_folder = f"{BUCKET_URI}/{embeddings_file_path.stem}/"
! gsutil -m cp -r {embeddings_file_path}/* {remote_folder}

I0000 00:00:1723299543.706102     405 work_stealing_thread_pool.cc:320] WorkStealingThreadPoolImpl::PrepareFork


Copying file:///tmp/tmp1fe4ylub/tmp1fe4ylub_2.json [Content-Type=application/json]...
Copying file:///tmp/tmp1fe4ylub/tmp1fe4ylub_0.json [Content-Type=application/json]...
Copying file:///tmp/tmp1fe4ylub/tmp1fe4ylub_1.json [Content-Type=application/json]...
Copying file:///tmp/tmp1fe4ylub/tmp1fe4ylub_3.json [Content-Type=application/json]...
Copying file:///tmp/tmp1fe4ylub/tmp1fe4ylub_4.json [Content-Type=application/json]...
- [5/5 files][  6.2 MiB/  6.2 MiB] 100% Done                                    
Operation completed over 5 objects/6.2 MiB.                                      


Setup your index name and description.

In [19]:
DISPLAY_NAME = "stack_overflow"
DESCRIPTION = "question titles and bodies from stackoverflow"

Create the index. Notice that the index reads the embeddings from the Cloud Storage bucket. The indexing process can take from 45 minutes up to 60 minutes. Wait for completion, and then proceed. You can open a different Google Cloud Console page, navigate to Vertex AI Vector search, and see how the index is being created.


In [None]:
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

DIMENSIONS = 768

tree_ah_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=DISPLAY_NAME,
    contents_delta_uri=remote_folder,
    dimensions=DIMENSIONS,
    approximate_neighbors_count=150,
    distance_measure_type="DOT_PRODUCT_DISTANCE",
    leaf_node_embedding_count=500,
    leaf_nodes_to_search_percent=80,
    description=DESCRIPTION,
)

Creating MatchingEngineIndex
Create MatchingEngineIndex backing LRO: projects/965332343123/locations/us-east1/indexes/7312517584803332096/operations/8007708138159472640
MatchingEngineIndex created. Resource name: projects/965332343123/locations/us-east1/indexes/7312517584803332096
To use this MatchingEngineIndex in another session:
index = aiplatform.MatchingEngineIndex('projects/965332343123/locations/us-east1/indexes/7312517584803332096')


Reference the index name to make sure it got created successfully.

In [21]:
INDEX_RESOURCE_NAME = tree_ah_index.resource_name
INDEX_RESOURCE_NAME

'projects/965332343123/locations/us-east1/indexes/7312517584803332096'

Using the resource name, you can retrieve an existing MatchingEngineIndex.


In [22]:
tree_ah_index = aiplatform.MatchingEngineIndex(index_name=INDEX_RESOURCE_NAME)

Create an IndexEndpoint so that it can be accessed via an API.


In [23]:
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name=DISPLAY_NAME,
    description=DISPLAY_NAME,
    public_endpoint_enabled=True,
)

Creating MatchingEngineIndexEndpoint
Create MatchingEngineIndexEndpoint backing LRO: projects/965332343123/locations/us-east1/indexEndpoints/3060310296007540736/operations/7460520783433957376
MatchingEngineIndexEndpoint created. Resource name: projects/965332343123/locations/us-east1/indexEndpoints/3060310296007540736
To use this MatchingEngineIndexEndpoint in another session:
index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/965332343123/locations/us-east1/indexEndpoints/3060310296007540736')


Deploy your index to the created endpoint. This can take up to 15 minutes.


In [27]:
DEPLOYED_INDEX_ID = "deployed_index_id_unique"

DEPLOYED_INDEX_ID


my_index_endpoint = my_index_endpoint.deploy_index(
    index=tree_ah_index, deployed_index_id=DEPLOYED_INDEX_ID
)

my_index_endpoint.deployed_indexes

Deploying index MatchingEngineIndexEndpoint index_endpoint: projects/965332343123/locations/us-east1/indexEndpoints/3060310296007540736


AlreadyExists: 409 There already exists a DeployedIndex with same ID "deployed_index_id_unique" deployed or being deployed at the following IndexEndpoint: projects/965332343123/locations/us-east1/indexEndpoints/3060310296007540736. Please use a different ID.

Verify number of declared items matches the number of embeddings. Each IndexEndpoint can have multiple indexes deployed to it. For each index, you can retrieve the number of deployed vectors using the index_endpoint._gca_resource.index_stats.vectors_count. The numbers may not match exactly due to potential rate-limiting failures incurred when using the embedding service.

In [28]:
number_of_vectors = sum(
    aiplatform.MatchingEngineIndex(
        deployed_index.index
    )._gca_resource.index_stats.vectors_count
    for deployed_index in my_index_endpoint.deployed_indexes
)

print(f"Expected: {BQ_NUM_ROWS}, Actual: {number_of_vectors}")

Expected: 5000, Actual: 350


After you build your indexes, you may query against the deployed index to find nearest neighbors.

Note: For the DOT_PRODUCT_DISTANCE distance type, the "distance" property returned with each MatchNeighbor actually refers to the similarity.


Create an embedding for a test question.


In [29]:
test_embeddings = encode_texts_to_embeddings(sentences=["Install GPU for Tensorflow"])

Test the query to retrieve the similar embeddings.


In [30]:
NUM_NEIGHBOURS = 10

response = my_index_endpoint.find_neighbors(
    deployed_index_id=DEPLOYED_INDEX_ID,
    queries=test_embeddings,
    num_neighbors=NUM_NEIGHBOURS,
)

response

[[MatchNeighbor(id='50607994', distance=0.43464401364326477, sparse_distance=None, feature_vector=[], crowding_tag='0', restricts=[], numeric_restricts=[], sparse_embedding_values=[], sparse_embedding_dimensions=[]),
  MatchNeighbor(id='31090638', distance=0.4175715446472168, sparse_distance=None, feature_vector=[], crowding_tag='0', restricts=[], numeric_restricts=[], sparse_embedding_values=[], sparse_embedding_dimensions=[]),
  MatchNeighbor(id='43732991', distance=0.4172176718711853, sparse_distance=None, feature_vector=[], crowding_tag='0', restricts=[], numeric_restricts=[], sparse_embedding_values=[], sparse_embedding_dimensions=[]),
  MatchNeighbor(id='73118840', distance=0.4167698621749878, sparse_distance=None, feature_vector=[], crowding_tag='0', restricts=[], numeric_restricts=[], sparse_embedding_values=[], sparse_embedding_dimensions=[]),
  MatchNeighbor(id='13104630', distance=0.4123101234436035, sparse_distance=None, feature_vector=[], crowding_tag='0', restricts=[], nu

Verify that the retrieved results are relevant by checking the StackOverflow links.

In [31]:
for match_index, neighbor in enumerate(response[0]):
    print(f"https://stackoverflow.com/questions/{neighbor.id}")

https://stackoverflow.com/questions/50607994
https://stackoverflow.com/questions/31090638
https://stackoverflow.com/questions/43732991
https://stackoverflow.com/questions/73118840
https://stackoverflow.com/questions/13104630
https://stackoverflow.com/questions/53183869
https://stackoverflow.com/questions/5897308
https://stackoverflow.com/questions/50972198
https://stackoverflow.com/questions/50541851
https://stackoverflow.com/questions/23581440


Clean Up the environment

In [None]:
import os

delete_bucket = False

# Force undeployment of indexes and delete endpoint
my_index_endpoint.delete(force=True)

# Delete indexes
tree_ah_index.delete()

if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -rf {BUCKET_URI}