# Set up the Jupyter notebook environment

Install the Google Cloud Vertex AI, Cloud Storage and BigQuery SDKs.

In [1]:
! pip3 install --upgrade google-cloud-aiplatform \
                        google-cloud-storage \
                        'google-cloud-bigquery[pandas]'

Collecting google-cloud-storage
  Downloading google_cloud_storage-3.1.0-py2.py3-none-any.whl.metadata (12 kB)


Restart kernel 

In [2]:
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)


{'status': 'ok', 'restart': True}

Setup the environment values for your project

Setting up variables

In [6]:
PROJECT = !gcloud config get-value project
PROJECT_ID = PROJECT[0]
REGION = "us-west1"

Import and initialize the Vertex AI Python SDK.

In [7]:
import vertexai
vertexai.init(project = PROJECT_ID,
              location = REGION)

# Prepare the data in BigQuery

The dataset used for this lab is the [StackOverflow dataset](https://console.cloud.google.com/marketplace/product/stack-exchange/stack-overflow?inv=1&invt=Abylhg&project=qwiklabs-gcp-02-9c2eecad5f04). This public dataset is hosted in Google BigQuery and is included in BigQuery's 1TB/mo of free tier processing. This means that each user receives 1TB of free BigQuery processing every month, which can be used to run queries on this public dataset.

Stack Overflow is the largest online community for programmers to learn, share their knowledge, and advance their careers. Updated on a quarterly basis, this BigQuery dataset includes an archive of Stack Overflow content, including posts, votes, tags, and badges. This dataset is updated to mirror the Stack Overflow content on the Internet Archive, and is also available through the Stack Exchange Data Explorer.

The BigQuery table is too large to fit into memory, so you need to write a generator called `query_bigquery_chunks` to yield chunks of the dataframe for processing. Additionally, an extra column `title_with_body` is added, which is a concatenation of the question title and body that will be used for creating embeddings.

Import the libraries and initialize the BigQuery client.

In [8]:
import math
from typing import Any, Generator

import pandas as pd
from google.cloud import bigquery

client = bigquery.Client(project=PROJECT_ID)

Define the BigQuery query for the remote dataset.

In [9]:
QUERY_TEMPLATE = """
        SELECT distinct q.id, q.title, q.body
        FROM (SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` where Score>0 ORDER BY View_Count desc) AS q
        LIMIT {limit} OFFSET {offset};
        """

Create a function to access the BigQuery data in chunks

In [10]:
def query_bigquery_chunks(
    max_rows: int, rows_per_chunk: int, start_chunk: int = 0
) -> Generator[pd.DataFrame, Any, None]:
    for offset in range(start_chunk, max_rows, rows_per_chunk):
        query = QUERY_TEMPLATE.format(limit=rows_per_chunk, offset=offset)
        query_job = client.query(query)
        rows = query_job.result()
        df = rows.to_dataframe()
        df["title_with_body"] = df.title + "\n" + df.body
        yield df

Get a dataframe of 1000 rows for demonstration purposes

In [11]:
df = next(query_bigquery_chunks(max_rows=1000, rows_per_chunk=1000))

# Examine the data
df.head()

Unnamed: 0,id,title,body,title_with_body
0,9257858,Get mutual subscriptions between two users,<p>I have a table as follows</p>\n\n<pre><code...,Get mutual subscriptions between two users\n<p...
1,9243623,UITableView not selecting properly,"<p>I am working on an iPhone app, where I have...",UITableView not selecting properly\n<p>I am wo...
2,9459273,".Ajax method works on local machine, but not o...",<p>I have an .Ajax method that calls a method ...,".Ajax method works on local machine, but not o..."
3,4569123,Content is not allowed in Prolog SAXParserExce...,<p>I am trying to call a web service but facin...,Content is not allowed in Prolog SAXParserExce...
4,4308934,How to delete last character from a string usi...,<p>How to delete last character from a string ...,How to delete last character from a string usi...


# Create text embeddings from BigQuery data

Load the `Vertex AI Embeddings` for Text model.

In [12]:
from typing import List, Optional
from vertexai.preview.language_models import TextEmbeddingModel

model = TextEmbeddingModel.from_pretrained("text-embedding-004")

Define an embedding method that uses the model.

In [13]:
def encode_texts_to_embeddings(sentences: List[str]) -> List[Optional[List[float]]]:
    try:
        embeddings = model.get_embeddings(sentences)
        return [embedding.values for embedding in embeddings]
    except Exception:
        return [None for _ in range(len(sentences))]

According to the documentation, each request can handle up to 5 text instances. So we will need to split the BigQuery question results in batches of 5 before sending to the embedding API.

Create a `generate_batches` to split results in batches of 5 to be sent to the embeddings API.

In [14]:
import functools
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Generator, List, Tuple

import numpy as np
from tqdm.auto import tqdm


# Generator function to yield batches of sentences
def generate_batches(
    sentences: List[str], batch_size: int
) -> Generator[List[str], None, None]:
    for i in range(0, len(sentences), batch_size):
        yield sentences[i : i + batch_size]

Encapsulate the process of generating batches and calling the embeddings API in a method called `encode_text_to_embedding_batched`. 
This method also handles `rate-limiting` using `time.sleep`. 
For production use cases, you would want a more sophisticated rate-limiting mechanism that takes retries into account.

In [15]:
def encode_text_to_embedding_batched(
    sentences: List[str], api_calls_per_second: int = 10, batch_size: int = 5
) -> Tuple[List[bool], np.ndarray]:

    embeddings_list: List[List[float]] = []

    # Prepare the batches using a generator
    batches = generate_batches(sentences, batch_size)

    seconds_per_job = 1 / api_calls_per_second

    with ThreadPoolExecutor() as executor:
        futures = []
        for batch in tqdm(
            batches, total=math.ceil(len(sentences) / batch_size), position=0
        ):
            futures.append(
                executor.submit(functools.partial(encode_texts_to_embeddings), batch)
            )
            time.sleep(seconds_per_job)

        for future in futures:
            embeddings_list.extend(future.result())

    is_successful = [
        embedding is not None for sentence, embedding in zip(sentences, embeddings_list)
    ]
    embeddings_list_successful = np.squeeze(
        np.stack([embedding for embedding in embeddings_list if embedding is not None])
    )
    return is_successful, embeddings_list_successful

Test the encoding function by encoding a subset of data and see if the embeddings and distance metrics make sense

In [16]:
# Encode a subset of questions for validation
questions = df.title.tolist()[:500]
is_successful, question_embeddings = encode_text_to_embedding_batched(
    sentences=df.title.tolist()[:500]
)

# Filter for successfully embedded sentences
questions = np.array(questions)[is_successful]

  0%|          | 0/100 [00:00<?, ?it/s]

Save the dimension size for later usage when creating the Vertex AI Vector Search index.

In [17]:
DIMENSIONS = len(question_embeddings[0])

print(DIMENSIONS)

768


Sort questions in order of similarity. According to the [embedding documentation](https://cloud.google.com/vertex-ai/docs/generative-ai/embeddings/get-text-embeddings#colab_example_of_semantic_search_using_embeddings), the similarity of embeddings is calculated using the dot-product, with np.dot. Once you have the similarity score, sort the results and print them for inspection. 1 means very similar, 0 means very different.

In [18]:
import random

question_index = random.randint(0, 99)

print(f"Query question = {questions[question_index]}")

# Get similarity scores for each embedding by using dot-product.
scores = np.dot(question_embeddings[question_index], question_embeddings.T)

# Print top 20 matches
for index, (question, score) in enumerate(
    sorted(zip(questions, scores), key=lambda x: x[1], reverse=True)[:20]
):
    print(f"\t{index}: {question}: {score}")

Query question = DI: Register-Resolve-Release when using child containers
	0: DI: Register-Resolve-Release when using child containers: 0.9999978110458627
	1: C# not releasing memory after task complete: 0.5136752460923943
	2: Embed xml into dll: 0.4803991893181705
	3: Load whole *ui file in an frame/widget of another *.ui file: 0.4545462072696377
	4: Using a 'using alias = class' with generic types?: 0.44325575692733843
	5: Returning values from MyBatis <insert> mapped methods: 0.4405694235804478
	6: How inefficient is passing Collections.unmodifiable* an instance which is already wrapped with Collections.unmodifiable*?: 0.4169665087870481
	7: Is a jbyteArray a jobject (i.e.: as a parameter to DeleteLocalRef)?: 0.3929884675899803
	8: Regex.Replace and String immutability: 0.3880198360882109
	9: PHP internationalization with intl: 0.3860386004774648
	10: Render List returned via JQuery: 0.3849918594436284
	11: Where to Store Encryption Keys MVC Application: 0.38287309031907313
	12: Aut

Save the embeddings in JSONL format. The data must be formatted in JSONL format, which means each embedding dictionary is written as an individual JSON object on its own line.

In [19]:
import tempfile
from pathlib import Path

# Create temporary file to write embeddings to
embeddings_file_path = Path(tempfile.mkdtemp())

print(f"Embeddings directory: {embeddings_file_path}")

Embeddings directory: /var/tmp/tmpzx18w_a0


Write embeddings in batches to prevent out-of-memory errors. Notice we are only using 5000 questions so that the embedding creation process and indexing is faster. The dataset contains more than 50,000 questions. This step will take around 5 minutes.

In [24]:
import gc
import json

BQ_NUM_ROWS = 5000
BQ_CHUNK_SIZE = 1000
BQ_NUM_CHUNKS = math.ceil(BQ_NUM_ROWS / BQ_CHUNK_SIZE)

START_CHUNK = 0

# Create a rate limit of 300 requests per minute. Adjust this depending on your quota.
API_CALLS_PER_SECOND = 300 / 60
# According to the docs, each request can process 5 instances per request
ITEMS_PER_REQUEST = 5

# Loop through each generated dataframe, convert
for i, df in tqdm(
    enumerate(
        query_bigquery_chunks(
            max_rows=BQ_NUM_ROWS, rows_per_chunk=BQ_CHUNK_SIZE, start_chunk=START_CHUNK
        )
    ),
    total=BQ_NUM_CHUNKS - START_CHUNK,
    position=-1,
    desc="Chunk of rows from BigQuery",
):
    # Create a unique output file for each chunk
    chunk_path = embeddings_file_path.joinpath(
        f"{embeddings_file_path.stem}_{i+START_CHUNK}.json"
    )
    with open(chunk_path, "a") as f:
        id_chunk = df.id

        # Convert batch to embeddings
        is_successful, question_chunk_embeddings = encode_text_to_embedding_batched(
            sentences=df.title_with_body.to_list(),
            api_calls_per_second=API_CALLS_PER_SECOND,
            batch_size=ITEMS_PER_REQUEST,
        )

        # Append to file
        embeddings_formatted = [
            json.dumps(
                {
                    "id": str(id),
                    "embedding": [str(value) for value in embedding],
                }
            )
            + "\n"
            for id, embedding in zip(id_chunk[is_successful], question_chunk_embeddings)
        ]
        f.writelines(embeddings_formatted)

        # Delete the DataFrame and any other large data structures
        del df
        gc.collect()

Chunk of rows from BigQuery:   0%|          | 0/5 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

# Upload embeddings to Cloud Storage

Upload the text-embeddings to Cloud Storage, so that Vertex AI Vector Search can access them later.

Define a bucket where you will store your embeddings.

In [25]:
BUCKET_URI = f"gs://{PROJECT_ID}-unique"

Create your Cloud Storage bucket.

In [26]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://qwiklabs-gcp-02-9c2eecad5f04-unique/...


Upload the training data to a Google Cloud Storage bucket.

In [27]:
remote_folder = f"{BUCKET_URI}/{embeddings_file_path.stem}/"
! gsutil -m cp -r {embeddings_file_path}/* {remote_folder}

Copying file:///var/tmp/tmpzx18w_a0/tmpzx18w_a0_0.json [Content-Type=application/json]...
Copying file:///var/tmp/tmpzx18w_a0/tmpzx18w_a0_1.json [Content-Type=application/json]...
Copying file:///var/tmp/tmpzx18w_a0/tmpzx18w_a0_2.json [Content-Type=application/json]...
Copying file:///var/tmp/tmpzx18w_a0/tmpzx18w_a0_3.json [Content-Type=application/json]...
Copying file:///var/tmp/tmpzx18w_a0/tmpzx18w_a0_4.json [Content-Type=application/json]...
- [5/5 files][ 27.9 MiB/ 27.9 MiB] 100% Done                                    
Operation completed over 5 objects/27.9 MiB.                                     


# Create an Index in Vertex AI Vector Search for your embeddings

Setup your index name and description.

In [28]:
DISPLAY_NAME = "stack_overflow"
DESCRIPTION = "question titles and bodies from stackoverflow"

Create the index. 
Notice that the index reads the embeddings from the Cloud Storage bucket. 
The indexing process can take from 45 minutes up to 60 minutes. 
Wait for completion, and then proceed. You can open a different Google Cloud Console page, navigate to Vertex AI Vector search, and see how the index is being created.

In [29]:
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

DIMENSIONS = 768

tree_ah_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=DISPLAY_NAME,
    contents_delta_uri=remote_folder,
    dimensions=DIMENSIONS,
    approximate_neighbors_count=150,
    distance_measure_type="DOT_PRODUCT_DISTANCE",
    leaf_node_embedding_count=500,
    leaf_nodes_to_search_percent=80,
    description=DESCRIPTION,
)

Creating MatchingEngineIndex
Create MatchingEngineIndex backing LRO: projects/545248027730/locations/us-west1/indexes/6648183863208050688/operations/3743299981147111424
MatchingEngineIndex created. Resource name: projects/545248027730/locations/us-west1/indexes/6648183863208050688
To use this MatchingEngineIndex in another session:
index = aiplatform.MatchingEngineIndex('projects/545248027730/locations/us-west1/indexes/6648183863208050688')


Reference the index name to make sure it got created successfully

In [30]:
INDEX_RESOURCE_NAME = tree_ah_index.resource_name
INDEX_RESOURCE_NAME

'projects/545248027730/locations/us-west1/indexes/6648183863208050688'

Using the resource name, you can retrieve an existing MatchingEngineIndex.

In [31]:
tree_ah_index = aiplatform.MatchingEngineIndex(index_name=INDEX_RESOURCE_NAME)

Create an IndexEndpoint so that it can be accessed via an API

In [32]:
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name=DISPLAY_NAME,
    description=DISPLAY_NAME,
    public_endpoint_enabled=True,
)

Creating MatchingEngineIndexEndpoint
Create MatchingEngineIndexEndpoint backing LRO: projects/545248027730/locations/us-west1/indexEndpoints/2908542507002363904/operations/8625201977216729088
MatchingEngineIndexEndpoint created. Resource name: projects/545248027730/locations/us-west1/indexEndpoints/2908542507002363904
To use this MatchingEngineIndexEndpoint in another session:
index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/545248027730/locations/us-west1/indexEndpoints/2908542507002363904')


Deploy your index to the created endpoint. This can take up to 15 minutes

In [33]:
DEPLOYED_INDEX_ID = "deployed_index_id_unique"

DEPLOYED_INDEX_ID


my_index_endpoint = my_index_endpoint.deploy_index(
    index=tree_ah_index, deployed_index_id=DEPLOYED_INDEX_ID
)

my_index_endpoint.deployed_indexes

Deploying index MatchingEngineIndexEndpoint index_endpoint: projects/545248027730/locations/us-west1/indexEndpoints/2908542507002363904
Deploy index MatchingEngineIndexEndpoint index_endpoint backing LRO: projects/545248027730/locations/us-west1/indexEndpoints/2908542507002363904/operations/2257112104114847744
MatchingEngineIndexEndpoint index_endpoint Deployed index. Resource name: projects/545248027730/locations/us-west1/indexEndpoints/2908542507002363904


[id: "deployed_index_id_unique"
index: "projects/545248027730/locations/us-west1/indexes/6648183863208050688"
create_time {
  seconds: 1748425073
  nanos: 774808000
}
index_sync_time {
  seconds: 1748426978
  nanos: 935213000
}
automatic_resources {
  min_replica_count: 2
  max_replica_count: 2
}
deployment_group: "default"
]

---
Verify number of declared items matches the number of embeddings. 

Each IndexEndpoint can have multiple indexes deployed to it. 

For each index, you can retrieve the number of deployed vectors using the index_endpoint._gca_resource.index_stats.vectors_count. 

The numbers may not match exactly due to potential rate-limiting failures incurred when using the embedding service.

In [None]:
number_of_vectors = sum(
    aiplatform.MatchingEngineIndex(
        deployed_index.index
    )._gca_resource.index_stats.vectors_count
    for deployed_index in my_index_endpoint.deployed_indexes
)

print(f"Expected: {BQ_NUM_ROWS}, Actual: {number_of_vectors}")

# Create online queries

After you build your indexes, you may query against the deployed index to find nearest neighbors.

Note: For the `DOT_PRODUCT_DISTANCE` distance type, the "distance" property returned with each MatchNeighbor actually refers to the similarity.

Create an embedding for a test question.

In [None]:
test_embeddings = encode_texts_to_embeddings(sentences=["Install GPU for Tensorflow"])

Test the query to retrieve the similar embeddings.

In [None]:
NUM_NEIGHBOURS = 10

response = my_index_endpoint.find_neighbors(
    deployed_index_id=DEPLOYED_INDEX_ID,
    queries=test_embeddings,
    num_neighbors=NUM_NEIGHBOURS,
)

response

Verify that the retrieved results are relevant by checking the StackOverflow links

In [None]:
for match_index, neighbor in enumerate(response[0]):
    print(f"https://stackoverflow.com/questions/{neighbor.id}")

# Clean up the Google Cloud environment

To clean up all Google Cloud resources used in this project, you can delete the Google Cloud project you used for the lab. 
You can also manually delete resources that you created by running the following code.

In [None]:
import os

delete_bucket = False

# Force undeployment of indexes and delete endpoint
my_index_endpoint.delete(force=True)

# Delete indexes
tree_ah_index.delete()

if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -rf {BUCKET_URI}