In [1]:
# Install dependencies.
!pip install asyncio==3.4.3 asyncpg==0.27.0 cloud-sql-python-connector["asyncpg"]==1.2.3
!pip install numpy==1.22.4 pandas==1.5.3
!pip install pgvector==0.1.8
!pip install langchain==0.0.196 transformers==4.30.1
!pip install google-cloud-aiplatform==1.26.0

Collecting asyncio==3.4.3
  Downloading asyncio-3.4.3-py3-none-any.whl.metadata (1.7 kB)
Collecting asyncpg==0.27.0
  Downloading asyncpg-0.27.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl.metadata (4.7 kB)
Collecting cloud-sql-python-connector==1.2.3 (from cloud-sql-python-connector[asyncpg]==1.2.3)
  Downloading cloud_sql_python_connector-1.2.3-py2.py3-none-any.whl.metadata (22 kB)
Downloading asyncio-3.4.3-py3-none-any.whl (101 kB)
Downloading asyncpg-0.27.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl (2.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.6/2.6 MB[0m [31m41.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading cloud_sql_python_connector-1.2.3-py2.py3-none-any.whl (35 kB)
Installing collected packages: asyncio, asyncpg, cloud-sql-python-connector
Successfully installed asyncio-3.4.3 asyncpg-0.27.0 cloud-sql-python-connector-1.2.3
Collecting numpy==1.22.4
  Downloading numpy-1.22

In [2]:
# Automatically restart kernel after installs so that your environment
# can access the new packages.
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)


{'status': 'ok', 'restart': True}

In [None]:
#### Task 3. Create a Cloud SQL Instance and PostgreSQL database.

In [1]:
project_id = "qwiklabs-gcp-01-ab2d59322a73"
database_password = "cymbal@123!"
region =  "us-east1"
instance_name = "retail-ins"
database_name = "retail"
database_user = "retail-admin"

In [2]:
#@markdown Create and setup a Cloud SQL PostgreSQL instance, if not done already.
database_version = !gcloud sql instances describe {instance_name} --format="value(databaseVersion)"
if database_version[0].startswith("POSTGRES"):
  print("Found an existing Postgres Cloud SQL Instance!")
else:
  print("Creating new Cloud SQL instance...")
  !gcloud sql instances create {instance_name} --database-version=POSTGRES_15 \
    --region={region} --cpu=1 --memory=4GB --root-password={database_password}

# Create the database, if it does not exist.
out = !gcloud sql databases list --instance={instance_name} --filter="NAME:{database_name}" --format="value(NAME)"
if ''.join(out) == database_name:
  print("Database %s already exists, skipping creation." % database_name)
else:
  !gcloud sql databases create {database_name} --instance={instance_name}

# Create the database user for accessing the database.
!gcloud sql users create {database_user} \
  --instance={instance_name} \
  --password={database_password}

Creating new Cloud SQL instance...
Creating Cloud SQL instance for POSTGRES_15...done.                            
Created [https://sqladmin.googleapis.com/sql/v1beta4/projects/qwiklabs-gcp-01-ab2d59322a73/instances/retail-ins].
NAME        DATABASE_VERSION  LOCATION    TIER              PRIMARY_ADDRESS  PRIVATE_ADDRESS  STATUS
retail-ins  POSTGRES_15       us-east1-d  db-custom-1-4096  34.73.58.162     -                RUNNABLE
Creating Cloud SQL database...done.                                            
Created database [retail].
instance: retail-ins
name: retail
project: qwiklabs-gcp-01-ab2d59322a73
Creating Cloud SQL user...done.                                                
Created user [retail-admin].


In [3]:
# @markdown Verify that you are able to connect to the database. Executing this block should print the current PostgreSQL server version.

import asyncio
import asyncpg
from google.cloud.sql.connector import Connector


async def main():
    # get current running event loop to be used with Connector
    loop = asyncio.get_running_loop()
    # initialize Connector object as async context manager
    async with Connector(loop=loop) as connector:
        # create connection to Cloud SQL database
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}"
            # ... additional database driver args
        )

        # query Cloud SQL database
        results = await conn.fetch("SELECT version()")
        print(results[0]["version"])

        # close asyncpg connection
        await conn.close()


# Test connection with `asyncio`
await main()  # type: ignore

PostgreSQL 15.10 on x86_64-pc-linux-gnu, compiled by Debian clang version 12.0.1, 64-bit


  expiration = x509.not_valid_after


In [4]:
#### Task 4. Download and load the dataset in PostgreSQL

In [5]:
# Load dataset from a web URL and store it in a pandas dataframe.

import pandas as pd
import os

DATASET_URL = "https://github.com/GoogleCloudPlatform/python-docs-samples/raw/main/cloud-sql/postgres/pgvector/data/retail_toy_dataset.csv"
df = pd.read_csv(DATASET_URL)
df = df.loc[:, ["product_id", "product_name", "description", "list_price"]]
df = df.dropna()
df.head(10)

Unnamed: 0,product_id,product_name,description,list_price
0,7e8697b5b7cdb5a40daf54caf1435cd5,"Koplow Games Set of 2 D12 12-Sided Rock, Paper...","Rock, paper, scissors is a great way to resolv...",3.56
1,7de8b315b3cb91f3680eb5b88a20dcee,"12""-20"" Schwinn Training Wheels",Turn any small bicycle into an instrument for ...,28.17
2,fb9535c103d7d717f0414b2b111cfaaa,Bicycle Pinochle Jumbo Index Playing Cards - 1...,Purchase includes 1 blue deck and 1 red deck. ...,6.49
3,c73ea622b3be6a3ffa3b0b5490e4929e,Step2 Woodland Adventure Playhouse & Slide,The Step2 Woodland Climber Adventure Playhouse...,499.99
4,dec7bd1f983887650715c6fafaa5b593,Step2 Naturally Playful Welcome Home Playhouse...,Children can play and explore in the Step2 Nat...,600.0
5,74a695e3675efc2aad11ed73c46db29b,Slip N Slide Triple Racer with Slide Boogies,Triple Racer Slip and Slide with Boogie Boards...,37.21
6,3eae5293b56c25f63b47cb8a89fb4813,Hydro Tools Digital Pool/Spa Thermometer,The solar-powered Swimline Floating Digital Th...,15.92
7,ed85bf829a36c67042503ffd9b6ab475,Full Bucket Swing With Coated Chain Toddler Sw...,Safe Kids&Children Full Bucket Swing With Coa...,102.26
8,55820fa53f0583cb637d5cb2b051d78c,Banzai Water Park Splash Zone,Dive into fun in your own backyard with the B...,397.82
9,0e26a9e92e4036bfaa68eb2040a8ec97,Polaris 39-310 5-Liter Zippered Super Bag for ...,Keep your pool water sparkling clean all seaso...,39.47


In [6]:
# Save the Pandas dataframe in a PostgreSQL table.

import asyncio
import asyncpg
from google.cloud.sql.connector import Connector


async def main():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # Create connection to Cloud SQL database
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}",
        )

        await conn.execute("DROP TABLE IF EXISTS products CASCADE")
        # Create the `products` table.
        await conn.execute(
            """CREATE TABLE products(
                                product_id VARCHAR(1024) PRIMARY KEY,
                                product_name TEXT,
                                description TEXT,
                                list_price NUMERIC)"""
        )

        # Copy the dataframe to the `products` table.
        tuples = list(df.itertuples(index=False))
        await conn.copy_records_to_table(
            "products", records=tuples, columns=list(df), timeout=10
        )
        await conn.close()


# Run the SQL commands now.
await main()  # type: ignore

  expiration = x509.not_valid_after


In [7]:
#### Task 5. Generate vector embeddings using a Text Embedding model

In [8]:
# Split long text descriptions into smaller chunks that can fit into
# the API request size limit, as expected by the LLM providers.

from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    separators=[".", "\n"],
    chunk_size=500,
    chunk_overlap=0,
    length_function=len,
)
chunked = []
for index, row in df.iterrows():
    product_id = row["product_id"]
    desc = row["description"]
    splits = text_splitter.create_documents([desc])
    for s in splits:
        r = {"product_id": product_id, "content": s.page_content}
        chunked.append(r)

In [9]:
# Generate the vector embeddings for each chunk of text.
# This code snippet may run for a few minutes.

from langchain.embeddings import VertexAIEmbeddings
from google.cloud import aiplatform
import time

aiplatform.init(project=f"{project_id}", location=f"{region}")
embeddings_service = VertexAIEmbeddings()


# Helper function to retry failed API requests with exponential backoff.
def retry_with_backoff(func, *args, retry_delay=5, backoff_factor=2, **kwargs):
    max_attempts = 10
    retries = 0
    for i in range(max_attempts):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            print(f"error: {e}")
            retries += 1
            wait = retry_delay * (backoff_factor**retries)
            print(f"Retry after waiting for {wait} seconds...")
            time.sleep(wait)


batch_size = 5
for i in range(0, len(chunked), batch_size):
    request = [x["content"] for x in chunked[i : i + batch_size]]
    response = retry_with_backoff(embeddings_service.embed_documents, request)
    # Store the retrieved vector embeddings for each chunk back.
    for x, e in zip(chunked[i : i + batch_size], response):
        x["embedding"] = e

# Store the generated embeddings in a pandas dataframe.
product_embeddings = pd.DataFrame(chunked)
product_embeddings.head()

AttributeError: module 'shapely' has no attribute 'strtree'

In [10]:
#### Task 6. Use pgvector to store the generated embeddings within PostgreSQL

In [11]:
# Store the generated vector embeddings in a PostgreSQL table.
# This code may run for a few minutes.

import asyncio
import asyncpg
from google.cloud.sql.connector import Connector
import numpy as np
from pgvector.asyncpg import register_vector


async def main():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # Create connection to Cloud SQL database.
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}",
        )

        await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
        await register_vector(conn)

        await conn.execute("DROP TABLE IF EXISTS product_embeddings")
        # Create the `product_embeddings` table to store vector embeddings.
        await conn.execute(
            """CREATE TABLE product_embeddings(
                                product_id VARCHAR(1024) NOT NULL REFERENCES products(product_id),
                                content TEXT,
                                embedding vector(768))"""
        )

        # Store all the generated embeddings back into the database.
        for index, row in product_embeddings.iterrows():
            await conn.execute(
                "INSERT INTO product_embeddings (product_id, content, embedding) VALUES ($1, $2, $3)",
                row["product_id"],
                row["content"],
                np.array(row["embedding"]),
            )

        await conn.close()


# Run the SQL commands now.
await main()  # type: ignore

NameError: name 'product_embeddings' is not defined

In [12]:
#### Task 7. Finding similar toy products using pgvector cosine search operator

In [13]:
# @markdown Enter a short description of the toy to search for within a specified price range:
toy = "playing card games"  # @param {type:"string"}
min_price = 25  # @param {type:"integer"}
max_price = 100  # @param {type:"integer"}

# Quick input validations.
assert toy, "⚠️ Please input a valid input search text"

from langchain.embeddings import VertexAIEmbeddings
from google.cloud import aiplatform

aiplatform.init(project=f"{project_id}", location=f"{region}")

embeddings_service = VertexAIEmbeddings()
qe = embeddings_service.embed_query([toy])
from pgvector.asyncpg import register_vector
import asyncio
import asyncpg
from google.cloud.sql.connector import Connector

matches = []


async def main():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # Create connection to Cloud SQL database.
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}",
        )

        await register_vector(conn)
        similarity_threshold = 0.1
        num_matches = 50

        # Find similar products to the query using cosine similarity search
        # over all vector embeddings. This new feature is provided by `pgvector`.
        results = await conn.fetch(
            """
                            WITH vector_matches AS (
                              SELECT product_id, 1 - (embedding <=> $1) AS similarity
                              FROM product_embeddings
                              WHERE 1 - (embedding <=> $1) > $2
                              ORDER BY similarity DESC
                              LIMIT $3
                            )
                            SELECT product_name, list_price, description FROM products
                            WHERE product_id IN (SELECT product_id FROM vector_matches)
                            AND list_price >= $4 AND list_price <= $5
                            """,
            qe,
            similarity_threshold,
            num_matches,
            min_price,
            max_price,
        )

        if len(results) == 0:
            raise Exception("Did not find any results. Adjust the query parameters.")

        for r in results:
            # Collect the description for all the matched similar toy products.
            matches.append(
                {
                    "product_name": r["product_name"],
                    "description": r["description"],
                    "list_price": round(r["list_price"], 2),
                }
            )

        await conn.close()


# Run the SQL commands now.
await main()  # type: ignore

# Show the results for similar products that matched the user query.
matches = pd.DataFrame(matches)
matches.head(5)

AttributeError: module 'shapely' has no attribute 'strtree'

In [14]:
#### Task 8. LLMs and LangChain

In [15]:
# @markdown Enter the user search query in a simple English text. The price filters are shown separately here for demo purposes. These filters may represent additional input from your frontend application.
# Please fill in these values.
user_query = "Do you have a beach toy set that teaches numbers and letters to kids?"  # @param {type:"string"}
min_price = 20  # @param {type:"integer"}
max_price = 100  # @param {type:"integer"}

# Quick input validations.
assert user_query, "⚠️ Please input a valid input search text"

In [16]:
qe = embeddings_service.embed_query([user_query])

NameError: name 'embeddings_service' is not defined